#!/usr/bin/env python3 import yaml from telethon import TelegramClient, events, sync import logging from telethon import functions, types, errors from pathlib import Path import asyncio logging.basicConfig(filename=str('bot.log'),level=logging.DEBUG) log = logging.getLogger('gentoobot') class StickerPackDownloader: _log = logging.getLogger('gentoobot.packdownloader') def __init__(self, downloader): self.seen=set() self.client = downloader.client self.downloader = downloader self.downloads_root = Path(str(downloader.downloads_root)) self.downloads_root.mkdir(exist_ok=True) self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads.mkdir(exist_ok=True) async def download(self, inputpack): if inputpack.id not in self.seen: try: req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) stickers = await self.client(req) sticker_dir = self.sticker_downloads/str(inputpack.id) sticker_dir.mkdir(exist_ok=True) with open(str(sticker_dir/'meta.txt'), 'w') as meta: meta.write(stickers.to_json()) self._log.info("preparing to download whole stickerset %s", inputpack.id) for doc in stickers.documents: await self.downloader.download(doc) self.seen.add(inputpack.id) except errors.rpcerrorlist.StickersetInvalidError: self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) class StickerDownloader: mimes={ 'image/webp': 'webp', 'application/x-tgsticker': 'tgs', 'video/webm': 'webm' } _log = logging.getLogger('gentoobot.downloader') def __init__(self, client, dl_root): self.downloads_root = Path(str(dl_root)) self.downloads_root.mkdir(exist_ok=True) self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads.mkdir(exist_ok=True) self.seen_stickers = set() self.seen_packs = set() self.client = client @classmethod def get_sticker_metadata(cls,document): meta = { 'id': document.id, 'date': document.date } for a in document.attributes: if isinstance(a, types.DocumentAttributeSticker): if isinstance(a.stickerset, types.InputStickerSetID): cls._log.debug("document %s is a normal sticker", document.id) meta['pack'] = a.stickerset.id if hasattr(a, 'alt'): cls._log.debug("sticker has alt text %s", a.alt) meta['alt'] = a.alt return meta async def download(self, document): if document.id not in self.seen_stickers: try: meta = self.get_sticker_metadata(document) to_dir = self.sticker_downloads / str(meta.get('pack', 'inline')) to_dir.mkdir(exist_ok=True) to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, '')) if not to_file.exists() or to_file.stat().st_size != document.size: self._log.info("downloading %s to %s", document.id, str(to_file)) await self.client.download_file(document, to_file) self.seen_stickers.add(document.id) else: self._log.info("document %s already downloaded at %s", document.id, str(to_file)) except Exception as e: self._log.critical("oops: ", exc_info=e) def load_config(path): log.debug("opening %s", repr(path)) with open(str(path)) as cfgstream: cfg = yaml.safe_load(cfgstream) log.debug(cfg) return cfg def make_client(nth): cfg = load_config('config.yaml') app_id = cfg['apps'][nth]['id'] app_hash = cfg['apps'][nth]['hash'] log.debug("client id %s hash %s", app_id, app_hash) return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash) def main(): cfg = load_config('config.yaml') app_id = cfg['apps'][0]['id'] app_hash = cfg['apps'][0]['hash'] log.debug("client id %s hash %s", app_id, app_hash) client = TelegramClient('session0'+str(app_id), app_id, app_hash) client.start() log.debug(client.get_me().stringify()) dl = StickerDownloader(client, './download') try: for found in iter_sources(client, cfg['sources']): if isinstance(found, types.Document): dl.download(found) elif isinstance(found, types.InputStickerSetID): dl.download_pack(found) except KeyboardInterrupt: log.warn('exiting: interrupted by keyboard') client.disconnect() class DialogProcessor: _log = logging.getLogger('gentoobot.messages') def __init__(self, client, dialog, downloader, packdownloader): self.client = client self.downloader = downloader self.packdownloader = packdownloader self.dialog = dialog async def process(self): async for message in self.client.iter_messages(self.dialog): asyncio.create_task(self.check_message(message)) async def check_message(self, message): try: if not hasattr(message, 'media'): self._log.debug("message %i has no media", message.id) return if not hasattr(message.media, 'document'): self._log.debug('message %i has no documents', message.id) return for a in message.media.document.attributes: if hasattr(a, 'stickerset'): self._log.debug("document %s is a sticker", message.media.document.id) await self.downloader.download(message.media.document) if isinstance(a.stickerset, types.InputStickerSetID): self._log.debug("document %s belongs to stickerset", message.media.document.id) await self.packdownloader.download(a.stickerset) except Exception as e: self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) class ArchiveWalker: _log = logging.getLogger('gentoobot.walker') def __init__(self, client, archives): self.client = client self.archives = archives async def fetch_dialogs(self): async for dialog in self.client.iter_dialogs(): self._log.debug(dialog) e = dialog.entity for src in self.archives: if e.id != src['id']: continue self._log.debug("dialog %s %s matches by id", e.id, e.title) if src['type'] == 'Channel' and not isinstance(e, types.Channel): self._log.debug("dialog %s is not a channel", e.id) continue elif src['type'] == 'User' and not isinstance(e, types.User): continue self._log.debug("dialog %s is not a user", e.id) self._log.info(dialog.stringify()) yield dialog, src async def walk(self): async for dialog, options in self.fetch_dialogs(): downloader = StickerDownloader(self.client, Path(options.get('destdir', 'downloads'))) packdownloader = StickerPackDownloader(downloader) processor = DialogProcessor(self.client, dialog, downloader, packdownloader) asyncio.create_task(processor.process()) async def main(): log.debug((await client.get_me()).stringify()) walker = ArchiveWalker(client, cfg['sources']) await walker.walk() await client.run_until_disconnected() #################### if __name__ == '__main__': cfg = load_config('config.yaml') app_id = cfg['apps'][0]['id'] app_hash = cfg['apps'][0]['hash'] log.debug("client id %s hash %s", app_id, app_hash) client = TelegramClient('session0'+str(app_id), app_id, app_hash) with client: client.loop.run_until_complete(main())