diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index cb18c126..67d83c03 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -95,12 +95,13 @@ class CrawlConfig(BaseMongoModel): crawlTimeout: Optional[int] = 0 + crawlCount: Optional[int] = 0 + # ============================================================================ -class TriggerCrawl(BaseModel): - """ Crawl trigger from internal scheduler """ +class UpdateSchedule(BaseModel): + """ Update the crawl schedule """ - id: str schedule: str @@ -138,20 +139,21 @@ class CrawlOps: ) return result - async def update_crawl_config( - self, cid: str, config: CrawlConfigIn, archive: Archive, user: User - ): - """ Update existing crawl config""" - data = config.dict() - data["archive"] = archive.id - data["user"] = str(user.id) - data["_id"] = cid + async def update_crawl_schedule(self, cid: str, update: UpdateSchedule): + """ Update schedule for existing crawl config""" + if not await self.crawl_configs.find_one_and_update( + {"_id": cid}, {"$set": {"schedule": update.schedule}} + ): + return None - await self.crawl_configs.find_one_and_replace({"_id": cid}, data) + await self.crawl_manager.update_crawl_schedule(cid, update.schedule) + return True - crawlconfig = CrawlConfig.from_dict(data) - - await self.crawl_manager.update_crawl_config(crawlconfig) + async def inc_crawls(self, cid: str): + """ Increment Crawl Counter """ + await self.crawl_configs.find_one_and_update( + {"_id": cid}, {"$inc": {"crawlCount": 1}} + ) async def get_crawl_configs(self, archive: Archive): """Get all crawl configs for an archive is a member of""" @@ -179,7 +181,7 @@ class CrawlOps: # ============================================================================ # pylint: disable=redefined-builtin,invalid-name,too-many-locals -def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): +def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): """Init /crawlconfigs api routes""" ops = CrawlOps(mdb, archive_ops, crawl_manager) @@ -214,16 +216,18 @@ def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): res = await ops.add_crawl_config(config, archive, user) return {"added": str(res.inserted_id)} - @router.patch("/{cid}") - async def update_crawl_config( - config: CrawlConfigIn, + @router.patch("/{cid}/schedule", dependencies=[Depends(archive_crawl_dep)]) + async def update_crawl_schedule( + update: UpdateSchedule, cid: str, - archive: Archive = Depends(archive_crawl_dep), - user: User = Depends(user_dep), ): try: - await ops.update_crawl_config(cid, config, archive, user) + if not await ops.update_crawl_schedule(cid, update): + raise HTTPException( + status_code=404, detail=f"Crawl Config '{cid}' not found" + ) + except Exception as e: # pylint: disable=raise-missing-from raise HTTPException( @@ -250,13 +254,6 @@ def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): return {"started": crawl_id} - @app.post("/crawls/trigger", tags=["crawlconfigs"]) - async def trigger_crawl(trigger: TriggerCrawl): - await crawl_manager.run_crawl_config( - trigger.id, manual=False, schedule=trigger.schedule - ) - return {} - @router.delete("") async def delete_crawl_configs(archive: Archive = Depends(archive_crawl_dep)): result = await ops.delete_crawl_configs(archive) diff --git a/backend/crawls.py b/backend/crawls.py index 768aabe4..1290e656 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -60,9 +60,10 @@ class CrawlCompleteIn(BaseModel): class CrawlOps: """ Crawl Ops """ - def __init__(self, mdb, crawl_manager, archives): + def __init__(self, mdb, crawl_manager, crawl_configs, archives): self.crawls = mdb["crawls"] self.crawl_manager = crawl_manager + self.crawl_configs = crawl_configs self.archives = archives self.crawl_manager.set_crawl_ops(self) @@ -96,6 +97,8 @@ class CrawlOps: await self.archives.inc_usage(crawl.aid, dura) + await self.crawl_configs.inc_crawls(crawl.cid) + async def list_crawls(self, aid: str, cid: str = None): """Get all crawl configs for an archive is a member of""" query = {"aid": aid} @@ -115,10 +118,10 @@ class CrawlOps: # ============================================================================ -def init_crawls_api(app, mdb, crawl_manager, archives): +def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives): """ API for crawl management, including crawl done callback""" - ops = CrawlOps(mdb, crawl_manager, archives) + ops = CrawlOps(mdb, crawl_manager, crawl_config_ops, archives) archive_crawl_dep = archives.archive_crawl_dep diff --git a/backend/dockerman.py b/backend/dockerman.py index bb6b2ae2..7b2ef101 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -5,6 +5,7 @@ Docker crawl manager import tarfile import os import json +import time import asyncio from datetime import datetime @@ -12,9 +13,12 @@ from io import BytesIO from tempfile import NamedTemporaryFile import aiodocker +import aioprocessing from crawls import Crawl +from scheduler import run_scheduler + # ============================================================================ class DockerManager: @@ -30,10 +34,33 @@ class DockerManager: self.archive_ops = archive_ops self.crawl_ops = None - self.loop = asyncio.get_running_loop() - self.loop.create_task(self.run_event_loop()) - self.extra_crawl_params = extra_crawl_params or [] + self._event_q = None + + self.loop = asyncio.get_running_loop() + + self.loop.create_task(self.run_event_loop()) + self.loop.create_task(self.init_trigger_queue()) + self.loop.create_task(self.cleanup_loop()) + + # pylint: disable=no-member + async def init_trigger_queue(self): + """ Crawl trigger queue from separate scheduling process """ + self._event_q = aioprocessing.AioQueue() + _trigger_q = aioprocessing.AioQueue() + + self.sched = aioprocessing.AioProcess( + target=run_scheduler, args=(self._event_q, _trigger_q) + ) + self.sched.start() + + while True: + try: + result = await _trigger_q.coro_get() + self.loop.create_task(self.run_crawl_config(manual=False, **result)) + # pylint: disable=broad-except + except Exception as exc: + print(f"Error trigger crawl: {exc}") async def run_event_loop(self): """ Run Docker event loop""" @@ -49,6 +76,46 @@ class DockerManager: if event["Action"] == "die": self.loop.create_task(self._handle_container_die(event["Actor"])) + async def cleanup_loop(self): + """Clean-up any orphaned crawler images that are not running. + Stop containers whose crawlTimeout has been exceeded""" + + while True: + # cleanup orphaned + results = await self.client.containers.list( + filters=json.dumps( + { + "label": ["btrix.crawlconfig"], + "status": ["exited"], + "exited": ["1"], + } + ) + ) + + for container in results: + print(f"Cleaning Up Orphan Container {container['Id']}", flush=True) + await container.delete() + + results = await self.client.containers.list( + filters=json.dumps( + { + "label": ["btrix.timeout"], + "status": ["running"], + } + ) + ) + + for container in results: + timeout = int(container["Labels"]["btrix.timeout"]) + actual = int(time.time()) - int(container["Created"]) + if actual >= timeout: + print( + f"Crawl {container['Id']} running for {actual} seconds, exceeded timeout {timeout}, stopping..." + ) + await container.kill(signal="SIGTERM") + + await asyncio.sleep(30) + def set_crawl_ops(self, ops): """ set crawl ops """ self.crawl_ops = ops @@ -70,14 +137,17 @@ class DockerManager: "btrix.coll": crawlconfig.config.collection, } + if crawlconfig.crawlTimeout: + labels["btrix.timeout"] = str(crawlconfig.crawlTimeout) + # Create Config Volume volume = await self._create_volume(crawlconfig, labels) if crawlconfig.schedule: print("Scheduling...", flush=True) - await self._send_sched_msg( - {"type": "add", "id": crawlconfig.id, "schedule": crawlconfig.schedule} + await self._schedule_update( + cid=crawlconfig.id, schedule=crawlconfig.schedule ) if crawlconfig.runNow: @@ -87,22 +157,15 @@ class DockerManager: volume, ) - async def update_crawl_config(self, crawlconfig): - """ Only updating the schedule + run now """ + async def update_crawl_schedule(self, cid, schedule): + """ Update the schedule for existing crawl config """ - if crawlconfig.schedule: + if schedule: print("Updating Schedule..", flush=True) - await self._send_sched_msg( - {"type": "add", "id": crawlconfig.id, "schedule": crawlconfig.schedule} - ) + await self._schedule_update(cid=cid, schedule=schedule) else: - await self._send_sched_msg( - {"type": "remove", "id": crawlconfig.id} - ) - - if crawlconfig.runNow: - await self.run_crawl_config(crawlconfig.id) + await self._schedule_update(cid=cid, schedule="") async def list_running_crawls(self, aid): """ List running containers for this archive """ @@ -189,13 +252,11 @@ class DockerManager: async def delete_crawl_config_by_id(self, cid): """ Delete Crawl Config by Crawl Config Id""" - await self._delete_volume_by_labels( - filters={"label": [f"btrix.crawlconfig={cid}"]} - ) + await self._delete_volume_by_labels([f"btrix.crawlconfig={cid}"]) async def delete_crawl_configs_for_archive(self, aid): """ Delete Crawl Config by Archive Id""" - await self._delete_volume_by_labels(filters={"label": [f"btrix.archive={aid}"]}) + await self._delete_volume_by_labels([f"btrix.archive={aid}"]) # ======================================================================== async def _create_volume(self, crawlconfig, labels): @@ -242,19 +303,25 @@ class DockerManager: await container.delete() - async def _delete_volume_by_labels(self, filters): + async def _delete_volume_by_labels(self, labels): """ Delete Crawl Configs by specified filter """ + containers = await self._list_running_containers(labels) + if len(containers): + raise Exception("Cannot delete crawl config, in use for running crawl") + # pylint: disable=protected-access resp = await self.client._query_json( - "volumes", method="GET", params={"filters": json.dumps(filters)} + "volumes", + method="GET", + params={"filters": json.dumps({"label": labels})}, ) for volume in resp["Volumes"]: vol_obj = aiodocker.docker.DockerVolume(self.client, volume["Name"]) - await self._send_sched_msg( - {"type": "remove", "id": volume["Labels"]["btrix.crawlconfig"]} + await self._schedule_update( + cid=volume["Labels"]["btrix.crawlconfig"], schedule="" ) try: @@ -263,13 +330,8 @@ class DockerManager: except: print("Warning: Volume Delete Failed, Container in Use", flush=True) - async def _send_sched_msg(self, msg): - reader, writer = await asyncio.open_connection("scheduler", 9017) - writer.write(json.dumps(msg).encode("utf-8") + b"\n") - await writer.drain() - await reader.readline() - writer.close() - await writer.wait_closed() + async def _schedule_update(self, cid, schedule=""): + await self._event_q.coro_put({"cid": cid, "schedule": schedule}) # pylint: disable=too-many-arguments async def _run_crawl_now(self, storage, labels, volume, schedule="", manual=True): diff --git a/backend/k8sman.py b/backend/k8sman.py index 7fce030f..911f334a 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -169,10 +169,8 @@ class K8SManager: return cron_job - async def update_crawl_config(self, crawlconfig): - """ Update existing crawl config """ - - cid = crawlconfig.id + async def update_crawl_config(self, cid, schedule): + """ Update the schedule for existing crawl config """ cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" @@ -183,57 +181,20 @@ class K8SManager: cron_job = cron_jobs.items[0] - if crawlconfig.archive != cron_job.metadata.labels["btrix.archive"]: - return + real_schedule = schedule or DEFAULT_NO_SCHEDULE - labels = { - "btrix.user": crawlconfig.user, - "btrix.archive": crawlconfig.archive, - "btrix.crawlconfig": cid, - } + if real_schedule != cron_job.spec.schedule: + cron_job.spec.schedule = real_schedule + cron_job.spec.suspend = not schedule - # Update Config Map - config_map = self._create_config_map(crawlconfig, labels) - - await self.core_api.patch_namespaced_config_map( - name=f"crawl-config-{cid}", namespace=self.namespace, body=config_map - ) - - # Update CronJob, if needed - suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) - - changed = False - - if schedule != cron_job.spec.schedule: - cron_job.spec.schedule = schedule - changed = True - - if suspend != cron_job.spec.suspend: - cron_job.spec.suspend = suspend - changed = True - - if ( - crawlconfig.crawlTimeout - != cron_job.spec.job_template.spec.active_deadline_seconds - ): - cron_job.spec.job_template.spec.active_deadline_seconds = ( - crawlconfig.crawlTimeout - ) - changed = True - - if changed: cron_job.spec.job_template.metadata.annotations[ "btrix.run.schedule" - ] = crawlconfig.schedule + ] = schedule await self.batch_beta_api.patch_namespaced_cron_job( name=cron_job.metadata.name, namespace=self.namespace, body=cron_job ) - # Run Job Now - if run_now: - await self._create_run_now_job(cron_job) - async def run_crawl_config(self, cid, manual=True, schedule=""): """ Run crawl job for cron job based on specified crawlconfig id (cid) """ cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( diff --git a/backend/main.py b/backend/main.py index 9bab8b49..a82b5bbc 100644 --- a/backend/main.py +++ b/backend/main.py @@ -75,7 +75,6 @@ class BrowsertrixAPI: ) self.crawl_config_ops = init_crawl_config_api( - self.app, self.mdb, current_active_user, self.archive_ops, @@ -86,6 +85,7 @@ class BrowsertrixAPI: self.app, self.mdb, self.crawl_manager, + self.crawl_config_ops, self.archive_ops, ) diff --git a/backend/requirements.txt b/backend/requirements.txt index 5f040aa3..b58e21bd 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -5,3 +5,4 @@ aiofiles kubernetes-asyncio aiodocker apscheduler +aioprocessing diff --git a/docker-compose.yml b/docker-compose.yml index 1d5c7d63..d1caba5b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,17 +18,6 @@ services: - mongo - scheduler - scheduler: - build: ./backend - image: webrecorder/browsertrix-api - command: python -u scheduler.py - - env_file: - - ./config.env - - depends_on: - - mongo - mongo: image: mongo volumes: