tg_gentoo_bot/bot.py

156 lines
6.1 KiB
Python
Raw Normal View History

2022-09-27 17:53:53 +00:00
#!/usr/bin/env python3
import yaml
from telethon import TelegramClient, events, sync
import logging
from telethon import functions, types, errors
from pathlib import Path
logging.basicConfig(filename=str("bot.log"),level=logging.DEBUG)
2022-09-27 17:53:53 +00:00
log = logging.getLogger("gentoobot")
def fetch_dialogs(dialogs, sources):
for dialog in dialogs:
2022-09-27 17:53:53 +00:00
log.debug(dialog)
e = dialog.entity
for src in sources:
if e.id != src["id"]:
continue
log.debug('dialog %s %s matches by id', e.id, e.title)
if src["type"] == "Channel" and not isinstance(e, types.Channel):
log.debug('dialog %s is not a channel', e.id)
continue
elif src["type"] == "User" and not isinstance(e, types.User):
continue
log.debug('dialog %s is not a user', e.id)
2022-09-27 17:53:53 +00:00
log.info(dialog.stringify())
yield dialog
def iter_sources(client, sources,):
for dialog in fetch_dialogs(client.iter_dialogs(), sources):
for message in client.iter_messages(dialog, limit=None):
try:
if not hasattr(message, "media"):
log.debug("message %i has no media", message.id)
continue
if not hasattr(message.media, "document"):
log.debug("message %i has no documents", message.id)
continue
for a in message.media.document.attributes:
if hasattr(a, "stickerset"):
log.debug("document %s is a sticker", message.media.document.id)
yield message.media.document
if isinstance(a.stickerset, types.InputStickerSetID):
log.debug("document %s belongs to stickerset", message.media.document.id)
yield a.stickerset
except Exception as e:
log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e)
2022-09-27 17:53:53 +00:00
class StickerDownloader:
mimes={
"image/webp": "webp",
"application/x-tgsticker": "tgs",
"video/webm": "webm"
}
_log = logging.getLogger("gentoobot.downloader")
2022-09-27 17:53:53 +00:00
def __init__(self, client, dl_root):
self.downloads_root = Path(str(dl_root))
self.downloads_root.mkdir(exist_ok=True)
self.sticker_downloads = self.downloads_root/"stickers"
self.sticker_downloads.mkdir(exist_ok=True)
self.downloads_root.mkdir(exist_ok=True)
self.seen_stickers = set()
self.seen_packs = set()
self.client = client
2022-09-27 17:53:53 +00:00
@classmethod
def get_sticker_metadata(cls,document):
meta = {
"id": document.id,
"date": document.date
}
for a in document.attributes:
if isinstance(a, types.DocumentAttributeSticker):
if isinstance(a.stickerset, types.InputStickerSetID):
cls._log.debug("document %s is a normal sticker", document.id)
meta["pack"] = a.stickerset.id
if hasattr(a, 'alt'):
cls._log.debug('sticker has alt text %s', a.alt)
meta["alt"] = a.alt
return meta
2022-09-27 17:53:53 +00:00
def download(self, document):
if document.id not in self.seen_stickers:
try:
meta = self.get_sticker_metadata(document)
to_dir = self.sticker_downloads / str(meta.get("pack", "inline"))
to_dir.mkdir(exist_ok=True)
to_file = to_dir / (str(document.id)+"."+self.mimes.get(document.mime_type, ""))
if not to_file.exists() or to_file.stat().st_size != document.size:
self._log.info("downloading %s to %s", document.id, str(to_file))
self.client.download_file(document, to_file)
self.seen_stickers.add(document.id)
else:
self._log.info("document %s already downloaded at %s", document.id, str(to_file))
except Exception as e:
self._log.critical("oops: ", exc_info=e)
def download_pack(self, inputpack):
if inputpack.id not in self.seen_packs:
try:
req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0)
stickers=self.client(req)
sticker_dir = self.sticker_downloads/str(inputpack.id)
sticker_dir.mkdir(exist_ok=True)
with open(str(sticker_dir/"meta.txt"), "w") as meta:
meta.write(stickers.to_json())
self._log.info("preparing to download whole stickerset %s", inputpack.id)
for doc in stickers.documents:
self.download(doc)
self.seen_packs.add(inputpack.id)
except errors.rpcerrorlist.StickersetInvalidError:
self._log.warning("sadly, stickerset %s no longer exists", inputpack.id)
def load_config(path):
log.debug("opening %s", repr(path))
with open(str(path)) as cfgstream:
2022-10-29 12:48:03 +00:00
cfg = yaml.safe_load(cfgstream)
log.debug(cfg)
return cfg
def make_client(nth):
cfg = load_config("config.yaml")
app_id = cfg["apps"][nth]["id"]
app_hash = cfg["apps"][nth]['hash']
log.debug("client id %s hash %s", app_id, app_hash)
return TelegramClient("session"+str(nth)+str(app_id), app_id, app_hash)
def main():
cfg = load_config("config.yaml")
app_id = cfg["apps"][0]["id"]
app_hash = cfg["apps"][0]['hash']
log.debug("client id %s hash %s", app_id, app_hash)
client = TelegramClient("session0"+str(app_id), app_id, app_hash)
2022-10-29 12:48:03 +00:00
client.start()
log.debug(client.get_me().stringify())
dl = StickerDownloader(client, "./download")
2022-10-29 12:48:03 +00:00
try:
for found in iter_sources(client, cfg["sources"]):
if isinstance(found, types.Document):
dl.download(found)
elif isinstance(found, types.InputStickerSetID):
dl.download_pack(found)
2022-10-29 12:48:03 +00:00
except KeyboardInterrupt:
log.warn("exiting: interrupted by keyboard")
2022-10-29 12:48:03 +00:00
client.disconnect()
####################
if __name__ == "__main__":
main()