crawls work (#1):

- support listing existing crawls
- add 'schedule' and 'manual' annotations to jobs, store in Crawl obj
- ensure manual jobs are deleted when completed
- support deleting crawls by id (but not data)
- rename running crawl delete to '/cancel'

change paths for local minio/mongo to /tmp
This commit is contained in:
Ilya Kreymer 2021-08-23 17:57:16 -07:00
parent 66c4e618eb
commit 7146e054a4
5 changed files with 148 additions and 54 deletions

View File

@ -226,7 +226,6 @@ class ArchiveOps:
res = await self.archives.find_one_and_update(
{"_id": aid}, {"$inc": {f"usage.{yymm}": amount}}
)
print(res, flush=True)
return res is not None

View File

@ -2,7 +2,7 @@
import asyncio
from typing import Optional
from typing import Optional, List
from datetime import datetime
from fastapi import Depends, HTTPException
@ -13,15 +13,25 @@ from archives import Archive
# ============================================================================
class CrawlFinished(BaseMongoModel):
""" Store State of Finished Crawls """
class DeleteCrawlList(BaseModel):
""" delete crawl list POST body """
crawl_ids: List[str]
# ============================================================================
class Crawl(BaseMongoModel):
""" Store State of a Crawl (Finished or Running) """
user: str
aid: str
cid: str
schedule: Optional[str]
manual: Optional[bool]
started: datetime
finished: datetime
finished: Optional[datetime]
state: str
@ -33,6 +43,7 @@ class CrawlFinished(BaseMongoModel):
# ============================================================================
class CrawlCompleteIn(BaseModel):
""" Completed Crawl Webhook POST message """
id: str
user: str
@ -55,27 +66,37 @@ class CrawlOps:
async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
""" Handle completed crawl, add to crawls db collection, also update archive usage """
crawl_finished = await self.crawl_manager.validate_crawl_complete(msg)
if not crawl_finished:
crawl = await self.crawl_manager.validate_crawl_complete(msg)
if not crawl:
print("Not a valid crawl complete msg!", flush=True)
return
await self.handle_finished(crawl_finished)
await self.handle_finished(crawl)
async def handle_finished(self, crawl_finished: CrawlFinished):
async def handle_finished(self, crawl: Crawl):
""" Add finished crawl to db, increment archive usage """
await self.crawls.insert_one(crawl_finished.to_dict())
await self.crawls.insert_one(crawl.to_dict())
print(crawl_finished)
dura = int((crawl.finished - crawl.started).total_seconds())
dura = int((crawl_finished.finished - crawl_finished.started).total_seconds())
await self.archives.inc_usage(crawl.aid, dura)
print(f"Duration: {dura}", flush=True)
await self.archives.inc_usage(crawl_finished.aid, dura)
async def list_crawls(self, aid: str, cid: str = None):
"""Get all crawl configs for an archive is a member of"""
query = {"aid": aid}
if cid:
query["cid"] = cid
async def delete_crawl(self, cid: str, aid: str):
""" Delete crawl by id """
return await self.crawls.delete_one({"_id": cid, "aid": aid})
cursor = self.crawls.find(query)
results = await cursor.to_list(length=1000)
return [Crawl.from_dict(res) for res in results]
async def delete_crawls(self, aid: str, delete_list: DeleteCrawlList):
""" Delete a list of crawls by id for given archive """
res = await self.crawls.delete_many(
{"_id": {"$in": delete_list.crawl_ids}, "aid": aid}
)
return res.deleted_count
# ============================================================================
@ -93,21 +114,37 @@ def init_crawls_api(app, mdb, crawl_manager, archives):
return {"success": True}
@app.delete(
"/archives/{aid}/crawls/{crawl_id}",
@app.get("/archives/{aid}/crawls", tags=["crawls"])
async def list_crawls(archive: Archive = Depends(archive_crawl_dep)):
aid = str(archive.id)
running_crawls = await crawl_manager.list_running_crawls(aid=aid)
finished_crawls = await ops.list_crawls(aid)
return {
"running": [
crawl.dict(exclude_none=True, exclude_unset=True)
for crawl in running_crawls
],
"finished": finished_crawls,
}
@app.post(
"/archives/{aid}/crawls/{crawl_id}/cancel",
tags=["crawls"],
)
async def crawl_delete_stop(crawl_id, archive: Archive = Depends(archive_crawl_dep)):
async def crawl_cancel_stop(
crawl_id, archive: Archive = Depends(archive_crawl_dep)
):
try:
crawl_finished = await crawl_manager.stop_crawl(
crawl_id, str(archive.id)
)
if not crawl_finished:
crawl = await crawl_manager.stop_crawl(crawl_id, archive.id)
if not crawl:
raise HTTPException(
status_code=404, detail=f"Crawl not found: {crawl_id}"
)
await ops.handle_finished(crawl_finished)
await ops.handle_finished(crawl)
except Exception as exc:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}")
@ -135,3 +172,10 @@ def init_crawls_api(app, mdb, crawl_manager, archives):
raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}")
return {"stopped_gracefully": True}
@app.post("/archives/{aid}/crawls/delete", tags=["crawls"])
async def delete_crawls(
delete_list: DeleteCrawlList, archive: Archive = Depends(archive_crawl_dep)
):
res = await ops.delete_crawls(archive.id, delete_list)
return {"deleted": res}

View File

