tg_gentoo_bot/bot.py
2023-02-07 09:55:37 +03:00

213 lines
8.3 KiB
Python

#!/usr/bin/env python3
import yaml
from telethon import TelegramClient, events, sync
import logging
from telethon import functions, types, errors
from pathlib import Path
import asyncio
logging.basicConfig(filename=str('bot.log'),level=logging.INFO)
log = logging.getLogger('gentoobot')
semaphore = asyncio.Semaphore(10)
class StickerPackDownloader:
_log = logging.getLogger('gentoobot.packdownloader')
def __init__(self, downloader):
self.seen=set()
self.client = downloader.client
self.downloader = downloader
self.downloads_root = Path(str(downloader.downloads_root))
self.downloads_root.mkdir(exist_ok=True)
self.sticker_downloads = self.downloads_root/'stickers'
self.sticker_downloads.mkdir(exist_ok=True)
async def download(self, inputpack):
if inputpack.id in self.seen:
return
self.seen.add(inputpack.id)
try:
req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0)
await for_online()
stickers = await self.client(req)
self._log.info("downloading stickerset %s %s", inputpack.id, stickers.set.title)
sticker_dir = self.sticker_downloads/str(inputpack.id)
sticker_dir.mkdir(exist_ok=True)
with open(str(sticker_dir/'meta.txt'), 'w') as meta:
meta.write(stickers.to_json())
self._log.debug("preparing to download whole stickerset %s", inputpack.id)
for doc in stickers.documents:
await self.downloader.download(doc)
except errors.rpcerrorlist.StickersetInvalidError:
self._log.warning("sadly, stickerset %s no longer exists", inputpack.id)
class StickerDownloader:
mimes={
'image/webp': 'webp',
'application/x-tgsticker': 'tgs',
'video/webm': 'webm'
}
_log = logging.getLogger('gentoobot.downloader')
def __init__(self, client, dl_root):
self.downloads_root = Path(str(dl_root))
self.downloads_root.mkdir(exist_ok=True)
self.sticker_downloads = self.downloads_root/'stickers'
self.sticker_downloads.mkdir(exist_ok=True)
self.seen_stickers = set()
self.pending_stickers = set()
self.client = client
@classmethod
def get_sticker_metadata(cls,document):
meta = {
'id': document.id,
'date': document.date
}
for a in document.attributes:
if isinstance(a, types.DocumentAttributeSticker):
if isinstance(a.stickerset, types.InputStickerSetID):
cls._log.debug("document %s is a normal sticker", document.id)
meta['pack'] = a.stickerset.id
if hasattr(a, 'alt'):
cls._log.debug("sticker has alt text %s", a.alt)
meta['alt'] = a.alt
return meta
async def download_task(self, document):
if document.id in self.seen_stickers:
return
self.seen_stickers.add(document.id)
try:
meta = self.get_sticker_metadata(document)
to_dir = self.sticker_downloads / str(meta.get('pack', 'inline'))
to_dir.mkdir(exist_ok=True)
to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, ''))
if not to_file.exists() or to_file.stat().st_size != document.size:
self._log.debug("downloading %s to %s", document.id, str(to_file))
await for_online()
async with semaphore:
await self.client.download_file(document, to_file)
self._log.info("downloaded %s to %s", document.id, str(to_file))
else:
self._log.debug("document %s already downloaded at %s", document.id, str(to_file))
except Exception as e:
self._log.critical("oops: ", exc_info=e)
self.seen_stickers.remove(document.id)
async def download(self, document):
asyncio.create_task(self.download_task(document))
def load_config(path):
log.debug("opening %s", repr(path))
with open(str(path)) as cfgstream:
cfg = yaml.safe_load(cfgstream)
log.debug(cfg)
return cfg
def make_client(nth):
cfg = load_config('config.yaml')
app_id = cfg['apps'][nth]['id']
app_hash = cfg['apps'][nth]['hash']
log.debug("client id %s hash %s", app_id, app_hash)
return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash)
class DialogProcessor:
_log = logging.getLogger('gentoobot.messages')
def __init__(self, client, dialog, task_group, destdir):
self.client = client
self.dialog = dialog
self.task_group = task_group
self.downloader = StickerDownloader(self.client, destdir)
self.packdownloader = StickerPackDownloader(self.downloader)
async def process(self):
async for message in self.client.iter_messages(self.dialog):
await for_online()
async with semaphore:
self._log.debug("starting task for message")
self.task_group.create_task(self.check_message(message))
async def check_message(self, message):
try:
if not hasattr(message, 'media'):
self._log.debug("message %i has no media", message.id)
return
if not hasattr(message.media, 'document'):
self._log.debug('message %i has no documents', message.id)
return
for a in message.media.document.attributes:
if hasattr(a, 'stickerset'):
self._log.debug("document %s is a sticker", message.media.document.id)
await for_online()
await self.downloader.download(message.media.document)
if isinstance(a.stickerset, types.InputStickerSetID):
self._log.debug("document %s belongs to stickerset", message.media.document.id)
await for_online()
await self.packdownloader.download(a.stickerset)
except Exception as e:
self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e)
class ArchiveWalker:
_log = logging.getLogger('gentoobot.walker')
def __init__(self, client, archives):
self.client = client
self.archives = archives
self.task_group = asyncio.TaskGroup()
async def fetch_dialogs(self):
async for dialog in self.client.iter_dialogs():
self._log.debug(dialog)
e = dialog.entity
for src in self.archives:
if e.id != src['id']:
continue
self._log.debug("dialog %s %s matches by id", e.id, e.title)
if src['type'] == 'Channel' and not isinstance(e, types.Channel):
self._log.debug("dialog %s is not a channel", e.id)
continue
elif src['type'] == 'User' and not isinstance(e, types.User):
continue
self._log.debug("dialog %s is not a user", e.id)
self._log.info(dialog.stringify())
yield dialog, src
async def walk(self):
async with self.task_group:
async for dialog, options in self.fetch_dialogs():
processor = DialogProcessor(self.client, dialog, self.task_group, Path(options.get('destdir', 'downloads')))
self._log.debug('starting dialog processor')
self.task_group.create_task(processor.process())
self._log.debug('walker finished')
async def for_online():
while not client.is_connected():
log.warn('disconnected')
await asyncio.sleep(10)
async def main():
try:
log.debug((await client.get_me()).stringify())
walker = ArchiveWalker(client, cfg['sources'])
await walker.walk()
except KeyboardInterrupt:
return
####################
if __name__ == '__main__':
cfg = load_config('config.yaml')
app_id = cfg['apps'][0]['id']
app_hash = cfg['apps'][0]['hash']
log.debug("client id %s hash %s", app_id, app_hash)
client = TelegramClient('session0'+str(app_id), app_id, app_hash)
with client:
client.loop.run_until_complete(main())