backend: use redis to mark crawls as canceled immediately, avoid dupes in crawl list (even if paging is added for db results)
This commit is contained in:
parent
4b7522920a
commit
8acb43b171
@ -330,9 +330,7 @@ class CrawlOps:
|
|||||||
crawl = CrawlOut.from_dict(res)
|
crawl = CrawlOut.from_dict(res)
|
||||||
|
|
||||||
if not crawl:
|
if not crawl:
|
||||||
raise HTTPException(
|
raise HTTPException(status_code=404, detail=f"Crawl not found: {crawlid}")
|
||||||
status_code=404, detail=f"Crawl not found: {crawlid}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return await self._resolve_crawl_refs(crawl, archive)
|
return await self._resolve_crawl_refs(crawl, archive)
|
||||||
|
|
||||||
|
|||||||
@ -258,9 +258,10 @@ class DockerManager:
|
|||||||
if not graceful:
|
if not graceful:
|
||||||
await container.kill(signal="SIGABRT")
|
await container.kill(signal="SIGABRT")
|
||||||
result = self._make_crawl_for_container(container, "canceled", True)
|
result = self._make_crawl_for_container(container, "canceled", True)
|
||||||
|
await self._mark_is_stopping(crawl_id, "canceled")
|
||||||
else:
|
else:
|
||||||
result = True
|
result = True
|
||||||
await self._mark_is_stopping(crawl_id)
|
await self._mark_is_stopping(crawl_id, "stopping")
|
||||||
|
|
||||||
await container.kill(signal="SIGTERM")
|
await container.kill(signal="SIGTERM")
|
||||||
except aiodocker.exceptions.DockerError as exc:
|
except aiodocker.exceptions.DockerError as exc:
|
||||||
@ -365,10 +366,12 @@ class DockerManager:
|
|||||||
if aid and container["Config"]["Labels"]["btrix.archive"] != aid:
|
if aid and container["Config"]["Labels"]["btrix.archive"] != aid:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
stopping = await self._get_is_stopping(crawl_id)
|
stop_type = await self._get_is_stopping(crawl_id)
|
||||||
|
if stop_type == "canceled":
|
||||||
|
return None
|
||||||
|
|
||||||
return self._make_crawl_for_container(
|
return self._make_crawl_for_container(
|
||||||
container, "stopping" if stopping else "running", False, CrawlOut
|
container, "stopping" if stop_type else "running", False, CrawlOut
|
||||||
)
|
)
|
||||||
# pylint: disable=broad-except
|
# pylint: disable=broad-except
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
@ -528,9 +531,9 @@ class DockerManager:
|
|||||||
)
|
)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
async def _mark_is_stopping(self, crawl_id):
|
async def _mark_is_stopping(self, crawl_id, stop_type):
|
||||||
""" mark crawl as stopping in redis """
|
""" mark crawl as stopping in redis """
|
||||||
await self.redis.setex(f"{crawl_id}:stop", 600, 1)
|
await self.redis.setex(f"{crawl_id}:stop", 600, stop_type)
|
||||||
|
|
||||||
async def _get_is_stopping(self, crawl_id):
|
async def _get_is_stopping(self, crawl_id):
|
||||||
""" check redis if crawl is marked for stopping """
|
""" check redis if crawl is marked for stopping """
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import datetime
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
import base64
|
import base64
|
||||||
|
import aioredis
|
||||||
|
|
||||||
from kubernetes_asyncio import client, config, watch
|
from kubernetes_asyncio import client, config, watch
|
||||||
from kubernetes_asyncio.stream import WsApiClient
|
from kubernetes_asyncio.stream import WsApiClient
|
||||||
@ -45,8 +46,17 @@ class K8SManager:
|
|||||||
|
|
||||||
self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
|
self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
|
||||||
|
|
||||||
|
self.redis_url = os.environ["REDIS_URL"]
|
||||||
|
|
||||||
self.loop = asyncio.get_running_loop()
|
self.loop = asyncio.get_running_loop()
|
||||||
self.loop.create_task(self.run_event_loop())
|
self.loop.create_task(self.run_event_loop())
|
||||||
|
self.loop.create_task(self.init_redis(self.redis_url))
|
||||||
|
|
||||||
|
async def init_redis(self, redis_url):
|
||||||
|
""" init redis async """
|
||||||
|
self.redis = await aioredis.from_url(
|
||||||
|
redis_url, encoding="utf-8", decode_responses=True
|
||||||
|
)
|
||||||
|
|
||||||
def set_crawl_ops(self, ops):
|
def set_crawl_ops(self, ops):
|
||||||
""" Set crawl ops handler """
|
""" Set crawl ops handler """
|
||||||
@ -276,7 +286,7 @@ class K8SManager:
|
|||||||
crawls = []
|
crawls = []
|
||||||
|
|
||||||
for job in jobs.items:
|
for job in jobs.items:
|
||||||
status = self._get_crawl_state(job)
|
status = await self._get_crawl_state(job)
|
||||||
if not status:
|
if not status:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -392,7 +402,7 @@ class K8SManager:
|
|||||||
if not job or job.metadata.labels["btrix.archive"] != aid:
|
if not job or job.metadata.labels["btrix.archive"] != aid:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
status = self._get_crawl_state(job)
|
status = await self._get_crawl_state(job)
|
||||||
if not status:
|
if not status:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -429,6 +439,7 @@ class K8SManager:
|
|||||||
await self._send_sig_to_pods(pods.items, aid)
|
await self._send_sig_to_pods(pods.items, aid)
|
||||||
|
|
||||||
result = self._make_crawl_for_job(job, "canceled", True)
|
result = self._make_crawl_for_job(job, "canceled", True)
|
||||||
|
await self.redis.setex(f"{job_name}:stop", 300, "canceled")
|
||||||
else:
|
else:
|
||||||
result = True
|
result = True
|
||||||
|
|
||||||
@ -492,7 +503,7 @@ class K8SManager:
|
|||||||
# ========================================================================
|
# ========================================================================
|
||||||
# Internal Methods
|
# Internal Methods
|
||||||
|
|
||||||
def _get_crawl_state(self, job):
|
async def _get_crawl_state(self, job):
|
||||||
if job.status.active:
|
if job.status.active:
|
||||||
return "running"
|
return "running"
|
||||||
|
|
||||||
@ -500,12 +511,13 @@ class K8SManager:
|
|||||||
finished = (job.status.succeeded or 0) + (job.status.failed or 0)
|
finished = (job.status.succeeded or 0) + (job.status.failed or 0)
|
||||||
total = job.spec.parallelism or 1
|
total = job.spec.parallelism or 1
|
||||||
if finished != total:
|
if finished != total:
|
||||||
|
# don't return anything if marked as cancel
|
||||||
|
if await self.redis.get(f"{job.metadata.name}:stop") != "canceled":
|
||||||
return "stopping"
|
return "stopping"
|
||||||
|
|
||||||
# job fully done, do not treat as running or stopping
|
# job fully done, do not treat as running or stopping
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=no-self-use
|
# pylint: disable=no-self-use
|
||||||
def _make_crawl_for_job(self, job, state, finish_now=False, crawl_cls=Crawl):
|
def _make_crawl_for_job(self, job, state, finish_now=False, crawl_cls=Crawl):
|
||||||
""" Make a crawl object from a job"""
|
""" Make a crawl object from a job"""
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user