From 66c4e618eb0cb7afa8ab65118f880bbb0aeae839 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 23 Aug 2021 12:25:04 -0700 Subject: [PATCH] crawls work (#1), support for: - canceling a crawl (via sigterm) - stopping a crawl gracefully (via custom exec sigint) --- backend/archives.py | 14 +++- backend/crawlconfigs.py | 62 ++++------------ backend/crawls.py | 107 ++++++++++++++++++++++----- backend/dockerman.py | 14 +--- backend/k8sman.py | 160 +++++++++++++++++++++++++++------------- backend/main.py | 1 - chart/values.yaml | 2 +- 7 files changed, 228 insertions(+), 132 deletions(-) diff --git a/backend/archives.py b/backend/archives.py index 42bf5e8e..7079717e 100644 --- a/backend/archives.py +++ b/backend/archives.py @@ -107,7 +107,7 @@ class ArchiveOps: self.email = email self.router = None - self.archive_dep = None + self.archive_crawl_dep = None async def add_archive(self, archive: Archive): """Add new archive""" @@ -242,6 +242,16 @@ def init_archives_api(app, mdb, users, email, user_dep: User): return archive + async def archive_crawl_dep( + archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) + ): + if not archive.is_crawler(user): + raise HTTPException( + status_code=403, detail="User does not have permission to modify crawls" + ) + + return archive + router = APIRouter( prefix="/archives/{aid}", dependencies=[Depends(archive_dep)], @@ -249,7 +259,7 @@ def init_archives_api(app, mdb, users, email, user_dep: User): ) ops.router = router - ops.archive_dep = archive_dep + ops.archive_crawl_dep = archive_crawl_dep @app.get("/archives", tags=["archives"]) async def get_archives(user: User = Depends(user_dep)): diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index 05b83210..9c4ab011 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -87,10 +87,10 @@ class CrawlConfig(BaseMongoModel): schedule: Optional[str] = "" runNow: Optional[bool] = False - # storageName: Optional[str] = "default" - archive: Optional[str] + user: Optional[str] + config: RawCrawlConfig @@ -122,7 +122,7 @@ class CrawlOps: """Add new crawl config""" data = config.dict() data["archive"] = archive.id - data["user"] = user.id + data["user"] = str(user.id) data["_id"] = str(uuid.uuid4()) result = await self.crawl_configs.insert_one(data) @@ -130,10 +130,8 @@ class CrawlOps: crawlconfig = CrawlConfig.from_dict(data) await self.crawl_manager.add_crawl_config( - userid=str(user.id), - aid=str(archive.id), - storage=archive.storage, crawlconfig=crawlconfig, + storage=archive.storage, extra_crawl_params=self.default_crawl_params, ) return result @@ -144,7 +142,7 @@ class CrawlOps: """ Update existing crawl config""" data = config.dict() data["archive"] = archive.id - data["user"] = user.id + data["user"] = str(user.id) data["_id"] = cid await self.crawl_configs.find_one_and_replace({"_id": cid}, data) @@ -185,9 +183,9 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): router = ops.router - archive_dep = archive_ops.archive_dep + archive_crawl_dep = archive_ops.archive_crawl_dep - async def crawls_dep(cid: str, archive: Archive = Depends(archive_dep)): + async def crawls_dep(cid: str, archive: Archive = Depends(archive_crawl_dep)): crawl_config = await ops.get_crawl_config(cid, archive) if not crawl_config: raise HTTPException( @@ -197,7 +195,7 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): return archive @router.get("") - async def get_crawl_configs(archive: Archive = Depends(archive_dep)): + async def get_crawl_configs(archive: Archive = Depends(archive_crawl_dep)): results = await ops.get_crawl_configs(archive) return {"crawl_configs": [res.serialize() for res in results]} @@ -208,15 +206,9 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): @router.post("/") async def add_crawl_config( config: CrawlConfigIn, - archive: Archive = Depends(archive_dep), + archive: Archive = Depends(archive_crawl_dep), user: User = Depends(user_dep), ): - - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - res = await ops.add_crawl_config(config, archive, user) return {"added": str(res.inserted_id)} @@ -224,15 +216,10 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): async def update_crawl_config( config: CrawlConfigIn, cid: str, - archive: Archive = Depends(archive_dep), + archive: Archive = Depends(archive_crawl_dep), user: User = Depends(user_dep), ): - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - try: await ops.update_crawl_config(cid, config, archive, user) except Exception as e: @@ -244,15 +231,12 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): return {"updated": cid} @router.post("/{cid}/run") - async def run_now( - cid: str, - archive: Archive = Depends(archive_dep), - user: User = Depends(user_dep), - ): + async def run_now(cid: str, archive: Archive = Depends(archive_crawl_dep)): + crawl_config = await ops.get_crawl_config(cid, archive) - if not archive.is_crawler(user): + if not crawl_config: raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" + status_code=404, detail=f"Crawl Config '{cid}' not found" ) crawl_id = None @@ -265,28 +249,14 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): return {"started": crawl_id} @router.delete("") - async def delete_crawl_configs( - archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) - ): - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - + async def delete_crawl_configs(archive: Archive = Depends(archive_crawl_dep)): result = await ops.delete_crawl_configs(archive) return {"deleted": result.deleted_count} @router.delete("/{cid}") async def delete_crawl_config( - cid: str, - archive: Archive = Depends(archive_dep), - user: User = Depends(user_dep), + cid: str, archive: Archive = Depends(archive_crawl_dep) ): - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - result = await ops.delete_crawl_config(cid, archive) if not result or not result.deleted_count: raise HTTPException(status_code=404, detail="Crawl Config Not Found") diff --git a/backend/crawls.py b/backend/crawls.py index f52acef3..d77bdcf1 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -5,49 +5,73 @@ import asyncio from typing import Optional from datetime import datetime +from fastapi import Depends, HTTPException +from pydantic import BaseModel + from db import BaseMongoModel +from archives import Archive + # ============================================================================ -class CrawlComplete(BaseMongoModel): - """ Store State of Completed Crawls """ +class CrawlFinished(BaseMongoModel): + """ Store State of Finished Crawls """ + user: str + aid: str + cid: str + + started: datetime + finished: datetime + + state: str + + filename: Optional[str] + size: Optional[int] + hash: Optional[str] + + +# ============================================================================ +class CrawlCompleteIn(BaseModel): + """ Completed Crawl Webhook POST message """ id: str user: str - aid: Optional[str] - cid: Optional[str] filename: str size: int hash: str - started: Optional[datetime] - finished: Optional[datetime] + completed: Optional[bool] = True # ============================================================================ class CrawlOps: """ Crawl Ops """ - def __init__(self, mdb, crawl_manager, users, archives): + def __init__(self, mdb, crawl_manager, archives): self.crawls = mdb["crawls"] self.crawl_manager = crawl_manager - self.users = users self.archives = archives - async def on_handle_crawl_complete(self, msg: CrawlComplete): + async def on_handle_crawl_complete(self, msg: CrawlCompleteIn): """ Handle completed crawl, add to crawls db collection, also update archive usage """ - if not await self.crawl_manager.validate_crawl_complete(msg): + crawl_finished = await self.crawl_manager.validate_crawl_complete(msg) + if not crawl_finished: print("Not a valid crawl complete msg!", flush=True) return - print(msg, flush=True) - await self.crawls.insert_one(msg.to_dict()) + await self.handle_finished(crawl_finished) - dura = int((msg.finished - msg.started).total_seconds()) + async def handle_finished(self, crawl_finished: CrawlFinished): + """ Add finished crawl to db, increment archive usage """ + await self.crawls.insert_one(crawl_finished.to_dict()) + + print(crawl_finished) + + dura = int((crawl_finished.finished - crawl_finished.started).total_seconds()) print(f"Duration: {dura}", flush=True) - await self.archives.inc_usage(msg.aid, dura) + await self.archives.inc_usage(crawl_finished.aid, dura) async def delete_crawl(self, cid: str, aid: str): """ Delete crawl by id """ @@ -55,14 +79,59 @@ class CrawlOps: # ============================================================================ -def init_crawls_api(app, mdb, crawl_manager, users, archives): +def init_crawls_api(app, mdb, crawl_manager, archives): """ API for crawl management, including crawl done callback""" - ops = CrawlOps(mdb, crawl_manager, users, archives) + ops = CrawlOps(mdb, crawl_manager, archives) - @app.post("/crawls/done") - async def webhook(msg: CrawlComplete): + archive_crawl_dep = archives.archive_crawl_dep + + @app.post("/crawls/done", tags=["crawls"]) + async def crawl_done(msg: CrawlCompleteIn): loop = asyncio.get_running_loop() loop.create_task(ops.on_handle_crawl_complete(msg)) - return {"message": "webhook received"} + return {"success": True} + + @app.delete( + "/archives/{aid}/crawls/{crawl_id}", + tags=["crawls"], + ) + async def crawl_delete_stop(crawl_id, archive: Archive = Depends(archive_crawl_dep)): + try: + crawl_finished = await crawl_manager.stop_crawl( + crawl_id, str(archive.id) + ) + if not crawl_finished: + raise HTTPException( + status_code=404, detail=f"Crawl not found: {crawl_id}" + ) + + await ops.handle_finished(crawl_finished) + except Exception as exc: + # pylint: disable=raise-missing-from + raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}") + + return {"canceled": True} + + @app.post( + "/archives/{aid}/crawls/{crawl_id}/stop", + tags=["crawls"], + ) + async def crawl_graceful_stop( + crawl_id, archive: Archive = Depends(archive_crawl_dep) + ): + try: + canceled = await crawl_manager.stop_crawl_graceful( + crawl_id, str(archive.id) + ) + if not canceled: + raise HTTPException( + status_code=404, detail=f"Crawl not found: {crawl_id}" + ) + + except Exception as exc: + # pylint: disable=raise-missing-from + raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}") + + return {"stopped_gracefully": True} diff --git a/backend/dockerman.py b/backend/dockerman.py index 8dbb73d8..e49b1059 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -6,23 +6,13 @@ class DockerManager: def __init__(self): pass - async def test(): - print("test async", flush=True) - - loop = asyncio.get_running_loop() - loop.create_task(test()) - print("starting") - async def add_crawl_config( self, - userid: str, - aid: str, - storage, crawlconfig, + storage, extra_crawl_params: list = None, ): print("add_crawl_config") - print(storage) print(crawlconfig) - print(aid) + print(storage) print(extra_crawl_params) diff --git a/backend/k8sman.py b/backend/k8sman.py index 4512bcbf..294c4dc4 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -5,6 +5,9 @@ import datetime import json from kubernetes_asyncio import client, config +from kubernetes_asyncio.stream import WsApiClient + +from crawls import CrawlFinished # ============================================================================ @@ -22,6 +25,7 @@ class K8SManager: config.load_incluster_config() self.core_api = client.CoreV1Api() + self.core_api_ws = client.CoreV1Api(api_client=WsApiClient()) self.batch_api = client.BatchV1Api() self.batch_beta_api = client.BatchV1beta1Api() @@ -33,44 +37,16 @@ class K8SManager: # loop = asyncio.get_running_loop() # loop.create_task(self.watch_job_done()) - async def validate_crawl_complete(self, crawlcomplete): - """Ensure the crawlcomplete data is valid (job exists and user matches) - Fill in additional details about the crawl""" - job = await self.batch_api.read_namespaced_job( - name=crawlcomplete.id, namespace=self.namespace - ) - - if not job or job.metadata.labels["btrix.user"] != crawlcomplete.user: - return False - - # job.metadata.annotations = { - # "crawl.size": str(crawlcomplete.size), - # "crawl.filename": crawlcomplete.filename, - # "crawl.hash": crawlcomplete.hash - # } - - # await self.batch_api.patch_namespaced_job( - # name=crawlcomplete.id, namespace=self.namespace, body=job - # ) - - crawlcomplete.started = job.status.start_time.replace(tzinfo=None) - crawlcomplete.aid = job.metadata.labels["btrix.archive"] - crawlcomplete.cid = job.metadata.labels["btrix.crawlconfig"] - crawlcomplete.finished = datetime.datetime.utcnow().replace( - microsecond=0, tzinfo=None - ) - return True - async def add_crawl_config( self, - userid: str, - aid: str, - storage, crawlconfig, + storage, extra_crawl_params: list = None, ): """add new crawl as cron job, store crawl config in configmap""" cid = str(crawlconfig.id) + userid = crawlconfig.user + aid = crawlconfig.archive labels = { "btrix.user": userid, @@ -129,7 +105,7 @@ class K8SManager: cron_job = client.V1beta1CronJob( metadata={ - "name": f"scheduled-crawl-{cid}", + "name": f"crawl-scheduled-{cid}", "namespace": self.namespace, "labels": labels, }, @@ -165,7 +141,7 @@ class K8SManager: return labels = { - "btrix.user": cron_job.metadata.labels["btrix.user"], + "btrix.user": crawlconfig.user, "btrix.archive": crawlconfig.archive, "btrix.crawlconfig": cid, } @@ -199,6 +175,105 @@ class K8SManager: if run_now: await self._create_run_now_job(cron_job) + async def run_crawl_config(self, cid): + """ Run crawl job for cron job based on specified crawlconfig id (cid) """ + print(f"btrix.crawlconfig={cid}") + cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( + namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" + ) + + if len(cron_jobs.items) != 1: + raise Exception("Crawl Config Not Found") + + res = await self._create_run_now_job(cron_jobs.items[0]) + return res.metadata.name + + async def validate_crawl_complete(self, crawlcomplete): + """Ensure the crawlcomplete data is valid (job exists and user matches) + Fill in additional details about the crawl""" + job = await self.batch_api.read_namespaced_job( + name=crawlcomplete.id, namespace=self.namespace + ) + + if not job or job.metadata.labels["btrix.user"] != crawlcomplete.user: + return None + + return CrawlFinished( + id=crawlcomplete.id, + state="complete" if crawlcomplete.completed else "partial_complete", + + user=crawlcomplete.user, + aid=job.metadata.labels["btrix.archive"], + cid=job.metadata.labels["btrix.crawlconfig"], + started=job.status.start_time.replace(tzinfo=None), + finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None), + + filename=crawlcomplete.filename, + size=crawlcomplete.size, + hash=crawlcomplete.hash + ) + + async def stop_crawl(self, job_id, aid): + """ Stop Crawl based on crawl job id """ + job = await self.batch_api.read_namespaced_job( + name=job_id, namespace=self.namespace + ) + + if not job or job.metadata.labels["btrix.archive"] != aid: + return None + + await self.batch_api.delete_namespaced_job( + name=job_id, namespace=self.namespace, + grace_period_seconds=10, + propagation_policy="Foreground", + ) + + return CrawlFinished( + id=job_id, + state="canceled", + + user=job.metadata.labels["btrix.user"], + aid=job.metadata.labels["btrix.archive"], + cid=job.metadata.labels["btrix.crawlconfig"], + started=job.status.start_time.replace(tzinfo=None), + finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None), + ) + + async def stop_crawl_graceful(self, job_name, aid): + """ Attempt to gracefully stop crawl by sending a SIGINT to the pod(s)""" + + pods = await self.core_api.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"job-name={job_name},btrix.archive={aid}", + ) + + command = ["kill", "-s", "SIGINT", "1"] + interrupted = False + + for pod in pods.items: + if pod.metadata.labels["btrix.archive"] != aid: + print("wrong archive") + continue + + await self.core_api_ws.connect_get_namespaced_pod_exec( + pod.metadata.name, namespace=self.namespace, command=command, + stdout=True + ) + interrupted = True + + return interrupted + + async def delete_crawl_configs_for_archive(self, archive): + """Delete all crawl configs for given archive""" + return await self._delete_crawl_configs(f"btrix.archive={archive}") + + async def delete_crawl_config_by_id(self, cid): + """Delete all crawl configs by id""" + return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}") + + # ======================================================================== + # Internal Methods + def _create_config_map(self, crawlconfig, labels): """ Create Config Map based on CrawlConfig + labels """ config_map = client.V1ConfigMap( @@ -212,7 +287,7 @@ class K8SManager: return config_map - #pylint: disable=no-self-use + # pylint: disable=no-self-use def _get_schedule_suspend_run_now(self, crawlconfig): """ get schedule/suspend/run_now data based on crawlconfig """ @@ -230,14 +305,6 @@ class K8SManager: return suspend, schedule, run_now - async def delete_crawl_configs_for_archive(self, archive): - """Delete all crawl configs for given archive""" - return await self._delete_crawl_configs(f"btrix.archive={archive}") - - async def delete_crawl_config_by_id(self, cid): - """Delete all crawl configs by id""" - return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}") - async def _delete_crawl_configs(self, label): """Delete Crawl Cron Job and all dependent resources, including configmap and secrets""" @@ -259,15 +326,6 @@ class K8SManager: propagation_policy="Foreground", ) - async def run_crawl_config(self, cid): - """ Run crawl job for cron job based on specified crawlconfig id (cid) """ - cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( - namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" - ) - - res = await self._create_run_now_job(cron_jobs.items[0]) - return res.metadata.name - async def _create_run_now_job(self, cron_job): """Create new job from cron job to run instantly""" annotations = {} diff --git a/backend/main.py b/backend/main.py index 72dca1ef..6db70b87 100644 --- a/backend/main.py +++ b/backend/main.py @@ -76,7 +76,6 @@ class BrowsertrixAPI: self.app, self.mdb, self.crawl_manager, - self.fastapi_users.db, self.archive_ops, ) diff --git a/chart/values.yaml b/chart/values.yaml index 36cfa980..d6b1af98 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -36,7 +36,7 @@ mongo_auth: # Crawler Image # ========================================= -crawler_image: "webrecorder/browsertrix-crawler:0.4.4" +crawler_image: "webrecorder/browsertrix-crawler:latest" crawler_pull_policy: "Never" crawler_namespace: "crawlers"