* Btrixjobs Operator - Phase 1 (#679) - add metacontroller and custom crds - add main_op entrypoint for operator * Btrix Operator Crawl Management (#767) * operator backend: - run operator api in separate container but in same pod, with WEB_CONCURRENCY=1 - operator creates statefulsets and services for CrawlJob and ProfileJob - operator: use service hook endpoint, set port in values.yaml * crawls working with CrawlJob - jobs start with 'crawljob-' prefix - update status to reflect current crawl state - set sync time to 10 seconds by default, overridable with 'operator_resync_seconds' - mark crawl as running, failed, complete when finished - store finished status when crawl is complete - support updating scale, forcing rollover, stop via patching CrawlJob - support cancel via deletion - requires hack to content-length for patching custom resources - auto-delete of CrawlJob via 'ttlSecondsAfterFinished' - also delete pvcs until autodelete supported via statefulset (k8s >1.27) - ensure filesAdded always set correctly, keep counter in redis, add to status display - optimization: attempt to reduce automerging, by reusing volumeClaimTemplates from existing children, as these may have additional props added - add add_crawl_errors_to_db() for storing crawl errors from redis '<crawl>:e' key to mongodb when crawl is finished/failed/canceled - add .status.size to display human-readable crawl size, if available (from webrecorder/browsertrix-crawler#291) - support new page size, >0.9.0 and old page size key (changed in webrecorder/browsertrix-crawler#284) * support for scheduled jobs! - add main_scheduled_job entrypoint to run scheduled jobs - add crawl_cron_job.yaml template for declaring CronJob - CronJobs moved to default namespace * operator manages ProfileJobs: - jobs start with 'profilejob-' - update expiry time by updating ProfileJob object 'expireTime' while profile is active * refactor/cleanup: - remove k8s package - merge k8sman and basecrawlmanager into crawlmanager - move templates, k8sapi, utils into root package - delete all *_job.py files - remove dt_now, ts_now from crawls, now in utils - all db operations happen in crawl/crawlconfig/org files - move shared crawl/crawlconfig/org functions that use the db to be importable directly, including get_crawl_config, add_new_crawl, inc_crawl_stats * role binding: more secure setup, don't allow crawler namespace any k8s permissions - move cronjobs to be created in default namespace - grant default namespace access to create cronjobs in default namespace - remove role binding from crawler namespace * additional tweaks to templates: - templates: split crawler and redis statefulset into separate yaml file (in case need to load one or other separately) * stats / redis optimization: - don't update stats in mongodb on every operator sync, only when crawl is finished - for api access, read stats directly from redis to get up-to-date stats - move get_page_stats() to utils, add get_redis_url() to k8sapi to unify access * Add migration for operator changes - Update configmap for crawl configs with scale > 1 or crawlTimeout > 0 and schedule exists to recreate CronJobs - add option to rerun last migration, enabled via env var and by running helm with --set=rerun_last_migration=1 * subcharts: move crawljob and profilejob crds to separate subchart, as this seems best way to guarantee proper install order with + update on upgrade with helm, add built btrix-crds-0.1.0.tgz subchart - metacontroller: use release from ghcr, add metacontroller-helm-v4.10.1.tgz subchart * backend api fixes - ensure changing scale of crawl also updates it in the db - crawlconfigs: add 'currCrawlSize' and 'lastCrawlSize' to crawlconfig api --------- Co-authored-by: D. Lee <leepro@gmail.com> Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
492 lines
16 KiB
Python
492 lines
16 KiB
Python
""" btrixjob operator (working for metacontroller) """
|
|
|
|
import asyncio
|
|
import traceback
|
|
from typing import Optional
|
|
|
|
from datetime import datetime
|
|
import json
|
|
import uuid
|
|
|
|
import yaml
|
|
import humanize
|
|
|
|
from pydantic import BaseModel
|
|
from redis import asyncio as aioredis
|
|
|
|
from .utils import from_k8s_date, to_k8s_date, dt_now, get_redis_crawl_stats
|
|
from .k8sapi import K8sAPI
|
|
|
|
from .db import init_db
|
|
from .orgs import inc_org_stats
|
|
from .crawls import (
|
|
CrawlFile,
|
|
CrawlCompleteIn,
|
|
add_crawl_file,
|
|
update_crawl,
|
|
add_crawl_errors,
|
|
)
|
|
|
|
|
|
STS = "StatefulSet.apps/v1"
|
|
CMAP = "ConfigMap.v1"
|
|
|
|
DEFAULT_TTL = 30
|
|
|
|
|
|
# ============================================================================
|
|
class DeleteCrawlException(Exception):
|
|
"""throw to force deletion of crawl objects"""
|
|
|
|
|
|
# ============================================================================
|
|
class MCBaseRequest(BaseModel):
|
|
"""base metacontroller model, used for customize hook"""
|
|
|
|
parent: dict
|
|
controller: dict
|
|
|
|
|
|
# ============================================================================
|
|
class MCSyncData(MCBaseRequest):
|
|
"""sync / finalize metacontroller model"""
|
|
|
|
children: dict
|
|
related: dict
|
|
finalizing: bool = False
|
|
|
|
|
|
# ============================================================================
|
|
class CrawlSpec(BaseModel):
|
|
"""spec from k8s CrawlJob object"""
|
|
|
|
id: str
|
|
cid: uuid.UUID
|
|
oid: uuid.UUID
|
|
scale: int
|
|
storage_path: str
|
|
storage_name: str
|
|
started: str
|
|
stopping: bool = False
|
|
expire_time: Optional[datetime] = None
|
|
|
|
|
|
# ============================================================================
|
|
class CrawlStatus(BaseModel):
|
|
"""status from k8s CrawlJob object"""
|
|
|
|
state: str = "waiting"
|
|
pagesFound: int = 0
|
|
pagesDone: int = 0
|
|
size: str = ""
|
|
scale: int = 1
|
|
filesAdded: int = 0
|
|
finished: Optional[str] = None
|
|
|
|
|
|
# ============================================================================
|
|
class BtrixOperator(K8sAPI):
|
|
"""BtrixOperator Handler"""
|
|
|
|
# pylint: disable=too-many-instance-attributes,too-many-locals
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.config_file = "/config/config.yaml"
|
|
|
|
_, mdb = init_db()
|
|
self.crawls = mdb["crawls"]
|
|
self.orgs = mdb["organizations"]
|
|
|
|
self.done_key = "crawls-done"
|
|
|
|
with open(self.config_file, encoding="utf-8") as fh_config:
|
|
self.shared_params = yaml.safe_load(fh_config)
|
|
|
|
async def sync_profile_browsers(self, data: MCSyncData):
|
|
"""sync profile browsers"""
|
|
spec = data.parent.get("spec", {})
|
|
|
|
expire_time = from_k8s_date(spec.get("expireTime"))
|
|
browserid = spec.get("id")
|
|
|
|
if dt_now() >= expire_time:
|
|
asyncio.create_task(self.delete_profile_browser(browserid))
|
|
return {"status": {}, "children": []}
|
|
|
|
params = {}
|
|
params.update(self.shared_params)
|
|
params["id"] = browserid
|
|
params["userid"] = spec.get("userid", "")
|
|
|
|
params["storage_name"] = spec.get("storageName", "")
|
|
params["storage_path"] = spec.get("storagePath", "")
|
|
params["profile_filename"] = spec.get("profileFilename", "")
|
|
params["url"] = spec.get("startUrl", "about:blank")
|
|
params["vnc_password"] = spec.get("vncPassword")
|
|
|
|
children = self.load_from_yaml("profilebrowser.yaml", params)
|
|
|
|
return {"status": {}, "children": children}
|
|
|
|
async def sync_crawls(self, data: MCSyncData):
|
|
"""sync crawls"""
|
|
|
|
status = CrawlStatus(**data.parent.get("status", {}))
|
|
|
|
spec = data.parent.get("spec", {})
|
|
crawl_id = spec["id"]
|
|
|
|
scale = spec.get("scale", 1)
|
|
status.scale = scale
|
|
|
|
if status.finished:
|
|
return await self.handle_finished_delete_if_needed(crawl_id, status, spec)
|
|
|
|
cid = spec["cid"]
|
|
|
|
redis_url = self.get_redis_url(crawl_id)
|
|
|
|
try:
|
|
configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"]
|
|
# pylint: disable=bare-except, broad-except
|
|
except:
|
|
return await self.cancel_crawl(redis_url, crawl_id, status, "failed")
|
|
|
|
crawl = CrawlSpec(
|
|
id=crawl_id,
|
|
cid=cid,
|
|
oid=configmap["ORG_ID"],
|
|
storage_name=configmap["STORAGE_NAME"],
|
|
storage_path=configmap["STORE_PATH"],
|
|
scale=scale,
|
|
started=data.parent["metadata"]["creationTimestamp"],
|
|
stopping=spec.get("stopping", False),
|
|
expire_time=from_k8s_date(spec.get("expireTime")),
|
|
)
|
|
|
|
# if finalizing and not finished, job is being deleted, so assume crawl has been canceled
|
|
if data.finalizing:
|
|
return await self.cancel_crawl(redis_url, crawl_id, status, "canceled")
|
|
|
|
crawl_sts = f"crawl-{crawl_id}"
|
|
redis_sts = f"redis-{crawl_id}"
|
|
|
|
has_crawl_children = STS in data.children and crawl_sts in data.children[STS]
|
|
if has_crawl_children:
|
|
status = await self.sync_crawl_state(redis_url, crawl, status)
|
|
else:
|
|
status.state = "starting"
|
|
|
|
if status.finished:
|
|
return await self.handle_finished_delete_if_needed(crawl.id, status, spec)
|
|
|
|
params = {}
|
|
params.update(self.shared_params)
|
|
params["id"] = crawl_id
|
|
params["cid"] = cid
|
|
params["userid"] = spec.get("userid", "")
|
|
|
|
params["storage_name"] = configmap["STORAGE_NAME"]
|
|
params["store_path"] = configmap["STORE_PATH"]
|
|
params["store_filename"] = configmap["STORE_FILENAME"]
|
|
params["profile_filename"] = configmap["PROFILE_FILENAME"]
|
|
params["scale"] = spec.get("scale", 1)
|
|
params["force_restart"] = spec.get("forceRestart")
|
|
|
|
params["redis_url"] = redis_url
|
|
|
|
children = self.load_from_yaml("crawler.yaml", params)
|
|
children.extend(self.load_from_yaml("redis.yaml", params))
|
|
|
|
# to minimize merging, just patch in volumeClaimTemplates from actual children
|
|
# as they may get additional settings that cause more frequent updates
|
|
if has_crawl_children:
|
|
children[0]["spec"]["volumeClaimTemplates"] = data.children[STS][crawl_sts][
|
|
"spec"
|
|
]["volumeClaimTemplates"]
|
|
|
|
has_redis_children = STS in data.children and redis_sts in data.children[STS]
|
|
if has_redis_children:
|
|
children[2]["spec"]["volumeClaimTemplates"] = data.children[STS][redis_sts][
|
|
"spec"
|
|
]["volumeClaimTemplates"]
|
|
|
|
return {"status": status.dict(exclude_none=True), "children": children}
|
|
|
|
def load_from_yaml(self, filename, params):
|
|
"""load and parse k8s template from yaml file"""
|
|
return list(
|
|
yaml.safe_load_all(self.templates.env.get_template(filename).render(params))
|
|
)
|
|
|
|
def get_related(self, data: MCBaseRequest):
|
|
"""return configmap related to crawl"""
|
|
spec = data.parent.get("spec", {})
|
|
cid = spec.get("cid")
|
|
return {
|
|
"relatedResources": [
|
|
{
|
|
"apiVersion": "v1",
|
|
"resource": "configmaps",
|
|
"labelSelector": {"matchLabels": {"btrix.crawlconfig": cid}},
|
|
}
|
|
]
|
|
}
|
|
|
|
async def handle_finished_delete_if_needed(self, crawl_id, status, spec):
|
|
"""return status for finished job (no children)
|
|
also check if deletion is necessary
|
|
"""
|
|
|
|
ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL)
|
|
finished = from_k8s_date(status.finished)
|
|
if (dt_now() - finished).total_seconds() > ttl:
|
|
print("Job expired, deleting: " + crawl_id)
|
|
|
|
asyncio.create_task(self.delete_crawl_job(crawl_id))
|
|
|
|
return self._done_response(status)
|
|
|
|
async def delete_crawl_job(self, crawl_id):
|
|
# delete the crawljob itself
|
|
await super().delete_crawl_job(crawl_id)
|
|
|
|
# until delete policy is supported in StatefulSet
|
|
# now, delete pvcs explicitly
|
|
# (don't want to make them children as already owned by sts)
|
|
try:
|
|
await self.core_api.delete_collection_namespaced_persistent_volume_claim(
|
|
namespace=self.namespace, label_selector=f"crawl={crawl_id}"
|
|
)
|
|
# pylint: disable=bare-except, broad-except
|
|
except Exception as exc:
|
|
print("PVC Delete failed", exc, flush=True)
|
|
|
|
async def cancel_crawl(self, redis_url, crawl_id, status, state):
|
|
"""immediately cancel crawl with specified state"""
|
|
redis = await self._get_redis(redis_url)
|
|
await self.mark_finished(redis, crawl_id, status, state)
|
|
return self._done_response(status)
|
|
|
|
def _done_response(self, status):
|
|
"""response for when crawl job is done/to be deleted"""
|
|
return {
|
|
"status": status.dict(exclude_none=True),
|
|
"children": [],
|
|
"finalized": True,
|
|
}
|
|
|
|
async def _get_redis(self, redis_url):
|
|
"""init redis, ensure connectivity"""
|
|
redis = None
|
|
try:
|
|
redis = await aioredis.from_url(
|
|
redis_url, encoding="utf-8", decode_responses=True
|
|
)
|
|
# test connection
|
|
await redis.ping()
|
|
return redis
|
|
|
|
# pylint: disable=bare-except
|
|
except:
|
|
return None
|
|
|
|
async def sync_crawl_state(self, redis_url, crawl, status):
|
|
"""sync crawl state for running crawl"""
|
|
redis = await self._get_redis(redis_url)
|
|
if not redis:
|
|
return status
|
|
|
|
# if not prev_start_time:
|
|
# await redis.set("start_time", str(self.started))
|
|
|
|
try:
|
|
file_done = await redis.lpop(self.done_key)
|
|
|
|
while file_done:
|
|
msg = json.loads(file_done)
|
|
# add completed file
|
|
if msg.get("filename"):
|
|
await self.add_file_to_crawl(msg, crawl)
|
|
await redis.incr("filesAdded")
|
|
|
|
# get next file done
|
|
file_done = await redis.lpop(self.done_key)
|
|
|
|
# ensure filesAdded always set
|
|
status.filesAdded = int(await redis.get("filesAdded") or 0)
|
|
|
|
# update stats and get status
|
|
return await self.update_crawl_state(redis, crawl, status)
|
|
|
|
# pylint: disable=broad-except
|
|
except Exception as exc:
|
|
traceback.print_exc()
|
|
print(f"Crawl get failed: {exc}, will try again")
|
|
return status
|
|
|
|
async def add_file_to_crawl(self, cc_data, crawl):
|
|
"""Handle finished CrawlFile to db"""
|
|
|
|
filecomplete = CrawlCompleteIn(**cc_data)
|
|
|
|
inx = None
|
|
filename = None
|
|
if crawl.storage_path:
|
|
inx = filecomplete.filename.index(crawl.storage_path)
|
|
filename = filecomplete.filename[inx:] if inx > 0 else filecomplete.filename
|
|
|
|
def_storage_name = crawl.storage_name if inx else None
|
|
|
|
crawl_file = CrawlFile(
|
|
def_storage_name=def_storage_name,
|
|
filename=filename or filecomplete.filename,
|
|
size=filecomplete.size,
|
|
hash=filecomplete.hash,
|
|
)
|
|
|
|
await add_crawl_file(self.crawls, crawl.id, crawl_file)
|
|
|
|
return True
|
|
|
|
async def update_crawl_state(self, redis, crawl, status):
|
|
"""update crawl state and check if crawl is now done"""
|
|
results = await redis.hvals(f"{crawl.id}:status")
|
|
stats = await get_redis_crawl_stats(redis, crawl.id)
|
|
|
|
# check crawl expiry
|
|
if crawl.expire_time and datetime.utcnow() > crawl.expire_time:
|
|
crawl.stopping = True
|
|
print(
|
|
"Job duration expired at {crawl.expire_time}, "
|
|
+ "gracefully stopping crawl"
|
|
)
|
|
|
|
if crawl.stopping:
|
|
await redis.set(f"{crawl.id}:stopping", "1")
|
|
|
|
# optimization: don't update db once crawl is already running
|
|
# will set stats at when crawl is finished, otherwise can read
|
|
# directly from redis
|
|
if status.state != "running":
|
|
await update_crawl(self.crawls, crawl.id, state="running")
|
|
|
|
# update status
|
|
status.state = "running"
|
|
status.pagesDone = stats["done"]
|
|
status.pagesFound = stats["found"]
|
|
if stats["size"] is not None:
|
|
status.size = humanize.naturalsize(stats["size"])
|
|
|
|
# check if done / failed
|
|
done = 0
|
|
failed = 0
|
|
for res in results:
|
|
if res == "done":
|
|
done += 1
|
|
elif res == "failed":
|
|
failed += 1
|
|
|
|
# check if all crawlers are done
|
|
if done >= crawl.scale:
|
|
# check if one-page crawls actually succeeded
|
|
# if only one page found, and no files, assume failed
|
|
if status.pagesFound == 1 and not status.filesAdded:
|
|
return await self.mark_finished(redis, crawl.id, status, state="failed")
|
|
|
|
completed = status.pagesDone and status.pagesDone >= status.pagesFound
|
|
|
|
state = "complete" if completed else "partial_complete"
|
|
|
|
status = await self.mark_finished(
|
|
redis, crawl.id, status, state, crawl, stats
|
|
)
|
|
|
|
# check if all crawlers failed
|
|
if failed >= crawl.scale:
|
|
status = await self.mark_finished(redis, crawl.id, status, state="failed")
|
|
|
|
return status
|
|
|
|
# pylint: disable=too-many-arguments
|
|
async def mark_finished(
|
|
self, redis, crawl_id, status, state, crawl=None, stats=None
|
|
):
|
|
"""mark crawl as finished, set finished timestamp and final state"""
|
|
finished = dt_now()
|
|
|
|
kwargs = {"state": state, "finished": finished}
|
|
if stats:
|
|
kwargs["stats"] = stats
|
|
|
|
await update_crawl(self.crawls, crawl_id, **kwargs)
|
|
|
|
if redis:
|
|
await self.add_crawl_errors_to_db(redis, crawl_id)
|
|
|
|
status.state = state
|
|
status.finished = to_k8s_date(finished)
|
|
|
|
if crawl:
|
|
await self.inc_crawl_complete_stats(crawl, finished)
|
|
|
|
return status
|
|
|
|
async def inc_crawl_complete_stats(self, crawl, finished):
|
|
"""Increment Crawl Stats"""
|
|
|
|
started = from_k8s_date(crawl.started)
|
|
|
|
duration = int((finished - started).total_seconds())
|
|
|
|
print(f"Duration: {duration}", flush=True)
|
|
|
|
await inc_org_stats(self.orgs, crawl.oid, duration)
|
|
|
|
async def add_crawl_errors_to_db(self, redis, crawl_id, inc=100):
|
|
"""Pull crawl errors from redis and write to mongo db"""
|
|
index = 0
|
|
while True:
|
|
skip = index * inc
|
|
upper_bound = skip + inc - 1
|
|
errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
|
|
if not errors:
|
|
break
|
|
|
|
await add_crawl_errors(self.crawls, crawl_id, errors)
|
|
|
|
if len(errors) < inc:
|
|
# If we have fewer than inc errors, we can assume this is the
|
|
# last page of data to add.
|
|
break
|
|
index += 1
|
|
|
|
|
|
# ============================================================================
|
|
def init_operator_webhook(app):
|
|
"""regsiters webhook handlers for metacontroller"""
|
|
|
|
oper = BtrixOperator()
|
|
|
|
@app.post("/op/crawls/sync")
|
|
async def mc_sync_crawls(data: MCSyncData):
|
|
return await oper.sync_crawls(data)
|
|
|
|
# reuse sync path, but distinct endpoint for better logging
|
|
@app.post("/op/crawls/finalize")
|
|
async def mc_sync_finalize(data: MCSyncData):
|
|
return await oper.sync_crawls(data)
|
|
|
|
@app.post("/op/crawls/customize")
|
|
async def mc_related(data: MCBaseRequest):
|
|
return oper.get_related(data)
|
|
|
|
@app.post("/op/profilebrowsers/sync")
|
|
async def mc_sync_profile_browsers(data: MCSyncData):
|
|
return await oper.sync_profile_browsers(data)
|
|
|
|
@app.get("/healthz", include_in_schema=False)
|
|
async def healthz():
|
|
return {}
|