third version

* added ratelimiting and task grouping
* bumped python to 3.11
This commit is contained in:
Aleksey Chubukov 2023-02-07 09:06:58 +03:00
parent ec8008c29e
commit d9fb692eeb
2 changed files with 74 additions and 65 deletions

View File

@ -10,7 +10,7 @@ telethon = "*"
[dev-packages] [dev-packages]
[requires] [requires]
python_version = "3.9" python_version = "3.11"
[scripts] [scripts]
bot = "python3 ./bot.py" bot = "python3 ./bot.py"

137
bot.py
View File

@ -6,10 +6,9 @@ from telethon import functions, types, errors
from pathlib import Path from pathlib import Path
import asyncio import asyncio
logging.basicConfig(filename=str('bot.log'),level=logging.DEBUG) logging.basicConfig(filename=str('bot.log'),level=logging.INFO)
log = logging.getLogger('gentoobot') log = logging.getLogger('gentoobot')
semaphore = asyncio.Semaphore(10)
class StickerPackDownloader: class StickerPackDownloader:
_log = logging.getLogger('gentoobot.packdownloader') _log = logging.getLogger('gentoobot.packdownloader')
@ -23,20 +22,23 @@ class StickerPackDownloader:
self.sticker_downloads.mkdir(exist_ok=True) self.sticker_downloads.mkdir(exist_ok=True)
async def download(self, inputpack): async def download(self, inputpack):
if inputpack.id not in self.seen: if inputpack.id in self.seen:
try: return
req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0) self.seen.add(inputpack.id)
stickers = await self.client(req) try:
sticker_dir = self.sticker_downloads/str(inputpack.id) req = functions.messages.GetStickerSetRequest(stickerset=inputpack,hash=0)
sticker_dir.mkdir(exist_ok=True) await for_online()
with open(str(sticker_dir/'meta.txt'), 'w') as meta: stickers = await self.client(req)
meta.write(stickers.to_json()) self._log.info("downloadding stickerset %s %s", inputpack.id, stickers.set.title)
self._log.info("preparing to download whole stickerset %s", inputpack.id) sticker_dir = self.sticker_downloads/str(inputpack.id)
for doc in stickers.documents: sticker_dir.mkdir(exist_ok=True)
await self.downloader.download(doc) with open(str(sticker_dir/'meta.txt'), 'w') as meta:
self.seen.add(inputpack.id) meta.write(stickers.to_json())
except errors.rpcerrorlist.StickersetInvalidError: self._log.debug("preparing to download whole stickerset %s", inputpack.id)
self._log.warning("sadly, stickerset %s no longer exists", inputpack.id) for doc in stickers.documents:
await self.downloader.download(doc)
except errors.rpcerrorlist.StickersetInvalidError:
self._log.warning("sadly, stickerset %s no longer exists", inputpack.id)
class StickerDownloader: class StickerDownloader:
mimes={ mimes={
@ -52,7 +54,7 @@ class StickerDownloader:
self.sticker_downloads = self.downloads_root/'stickers' self.sticker_downloads = self.downloads_root/'stickers'
self.sticker_downloads.mkdir(exist_ok=True) self.sticker_downloads.mkdir(exist_ok=True)
self.seen_stickers = set() self.seen_stickers = set()
self.seen_packs = set() self.pending_stickers = set()
self.client = client self.client = client
@classmethod @classmethod
@ -71,21 +73,30 @@ class StickerDownloader:
meta['alt'] = a.alt meta['alt'] = a.alt
return meta return meta
async def download(self, document): async def download_task(self, document):
if document.id not in self.seen_stickers: if document.id in self.seen_stickers:
try: return
meta = self.get_sticker_metadata(document) self.seen_stickers.add(document.id)
to_dir = self.sticker_downloads / str(meta.get('pack', 'inline')) try:
to_dir.mkdir(exist_ok=True) meta = self.get_sticker_metadata(document)
to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, '')) to_dir = self.sticker_downloads / str(meta.get('pack', 'inline'))
if not to_file.exists() or to_file.stat().st_size != document.size: to_dir.mkdir(exist_ok=True)
self._log.info("downloading %s to %s", document.id, str(to_file)) to_file = to_dir / (str(document.id)+'.'+self.mimes.get(document.mime_type, ''))
if not to_file.exists() or to_file.stat().st_size != document.size:
self._log.debug("downloading %s to %s", document.id, str(to_file))
await for_online()
async with semaphore:
await self.client.download_file(document, to_file) await self.client.download_file(document, to_file)
self.seen_stickers.add(document.id) self._log.info("downloaded %s to %s", document.id, str(to_file))
else: else:
self._log.info("document %s already downloaded at %s", document.id, str(to_file)) self._log.debug("document %s already downloaded at %s", document.id, str(to_file))
except Exception as e: except Exception as e:
self._log.critical("oops: ", exc_info=e) self._log.critical("oops: ", exc_info=e)
self.seen_stickers.remove(document.id)
async def download(self, document):
asyncio.create_task(self.download_task(document))
@ -103,36 +114,21 @@ def make_client(nth):
log.debug("client id %s hash %s", app_id, app_hash) log.debug("client id %s hash %s", app_id, app_hash)
return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash) return TelegramClient('session'+str(nth)+str(app_id), app_id, app_hash)
def main():
cfg = load_config('config.yaml')
app_id = cfg['apps'][0]['id']
app_hash = cfg['apps'][0]['hash']
log.debug("client id %s hash %s", app_id, app_hash)
client = TelegramClient('session0'+str(app_id), app_id, app_hash)
client.start()
log.debug(client.get_me().stringify())
dl = StickerDownloader(client, './download')
try:
for found in iter_sources(client, cfg['sources']):
if isinstance(found, types.Document):
dl.download(found)
elif isinstance(found, types.InputStickerSetID):
dl.download_pack(found)
except KeyboardInterrupt:
log.warn('exiting: interrupted by keyboard')
client.disconnect()
class DialogProcessor: class DialogProcessor:
_log = logging.getLogger('gentoobot.messages') _log = logging.getLogger('gentoobot.messages')
def __init__(self, client, dialog, downloader, packdownloader): def __init__(self, client, dialog, task_group, destdir):
self.client = client self.client = client
self.downloader = downloader
self.packdownloader = packdownloader
self.dialog = dialog self.dialog = dialog
self.task_group = task_group
self.downloader = StickerDownloader(self.client, destdir)
self.packdownloader = StickerPackDownloader(self.downloader)
async def process(self): async def process(self):
async for message in self.client.iter_messages(self.dialog): async for message in self.client.iter_messages(self.dialog):
asyncio.create_task(self.check_message(message)) await for_online()
async with semaphore:
self._log.debug("starting task for message")
self.task_group.create_task(self.check_message(message))
async def check_message(self, message): async def check_message(self, message):
try: try:
@ -145,9 +141,11 @@ class DialogProcessor:
for a in message.media.document.attributes: for a in message.media.document.attributes:
if hasattr(a, 'stickerset'): if hasattr(a, 'stickerset'):
self._log.debug("document %s is a sticker", message.media.document.id) self._log.debug("document %s is a sticker", message.media.document.id)
await for_online()
await self.downloader.download(message.media.document) await self.downloader.download(message.media.document)
if isinstance(a.stickerset, types.InputStickerSetID): if isinstance(a.stickerset, types.InputStickerSetID):
self._log.debug("document %s belongs to stickerset", message.media.document.id) self._log.debug("document %s belongs to stickerset", message.media.document.id)
await for_online()
await self.packdownloader.download(a.stickerset) await self.packdownloader.download(a.stickerset)
except Exception as e: except Exception as e:
self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e) self._log.error("somethin wrong happened during checking message: %s", message.stringify(), exc_info=e)
@ -157,6 +155,7 @@ class ArchiveWalker:
def __init__(self, client, archives): def __init__(self, client, archives):
self.client = client self.client = client
self.archives = archives self.archives = archives
self.task_group = asyncio.TaskGroup()
async def fetch_dialogs(self): async def fetch_dialogs(self):
async for dialog in self.client.iter_dialogs(): async for dialog in self.client.iter_dialogs():
@ -176,17 +175,27 @@ class ArchiveWalker:
yield dialog, src yield dialog, src
async def walk(self): async def walk(self):
async for dialog, options in self.fetch_dialogs(): async with self.task_group:
downloader = StickerDownloader(self.client, Path(options.get('destdir', 'downloads'))) async for dialog, options in self.fetch_dialogs():
packdownloader = StickerPackDownloader(downloader) processor = DialogProcessor(self.client, dialog, self.task_group, Path(options.get('destdir', 'downloads')))
processor = DialogProcessor(self.client, dialog, downloader, packdownloader) self._log.debug('starting dialog processor')
asyncio.create_task(processor.process()) self.task_group.create_task(processor.process())
self._log.debug('walker finished')
async def for_online():
while not client.is_connected():
log.warn('disconnected')
await asyncio.sleep(10)
async def main(): async def main():
log.debug((await client.get_me()).stringify()) try:
walker = ArchiveWalker(client, cfg['sources']) log.debug((await client.get_me()).stringify())
await walker.walk() walker = ArchiveWalker(client, cfg['sources'])
await client.run_until_disconnected() await walker.walk()
except KeyboardInterrupt:
return