backend fixes: fix graceful stop + stats (#122)
* backend fixes: fix graceful stop + stats - use redis to track stopping state, to be overwritten when finished - also include stats in completed crawls - docker: use short container id for crawl id - graceful stop returns 'stopping_gracefully' instead of 'stopped_gracefully' - don't set stopping state when complete! - beginning files support: resolve absolute urls for crawl detail (not pre-signing yet)
This commit is contained in:
parent
be4bf3742f
commit
542680daf7
@ -184,6 +184,8 @@ class CrawlOps:
|
|||||||
"""Add finished crawl to db, increment archive usage.
|
"""Add finished crawl to db, increment archive usage.
|
||||||
If crawl file provided, update and add file"""
|
If crawl file provided, update and add file"""
|
||||||
if crawl_file:
|
if crawl_file:
|
||||||
|
await self.get_redis_stats([crawl], False)
|
||||||
|
|
||||||
crawl_update = {
|
crawl_update = {
|
||||||
"$set": crawl.to_dict(exclude={"files", "completions"}),
|
"$set": crawl.to_dict(exclude={"files", "completions"}),
|
||||||
"$push": {"files": crawl_file.dict()},
|
"$push": {"files": crawl_file.dict()},
|
||||||
@ -205,10 +207,6 @@ class CrawlOps:
|
|||||||
# print(f"Crawl Already Added: {crawl.id} - {crawl.state}")
|
# print(f"Crawl Already Added: {crawl.id} - {crawl.state}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if crawl.state == "stopping":
|
|
||||||
print("Stopping Crawl...", flush=True)
|
|
||||||
return True
|
|
||||||
|
|
||||||
dura = int((crawl.finished - crawl.started).total_seconds())
|
dura = int((crawl.finished - crawl.started).total_seconds())
|
||||||
|
|
||||||
print(f"Duration: {dura}", flush=True)
|
print(f"Duration: {dura}", flush=True)
|
||||||
@ -282,7 +280,7 @@ class CrawlOps:
|
|||||||
aid=archive.id_str
|
aid=archive.id_str
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.get_redis_stats(running_crawls)
|
await self.get_redis_stats(running_crawls, True)
|
||||||
|
|
||||||
finished_crawls = await self.list_finished_crawls(
|
finished_crawls = await self.list_finished_crawls(
|
||||||
aid=archive.id, exclude_files=True
|
aid=archive.id, exclude_files=True
|
||||||
@ -300,17 +298,20 @@ class CrawlOps:
|
|||||||
|
|
||||||
async def get_crawl(self, crawlid: str, archive: Archive):
|
async def get_crawl(self, crawlid: str, archive: Archive):
|
||||||
""" Get data for single crawl """
|
""" Get data for single crawl """
|
||||||
res = await self.crawls.find_one({"_id": crawlid, "aid": archive.id})
|
crawl = await self.crawl_manager.get_running_crawl(crawlid, archive.id_str)
|
||||||
if not res:
|
if crawl:
|
||||||
crawl = await self.crawl_manager.get_running_crawl(crawlid, archive.id_str)
|
await self.get_redis_stats([crawl], True)
|
||||||
if crawl:
|
|
||||||
await self.get_redis_stats([crawl])
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
res = await self.crawls.find_one({"_id": crawlid, "aid": archive.id})
|
||||||
|
if not res:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404, detail=f"Crawl not found: {crawlid}"
|
||||||
|
)
|
||||||
|
|
||||||
crawl = CrawlOut.from_dict(res)
|
crawl = CrawlOut.from_dict(res)
|
||||||
|
|
||||||
if not crawl:
|
await self._resolve_filenames(crawl)
|
||||||
raise HTTPException(status_code=404, detail=f"Crawl not found: {crawlid}")
|
|
||||||
|
|
||||||
return await self._resolve_crawl(crawl, archive)
|
return await self._resolve_crawl(crawl, archive)
|
||||||
|
|
||||||
@ -327,26 +328,48 @@ class CrawlOps:
|
|||||||
|
|
||||||
return crawl
|
return crawl
|
||||||
|
|
||||||
|
async def _resolve_filenames(self, crawl: CrawlOut):
|
||||||
|
""" Resolve absolute filenames for each file """
|
||||||
|
if not crawl.files:
|
||||||
|
return
|
||||||
|
|
||||||
|
for file_ in crawl.files:
|
||||||
|
if file_.def_storage_name:
|
||||||
|
storage_prefix = (
|
||||||
|
await self.crawl_manager.get_default_storage_access_endpoint(
|
||||||
|
file_.def_storage_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
file_.filename = storage_prefix + file_.filename
|
||||||
|
|
||||||
# pylint: disable=too-many-arguments
|
# pylint: disable=too-many-arguments
|
||||||
async def get_redis_stats(self, crawl_list):
|
async def get_redis_stats(self, crawl_list, set_stopping=False):
|
||||||
""" Add additional live crawl stats from redis """
|
""" Add additional live crawl stats from redis """
|
||||||
results = None
|
results = None
|
||||||
|
|
||||||
def pairwise(iterable):
|
def pairwise(iterable):
|
||||||
val = iter(iterable)
|
val = iter(iterable)
|
||||||
return zip(val, val)
|
return zip(val, val, val)
|
||||||
|
|
||||||
async with self.redis.pipeline(transaction=True) as pipe:
|
async with self.redis.pipeline(transaction=True) as pipe:
|
||||||
for crawl in crawl_list:
|
for crawl in crawl_list:
|
||||||
key = crawl.id
|
key = crawl.id
|
||||||
pipe.llen(f"{key}:d")
|
pipe.llen(f"{key}:d")
|
||||||
pipe.scard(f"{key}:s")
|
pipe.scard(f"{key}:s")
|
||||||
|
pipe.get(f"{key}:stop")
|
||||||
|
|
||||||
results = await pipe.execute()
|
results = await pipe.execute()
|
||||||
|
|
||||||
for crawl, (done, total) in zip(crawl_list, pairwise(results)):
|
for crawl, (done, total, stopping) in zip(crawl_list, pairwise(results)):
|
||||||
|
if set_stopping and stopping:
|
||||||
|
crawl.state = "stopping"
|
||||||
|
|
||||||
crawl.stats = {"done": done, "found": total}
|
crawl.stats = {"done": done, "found": total}
|
||||||
|
|
||||||
|
async def mark_stopping(self, crawl_id):
|
||||||
|
""" Mark crawl as in process of stopping in redis """
|
||||||
|
await self.redis.setex(f"{crawl_id}:stop", 600, 1)
|
||||||
|
|
||||||
async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList):
|
async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList):
|
||||||
""" Delete a list of crawls by id for given archive """
|
""" Delete a list of crawls by id for given archive """
|
||||||
res = await self.crawls.delete_many(
|
res = await self.crawls.delete_many(
|
||||||
@ -401,9 +424,9 @@ def init_crawls_api(
|
|||||||
async def crawl_graceful_stop(
|
async def crawl_graceful_stop(
|
||||||
crawl_id, archive: Archive = Depends(archive_crawl_dep)
|
crawl_id, archive: Archive = Depends(archive_crawl_dep)
|
||||||
):
|
):
|
||||||
crawl = None
|
stopping = False
|
||||||
try:
|
try:
|
||||||
crawl = await crawl_manager.stop_crawl(
|
stopping = await crawl_manager.stop_crawl(
|
||||||
crawl_id, archive.id_str, graceful=True
|
crawl_id, archive.id_str, graceful=True
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -411,12 +434,12 @@ def init_crawls_api(
|
|||||||
# pylint: disable=raise-missing-from
|
# pylint: disable=raise-missing-from
|
||||||
raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}")
|
raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}")
|
||||||
|
|
||||||
if not crawl:
|
if not stopping:
|
||||||
raise HTTPException(status_code=404, detail=f"Crawl not found: {crawl_id}")
|
raise HTTPException(status_code=404, detail=f"Crawl not found: {crawl_id}")
|
||||||
|
|
||||||
await ops.store_crawl(crawl)
|
await ops.mark_stopping(crawl_id)
|
||||||
|
|
||||||
return {"stopped_gracefully": True}
|
return {"stopping_gracefully": True}
|
||||||
|
|
||||||
@app.post("/archives/{aid}/crawls/delete", tags=["crawls"])
|
@app.post("/archives/{aid}/crawls/delete", tags=["crawls"])
|
||||||
async def delete_crawls(
|
async def delete_crawls(
|
||||||
|
@ -234,7 +234,7 @@ class DockerManager:
|
|||||||
running = []
|
running = []
|
||||||
|
|
||||||
for container in containers:
|
for container in containers:
|
||||||
crawl = await self.get_running_crawl(container["Id"], aid)
|
crawl = await self.get_running_crawl(container["Id"][:12], aid)
|
||||||
if crawl:
|
if crawl:
|
||||||
running.append(crawl)
|
running.append(crawl)
|
||||||
|
|
||||||
@ -502,7 +502,7 @@ class DockerManager:
|
|||||||
}
|
}
|
||||||
|
|
||||||
container = await self.client.containers.run(run_config)
|
container = await self.client.containers.run(run_config)
|
||||||
return container["id"]
|
return container["id"][:12]
|
||||||
|
|
||||||
async def _list_running_containers(self, labels):
|
async def _list_running_containers(self, labels):
|
||||||
results = await self.client.containers.list(
|
results = await self.client.containers.list(
|
||||||
@ -536,7 +536,7 @@ class DockerManager:
|
|||||||
labels = container["Config"]["Labels"]
|
labels = container["Config"]["Labels"]
|
||||||
|
|
||||||
return crawl_cls(
|
return crawl_cls(
|
||||||
id=container["Id"],
|
id=container["Id"][:12],
|
||||||
state=state,
|
state=state,
|
||||||
userid=labels["btrix.user"],
|
userid=labels["btrix.user"],
|
||||||
aid=labels["btrix.archive"],
|
aid=labels["btrix.archive"],
|
||||||
|
@ -424,7 +424,7 @@ class K8SManager:
|
|||||||
|
|
||||||
result = self._make_crawl_for_job(job, "canceled", True)
|
result = self._make_crawl_for_job(job, "canceled", True)
|
||||||
else:
|
else:
|
||||||
result = self._make_crawl_for_job(job, "stopping", False)
|
result = True
|
||||||
|
|
||||||
await self._delete_job(job_name)
|
await self._delete_job(job_name)
|
||||||
|
|
||||||
|
@ -21,9 +21,9 @@ data:
|
|||||||
|
|
||||||
REDIS_CRAWLS_DONE_KEY: "crawls-done"
|
REDIS_CRAWLS_DONE_KEY: "crawls-done"
|
||||||
|
|
||||||
NO_DELETE_JOBS: "{{ .Values.no_delete_jobs | default '0' }}"
|
NO_DELETE_JOBS: "{{ .Values.no_delete_jobs | default 0 }}"
|
||||||
|
|
||||||
REGISTRATION_ENABLED: "{{ .Values.registration_enabled | default '0' }}"
|
REGISTRATION_ENABLED: "{{ .Values.registration_enabled | default 0 }}"
|
||||||
|
|
||||||
JWT_TOKEN_LIFETIME_MINUTES: "{{ .Values.jwt_token_lifetime_minutes | default 60 }}"
|
JWT_TOKEN_LIFETIME_MINUTES: "{{ .Values.jwt_token_lifetime_minutes | default 60 }}"
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user