From a8255a76b2fb48abe3f13064ebe2be5536b78cd3 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 21 Aug 2021 22:10:31 -0700 Subject: [PATCH] crawljob: - support run once on existing crawl job - support updating/patching existing crawl job with new crawl config, new schedule and run once --- backend/crawlconfigs.py | 67 +++++++++++++++++++- backend/k8sman.py | 133 ++++++++++++++++++++++++++++++++-------- 2 files changed, 171 insertions(+), 29 deletions(-) diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index 4ce39f3a..05b83210 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -138,6 +138,21 @@ class CrawlOps: ) return result + async def update_crawl_config( + self, cid: str, config: CrawlConfigIn, archive: Archive, user: User + ): + """ Update existing crawl config""" + data = config.dict() + data["archive"] = archive.id + data["user"] = user.id + data["_id"] = cid + + await self.crawl_configs.find_one_and_replace({"_id": cid}, data) + + crawlconfig = CrawlConfig.from_dict(data) + + await self.crawl_manager.update_crawl_config(crawlconfig) + async def get_crawl_configs(self, archive: Archive): """Get all crawl configs for an archive is a member of""" cursor = self.crawl_configs.find({"archive": archive.id}) @@ -205,6 +220,50 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): res = await ops.add_crawl_config(config, archive, user) return {"added": str(res.inserted_id)} + @router.patch("/{cid}") + async def update_crawl_config( + config: CrawlConfigIn, + cid: str, + archive: Archive = Depends(archive_dep), + user: User = Depends(user_dep), + ): + + if not archive.is_crawler(user): + raise HTTPException( + status_code=403, detail="User does not have permission to modify crawls" + ) + + try: + await ops.update_crawl_config(cid, config, archive, user) + except Exception as e: + # pylint: disable=raise-missing-from + raise HTTPException( + status_code=403, detail=f"Error updating crawl config: {e}" + ) + + return {"updated": cid} + + @router.post("/{cid}/run") + async def run_now( + cid: str, + archive: Archive = Depends(archive_dep), + user: User = Depends(user_dep), + ): + + if not archive.is_crawler(user): + raise HTTPException( + status_code=403, detail="User does not have permission to modify crawls" + ) + + crawl_id = None + try: + crawl_id = await crawl_manager.run_crawl_config(cid) + except Exception as e: + # pylint: disable=raise-missing-from + raise HTTPException(status_code=500, detail=f"Error starting crawl: {e}") + + return {"started": crawl_id} + @router.delete("") async def delete_crawl_configs( archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) @@ -217,16 +276,18 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): result = await ops.delete_crawl_configs(archive) return {"deleted": result.deleted_count} - @router.delete("/{id}") + @router.delete("/{cid}") async def delete_crawl_config( - id: str, archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) + cid: str, + archive: Archive = Depends(archive_dep), + user: User = Depends(user_dep), ): if not archive.is_crawler(user): raise HTTPException( status_code=403, detail="User does not have permission to modify crawls" ) - result = await ops.delete_crawl_config(id, archive) + result = await ops.delete_crawl_config(cid, archive) if not result or not result.deleted_count: raise HTTPException(status_code=404, detail="Crawl Config Not Found") diff --git a/backend/k8sman.py b/backend/k8sman.py index c47d0c9d..4512bcbf 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -78,19 +78,10 @@ class K8SManager: "btrix.crawlconfig": cid, } - extra_crawl_params = extra_crawl_params or [] - # Create Config Map - config_map = client.V1ConfigMap( - metadata={ - "name": f"crawl-config-{cid}", - "namespace": self.namespace, - "labels": labels, - }, - data={"crawl-config.json": json.dumps(crawlconfig.config.dict())}, - ) + config_map = self._create_config_map(crawlconfig, labels) - api_response = await self.core_api.create_namespaced_config_map( + await self.core_api.create_namespaced_config_map( namespace=self.namespace, body=config_map ) @@ -115,21 +106,15 @@ class K8SManager: }, ) - api_response = await self.core_api.create_namespaced_secret( + await self.core_api.create_namespaced_secret( namespace=self.namespace, body=crawl_secret ) # Create Cron Job - suspend = False - schedule = crawlconfig.schedule - if not schedule: - schedule = DEFAULT_NO_SCHEDULE - suspend = True + suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) - run_now = False - if crawlconfig.runNow: - run_now = True + extra_crawl_params = extra_crawl_params or [] job_template = self._get_job_template(cid, labels, extra_crawl_params) @@ -151,15 +136,99 @@ class K8SManager: spec=spec, ) - api_response = await self.batch_beta_api.create_namespaced_cron_job( + cron_job = await self.batch_beta_api.create_namespaced_cron_job( namespace=self.namespace, body=cron_job ) # Run Job Now if run_now: - await self._create_run_now_job(api_response, labels) + await self._create_run_now_job(cron_job) - return api_response + return cron_job + + async def update_crawl_config(self, crawlconfig): + """ Update existing crawl config """ + + cid = crawlconfig.id + + cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( + namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" + ) + + if len(cron_jobs.items) != 1: + return + + cron_job = cron_jobs.items[0] + + if crawlconfig.archive != cron_job.metadata.labels["btrix.archive"]: + print("wrong archive") + return + + labels = { + "btrix.user": cron_job.metadata.labels["btrix.user"], + "btrix.archive": crawlconfig.archive, + "btrix.crawlconfig": cid, + } + + # Update Config Map + config_map = self._create_config_map(crawlconfig, labels) + + await self.core_api.patch_namespaced_config_map( + name=f"crawl-config-{cid}", namespace=self.namespace, body=config_map + ) + + # Update CronJob, if needed + suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) + + changed = False + + if schedule != cron_job.spec.schedule: + cron_job.spec.schedule = schedule + changed = True + + if suspend != cron_job.spec.suspend: + cron_job.spec.suspend = suspend + changed = True + + if changed: + await self.batch_beta_api.patch_namespaced_cron_job( + name=cron_job.metadata.name, namespace=self.namespace, body=cron_job + ) + + # Run Job Now + if run_now: + await self._create_run_now_job(cron_job) + + def _create_config_map(self, crawlconfig, labels): + """ Create Config Map based on CrawlConfig + labels """ + config_map = client.V1ConfigMap( + metadata={ + "name": f"crawl-config-{crawlconfig.id}", + "namespace": self.namespace, + "labels": labels, + }, + data={"crawl-config.json": json.dumps(crawlconfig.config.dict())}, + ) + + return config_map + + #pylint: disable=no-self-use + def _get_schedule_suspend_run_now(self, crawlconfig): + """ get schedule/suspend/run_now data based on crawlconfig """ + + # Create Cron Job + suspend = False + schedule = crawlconfig.schedule + + if not schedule: + schedule = DEFAULT_NO_SCHEDULE + suspend = True + + run_now = False + if crawlconfig.runNow: + run_now = True + + return suspend, schedule, run_now async def delete_crawl_configs_for_archive(self, archive): """Delete all crawl configs for given archive""" @@ -190,7 +259,16 @@ class K8SManager: propagation_policy="Foreground", ) - async def _create_run_now_job(self, cron_job, labels): + async def run_crawl_config(self, cid): + """ Run crawl job for cron job based on specified crawlconfig id (cid) """ + cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( + namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" + ) + + res = await self._create_run_now_job(cron_jobs.items[0]) + return res.metadata.name + + async def _create_run_now_job(self, cron_job): """Create new job from cron job to run instantly""" annotations = {} annotations["cronjob.kubernetes.io/instantiate"] = "manual" @@ -204,10 +282,13 @@ class K8SManager: api_version="batch/v1beta1", ) + ts_now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S") + name = f"crawl-now-{ts_now}-{cron_job.metadata.labels['btrix.crawlconfig']}" + object_meta = client.V1ObjectMeta( - name=cron_job.metadata.name + "-run-now", + name=name, annotations=annotations, - labels=labels, + labels=cron_job.metadata.labels, owner_references=[owner_ref], )