#!/usr/bin/env python3 import yaml from telethon import TelegramClient, events, sync import logging from telethon import functions, types, errors from pathlib import Path mimes={ "image/webp": "webp", "application/x-tgsticker": "tgs", "video/webm": "webm" } w = Path(".") wn = w / "env" wn.mkdir(exist_ok=True) wd = wn/"download" wd.mkdir(exist_ok=True) wdi = wd/"inline" wdi.mkdir(exist_ok=True) logging.basicConfig(filename=str(wn/"bot.log"),level=logging.DEBUG) log = logging.getLogger("gentoobot") dllog=logging.getLogger("gentoobot.download") def download_sticker(client, document): stickerdir, setid = get_sticker_setid(document) if stickerdir is None: return dldir = wd / stickerdir dldir.mkdir(exist_ok=True) dlpath = dldir / (str(document.id)+"."+mimes.get(document.mime_type, "")) if not dlpath.exists() or dlpath.stat().st_size != document.size: dllog.info("downloading %s to %s", document.id, str(dlpath)) client.download_file(document, dlpath) else: dllog.info("document %s already downloaded at %s", document.id, str(dlpath)) def get_sticker_setid(document): stickerset = None for a in document.attributes: if hasattr(a, "stickerset"): stickerset = a.stickerset if stickerset is None: dllog.debug("document %s is not a sticker", document.id) return None, None if isinstance(stickerset, types.InputStickerSetID): log.debug("document %s is a normal sticker", document.id) return str(stickerset.id), stickerset if isinstance(stickerset, types.InputStickerSetEmpty): dllog.debug("document %s is an inline sticker", document.id) return "inline", stickerset def fetch_dialogs(client): sticker_archives=list() for dialog in client.iter_dialogs(): log.debug(dialog) if dialog.entity.id in cfg["stickers"]["sources"]["dialogs"]: log.info(dialog.stringify()) yield dialog def process_archive(archive, stickerset_seen=set()): for msg in client.iter_messages(sticker_archive, limit=None): log.debug(msg) if not hasattr(msg, "media"): log.debug("message %i has no media", msg.id) continue if not hasattr(msg.media, "document"): log.debug("message %i has no documents", msg.id) continue try: dldir, setid = get_sticker_setid(msg.media.document) if dldir == "inline": download_sticker(client, msg.media.document) elif dldir is not None: download_sticker(client, msg.media.document) if setid.id not in stickerset_seen: log.info("preparing to download whole stickerset %s as %s", setid, dldir) stickerset_seen.add(setid.id) try: for doc in client(functions.messages.GetStickerSetRequest(stickerset=setid,hash=0)).documents: try: download_sticker(client, doc) except Exception as e: log.critical("oops: %s", exc_info=e) except errors.rpcerrorlist.StickersetInvalidError: log.warning("sadly, stickerset %s no longer exists", setid.id) except Exception as e: log.error("somethin wrong happened during checking message: %s", msg.stringify(), exc_info=e) #################### if __name__ == "__main__": log.debug("opening %s", repr("config/bot.yaml")) with open("config/bot.yaml") as cfgstream: cfg = yaml.safe_load(cfgstream) log.debug(cfg) app = cfg["apps"][0] log.debug("starting client with id %s and hash %s", app['id'], app['hash']) client = TelegramClient('env/gentoo_session', app['id'], app['hash']) client.start() log.debug(client.get_me().stringify()) try: for sticker_archive in fetch_dialogs(client): process_archive(sticker_archive) except KeyboardInterrupt: pass client.disconnect()