From ec8008c29e81ba539e0e4c1ae761bfe9bc6fd785 Mon Sep 17 00:00:00 2001 From: Aleksey Chubukov Date: Mon, 6 Feb 2023 22:14:45 +0300 Subject: [PATCH] async version --- Pipfile.lock | 6 +- bot.py | 208 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 131 insertions(+), 83 deletions(-) diff --git a/Pipfile.lock b/Pipfile.lock index 5592b11..6f27392 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -96,11 +96,11 @@ }, "telethon": { "hashes": [ - "sha256:06edc1852ae0eacef6f598b96638cf1fbd30e505bd314268ff762eaf3c1d550f", - "sha256:3a6c89fb3108cbc6872a5056ad3dddd0895825f9b08a549216f35f231ac2e611" + "sha256:21fb26051adc521a4a00a157e6f4a9e87711940ac3504414f96e66056918ef61", + "sha256:39ae3c3335ddd5acc80e395969f27556df140a73e58e9d3bb45863c766c23a8c" ], "index": "pypi", - "version": "==1.25.1" + "version": "==1.27.0" } }, "develop": {} diff --git a/bot.py b/bot.py index d9582ba..857f5f5 100644 --- a/bot.py +++ b/bot.py @@ -4,62 +4,53 @@ from telethon import TelegramClient, events, sync import logging from telethon import functions, types, errors from pathlib import Path +import asyncio -logging.basicConfig(filename=str("bot.log"),level=logging.DEBUG) -log = logging.getLogger("gentoobot") +logging.basicConfig(filename=str('bot.log'),level=logging.DEBUG) +log = logging.getLogger('gentoobot') -def fetch_dialogs(dialogs, sources): - for dialog in dialogs: - log.debug(dialog) - e = dialog.entity - for src in sources: - if e.id != src["id"]: - continue - log.debug('dialog %s %s matches by id', e.id, e.title) - if src["type"] == "Channel" and not isinstance(e, types.Channel): - log.debug('dialog %s is not a channel', e.id) - continue - elif src["type"] == "User" and not isinstance(e, types.User): - continue - log.debug('dialog %s is not a user', e.id) - log.info(dialog.stringify()) - yield dialog -def iter_sources(client, sources,): - for dialog in fetch_dialogs(client.iter_dialogs(), sources): - for message in client.iter_messages(dialog, limit=None): + +class StickerPackDownloader: + _log = logging.getLogger('gentoobot.packdownloader') + def __init__(self, downloader): + self.seen=set() + self.client = downloader.client + self.downloader = downloader + self.downloads_root = Path(str(downloader.downloads_root)) + self.downloads_root.mkdir(exist_ok=True) + self.sticker_downloads = self.downloads_root/'stickers' + self.sticker_downloads.mkdir(exist_ok=True) + + async def download(self, inputpack): + if inputpack.id not in self.seen: try: - if not hasattr(message, "media"): - log.debug("message %i has no media", message.id) - continue - if not hasattr(message.media, "document"): - log.debug("message %i has no documents", message.id) - continue - for a in message.media.document.attributes: - if hasattr(a, "stickerset"): - log.debug("document %s is a sticker", message.media.document.id) - yield message.media.document - if isinstance(a.stickerset, types.InputStickerSetID): - log.debug("document %s belongs to stickerset", message.media.document.id) - yield a.stickerset - except Exception as e: - log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) - + req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) + stickers = await self.client(req) + sticker_dir = self.sticker_downloads/str(inputpack.id) + sticker_dir.mkdir(exist_ok=True) + with open(str(sticker_dir/'meta.txt'), 'w') as meta: + meta.write(stickers.to_json()) + self._log.info("preparing to download whole stickerset %s", inputpack.id) + for doc in stickers.documents: + await self.downloader.download(doc) + self.seen.add(inputpack.id) + except errors.rpcerrorlist.StickersetInvalidError: + self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) class StickerDownloader: mimes={ - "image/webp": "webp", - "application/x-tgsticker": "tgs", - "video/webm": "webm" + 'image/webp': 'webp', + 'application/x-tgsticker': 'tgs', + 'video/webm': 'webm' } - _log = logging.getLogger("gentoobot.downloader") + _log = logging.getLogger('gentoobot.downloader') def __init__(self, client, dl_root): self.downloads_root = Path(str(dl_root)) self.downloads_root.mkdir(exist_ok=True) - self.sticker_downloads = self.downloads_root/"stickers" + self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads.mkdir(exist_ok=True) - self.downloads_root.mkdir(exist_ok=True) self.seen_stickers = set() self.seen_packs = set() self.client = client @@ -67,29 +58,29 @@ class StickerDownloader: @classmethod def get_sticker_metadata(cls,document): meta = { - "id": document.id, - "date": document.date + 'id': document.id, + 'date': document.date } for a in document.attributes: if isinstance(a, types.DocumentAttributeSticker): if isinstance(a.stickerset, types.InputStickerSetID): cls._log.debug("document %s is a normal sticker", document.id) - meta["pack"] = a.stickerset.id + meta['pack'] = a.stickerset.id if hasattr(a, 'alt'): - cls._log.debug('sticker has alt text %s', a.alt) - meta["alt"] = a.alt + cls._log.debug("sticker has alt text %s", a.alt) + meta['alt'] = a.alt return meta - def download(self, document): + async def download(self, document): if document.id not in self.seen_stickers: try: meta = self.get_sticker_metadata(document) - to_dir = self.sticker_downloads / str(meta.get("pack", "inline")) + to_dir = self.sticker_downloads / str(meta.get('pack', 'inline')) to_dir.mkdir(exist_ok=True) - to_file = to_dir / (str(document.id)+"."+self.mimes.get(document.mime_type, "")) + to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, '')) if not to_file.exists() or to_file.stat().st_size != document.size: self._log.info("downloading %s to %s", document.id, str(to_file)) - self.client.download_file(document, to_file) + await self.client.download_file(document, to_file) self.seen_stickers.add(document.id) else: self._log.info("document %s already downloaded at %s", document.id, str(to_file)) @@ -97,21 +88,6 @@ class StickerDownloader: self._log.critical("oops: ", exc_info=e) - def download_pack(self, inputpack): - if inputpack.id not in self.seen_packs: - try: - req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) - stickers=self.client(req) - sticker_dir = self.sticker_downloads/str(inputpack.id) - sticker_dir.mkdir(exist_ok=True) - with open(str(sticker_dir/"meta.txt"), "w") as meta: - meta.write(stickers.to_json()) - self._log.info("preparing to download whole stickerset %s", inputpack.id) - for doc in stickers.documents: - self.download(doc) - self.seen_packs.add(inputpack.id) - except errors.rpcerrorlist.StickersetInvalidError: - self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) def load_config(path): log.debug("opening %s", repr(path)) @@ -121,35 +97,107 @@ def load_config(path): return cfg def make_client(nth): - cfg = load_config("config.yaml") - app_id = cfg["apps"][nth]["id"] - app_hash = cfg["apps"][nth]['hash'] + cfg = load_config('config.yaml') + app_id = cfg['apps'][nth]['id'] + app_hash = cfg['apps'][nth]['hash'] log.debug("client id %s hash %s", app_id, app_hash) - return TelegramClient("session"+str(nth)+str(app_id), app_id, app_hash) + return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash) def main(): - cfg = load_config("config.yaml") - app_id = cfg["apps"][0]["id"] - app_hash = cfg["apps"][0]['hash'] + cfg = load_config('config.yaml') + app_id = cfg['apps'][0]['id'] + app_hash = cfg['apps'][0]['hash'] log.debug("client id %s hash %s", app_id, app_hash) - client = TelegramClient("session0"+str(app_id), app_id, app_hash) + client = TelegramClient('session0'+str(app_id), app_id, app_hash) client.start() log.debug(client.get_me().stringify()) - dl = StickerDownloader(client, "./download") + dl = StickerDownloader(client, './download') try: - for found in iter_sources(client, cfg["sources"]): + for found in iter_sources(client, cfg['sources']): if isinstance(found, types.Document): dl.download(found) elif isinstance(found, types.InputStickerSetID): dl.download_pack(found) except KeyboardInterrupt: - log.warn("exiting: interrupted by keyboard") + log.warn('exiting: interrupted by keyboard') client.disconnect() +class DialogProcessor: + _log = logging.getLogger('gentoobot.messages') + def __init__(self, client, dialog, downloader, packdownloader): + self.client = client + self.downloader = downloader + self.packdownloader = packdownloader + self.dialog = dialog + async def process(self): + async for message in self.client.iter_messages(self.dialog): + asyncio.create_task(self.check_message(message)) + + async def check_message(self, message): + try: + if not hasattr(message, 'media'): + self._log.debug("message %i has no media", message.id) + return + if not hasattr(message.media, 'document'): + self._log.debug('message %i has no documents', message.id) + return + for a in message.media.document.attributes: + if hasattr(a, 'stickerset'): + self._log.debug("document %s is a sticker", message.media.document.id) + await self.downloader.download(message.media.document) + if isinstance(a.stickerset, types.InputStickerSetID): + self._log.debug("document %s belongs to stickerset", message.media.document.id) + await self.packdownloader.download(a.stickerset) + except Exception as e: + self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) + +class ArchiveWalker: + _log = logging.getLogger('gentoobot.walker') + def __init__(self, client, archives): + self.client = client + self.archives = archives + + async def fetch_dialogs(self): + async for dialog in self.client.iter_dialogs(): + self._log.debug(dialog) + e = dialog.entity + for src in self.archives: + if e.id != src['id']: + continue + self._log.debug("dialog %s %s matches by id", e.id, e.title) + if src['type'] == 'Channel' and not isinstance(e, types.Channel): + self._log.debug("dialog %s is not a channel", e.id) + continue + elif src['type'] == 'User' and not isinstance(e, types.User): + continue + self._log.debug("dialog %s is not a user", e.id) + self._log.info(dialog.stringify()) + yield dialog, src + + async def walk(self): + async for dialog, options in self.fetch_dialogs(): + downloader = StickerDownloader(self.client, Path(options.get('destdir', 'downloads'))) + packdownloader = StickerPackDownloader(downloader) + processor = DialogProcessor(self.client, dialog, downloader, packdownloader) + asyncio.create_task(processor.process()) + +async def main(): + log.debug((await client.get_me()).stringify()) + walker = ArchiveWalker(client, cfg['sources']) + await walker.walk() + await client.run_until_disconnected() + + #################### -if __name__ == "__main__": - main() +if __name__ == '__main__': + cfg = load_config('config.yaml') + app_id = cfg['apps'][0]['id'] + app_hash = cfg['apps'][0]['hash'] + log.debug("client id %s hash %s", app_id, app_hash) + client = TelegramClient('session0'+str(app_id), app_id, app_hash) + with client: + client.loop.run_until_complete(main())