diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index 9c4ab011..1caf2df0 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -75,7 +75,7 @@ class CrawlConfigIn(BaseModel): schedule: Optional[str] = "" runNow: Optional[bool] = False - # storageName: Optional[str] = "default" + crawlTimeout: Optional[int] = 0 config: RawCrawlConfig @@ -93,6 +93,8 @@ class CrawlConfig(BaseMongoModel): config: RawCrawlConfig + crawlTimeout: Optional[int] = 0 + # ============================================================================ class CrawlOps: diff --git a/backend/crawls.py b/backend/crawls.py index 705c1bf6..d1562366 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -1,13 +1,13 @@ """ Crawl API """ import asyncio -import traceback from typing import Optional, List from datetime import datetime from fastapi import Depends, HTTPException from pydantic import BaseModel +import pymongo from db import BaseMongoModel from archives import Archive @@ -74,11 +74,20 @@ class CrawlOps: print("Not a valid crawl complete msg!", flush=True) return - await self.handle_finished(crawl) + await self.store_crawl(crawl, update_existing=True) - async def handle_finished(self, crawl: Crawl): + async def store_crawl(self, crawl: Crawl, update_existing=False): """ Add finished crawl to db, increment archive usage """ - await self.crawls.insert_one(crawl.to_dict()) + if update_existing: + await self.crawls.find_one_and_replace( + {"_id": crawl.id}, crawl.to_dict(), upsert=True + ) + else: + try: + await self.crawls.insert_one(crawl.to_dict()) + except pymongo.errors.DuplicateKeyError: + print(f"Crawl Already Added: {crawl.id}") + return False dura = int((crawl.finished - crawl.started).total_seconds()) @@ -150,7 +159,7 @@ def init_crawls_api(app, mdb, crawl_manager, archives): status_code=404, detail=f"Crawl not found: {crawl_id}" ) - await ops.handle_finished(crawl) + await ops.store_crawl(crawl) except HTTPException as httpe: raise httpe @@ -182,7 +191,6 @@ def init_crawls_api(app, mdb, crawl_manager, archives): except Exception as exc: # pylint: disable=raise-missing-from - traceback.print_exc() raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}") return {"stopped_gracefully": True} diff --git a/backend/k8sman.py b/backend/k8sman.py index 741a1164..480f3bdf 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -37,17 +37,26 @@ class K8SManager: self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image_pull_policy = "IfNotPresent" - self.crawl_timeout = int(os.environ.get("CRAWL_TIMEOUT", "1000000")) self.crawl_retries = int(os.environ.get("CRAWL_RETRIES", "3")) self.loop = asyncio.get_running_loop() - self.loop.create_task(self.watch_job_loop()) + self.loop.create_task(self.run_event_loop()) def set_crawl_ops(self, ops): """ Set crawl ops handler """ self.crawl_ops = ops - async def watch_job_loop(self): + async def run_event_loop(self): + """ Run the job watch loop, retry in case of failure""" + while True: + try: + await self.watch_events() + # pylint: disable=broad-except + except Exception as exc: + print(f"Retrying job loop: {exc}") + await asyncio.sleep(10) + + async def watch_events(self): """ Get events for completed jobs""" async with watch.Watch().stream( self.core_api.list_namespaced_event, @@ -62,12 +71,12 @@ class K8SManager: self.handle_crawl_failed(obj.involved_object.name, "failed") ) - # elif obj.reason == "DeadlineExceeded": - # self.loop.create_task( - # self.handle_crawl_failed( - # obj.involved_object.name, "timed_out" - # ) - # ) + elif obj.reason == "DeadlineExceeded": + self.loop.create_task( + self.handle_crawl_failed( + obj.involved_object.name, "timed_out" + ) + ) # pylint: disable=broad-except except Exception as exc: @@ -131,7 +140,7 @@ class K8SManager: extra_crawl_params = extra_crawl_params or [] job_template = self._get_job_template( - cid, labels, annotations, extra_crawl_params + cid, labels, annotations, crawlconfig.crawlTimeout, extra_crawl_params ) spec = client.V1beta1CronJobSpec( @@ -205,6 +214,15 @@ class K8SManager: cron_job.spec.suspend = suspend changed = True + if ( + crawlconfig.crawlTimeout + != cron_job.spec.job_template.spec.active_deadline_seconds + ): + cron_job.spec.job_template.spec.active_deadline_seconds = ( + crawlconfig.crawlTimeout + ) + changed = True + if changed: cron_job.spec.job_template.metadata.annotations[ "btrix.run.schedule" @@ -248,7 +266,11 @@ class K8SManager: field_selector="status.successful=0", ) - return [self._make_crawl_for_job(job, "running") for job in jobs.items] + return [ + self._make_crawl_for_job(job, "running") + for job in jobs.items + if job.status.active + ] async def validate_crawl_complete(self, crawlcomplete): """Ensure the crawlcomplete data is valid (job exists and user matches) @@ -332,7 +354,7 @@ class K8SManager: crawl = self._make_crawl_for_job(job, reason, True) - await self.crawl_ops.handle_finished(crawl) + await self.crawl_ops.store_crawl(crawl) await self._delete_job(job_name) @@ -360,7 +382,7 @@ class K8SManager: await self.batch_api.delete_namespaced_job( name=name, namespace=self.namespace, - grace_period_seconds=120, + grace_period_seconds=60, propagation_policy="Foreground", ) @@ -474,7 +496,9 @@ class K8SManager: body=job, namespace=self.namespace ) - def _get_job_template(self, uid, labels, annotations, extra_crawl_params): + def _get_job_template( + self, uid, labels, annotations, crawl_timeout, extra_crawl_params + ): """Return crawl job template for crawl job, including labels, adding optiona crawl params""" command = ["crawl", "--config", "/tmp/crawl-config.json"] @@ -556,7 +580,7 @@ class K8SManager: }, } - if self.crawl_timeout > 0: - job_template["spec"]["activeDeadlineSeconds"] = self.crawl_timeout + if crawl_timeout > 0: + job_template["spec"]["activeDeadlineSeconds"] = crawl_timeout return job_template diff --git a/chart/values.yaml b/chart/values.yaml index 501f1448..53599f2b 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -41,9 +41,6 @@ crawler_pull_policy: "Never" crawler_namespace: "crawlers" -# set 0 to disable timeout -crawl_timeout: 0 - # num retries crawl_retries: 1