diff --git a/backend/crawls.py b/backend/crawls.py index a7f87581..705c1bf6 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -1,6 +1,7 @@ """ Crawl API """ import asyncio +import traceback from typing import Optional, List from datetime import datetime @@ -64,6 +65,8 @@ class CrawlOps: self.crawl_manager = crawl_manager self.archives = archives + self.crawl_manager.set_crawl_ops(self) + async def on_handle_crawl_complete(self, msg: CrawlCompleteIn): """ Handle completed crawl, add to crawls db collection, also update archive usage """ crawl = await self.crawl_manager.validate_crawl_complete(msg) @@ -79,6 +82,9 @@ class CrawlOps: dura = int((crawl.finished - crawl.started).total_seconds()) + print(crawl, flush=True) + print(f"Duration: {dura}", flush=True) + await self.archives.inc_usage(crawl.aid, dura) async def list_crawls(self, aid: str, cid: str = None): @@ -138,13 +144,17 @@ def init_crawls_api(app, mdb, crawl_manager, archives): crawl_id, archive: Archive = Depends(archive_crawl_dep) ): try: - crawl = await crawl_manager.stop_crawl(crawl_id, archive.id) + crawl = await crawl_manager.stop_crawl(crawl_id, archive.id, graceful=False) if not crawl: raise HTTPException( status_code=404, detail=f"Crawl not found: {crawl_id}" ) await ops.handle_finished(crawl) + + except HTTPException as httpe: + raise httpe + except Exception as exc: # pylint: disable=raise-missing-from raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}") @@ -159,17 +169,21 @@ def init_crawls_api(app, mdb, crawl_manager, archives): crawl_id, archive: Archive = Depends(archive_crawl_dep) ): try: - canceled = await crawl_manager.stop_crawl_graceful( - crawl_id, str(archive.id) + canceled = await crawl_manager.stop_crawl( + crawl_id, archive.id, graceful=True ) if not canceled: raise HTTPException( status_code=404, detail=f"Crawl not found: {crawl_id}" ) + except HTTPException as httpe: + raise httpe + except Exception as exc: # pylint: disable=raise-missing-from - raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}") + traceback.print_exc() + raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}") return {"stopped_gracefully": True} diff --git a/backend/k8sman.py b/backend/k8sman.py index cfde6a94..741a1164 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -3,8 +3,9 @@ import os import datetime import json +import asyncio -from kubernetes_asyncio import client, config +from kubernetes_asyncio import client, config, watch from kubernetes_asyncio.stream import WsApiClient from crawls import Crawl @@ -24,6 +25,8 @@ class K8SManager: def __init__(self, namespace=DEFAULT_NAMESPACE): config.load_incluster_config() + self.crawl_ops = None + self.core_api = client.CoreV1Api() self.core_api_ws = client.CoreV1Api(api_client=WsApiClient()) self.batch_api = client.BatchV1Api() @@ -34,8 +37,41 @@ class K8SManager: self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image_pull_policy = "IfNotPresent" - # loop = asyncio.get_running_loop() - # loop.create_task(self.watch_job_done()) + self.crawl_timeout = int(os.environ.get("CRAWL_TIMEOUT", "1000000")) + self.crawl_retries = int(os.environ.get("CRAWL_RETRIES", "3")) + + self.loop = asyncio.get_running_loop() + self.loop.create_task(self.watch_job_loop()) + + def set_crawl_ops(self, ops): + """ Set crawl ops handler """ + self.crawl_ops = ops + + async def watch_job_loop(self): + """ Get events for completed jobs""" + async with watch.Watch().stream( + self.core_api.list_namespaced_event, + self.namespace, + field_selector="involvedObject.kind=Job", + ) as stream: + async for event in stream: + try: + obj = event["object"] + if obj.reason == "BackoffLimitExceeded": + self.loop.create_task( + self.handle_crawl_failed(obj.involved_object.name, "failed") + ) + + # elif obj.reason == "DeadlineExceeded": + # self.loop.create_task( + # self.handle_crawl_failed( + # obj.involved_object.name, "timed_out" + # ) + # ) + + # pylint: disable=broad-except + except Exception as exc: + print(exc) async def add_crawl_config( self, @@ -212,19 +248,7 @@ class K8SManager: field_selector="status.successful=0", ) - return [ - Crawl( - id=job.metadata.name, - state="running", - user=job.metadata.labels["btrix.user"], - aid=job.metadata.labels["btrix.archive"], - cid=job.metadata.labels["btrix.crawlconfig"], - schedule=job.metadata.annotations.get("btrix.run.schedule", ""), - manual=job.metadata.annotations.get("btrix.run.manual") == "1", - started=job.status.start_time.replace(tzinfo=None), - ) - for job in jobs.items - ] + return [self._make_crawl_for_job(job, "running") for job in jobs.items] async def validate_crawl_complete(self, crawlcomplete): """Ensure the crawlcomplete data is valid (job exists and user matches) @@ -238,12 +262,7 @@ class K8SManager: manual = job.metadata.annotations.get("btrix.run.manual") == "1" if not manual: - await self.batch_api.delete_namespaced_job( - name=job.metadata.name, - namespace=self.namespace, - grace_period_seconds=10, - propagation_policy="Foreground", - ) + await self._delete_job(job.metadata.name) return Crawl( id=crawlcomplete.id, @@ -260,58 +279,37 @@ class K8SManager: hash=crawlcomplete.hash, ) - async def stop_crawl(self, job_id, aid): - """ Stop Crawl based on crawl job id """ + async def stop_crawl(self, job_name, aid, graceful=True): + """Attempt to stop crawl, either gracefully by issuing a SIGTERM which + will attempt to finish current pages + + OR, abruptly by first issueing a SIGINT, followed by SIGTERM, which + will terminate immediately""" + job = await self.batch_api.read_namespaced_job( - name=job_id, namespace=self.namespace + name=job_name, namespace=self.namespace ) if not job or job.metadata.labels["btrix.archive"] != aid: return None - await self.batch_api.delete_namespaced_job( - name=job_id, - namespace=self.namespace, - grace_period_seconds=10, - propagation_policy="Foreground", - ) + result = None - return Crawl( - id=job_id, - state="canceled", - user=job.metadata.labels["btrix.user"], - aid=job.metadata.labels["btrix.archive"], - cid=job.metadata.labels["btrix.crawlconfig"], - schedule=job.metadata.annotations.get("btrix.run.schedule", ""), - manual=job.metadata.annotations.get("btrix.run.manual") == "1", - started=job.status.start_time.replace(tzinfo=None), - finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None), - ) - - async def stop_crawl_graceful(self, job_name, aid): - """ Attempt to gracefully stop crawl by sending a SIGINT to the pod(s)""" - - pods = await self.core_api.list_namespaced_pod( - namespace=self.namespace, - label_selector=f"job-name={job_name},btrix.archive={aid}", - ) - - command = ["kill", "-s", "SIGINT", "1"] - interrupted = False - - for pod in pods.items: - if pod.metadata.labels["btrix.archive"] != aid: - continue - - await self.core_api_ws.connect_get_namespaced_pod_exec( - pod.metadata.name, + if not graceful: + pods = await self.core_api.list_namespaced_pod( namespace=self.namespace, - command=command, - stdout=True, + label_selector=f"job-name={job_name},btrix.archive={aid}", ) - interrupted = True - return interrupted + await self._send_sig_to_pods(pods.items, aid) + + result = self._make_crawl_for_job(job, "canceled", True) + else: + result = True + + await self._delete_job(job_name) + + return result async def delete_crawl_configs_for_archive(self, archive): """Delete all crawl configs for given archive""" @@ -321,9 +319,51 @@ class K8SManager: """Delete all crawl configs by id""" return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}") + async def handle_crawl_failed(self, job_name, reason): + """ Handle failed crawl job, add to db and then delete """ + try: + job = await self.batch_api.read_namespaced_job( + name=job_name, namespace=self.namespace + ) + # pylint: disable=bare-except + except: + print("Job Failure Already Handled") + return + + crawl = self._make_crawl_for_job(job, reason, True) + + await self.crawl_ops.handle_finished(crawl) + + await self._delete_job(job_name) + # ======================================================================== # Internal Methods + # pylint: disable=no-self-use + def _make_crawl_for_job(self, job, state, finish_now=False): + """ Make a crawl object from a job""" + return Crawl( + id=job.metadata.name, + state=state, + user=job.metadata.labels["btrix.user"], + aid=job.metadata.labels["btrix.archive"], + cid=job.metadata.labels["btrix.crawlconfig"], + schedule=job.metadata.annotations.get("btrix.run.schedule", ""), + manual=job.metadata.annotations.get("btrix.run.manual") == "1", + started=job.status.start_time.replace(tzinfo=None), + finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None) + if finish_now + else None, + ) + + async def _delete_job(self, name): + await self.batch_api.delete_namespaced_job( + name=name, + namespace=self.namespace, + grace_period_seconds=120, + propagation_policy="Foreground", + ) + def _create_config_map(self, crawlconfig, labels): """ Create Config Map based on CrawlConfig + labels """ config_map = client.V1ConfigMap( @@ -355,6 +395,29 @@ class K8SManager: return suspend, schedule, run_now + async def _send_sig_to_pods(self, pods, aid): + command = ["kill", "-s", "SIGUSR1", "1"] + interrupted = False + + try: + for pod in pods: + if pod.metadata.labels["btrix.archive"] != aid: + continue + + await self.core_api_ws.connect_get_namespaced_pod_exec( + pod.metadata.name, + namespace=self.namespace, + command=command, + stdout=True, + ) + interrupted = True + + # pylint: disable=broad-except + except Exception as exc: + print(f"Exec Error: {exc}") + + return interrupted + async def _delete_crawl_configs(self, label): """Delete Crawl Cron Job and all dependent resources, including configmap and secrets""" @@ -436,9 +499,10 @@ class K8SManager: }, } - return { + job_template = { "metadata": {"annotations": annotations}, "spec": { + "backoffLimit": self.crawl_retries, "template": { "metadata": {"labels": labels}, "spec": { @@ -488,6 +552,11 @@ class K8SManager: ], "restartPolicy": "OnFailure", }, - } + }, }, } + + if self.crawl_timeout > 0: + job_template["spec"]["activeDeadlineSeconds"] = self.crawl_timeout + + return job_template diff --git a/chart/templates/main.yaml b/chart/templates/main.yaml index 1d755ffa..f8d63975 100644 --- a/chart/templates/main.yaml +++ b/chart/templates/main.yaml @@ -11,6 +11,9 @@ data: CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }} CRAWLER_IMAGE: {{ .Values.crawler_image }} + CRAWL_TIMEOUT: "{{ .Values.crawl_timeout }}" + CRAWL_RETRIES: "{{ .Values.crawl_retries }}" + --- apiVersion: v1 diff --git a/chart/values.yaml b/chart/values.yaml index d6b1af98..501f1448 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -41,6 +41,12 @@ crawler_pull_policy: "Never" crawler_namespace: "crawlers" +# set 0 to disable timeout +crawl_timeout: 0 + +# num retries +crawl_retries: 1 + # Storage # =========================================