crawls work (#1), support for:

- canceling a crawl (via sigterm)
- stopping a crawl gracefully (via custom exec sigint)
This commit is contained in:
Ilya Kreymer 2021-08-23 12:25:04 -07:00
parent a8255a76b2
commit 66c4e618eb
7 changed files with 228 additions and 132 deletions

View File

@ -107,7 +107,7 @@ class ArchiveOps:
self.email = email
self.router = None
self.archive_dep = None
self.archive_crawl_dep = None
async def add_archive(self, archive: Archive):
"""Add new archive"""
@ -242,6 +242,16 @@ def init_archives_api(app, mdb, users, email, user_dep: User):
return archive
async def archive_crawl_dep(
archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
return archive
router = APIRouter(
prefix="/archives/{aid}",
dependencies=[Depends(archive_dep)],
@ -249,7 +259,7 @@ def init_archives_api(app, mdb, users, email, user_dep: User):
)
ops.router = router
ops.archive_dep = archive_dep
ops.archive_crawl_dep = archive_crawl_dep
@app.get("/archives", tags=["archives"])
async def get_archives(user: User = Depends(user_dep)):

View File

@ -87,10 +87,10 @@ class CrawlConfig(BaseMongoModel):
schedule: Optional[str] = ""
runNow: Optional[bool] = False
# storageName: Optional[str] = "default"
archive: Optional[str]
user: Optional[str]
config: RawCrawlConfig
@ -122,7 +122,7 @@ class CrawlOps:
"""Add new crawl config"""
data = config.dict()
data["archive"] = archive.id
data["user"] = user.id
data["user"] = str(user.id)
data["_id"] = str(uuid.uuid4())
result = await self.crawl_configs.insert_one(data)
@ -130,10 +130,8 @@ class CrawlOps:
crawlconfig = CrawlConfig.from_dict(data)
await self.crawl_manager.add_crawl_config(
userid=str(user.id),
aid=str(archive.id),
storage=archive.storage,
crawlconfig=crawlconfig,
storage=archive.storage,
extra_crawl_params=self.default_crawl_params,
)
return result
@ -144,7 +142,7 @@ class CrawlOps:
""" Update existing crawl config"""
data = config.dict()
data["archive"] = archive.id
data["user"] = user.id
data["user"] = str(user.id)
data["_id"] = cid
await self.crawl_configs.find_one_and_replace({"_id": cid}, data)
@ -185,9 +183,9 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
router = ops.router
archive_dep = archive_ops.archive_dep
archive_crawl_dep = archive_ops.archive_crawl_dep
async def crawls_dep(cid: str, archive: Archive = Depends(archive_dep)):
async def crawls_dep(cid: str, archive: Archive = Depends(archive_crawl_dep)):
crawl_config = await ops.get_crawl_config(cid, archive)
if not crawl_config:
raise HTTPException(
@ -197,7 +195,7 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
return archive
@router.get("")
async def get_crawl_configs(archive: Archive = Depends(archive_dep)):
async def get_crawl_configs(archive: Archive = Depends(archive_crawl_dep)):
results = await ops.get_crawl_configs(archive)
return {"crawl_configs": [res.serialize() for res in results]}
@ -208,15 +206,9 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
@router.post("/")
async def add_crawl_config(
config: CrawlConfigIn,
archive: Archive = Depends(archive_dep),
archive: Archive = Depends(archive_crawl_dep),
user: User = Depends(user_dep),
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
res = await ops.add_crawl_config(config, archive, user)
return {"added": str(res.inserted_id)}
@ -224,15 +216,10 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
async def update_crawl_config(
config: CrawlConfigIn,
cid: str,
archive: Archive = Depends(archive_dep),
archive: Archive = Depends(archive_crawl_dep),
user: User = Depends(user_dep),
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
try:
await ops.update_crawl_config(cid, config, archive, user)
except Exception as e:
@ -244,15 +231,12 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
return {"updated": cid}
@router.post("/{cid}/run")
async def run_now(
cid: str,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
):
async def run_now(cid: str, archive: Archive = Depends(archive_crawl_dep)):
crawl_config = await ops.get_crawl_config(cid, archive)
if not archive.is_crawler(user):
if not crawl_config:
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
crawl_id = None
@ -265,28 +249,14 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
return {"started": crawl_id}
@router.delete("")
async def delete_crawl_configs(
archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
async def delete_crawl_configs(archive: Archive = Depends(archive_crawl_dep)):
result = await ops.delete_crawl_configs(archive)
return {"deleted": result.deleted_count}
@router.delete("/{cid}")
async def delete_crawl_config(
cid: str,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
cid: str, archive: Archive = Depends(archive_crawl_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
result = await ops.delete_crawl_config(cid, archive)
if not result or not result.deleted_count:
raise HTTPException(status_code=404, detail="Crawl Config Not Found")

View File

@ -5,49 +5,73 @@ import asyncio
from typing import Optional
from datetime import datetime
from fastapi import Depends, HTTPException
from pydantic import BaseModel
from db import BaseMongoModel
from archives import Archive
# ============================================================================
class CrawlComplete(BaseMongoModel):
""" Store State of Completed Crawls """
class CrawlFinished(BaseMongoModel):
""" Store State of Finished Crawls """
user: str
aid: str
cid: str
started: datetime
finished: datetime
state: str
filename: Optional[str]
size: Optional[int]
hash: Optional[str]
# ============================================================================
class CrawlCompleteIn(BaseModel):
""" Completed Crawl Webhook POST message """
id: str
user: str
aid: Optional[str]
cid: Optional[str]
filename: str
size: int
hash: str
started: Optional[datetime]
finished: Optional[datetime]
completed: Optional[bool] = True
# ============================================================================
class CrawlOps:
""" Crawl Ops """
def __init__(self, mdb, crawl_manager, users, archives):
def __init__(self, mdb, crawl_manager, archives):
self.crawls = mdb["crawls"]
self.crawl_manager = crawl_manager
self.users = users
self.archives = archives
async def on_handle_crawl_complete(self, msg: CrawlComplete):
async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
""" Handle completed crawl, add to crawls db collection, also update archive usage """
if not await self.crawl_manager.validate_crawl_complete(msg):
crawl_finished = await self.crawl_manager.validate_crawl_complete(msg)
if not crawl_finished:
print("Not a valid crawl complete msg!", flush=True)
return
print(msg, flush=True)
await self.crawls.insert_one(msg.to_dict())
await self.handle_finished(crawl_finished)
dura = int((msg.finished - msg.started).total_seconds())
async def handle_finished(self, crawl_finished: CrawlFinished):
""" Add finished crawl to db, increment archive usage """
await self.crawls.insert_one(crawl_finished.to_dict())
print(crawl_finished)
dura = int((crawl_finished.finished - crawl_finished.started).total_seconds())
print(f"Duration: {dura}", flush=True)
await self.archives.inc_usage(msg.aid, dura)
await self.archives.inc_usage(crawl_finished.aid, dura)
async def delete_crawl(self, cid: str, aid: str):
""" Delete crawl by id """
@ -55,14 +79,59 @@ class CrawlOps:
# ============================================================================
def init_crawls_api(app, mdb, crawl_manager, users, archives):
def init_crawls_api(app, mdb, crawl_manager, archives):
""" API for crawl management, including crawl done callback"""
ops = CrawlOps(mdb, crawl_manager, users, archives)
ops = CrawlOps(mdb, crawl_manager, archives)
@app.post("/crawls/done")
async def webhook(msg: CrawlComplete):
archive_crawl_dep = archives.archive_crawl_dep
@app.post("/crawls/done", tags=["crawls"])
async def crawl_done(msg: CrawlCompleteIn):
loop = asyncio.get_running_loop()
loop.create_task(ops.on_handle_crawl_complete(msg))
return {"message": "webhook received"}
return {"success": True}
@app.delete(
"/archives/{aid}/crawls/{crawl_id}",
tags=["crawls"],
)
async def crawl_delete_stop(crawl_id, archive: Archive = Depends(archive_crawl_dep)):
try:
crawl_finished = await crawl_manager.stop_crawl(
crawl_id, str(archive.id)
)
if not crawl_finished:
raise HTTPException(
status_code=404, detail=f"Crawl not found: {crawl_id}"
)
await ops.handle_finished(crawl_finished)
except Exception as exc:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}")
return {"canceled": True}
@app.post(
"/archives/{aid}/crawls/{crawl_id}/stop",
tags=["crawls"],
)
async def crawl_graceful_stop(
crawl_id, archive: Archive = Depends(archive_crawl_dep)
):
try:
canceled = await crawl_manager.stop_crawl_graceful(
crawl_id, str(archive.id)
)
if not canceled:
raise HTTPException(
status_code=404, detail=f"Crawl not found: {crawl_id}"
)
except Exception as exc:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}")
return {"stopped_gracefully": True}

View File

@ -6,23 +6,13 @@ class DockerManager:
def __init__(self):
pass
async def test():
print("test async", flush=True)
loop = asyncio.get_running_loop()
loop.create_task(test())
print("starting")
async def add_crawl_config(
self,
userid: str,
aid: str,
storage,
crawlconfig,
storage,
extra_crawl_params: list = None,
):
print("add_crawl_config")
print(storage)
print(crawlconfig)
print(aid)
print(storage)
print(extra_crawl_params)

View File

@ -5,6 +5,9 @@ import datetime
import json
from kubernetes_asyncio import client, config
from kubernetes_asyncio.stream import WsApiClient
from crawls import CrawlFinished
# ============================================================================
@ -22,6 +25,7 @@ class K8SManager:
config.load_incluster_config()
self.core_api = client.CoreV1Api()
self.core_api_ws = client.CoreV1Api(api_client=WsApiClient())
self.batch_api = client.BatchV1Api()
self.batch_beta_api = client.BatchV1beta1Api()
@ -33,44 +37,16 @@ class K8SManager:
# loop = asyncio.get_running_loop()
# loop.create_task(self.watch_job_done())
async def validate_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches)
Fill in additional details about the crawl"""
job = await self.batch_api.read_namespaced_job(
name=crawlcomplete.id, namespace=self.namespace
)
if not job or job.metadata.labels["btrix.user"] != crawlcomplete.user:
return False
# job.metadata.annotations = {
# "crawl.size": str(crawlcomplete.size),
# "crawl.filename": crawlcomplete.filename,
# "crawl.hash": crawlcomplete.hash
# }
# await self.batch_api.patch_namespaced_job(
# name=crawlcomplete.id, namespace=self.namespace, body=job
# )
crawlcomplete.started = job.status.start_time.replace(tzinfo=None)
crawlcomplete.aid = job.metadata.labels["btrix.archive"]
crawlcomplete.cid = job.metadata.labels["btrix.crawlconfig"]
crawlcomplete.finished = datetime.datetime.utcnow().replace(
microsecond=0, tzinfo=None
)
return True
async def add_crawl_config(
self,
userid: str,
aid: str,
storage,
crawlconfig,
storage,
extra_crawl_params: list = None,
):
"""add new crawl as cron job, store crawl config in configmap"""
cid = str(crawlconfig.id)
userid = crawlconfig.user
aid = crawlconfig.archive
labels = {
"btrix.user": userid,
@ -129,7 +105,7 @@ class K8SManager:
cron_job = client.V1beta1CronJob(
metadata={
"name": f"scheduled-crawl-{cid}",
"name": f"crawl-scheduled-{cid}",
"namespace": self.namespace,
"labels": labels,
},
@ -165,7 +141,7 @@ class K8SManager:
return
labels = {
"btrix.user": cron_job.metadata.labels["btrix.user"],
"btrix.user": crawlconfig.user,
"btrix.archive": crawlconfig.archive,
"btrix.crawlconfig": cid,
}
@ -199,6 +175,105 @@ class K8SManager:
if run_now:
await self._create_run_now_job(cron_job)
async def run_crawl_config(self, cid):
""" Run crawl job for cron job based on specified crawlconfig id (cid) """
print(f"btrix.crawlconfig={cid}")
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
if len(cron_jobs.items) != 1:
raise Exception("Crawl Config Not Found")
res = await self._create_run_now_job(cron_jobs.items[0])
return res.metadata.name
async def validate_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches)
Fill in additional details about the crawl"""
job = await self.batch_api.read_namespaced_job(
name=crawlcomplete.id, namespace=self.namespace
)
if not job or job.metadata.labels["btrix.user"] != crawlcomplete.user:
return None
return CrawlFinished(
id=crawlcomplete.id,
state="complete" if crawlcomplete.completed else "partial_complete",
user=crawlcomplete.user,
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None),
filename=crawlcomplete.filename,
size=crawlcomplete.size,
hash=crawlcomplete.hash
)
async def stop_crawl(self, job_id, aid):
""" Stop Crawl based on crawl job id """
job = await self.batch_api.read_namespaced_job(
name=job_id, namespace=self.namespace
)
if not job or job.metadata.labels["btrix.archive"] != aid:
return None
await self.batch_api.delete_namespaced_job(
name=job_id, namespace=self.namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
)
return CrawlFinished(
id=job_id,
state="canceled",
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None),
)
async def stop_crawl_graceful(self, job_name, aid):
""" Attempt to gracefully stop crawl by sending a SIGINT to the pod(s)"""
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_name},btrix.archive={aid}",
)
command = ["kill", "-s", "SIGINT", "1"]
interrupted = False
for pod in pods.items:
if pod.metadata.labels["btrix.archive"] != aid:
print("wrong archive")
continue
await self.core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name, namespace=self.namespace, command=command,
stdout=True
)
interrupted = True
return interrupted
async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive"""
return await self._delete_crawl_configs(f"btrix.archive={archive}")
async def delete_crawl_config_by_id(self, cid):
"""Delete all crawl configs by id"""
return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
# ========================================================================
# Internal Methods
def _create_config_map(self, crawlconfig, labels):
""" Create Config Map based on CrawlConfig + labels """
config_map = client.V1ConfigMap(
@ -212,7 +287,7 @@ class K8SManager:
return config_map
#pylint: disable=no-self-use
# pylint: disable=no-self-use
def _get_schedule_suspend_run_now(self, crawlconfig):
""" get schedule/suspend/run_now data based on crawlconfig """
@ -230,14 +305,6 @@ class K8SManager:
return suspend, schedule, run_now
async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive"""
return await self._delete_crawl_configs(f"btrix.archive={archive}")
async def delete_crawl_config_by_id(self, cid):
"""Delete all crawl configs by id"""
return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
async def _delete_crawl_configs(self, label):
"""Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
@ -259,15 +326,6 @@ class K8SManager:
propagation_policy="Foreground",
)
async def run_crawl_config(self, cid):
""" Run crawl job for cron job based on specified crawlconfig id (cid) """
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
res = await self._create_run_now_job(cron_jobs.items[0])
return res.metadata.name
async def _create_run_now_job(self, cron_job):
"""Create new job from cron job to run instantly"""
annotations = {}

View File

@ -76,7 +76,6 @@ class BrowsertrixAPI:
self.app,
self.mdb,
self.crawl_manager,
self.fastapi_users.db,
self.archive_ops,
)

View File

@ -36,7 +36,7 @@ mongo_auth:
# Crawler Image
# =========================================
crawler_image: "webrecorder/browsertrix-crawler:0.4.4"
crawler_image: "webrecorder/browsertrix-crawler:latest"
crawler_pull_policy: "Never"
crawler_namespace: "crawlers"