From b417d7c185272225937d12306f9175b97e98d807 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 25 Aug 2021 12:21:03 -0700 Subject: [PATCH] docker manager: support scheduling with apscheduler and separate 'scheduler' process --- backend/crawlconfigs.py | 19 +++++- backend/crawls.py | 2 +- backend/dockerman.py | 121 ++++++++++++++++++++++++++++++++++----- backend/k8sman.py | 5 +- backend/main.py | 2 +- backend/requirements.txt | 1 + docker-compose.yml | 14 ++++- 7 files changed, 145 insertions(+), 19 deletions(-) diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index 7ec91a3f..cb18c126 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -96,6 +96,14 @@ class CrawlConfig(BaseMongoModel): crawlTimeout: Optional[int] = 0 +# ============================================================================ +class TriggerCrawl(BaseModel): + """ Crawl trigger from internal scheduler """ + + id: str + schedule: str + + # ============================================================================ class CrawlOps: """Crawl Config Operations""" @@ -170,8 +178,8 @@ class CrawlOps: # ============================================================================ -# pylint: disable=redefined-builtin,invalid-name -def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): +# pylint: disable=redefined-builtin,invalid-name,too-many-locals +def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): """Init /crawlconfigs api routes""" ops = CrawlOps(mdb, archive_ops, crawl_manager) @@ -242,6 +250,13 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): return {"started": crawl_id} + @app.post("/crawls/trigger", tags=["crawlconfigs"]) + async def trigger_crawl(trigger: TriggerCrawl): + await crawl_manager.run_crawl_config( + trigger.id, manual=False, schedule=trigger.schedule + ) + return {} + @router.delete("") async def delete_crawl_configs(archive: Archive = Depends(archive_crawl_dep)): result = await ops.delete_crawl_configs(archive) diff --git a/backend/crawls.py b/backend/crawls.py index 1ee55996..768aabe4 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -86,7 +86,7 @@ class CrawlOps: try: await self.crawls.insert_one(crawl.to_dict()) except pymongo.errors.DuplicateKeyError: - print(f"Crawl Already Added: {crawl.id} - {crawl.state}") + # print(f"Crawl Already Added: {crawl.id} - {crawl.state}") return False dura = int((crawl.finished - crawl.started).total_seconds()) diff --git a/backend/dockerman.py b/backend/dockerman.py index 49fa6fdd..bb6b2ae2 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -20,6 +20,7 @@ from crawls import Crawl class DockerManager: """ Docker Crawl Manager Interface""" + # pylint: disable=too-many-instance-attributes def __init__(self, archive_ops, extra_crawl_params=None): self.client = aiodocker.Docker() @@ -66,23 +67,83 @@ class DockerManager: "btrix.user": userid, "btrix.archive": aid, "btrix.crawlconfig": cid, - "btrix.run.schedule": crawlconfig.schedule, - "btrix.run.manual": "1" if crawlconfig.runNow else "0", "btrix.coll": crawlconfig.config.collection, } # Create Config Volume volume = await self._create_volume(crawlconfig, labels) - await self._run_crawl_now(storage, labels, volume, self.extra_crawl_params) + if crawlconfig.schedule: + print("Scheduling...", flush=True) + + await self._send_sched_msg( + {"type": "add", "id": crawlconfig.id, "schedule": crawlconfig.schedule} + ) + + if crawlconfig.runNow: + await self._run_crawl_now( + storage, + labels, + volume, + ) async def update_crawl_config(self, crawlconfig): - """ Updating not supported for now (labels can not be altered) """ - raise Exception("Unsupported") + """ Only updating the schedule + run now """ - async def run_crawl_config(self, cid): + if crawlconfig.schedule: + print("Updating Schedule..", flush=True) + + await self._send_sched_msg( + {"type": "add", "id": crawlconfig.id, "schedule": crawlconfig.schedule} + ) + else: + await self._send_sched_msg( + {"type": "remove", "id": crawlconfig.id} + ) + + if crawlconfig.runNow: + await self.run_crawl_config(crawlconfig.id) + + async def list_running_crawls(self, aid): + """ List running containers for this archive """ + containers = await self._list_running_containers([f"btrix.archive={aid}"]) + + running = [] + + for container in containers: + full_container = await self.client.containers.get(container["Id"]) + running.append(self._make_crawl_for_container(full_container, "running")) + + return running + + async def stop_crawl(self, crawl_id, aid, graceful=True): + """ Stop crawl, if not graceful, issue SIGUSR1 to indicate cancelation """ + container = await self.client.containers.get(crawl_id) + + if container["Config"]["Labels"]["btrix.archive"] != aid: + return None + + if not graceful: + await container.kill(signal="SIGUSR1") + result = self._make_crawl_for_container(container, "canceled", True) + else: + result = True + + await container.kill(signal="SIGTERM") + + return result + + async def run_crawl_config(self, cid, manual=True, schedule=""): """ Run crawl job for cron job based on specified crawlconfig id (cid) """ + if not manual: + if await self._is_scheduled_crawl_for_config_running(cid): + print( + f"Crawl for {cid} already running, not starting new crawl", + flush=True, + ) + return None + volume_name = f"crawl-config-{cid}" volume_obj = aiodocker.docker.DockerVolume(self.client, volume_name) @@ -95,15 +156,16 @@ class DockerManager: try: archive = await self.archive_ops.get_archive_by_id(labels["btrix.archive"]) storage = archive.storage + # pylint: disable=broad-except except Exception as exc: print(exc, flush=True) return None container = await self._run_crawl_now( - storage, labels, volume_name, self.extra_crawl_params + storage, labels, volume_name, schedule, manual ) - return container["Id"] + return container["id"][:12] async def validate_crawl_complete(self, crawlcomplete): """Validate that crawl is valid by checking that container exists and label matches @@ -189,16 +251,33 @@ class DockerManager: ) for volume in resp["Volumes"]: - print(vol_obj, flush=True) vol_obj = aiodocker.docker.DockerVolume(self.client, volume["Name"]) - await vol_obj.delete() - async def _run_crawl_now(self, storage, labels, volume, extra_crawl_params=None): + await self._send_sched_msg( + {"type": "remove", "id": volume["Labels"]["btrix.crawlconfig"]} + ) + + try: + await vol_obj.delete() + # pylint: disable=bare-except + except: + print("Warning: Volume Delete Failed, Container in Use", flush=True) + + async def _send_sched_msg(self, msg): + reader, writer = await asyncio.open_connection("scheduler", 9017) + writer.write(json.dumps(msg).encode("utf-8") + b"\n") + await writer.drain() + await reader.readline() + writer.close() + await writer.wait_closed() + + # pylint: disable=too-many-arguments + async def _run_crawl_now(self, storage, labels, volume, schedule="", manual=True): # Set Run Config command = ["crawl", "--config", "/tmp/crawlconfig/crawl-config.json"] - if extra_crawl_params: - command += extra_crawl_params + if self.extra_crawl_params: + command += self.extra_crawl_params endpoint_with_coll_url = os.path.join( storage.endpoint_url, "collections", labels["btrix.coll"] + "/" @@ -213,6 +292,9 @@ class DockerManager: "WEBHOOK_URL=http://backend:8000/crawls/done", ] + labels["btrix.run.schedule"] = schedule + labels["btrix.run.manual"] = "1" if manual else "0" + run_config = { "Image": self.crawler_image, "Volumes": {volume: {}}, @@ -227,6 +309,18 @@ class DockerManager: return await self.client.containers.run(run_config) + async def _list_running_containers(self, labels): + results = await self.client.containers.list( + filters=json.dumps({"status": ["running"], "label": labels}) + ) + return results + + async def _is_scheduled_crawl_for_config_running(self, cid): + results = await self._list_running_containers( + [f"btrix.crawlconfig={cid}", "btrix.run.manual=0"] + ) + return len(results) > 0 + async def _handle_container_die(self, actor): """ Handle crawl container shutdown """ container = await self.client.containers.get(actor["ID"]) @@ -243,6 +337,7 @@ class DockerManager: ): """ Make a crawl object from a container data""" labels = container["Config"]["Labels"] + return Crawl( id=container["Id"], state=state, diff --git a/backend/k8sman.py b/backend/k8sman.py index 67a255ed..7fce030f 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -234,12 +234,15 @@ class K8SManager: if run_now: await self._create_run_now_job(cron_job) - async def run_crawl_config(self, cid): + async def run_crawl_config(self, cid, manual=True, schedule=""): """ Run crawl job for cron job based on specified crawlconfig id (cid) """ cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" ) + if not manual or schedule: + raise Exception("Manual trigger not supported") + if len(cron_jobs.items) != 1: raise Exception("Crawl Config Not Found") diff --git a/backend/main.py b/backend/main.py index adcde24d..9bab8b49 100644 --- a/backend/main.py +++ b/backend/main.py @@ -73,9 +73,9 @@ class BrowsertrixAPI: self.crawl_manager = DockerManager( self.archive_ops, self.default_crawl_params ) - # raise Exception("Currently, only running in Kubernetes is supported") self.crawl_config_ops = init_crawl_config_api( + self.app, self.mdb, current_active_user, self.archive_ops, diff --git a/backend/requirements.txt b/backend/requirements.txt index 63daf7fc..5f040aa3 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -4,3 +4,4 @@ loguru aiofiles kubernetes-asyncio aiodocker +apscheduler diff --git a/docker-compose.yml b/docker-compose.yml index 34cb0692..1d5c7d63 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,8 +14,20 @@ services: - ./config.env depends_on: - - mongo - minio + - mongo + - scheduler + + scheduler: + build: ./backend + image: webrecorder/browsertrix-api + command: python -u scheduler.py + + env_file: + - ./config.env + + depends_on: + - mongo mongo: image: mongo