@ -7,7 +7,7 @@ import json
from kubernetes_asyncio import client, config
from kubernetes_asyncio.stream import WsApiClient
from crawls import CrawlFinished
from crawls import Crawl
# ============================================================================
@ -88,11 +88,15 @@ class K8SManager:
# Create Cron Job
annotations = {"btrix.run.schedule": crawlconfig.schedule}
suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig)
extra_crawl_params = extra_crawl_params or []
job_template = self._get_job_template(cid, labels, extra_crawl_params)
job_template = self._get_job_template(
cid, labels, annotations, extra_crawl_params
)
spec = client.V1beta1CronJobSpec(
schedule=schedule,
@ -137,7 +141,6 @@ class K8SManager:
cron_job = cron_jobs.items[0]
if crawlconfig.archive != cron_job.metadata.labels["btrix.archive"]:
print("wrong archive")
return
labels = {
@ -167,6 +170,10 @@ class K8SManager:
changed = True
if changed:
cron_job.spec.job_template.metadata.annotations[
"btrix.run.schedule"
] = crawlconfig.schedule
await self.batch_beta_api.patch_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace, body=cron_job
)
@ -177,7 +184,6 @@ class K8SManager:
async def run_crawl_config(self, cid):
""" Run crawl job for cron job based on specified crawlconfig id (cid) """
print(f"btrix.crawlconfig={cid}")
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
@ -188,6 +194,38 @@ class K8SManager:
res = await self._create_run_now_job(cron_jobs.items[0])
return res.metadata.name
async def list_running_crawls(self, cid=None, aid=None, userid=None):
""" Return a list of running crawls """
filters = []
if cid:
filters.append(f"btrix.crawlconfig={cid}")
if aid:
filters.append(f"btrix.archive={aid}")
if userid:
filters.append(f"btrix.user={userid}")
jobs = await self.batch_api.list_namespaced_job(
namespace=self.namespace,
label_selector=",".join(filters),
field_selector="status.successful=0",
)
return [
Crawl(
id=job.metadata.name,
state="running",
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None),
)
for job in jobs.items
]
async def validate_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches)
Fill in additional details about the crawl"""
@ -198,19 +236,28 @@ class K8SManager:
if not job or job.metadata.labels["btrix.user"] != crawlcomplete.user:
return None
return CrawlFinished(
manual = job.metadata.annotations.get("btrix.run.manual") == "1"
if not manual:
await self.batch_api.delete_namespaced_job(
name=job.metadata.name,
namespace=self.namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
)
return Crawl(
id=crawlcomplete.id,
state="complete" if crawlcomplete.completed else "partial_complete",
user=crawlcomplete.user,
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=manual,
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None),
filename=crawlcomplete.filename,
size=crawlcomplete.size,
hash=crawlcomplete.hash
hash=crawlcomplete.hash,
)
async def stop_crawl(self, job_id, aid):
@ -223,18 +270,20 @@ class K8SManager:
return None
await self.batch_api.delete_namespaced_job(
name=job_id, namespace=self.namespace,
name=job_id,
namespace=self.namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
)
return CrawlFinished(
return Crawl(
id=job_id,
state="canceled",
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None),
)
@ -252,12 +301,13 @@ class K8SManager:
for pod in pods.items:
if pod.metadata.labels["btrix.archive"] != aid:
print("wrong archive")
continue
await self.core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name, namespace=self.namespace, command=command,
stdout=True
pod.metadata.name,
namespace=self.namespace,
command=command,
stdout=True,
)
interrupted = True
@ -328,17 +378,17 @@ class K8SManager:
async def _create_run_now_job(self, cron_job):
"""Create new job from cron job to run instantly"""
annotations = {}
annotations["cronjob.kubernetes.io/instantiate"] = "manual"
annotations = cron_job.spec.job_template.metadata.annotations
annotations["btrix.run.manual"] = "1"
owner_ref = client.V1OwnerReference(
kind="CronJob",
name=cron_job.metadata.name,
block_owner_deletion=True,
controller=True,
uid=cron_job.metadata.uid,
api_version="batch/v1beta1",
)
# owner_ref = client.V1OwnerReference(
# kind="CronJob",
# name=cron_job.metadata.name,
# block_owner_deletion=True,
# controller=True,
# uid=cron_job.metadata.uid,
# api_version="batch/v1beta1",
# )
ts_now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
name = f"crawl-now-{ts_now}-{cron_job.metadata.labels['btrix.crawlconfig']}"
@ -347,7 +397,7 @@ class K8SManager:
name=name,
annotations=annotations,
labels=cron_job.metadata.labels,
owner_references=[owner_ref],
# owner_references=[owner_ref],
)
job = client.V1Job(
@ -361,7 +411,7 @@ class K8SManager:
body=job, namespace=self.namespace
)
def _get_job_template(self, uid, labels, extra_crawl_params):
def _get_job_template(self, uid, labels, annotations, extra_crawl_params):
"""Return crawl job template for crawl job, including labels, adding optiona crawl params"""
command = ["crawl", "--config", "/tmp/crawl-config.json"]
@ -387,6 +437,7 @@ class K8SManager:
}
return {
"metadata": {"annotations": annotations},
"spec": {
"template": {
"metadata": {"labels": labels},
@ -438,5 +489,5 @@ class K8SManager:
"restartPolicy": "OnFailure",
},
}
}
},
}

View File

@ -21,7 +21,7 @@ spec:
volumes:
- name: data-storage
hostPath:
path: /browsertrix-minio-data
path: /tmp/browsertrix-minio-data
type: DirectoryOrCreate
containers:

View File

@ -33,7 +33,7 @@ spec:
volumes:
- name: data-db
hostPath:
path: /browsertrix-mongo-data
path: /tmp/browsertrix-mongo-data
type: DirectoryOrCreate
containers: