From 8acb43b1717e1686819e8da7b4ad811e3a9c9473 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 1 Feb 2022 15:58:56 -0800 Subject: [PATCH] backend: use redis to mark crawls as canceled immediately, avoid dupes in crawl list (even if paging is added for db results) --- backend/crawls.py | 4 +--- backend/dockerman.py | 13 ++++++++----- backend/k8sman.py | 22 +++++++++++++++++----- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/backend/crawls.py b/backend/crawls.py index ed186bb2..1db8a99a 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -330,9 +330,7 @@ class CrawlOps: crawl = CrawlOut.from_dict(res) if not crawl: - raise HTTPException( - status_code=404, detail=f"Crawl not found: {crawlid}" - ) + raise HTTPException(status_code=404, detail=f"Crawl not found: {crawlid}") return await self._resolve_crawl_refs(crawl, archive) diff --git a/backend/dockerman.py b/backend/dockerman.py index 4e17f6cc..adc3c860 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -258,9 +258,10 @@ class DockerManager: if not graceful: await container.kill(signal="SIGABRT") result = self._make_crawl_for_container(container, "canceled", True) + await self._mark_is_stopping(crawl_id, "canceled") else: result = True - await self._mark_is_stopping(crawl_id) + await self._mark_is_stopping(crawl_id, "stopping") await container.kill(signal="SIGTERM") except aiodocker.exceptions.DockerError as exc: @@ -365,10 +366,12 @@ class DockerManager: if aid and container["Config"]["Labels"]["btrix.archive"] != aid: return None - stopping = await self._get_is_stopping(crawl_id) + stop_type = await self._get_is_stopping(crawl_id) + if stop_type == "canceled": + return None return self._make_crawl_for_container( - container, "stopping" if stopping else "running", False, CrawlOut + container, "stopping" if stop_type else "running", False, CrawlOut ) # pylint: disable=broad-except except Exception as exc: @@ -528,9 +531,9 @@ class DockerManager: ) return results - async def _mark_is_stopping(self, crawl_id): + async def _mark_is_stopping(self, crawl_id, stop_type): """ mark crawl as stopping in redis """ - await self.redis.setex(f"{crawl_id}:stop", 600, 1) + await self.redis.setex(f"{crawl_id}:stop", 600, stop_type) async def _get_is_stopping(self, crawl_id): """ check redis if crawl is marked for stopping """ diff --git a/backend/k8sman.py b/backend/k8sman.py index f5a6a420..a64b5b7e 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -5,6 +5,7 @@ import datetime import json import asyncio import base64 +import aioredis from kubernetes_asyncio import client, config, watch from kubernetes_asyncio.stream import WsApiClient @@ -45,8 +46,17 @@ class K8SManager: self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0" + self.redis_url = os.environ["REDIS_URL"] + self.loop = asyncio.get_running_loop() self.loop.create_task(self.run_event_loop()) + self.loop.create_task(self.init_redis(self.redis_url)) + + async def init_redis(self, redis_url): + """ init redis async """ + self.redis = await aioredis.from_url( + redis_url, encoding="utf-8", decode_responses=True + ) def set_crawl_ops(self, ops): """ Set crawl ops handler """ @@ -276,7 +286,7 @@ class K8SManager: crawls = [] for job in jobs.items: - status = self._get_crawl_state(job) + status = await self._get_crawl_state(job) if not status: continue @@ -392,7 +402,7 @@ class K8SManager: if not job or job.metadata.labels["btrix.archive"] != aid: return None - status = self._get_crawl_state(job) + status = await self._get_crawl_state(job) if not status: return None @@ -429,6 +439,7 @@ class K8SManager: await self._send_sig_to_pods(pods.items, aid) result = self._make_crawl_for_job(job, "canceled", True) + await self.redis.setex(f"{job_name}:stop", 300, "canceled") else: result = True @@ -492,7 +503,7 @@ class K8SManager: # ======================================================================== # Internal Methods - def _get_crawl_state(self, job): + async def _get_crawl_state(self, job): if job.status.active: return "running" @@ -500,12 +511,13 @@ class K8SManager: finished = (job.status.succeeded or 0) + (job.status.failed or 0) total = job.spec.parallelism or 1 if finished != total: - return "stopping" + # don't return anything if marked as cancel + if await self.redis.get(f"{job.metadata.name}:stop") != "canceled": + return "stopping" # job fully done, do not treat as running or stopping return None - # pylint: disable=no-self-use def _make_crawl_for_job(self, job, state, finish_now=False, crawl_cls=Crawl): """ Make a crawl object from a job"""