diff --git a/Pipfile b/Pipfile index 97a1824..202ffbf 100644 --- a/Pipfile +++ b/Pipfile @@ -10,7 +10,7 @@ telethon = "*" [dev-packages] [requires] -python_version = "3.9" +python_version = "3.11" [scripts] bot = "python3 ./bot.py" diff --git a/bot.py b/bot.py index 857f5f5..ce74019 100644 --- a/bot.py +++ b/bot.py @@ -6,10 +6,9 @@ from telethon import functions, types, errors from pathlib import Path import asyncio -logging.basicConfig(filename=str('bot.log'),level=logging.DEBUG) +logging.basicConfig(filename=str('bot.log'),level=logging.INFO) log = logging.getLogger('gentoobot') - - +semaphore = asyncio.Semaphore(10) class StickerPackDownloader: _log = logging.getLogger('gentoobot.packdownloader') @@ -23,20 +22,23 @@ class StickerPackDownloader: self.sticker_downloads.mkdir(exist_ok=True) async def download(self, inputpack): - if inputpack.id not in self.seen: - try: - req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) - stickers = await self.client(req) - sticker_dir = self.sticker_downloads/str(inputpack.id) - sticker_dir.mkdir(exist_ok=True) - with open(str(sticker_dir/'meta.txt'), 'w') as meta: - meta.write(stickers.to_json()) - self._log.info("preparing to download whole stickerset %s", inputpack.id) - for doc in stickers.documents: - await self.downloader.download(doc) - self.seen.add(inputpack.id) - except errors.rpcerrorlist.StickersetInvalidError: - self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) + if inputpack.id in self.seen: + return + self.seen.add(inputpack.id) + try: + req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) + await for_online() + stickers = await self.client(req) + self._log.info("downloadding stickerset %s %s", inputpack.id, stickers.set.title) + sticker_dir = self.sticker_downloads/str(inputpack.id) + sticker_dir.mkdir(exist_ok=True) + with open(str(sticker_dir/'meta.txt'), 'w') as meta: + meta.write(stickers.to_json()) + self._log.debug("preparing to download whole stickerset %s", inputpack.id) + for doc in stickers.documents: + await self.downloader.download(doc) + except errors.rpcerrorlist.StickersetInvalidError: + self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) class StickerDownloader: mimes={ @@ -52,7 +54,7 @@ class StickerDownloader: self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads.mkdir(exist_ok=True) self.seen_stickers = set() - self.seen_packs = set() + self.pending_stickers = set() self.client = client @classmethod @@ -71,21 +73,30 @@ class StickerDownloader: meta['alt'] = a.alt return meta - async def download(self, document): - if document.id not in self.seen_stickers: - try: - meta = self.get_sticker_metadata(document) - to_dir = self.sticker_downloads / str(meta.get('pack', 'inline')) - to_dir.mkdir(exist_ok=True) - to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, '')) - if not to_file.exists() or to_file.stat().st_size != document.size: - self._log.info("downloading %s to %s", document.id, str(to_file)) + async def download_task(self, document): + if document.id in self.seen_stickers: + return + self.seen_stickers.add(document.id) + try: + meta = self.get_sticker_metadata(document) + to_dir = self.sticker_downloads / str(meta.get('pack', 'inline')) + to_dir.mkdir(exist_ok=True) + to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, '')) + if not to_file.exists() or to_file.stat().st_size != document.size: + self._log.debug("downloading %s to %s", document.id, str(to_file)) + await for_online() + async with semaphore: await self.client.download_file(document, to_file) - self.seen_stickers.add(document.id) - else: - self._log.info("document %s already downloaded at %s", document.id, str(to_file)) - except Exception as e: - self._log.critical("oops: ", exc_info=e) + self._log.info("downloaded %s to %s", document.id, str(to_file)) + else: + self._log.debug("document %s already downloaded at %s", document.id, str(to_file)) + except Exception as e: + self._log.critical("oops: ", exc_info=e) + self.seen_stickers.remove(document.id) + + + async def download(self, document): + asyncio.create_task(self.download_task(document)) @@ -103,36 +114,21 @@ def make_client(nth): log.debug("client id %s hash %s", app_id, app_hash) return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash) -def main(): - cfg = load_config('config.yaml') - app_id = cfg['apps'][0]['id'] - app_hash = cfg['apps'][0]['hash'] - log.debug("client id %s hash %s", app_id, app_hash) - client = TelegramClient('session0'+str(app_id), app_id, app_hash) - client.start() - log.debug(client.get_me().stringify()) - dl = StickerDownloader(client, './download') - try: - for found in iter_sources(client, cfg['sources']): - if isinstance(found, types.Document): - dl.download(found) - elif isinstance(found, types.InputStickerSetID): - dl.download_pack(found) - except KeyboardInterrupt: - log.warn('exiting: interrupted by keyboard') - client.disconnect() - class DialogProcessor: _log = logging.getLogger('gentoobot.messages') - def __init__(self, client, dialog, downloader, packdownloader): + def __init__(self, client, dialog, task_group, destdir): self.client = client - self.downloader = downloader - self.packdownloader = packdownloader self.dialog = dialog + self.task_group = task_group + self.downloader = StickerDownloader(self.client, destdir) + self.packdownloader = StickerPackDownloader(self.downloader) async def process(self): async for message in self.client.iter_messages(self.dialog): - asyncio.create_task(self.check_message(message)) + await for_online() + async with semaphore: + self._log.debug("starting task for message") + self.task_group.create_task(self.check_message(message)) async def check_message(self, message): try: @@ -145,9 +141,11 @@ class DialogProcessor: for a in message.media.document.attributes: if hasattr(a, 'stickerset'): self._log.debug("document %s is a sticker", message.media.document.id) + await for_online() await self.downloader.download(message.media.document) if isinstance(a.stickerset, types.InputStickerSetID): self._log.debug("document %s belongs to stickerset", message.media.document.id) + await for_online() await self.packdownloader.download(a.stickerset) except Exception as e: self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) @@ -157,6 +155,7 @@ class ArchiveWalker: def __init__(self, client, archives): self.client = client self.archives = archives + self.task_group = asyncio.TaskGroup() async def fetch_dialogs(self): async for dialog in self.client.iter_dialogs(): @@ -176,17 +175,27 @@ class ArchiveWalker: yield dialog, src async def walk(self): - async for dialog, options in self.fetch_dialogs(): - downloader = StickerDownloader(self.client, Path(options.get('destdir', 'downloads'))) - packdownloader = StickerPackDownloader(downloader) - processor = DialogProcessor(self.client, dialog, downloader, packdownloader) - asyncio.create_task(processor.process()) + async with self.task_group: + async for dialog, options in self.fetch_dialogs(): + processor = DialogProcessor(self.client, dialog, self.task_group, Path(options.get('destdir', 'downloads'))) + self._log.debug('starting dialog processor') + self.task_group.create_task(processor.process()) + self._log.debug('walker finished') + + +async def for_online(): + while not client.is_connected(): + log.warn('disconnected') + await asyncio.sleep(10) + async def main(): - log.debug((await client.get_me()).stringify()) - walker = ArchiveWalker(client, cfg['sources']) - await walker.walk() - await client.run_until_disconnected() + try: + log.debug((await client.get_me()).stringify()) + walker = ArchiveWalker(client, cfg['sources']) + await walker.walk() + except KeyboardInterrupt: + return