#!/usr/bin/env python3 import yaml from telethon import TelegramClient, events, sync import logging from telethon import functions, types, errors from pathlib import Path import asyncio logging.basicConfig(filename=str('bot.log'),level=logging.INFO) log = logging.getLogger('gentoobot') semaphore = asyncio.Semaphore(10) class StickerPackDownloader: _log = logging.getLogger('gentoobot.packdownloader') def __init__(self, downloader): self.seen=set() self.client = downloader.client self.downloader = downloader self.downloads_root = Path(str(downloader.downloads_root)) self.downloads_root.mkdir(exist_ok=True) self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads.mkdir(exist_ok=True) async def download(self, inputpack): if inputpack.id in self.seen: return self.seen.add(inputpack.id) try: req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) await for_online() stickers = await self.client(req) self._log.info("downloading stickerset %s %s", inputpack.id, stickers.set.title) sticker_dir = self.sticker_downloads/str(inputpack.id) sticker_dir.mkdir(exist_ok=True) with open(str(sticker_dir/'meta.txt'), 'w') as meta: meta.write(stickers.to_json()) self._log.debug("preparing to download whole stickerset %s", inputpack.id) for doc in stickers.documents: await self.downloader.download(doc) except errors.rpcerrorlist.StickersetInvalidError: self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) class StickerDownloader: mimes={ 'image/webp': 'webp', 'application/x-tgsticker': 'tgs', 'video/webm': 'webm' } _log = logging.getLogger('gentoobot.downloader') def __init__(self, client, dl_root): self.downloads_root = Path(str(dl_root)) self.downloads_root.mkdir(exist_ok=True) self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads.mkdir(exist_ok=True) self.seen_stickers = set() self.pending_stickers = set() self.client = client @classmethod def get_sticker_metadata(cls,document): meta = { 'id': document.id, 'date': document.date } for a in document.attributes: if isinstance(a, types.DocumentAttributeSticker): if isinstance(a.stickerset, types.InputStickerSetID): cls._log.debug("document %s is a normal sticker", document.id) meta['pack'] = a.stickerset.id if hasattr(a, 'alt'): cls._log.debug("sticker has alt text %s", a.alt) meta['alt'] = a.alt return meta async def download_task(self, document): if document.id in self.seen_stickers: return self.seen_stickers.add(document.id) try: meta = self.get_sticker_metadata(document) to_dir = self.sticker_downloads / str(meta.get('pack', 'inline')) to_dir.mkdir(exist_ok=True) to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, '')) if not to_file.exists() or to_file.stat().st_size != document.size: self._log.debug("downloading %s to %s", document.id, str(to_file)) await for_online() async with semaphore: await self.client.download_file(document, to_file) self._log.info("downloaded %s to %s", document.id, str(to_file)) else: self._log.debug("document %s already downloaded at %s", document.id, str(to_file)) except Exception as e: self._log.critical("oops: ", exc_info=e) self.seen_stickers.remove(document.id) async def download(self, document): asyncio.create_task(self.download_task(document)) def load_config(path): log.debug("opening %s", repr(path)) with open(str(path)) as cfgstream: cfg = yaml.safe_load(cfgstream) log.debug(cfg) return cfg def make_client(nth): cfg = load_config('config.yaml') app_id = cfg['apps'][nth]['id'] app_hash = cfg['apps'][nth]['hash'] log.debug("client id %s hash %s", app_id, app_hash) return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash) class DialogProcessor: _log = logging.getLogger('gentoobot.messages') def __init__(self, client, dialog, task_group, destdir): self.client = client self.dialog = dialog self.task_group = task_group self.downloader = StickerDownloader(self.client, destdir) self.packdownloader = StickerPackDownloader(self.downloader) async def process(self): async for message in self.client.iter_messages(self.dialog): await for_online() async with semaphore: self._log.debug("starting task for message") self.task_group.create_task(self.check_message(message)) async def check_message(self, message): try: if not hasattr(message, 'media'): self._log.debug("message %i has no media", message.id) return if not hasattr(message.media, 'document'): self._log.debug('message %i has no documents', message.id) return for a in message.media.document.attributes: if hasattr(a, 'stickerset'): self._log.debug("document %s is a sticker", message.media.document.id) await for_online() await self.downloader.download(message.media.document) if isinstance(a.stickerset, types.InputStickerSetID): self._log.debug("document %s belongs to stickerset", message.media.document.id) await for_online() await self.packdownloader.download(a.stickerset) except Exception as e: self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) class ArchiveWalker: _log = logging.getLogger('gentoobot.walker') def __init__(self, client, archives): self.client = client self.archives = archives self.task_group = asyncio.TaskGroup() async def fetch_dialogs(self): async for dialog in self.client.iter_dialogs(): self._log.debug(dialog) e = dialog.entity for src in self.archives: if e.id != src['id']: continue self._log.debug("dialog %s %s matches by id", e.id, e.title) if src['type'] == 'Channel' and not isinstance(e, types.Channel): self._log.debug("dialog %s is not a channel", e.id) continue elif src['type'] == 'User' and not isinstance(e, types.User): continue self._log.debug("dialog %s is not a user", e.id) self._log.info(dialog.stringify()) yield dialog, src async def walk(self): async with self.task_group: async for dialog, options in self.fetch_dialogs(): processor = DialogProcessor(self.client, dialog, self.task_group, Path(options.get('destdir', 'downloads'))) self._log.debug('starting dialog processor') self.task_group.create_task(processor.process()) self._log.debug('walker finished') async def for_online(): while not client.is_connected(): log.warn('disconnected') await asyncio.sleep(10) async def main(): try: log.debug((await client.get_me()).stringify()) walker = ArchiveWalker(client, cfg['sources']) await walker.walk() except KeyboardInterrupt: return #################### if __name__ == '__main__': cfg = load_config('config.yaml') app_id = cfg['apps'][0]['id'] app_hash = cfg['apps'][0]['hash'] log.debug("client id %s hash %s", app_id, app_hash) client = TelegramClient('session0'+str(app_id), app_id, app_hash) with client: client.loop.run_until_complete(main())