From be86505347f9d99b1e751de59af676bf1c81f0a1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sun, 30 Jan 2022 21:59:35 -0800 Subject: [PATCH] backend: crawls api: better fix for graceful stop - k8s: don't use redis, set to 'stopping' if status.active is not set, toggled immediately on delete_job - docker: set custom redis key to indicate 'stopping' state (container still running) - api: remove crawl is_running endpoint, redundant with general get crawl api --- backend/crawls.py | 40 +++++++++++++++------------------------- backend/dockerman.py | 27 ++++++++++++++++++++++----- backend/k8sman.py | 25 ++++++------------------- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/backend/crawls.py b/backend/crawls.py index 66193749..c3a59e7c 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -184,7 +184,7 @@ class CrawlOps: """Add finished crawl to db, increment archive usage. If crawl file provided, update and add file""" if crawl_file: - await self.get_redis_stats([crawl], False) + await self.get_redis_stats([crawl]) crawl_update = { "$set": crawl.to_dict(exclude={"files", "completions"}), @@ -280,7 +280,7 @@ class CrawlOps: aid=archive.id_str ) - await self.get_redis_stats(running_crawls, True) + await self.get_redis_stats(running_crawls) finished_crawls = await self.list_finished_crawls( aid=archive.id, exclude_files=True @@ -300,7 +300,7 @@ class CrawlOps: """ Get data for single crawl """ crawl = await self.crawl_manager.get_running_crawl(crawlid, archive.id_str) if crawl: - await self.get_redis_stats([crawl], True) + await self.get_redis_stats([crawl]) else: res = await self.crawls.find_one({"_id": crawlid, "aid": archive.id}) @@ -343,33 +343,25 @@ class CrawlOps: file_.filename = storage_prefix + file_.filename # pylint: disable=too-many-arguments - async def get_redis_stats(self, crawl_list, set_stopping=False): + async def get_redis_stats(self, crawl_list): """ Add additional live crawl stats from redis """ results = None def pairwise(iterable): val = iter(iterable) - return zip(val, val, val) + return zip(val, val) async with self.redis.pipeline(transaction=True) as pipe: for crawl in crawl_list: key = crawl.id pipe.llen(f"{key}:d") pipe.scard(f"{key}:s") - pipe.get(f"{key}:stop") results = await pipe.execute() - for crawl, (done, total, stopping) in zip(crawl_list, pairwise(results)): - if set_stopping and stopping: - crawl.state = "stopping" - + for crawl, (done, total) in zip(crawl_list, pairwise(results)): crawl.stats = {"done": done, "found": total} - async def mark_stopping(self, crawl_id): - """ Mark crawl as in process of stopping in redis """ - await self.redis.setex(f"{crawl_id}:stop", 600, 1) - async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList): """ Delete a list of crawls by id for given archive """ res = await self.crawls.delete_many( @@ -437,8 +429,6 @@ def init_crawls_api( if not stopping: raise HTTPException(status_code=404, detail=f"Crawl not found: {crawl_id}") - await ops.mark_stopping(crawl_id) - return {"stopping_gracefully": True} @app.post("/archives/{aid}/crawls/delete", tags=["crawls"]) @@ -463,15 +453,15 @@ def init_crawls_api( async def get_crawl(crawl_id, archive: Archive = Depends(archive_crawl_dep)): return await ops.get_crawl(crawl_id, archive) - @app.get( - "/archives/{aid}/crawls/{crawl_id}/running", - tags=["crawls"], - ) - async def get_running(crawl_id, archive: Archive = Depends(archive_crawl_dep)): - if not crawl_manager.is_running(crawl_id, archive.id_str): - raise HTTPException(status_code=404, detail="No Such Crawl") - - return {"running": True} + # @app.get( + # "/archives/{aid}/crawls/{crawl_id}/running", + # tags=["crawls"], + # ) + # async def get_running(crawl_id, archive: Archive = Depends(archive_crawl_dep)): + # if not crawl_manager.is_running(crawl_id, archive.id_str): + # raise HTTPException(status_code=404, detail="No Such Crawl") + # + # return {"running": True} @app.post( "/archives/{aid}/crawls/{crawl_id}/scale", diff --git a/backend/dockerman.py b/backend/dockerman.py index 0eed7fde..2e1027a0 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -15,6 +15,7 @@ from tempfile import NamedTemporaryFile import aiodocker import aioprocessing +import aioredis from scheduler import run_scheduler @@ -62,6 +63,13 @@ class DockerManager: self.loop.create_task(self.run_event_loop()) self.loop.create_task(self.init_trigger_queue()) self.loop.create_task(self.cleanup_loop()) + self.loop.create_task(self.init_redis(self.redis_url)) + + async def init_redis(self, redis_url): + """ init redis async """ + self.redis = await aioredis.from_url( + redis_url, encoding="utf-8", decode_responses=True + ) # pylint: disable=no-member async def init_trigger_queue(self): @@ -257,6 +265,7 @@ class DockerManager: result = self._make_crawl_for_container(container, "canceled", True) else: result = True + await self._mark_is_stopping(crawl_id) await container.kill(signal="SIGTERM") except aiodocker.exceptions.DockerError as exc: @@ -358,16 +367,16 @@ class DockerManager: if aid and container["Config"]["Labels"]["btrix.archive"] != aid: return None - return self._make_crawl_for_container(container, "running", False, CrawlOut) + stopping = await self._get_is_stopping(crawl_id) + + return self._make_crawl_for_container( + container, "stopping" if stopping else "running", False, CrawlOut + ) # pylint: disable=broad-except except Exception as exc: print(exc, flush=True) return None - async def is_running(self, crawl_id, aid): - """ Return true is crawl with given id is running """ - return await self.get_running_crawl(crawl_id, aid) is not None - async def scale_crawl(self): # job_name, aid, parallelism=1): """ Scale running crawl, currently only supported in k8s""" return "Not Supported" @@ -510,6 +519,14 @@ class DockerManager: ) return results + async def _mark_is_stopping(self, crawl_id): + """ mark crawl as stopping in redis """ + await self.redis.setex(f"{crawl_id}:stop", 600, 1) + + async def _get_is_stopping(self, crawl_id): + """ check redis if crawl is marked for stopping """ + return await self.redis.get(f"{crawl_id}:stop") + async def _is_scheduled_crawl_for_config_running(self, cid): results = await self._list_running_containers( [f"btrix.crawlconfig={cid}", "btrix.run.manual=0"] diff --git a/backend/k8sman.py b/backend/k8sman.py index 8c64f181..5add66c7 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -281,9 +281,10 @@ class K8SManager: ) return [ - self._make_crawl_for_job(job, "running", False, CrawlOut) + self._make_crawl_for_job( + job, "running" if job.status.active else "stopping", False, CrawlOut + ) for job in jobs.items - if job.status.active ] async def init_crawl_screencast(self, crawl_id, aid): @@ -374,8 +375,9 @@ class K8SManager: if not job or job.metadata.labels["btrix.archive"] != aid: return None - if job.status.active: - return self._make_crawl_for_job(job, "running", False, CrawlOut) + return self._make_crawl_for_job( + job, "running" if job.status.active else "stopping", False, CrawlOut + ) # pylint: disable=broad-except except Exception: @@ -383,21 +385,6 @@ class K8SManager: return None - async def is_running(self, job_name, aid): - """ Return true if the specified crawl (by job_name) is running """ - try: - job = await self.batch_api.read_namespaced_job( - name=job_name, namespace=self.namespace - ) - - if not job or job.metadata.labels["btrix.archive"] != aid: - return False - - return True - # pylint: disable=broad-except - except Exception: - return False - async def stop_crawl(self, job_name, aid, graceful=True): """Attempt to stop crawl, either gracefully by issuing a SIGTERM which will attempt to finish current pages