diff --git a/.gitignore b/.gitignore index 66ae6d3..87cdcef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ /config /env +/config.yaml +/download +/bot.log +/session* diff --git a/bot.py b/bot.py index dcb45a5..d9582ba 100644 --- a/bot.py +++ b/bot.py @@ -5,86 +5,146 @@ import logging from telethon import functions, types, errors from pathlib import Path -mimes={ - "image/webp": "webp", - "application/x-tgsticker": "tgs", - "video/webm": "webm" - } -w = Path(".") -wn = w / "env" -wn.mkdir(exist_ok=True) -wd = wn/"download" -wd.mkdir(exist_ok=True) -wdi = wd/"inline" -wdi.mkdir(exist_ok=True) -logging.basicConfig(filename=str(wn/"bot.log"),level=logging.DEBUG) +logging.basicConfig(filename=str("bot.log"),level=logging.DEBUG) log = logging.getLogger("gentoobot") -dllog=logging.getLogger("gentoobot.download") -def download_sticker(client, document): - stickerdir, setid = get_sticker_setid(document) - if stickerdir is None: - return - dldir = wd / stickerdir - dldir.mkdir(exist_ok=True) - dlpath = dldir / (str(document.id)+"."+mimes.get(document.mime_type, "")) - if not dlpath.exists() or dlpath.stat().st_size != document.size: - dllog.info("downloading %s to %s", document.id, str(dlpath)) - client.download_file(document, dlpath) - else: - dllog.info("document %s already downloaded at %s", document.id, str(dlpath)) - -def get_sticker_setid(document): - stickerset = None - for a in document.attributes: - if hasattr(a, "stickerset"): - stickerset = a.stickerset - if stickerset is None: - dllog.debug("document %s is not a sticker", document.id) - return None, None - if isinstance(stickerset, types.InputStickerSetID): - log.debug("document %s is a normal sticker", document.id) - return str(stickerset.id), stickerset - if isinstance(stickerset, types.InputStickerSetEmpty): - dllog.debug("document %s is an inline sticker", document.id) - return "inline", stickerset - -def fetch_dialogs(client): - sticker_archives=list() - for dialog in client.iter_dialogs(): +def fetch_dialogs(dialogs, sources): + for dialog in dialogs: log.debug(dialog) - if dialog.entity.id in cfg["stickers"]["sources"]["dialogs"]: + e = dialog.entity + for src in sources: + if e.id != src["id"]: + continue + log.debug('dialog %s %s matches by id', e.id, e.title) + if src["type"] == "Channel" and not isinstance(e, types.Channel): + log.debug('dialog %s is not a channel', e.id) + continue + elif src["type"] == "User" and not isinstance(e, types.User): + continue + log.debug('dialog %s is not a user', e.id) log.info(dialog.stringify()) yield dialog -def process_archive(archive, stickerset_seen=set()): - for msg in client.iter_messages(sticker_archive, limit=None): - log.debug(msg) - if not hasattr(msg, "media"): - log.debug("message %i has no media", msg.id) - continue - if not hasattr(msg.media, "document"): - log.debug("message %i has no documents", msg.id) - continue - try: - dldir, setid = get_sticker_setid(msg.media.document) - if dldir == "inline": - download_sticker(client, msg.media.document) - elif dldir is not None: - download_sticker(client, msg.media.document) - if setid.id not in stickerset_seen: - log.info("preparing to download whole stickerset %s as %s", setid, dldir) - stickerset_seen.add(setid.id) - try: - for doc in client(functions.messages.GetStickerSetRequest(stickerset=setid,hash=0)).documents: - try: - download_sticker(client, doc) - except Exception as e: - log.critical("oops: %s", exc_info=e) - except errors.rpcerrorlist.StickersetInvalidError: - log.warning("sadly, stickerset %s no longer exists", setid.id) - except Exception as e: - log.error("somethin wrong happened during checking message: %s", msg.stringify(), exc_info=e) +def iter_sources(client, sources,): + for dialog in fetch_dialogs(client.iter_dialogs(), sources): + for message in client.iter_messages(dialog, limit=None): + try: + if not hasattr(message, "media"): + log.debug("message %i has no media", message.id) + continue + if not hasattr(message.media, "document"): + log.debug("message %i has no documents", message.id) + continue + for a in message.media.document.attributes: + if hasattr(a, "stickerset"): + log.debug("document %s is a sticker", message.media.document.id) + yield message.media.document + if isinstance(a.stickerset, types.InputStickerSetID): + log.debug("document %s belongs to stickerset", message.media.document.id) + yield a.stickerset + except Exception as e: + log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) + + +class StickerDownloader: + mimes={ + "image/webp": "webp", + "application/x-tgsticker": "tgs", + "video/webm": "webm" + } + _log = logging.getLogger("gentoobot.downloader") + + def __init__(self, client, dl_root): + self.downloads_root = Path(str(dl_root)) + self.downloads_root.mkdir(exist_ok=True) + self.sticker_downloads = self.downloads_root/"stickers" + self.sticker_downloads.mkdir(exist_ok=True) + self.downloads_root.mkdir(exist_ok=True) + self.seen_stickers = set() + self.seen_packs = set() + self.client = client + + @classmethod + def get_sticker_metadata(cls,document): + meta = { + "id": document.id, + "date": document.date + } + for a in document.attributes: + if isinstance(a, types.DocumentAttributeSticker): + if isinstance(a.stickerset, types.InputStickerSetID): + cls._log.debug("document %s is a normal sticker", document.id) + meta["pack"] = a.stickerset.id + if hasattr(a, 'alt'): + cls._log.debug('sticker has alt text %s', a.alt) + meta["alt"] = a.alt + return meta + + def download(self, document): + if document.id not in self.seen_stickers: + try: + meta = self.get_sticker_metadata(document) + to_dir = self.sticker_downloads / str(meta.get("pack", "inline")) + to_dir.mkdir(exist_ok=True) + to_file = to_dir / (str(document.id)+"."+self.mimes.get(document.mime_type, "")) + if not to_file.exists() or to_file.stat().st_size != document.size: + self._log.info("downloading %s to %s", document.id, str(to_file)) + self.client.download_file(document, to_file) + self.seen_stickers.add(document.id) + else: + self._log.info("document %s already downloaded at %s", document.id, str(to_file)) + except Exception as e: + self._log.critical("oops: ", exc_info=e) + + + def download_pack(self, inputpack): + if inputpack.id not in self.seen_packs: + try: + req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) + stickers=self.client(req) + sticker_dir = self.sticker_downloads/str(inputpack.id) + sticker_dir.mkdir(exist_ok=True) + with open(str(sticker_dir/"meta.txt"), "w") as meta: + meta.write(stickers.to_json()) + self._log.info("preparing to download whole stickerset %s", inputpack.id) + for doc in stickers.documents: + self.download(doc) + self.seen_packs.add(inputpack.id) + except errors.rpcerrorlist.StickersetInvalidError: + self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) + +def load_config(path): + log.debug("opening %s", repr(path)) + with open(str(path)) as cfgstream: + cfg = yaml.safe_load(cfgstream) + log.debug(cfg) + return cfg + +def make_client(nth): + cfg = load_config("config.yaml") + app_id = cfg["apps"][nth]["id"] + app_hash = cfg["apps"][nth]['hash'] + log.debug("client id %s hash %s", app_id, app_hash) + return TelegramClient("session"+str(nth)+str(app_id), app_id, app_hash) + +def main(): + cfg = load_config("config.yaml") + app_id = cfg["apps"][0]["id"] + app_hash = cfg["apps"][0]['hash'] + log.debug("client id %s hash %s", app_id, app_hash) + client = TelegramClient("session0"+str(app_id), app_id, app_hash) + client.start() + log.debug(client.get_me().stringify()) + dl = StickerDownloader(client, "./download") + try: + for found in iter_sources(client, cfg["sources"]): + if isinstance(found, types.Document): + dl.download(found) + elif isinstance(found, types.InputStickerSetID): + dl.download_pack(found) + except KeyboardInterrupt: + log.warn("exiting: interrupted by keyboard") + client.disconnect() @@ -92,18 +152,4 @@ def process_archive(archive, stickerset_seen=set()): if __name__ == "__main__": - log.debug("opening %s", repr("config/bot.yaml")) - with open("config/bot.yaml") as cfgstream: - cfg = yaml.safe_load(cfgstream) - log.debug(cfg) - app = cfg["apps"][0] - log.debug("starting client with id %s and hash %s", app['id'], app['hash']) - client = TelegramClient('env/gentoo_session', app['id'], app['hash']) - client.start() - log.debug(client.get_me().stringify()) - try: - for sticker_archive in fetch_dialogs(client): - process_archive(sticker_archive) - except KeyboardInterrupt: - pass - client.disconnect() + main()