browsertrix/backend/btrixcloud/crawl_job.py
Ilya Kreymer 544346d1d4
backend: make crawlconfigs mutable! (#656) (#662)
* backend: make crawlconfigs mutable! (#656)
- crawlconfig PATCH /{id} can now receive a new JSON config to replace the old one (in addition to scale, schedule, tags)
- exclusions: add / remove APIs mutate the current crawlconfig, do not result in a new crawlconfig created
- exclusions: ensure crawl job 'config' is updated when exclusions are added/removed, unify add/remove exclusions on crawl
- k8s: crawlconfig json is updated along with scale
- k8s: stateful set is restarted by updating annotation, instead of changing template
- crawl object: now has 'config', as well as 'profileid', 'schedule', 'crawlTimeout', 'jobType' properties to ensure anything that is changeable is stored on the crawl
- crawlconfigcore: store share properties between crawl and crawlconfig in new crawlconfigcore (includes 'schedule', 'jobType', 'config', 'profileid', 'schedule', 'crawlTimeout', 'tags', 'oid')
- crawlconfig object: remove 'oldId', 'newId', disallow deactivating/deleting while crawl is running
- rename 'userid' -> 'createdBy'
- remove unused 'completions' field
- add missing return to fix /run response
- crawlout: ensure 'profileName' is resolved on CrawlOut from profileid
- crawlout: return 'name' instead of 'configName' for consistent response
- update: 'modified', 'modifiedBy' fields to set modification date and user modifying config
- update: ensure PROFILE_FILENAME is updated in configmap is profileid provided, clear if profileid==""
- update: return 'settings_changed' and 'metadata_changed' if either crawl settings or metadata changed
- tests: update tests to check settings_changed/metadata_changed return values

add revision tracking to crawlconfig:
- store each revision separate mongo db collection
- revisions accessible via /crawlconfigs/{cid}/revs
- store 'rev' int in crawlconfig and in crawljob
- only add revision history if crawl config changed

migration:
- update to db v3
- copy fields from crawlconfig -> crawl
- rename userid -> createdBy
- copy userid -> modifiedBy, created -> modified
- skip invalid crawls (missing config), make createdBy optional (just in case)

frontend: Update crawl config keys with new API (#681), update frontend to use new PATCH endpoint, load config from crawl object in details view

---------

Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
Co-authored-by: sua yoo <sua@webrecorder.org>
Co-authored-by: sua yoo <sua@suayoo.com>
2023-03-07 20:36:50 -08:00

423 lines
12 KiB
Python

""" Crawl Job Management """
import asyncio
import sys
import signal
import os
import json
import uuid
import time
from datetime import datetime
from abc import ABC, abstractmethod
from redis import asyncio as aioredis
import pymongo
from .db import init_db
from .crawls import Crawl, CrawlFile, CrawlCompleteIn, dt_now
from .crawlconfigs import CrawlConfig
# Seconds before allowing another shutdown attempt
SHUTDOWN_ATTEMPT_WAIT = 60
# =============================================================================
# pylint: disable=too-many-instance-attributes,bare-except
class CrawlJob(ABC):
"""Crawl Job"""
started: datetime
finished: datetime
job_id: str
def __init__(self):
super().__init__()
_, mdb = init_db()
self.orgs = mdb["organizations"]
self.crawls = mdb["crawls"]
self.crawl_configs = mdb["crawl_configs"]
self.crawls_done_key = "crawls-done"
self.oid = uuid.UUID(os.environ["ORG_ID"])
self.cid = uuid.UUID(os.environ["CRAWL_CONFIG_ID"])
self.userid = uuid.UUID(os.environ["USER_ID"])
self.rev = int(os.environ["REV"])
self.is_manual = os.environ.get("RUN_MANUAL") == "1"
self.tags = os.environ.get("TAGS", "").split(",")
self.scale = int(os.environ.get("INITIAL_SCALE") or 0)
self.storage_path = os.environ.get("STORE_PATH")
self.storage_name = os.environ.get("STORAGE_NAME")
self.last_done = None
self.last_found = None
self.redis = None
self.started = dt_now()
self.finished = None
self._cached_params = {}
self._files_added = False
self._graceful_shutdown_pending = 0
self._delete_pending = False
params = {
"cid": self.cid,
"storage_name": self.storage_name or "default",
"storage_path": self.storage_path or "",
"redis_url": self.redis_url,
"env": os.environ,
}
asyncio.create_task(self.async_init("crawler.yaml", params))
async def async_init(self, template, params):
"""async init for k8s job"""
crawl = await self._get_crawl()
crawlconfig = None
try:
result = await self.crawl_configs.find_one({"_id": self.cid})
crawlconfig = CrawlConfig.from_dict(result)
self.scale = self._get_crawl_scale(crawl) or crawlconfig.scale
# pylint: disable=broad-except
except Exception as exc:
print(exc)
# if doesn't exist, create, using scale from config
if not crawl:
params["scale"] = self.scale
await self.init_job_objects(template, params)
await self.init_crawl(crawlconfig)
prev_start_time = None
retry = 3
# init redis
while True:
try:
self.redis = await aioredis.from_url(
self.redis_url, encoding="utf-8", decode_responses=True
)
prev_start_time = await self.redis.get("start_time")
print("Redis Connected!", flush=True)
break
except:
print(f"Retrying redis connection in {retry}", flush=True)
await asyncio.sleep(retry)
if prev_start_time:
try:
self.started = datetime.fromisoformat(prev_start_time)
except:
pass
else:
await self.redis.set("start_time", str(self.started))
# run redis loop
while True:
try:
result = await self.redis.blpop(self.crawls_done_key, timeout=5)
if result:
msg = json.loads(result[1])
# add completed file
if msg.get("filename"):
await self.add_file_to_crawl(msg)
# update stats
await self.update_running_crawl_stats(self.job_id)
# check crawl status
await self.check_crawl_status()
# pylint: disable=broad-except
except Exception as exc:
print(f"Retrying crawls done loop: {exc}")
await asyncio.sleep(10)
async def check_crawl_status(self):
"""check if crawl is done if all crawl workers have set their done state"""
results = await self.redis.hvals(f"{self.job_id}:status")
# check if done / failed
done = 0
failed = 0
for res in results:
if res == "done":
done += 1
elif res == "failed":
failed += 1
# check if all crawlers are done
if done >= self.scale:
print("crawl done!", flush=True)
await self.finish_crawl()
await self.delete_crawl()
# check if all crawlers failed
elif failed >= self.scale:
print("crawl failed!", flush=True)
await self.fail_crawl()
await self.delete_crawl()
async def delete_crawl(self):
"""delete crawl stateful sets, services and pvcs"""
self._delete_pending = True
await self.delete_job_objects(f"crawl={self.job_id}")
async def scale_to(self, scale):
"""scale to 'scale'"""
try:
await self._do_scale(scale)
# pylint: disable=broad-except
except Exception as exc:
return {"success": False, "error": str(exc)}
self.scale = scale
await self.update_crawl(scale=scale)
return {"success": True}
async def fail_crawl(self):
"""mark crawl as failed"""
if self.finished:
return
self.finished = dt_now()
await self.update_crawl(state="failed", finished=self.finished)
async def finish_crawl(self):
"""finish crawl"""
if self.finished:
return
# check if one-page crawls actually succeeded
# if only one page found, and no files, assume failed
if self.last_found == 1 and not self._files_added:
await self.fail_crawl()
return
self.finished = dt_now()
completed = self.last_done and self.last_done >= self.last_found
state = "complete" if completed else "partial_complete"
print("marking crawl as: " + state, flush=True)
await self.update_crawl(state=state, finished=self.finished)
if completed:
await self.inc_crawl_complete_stats()
async def inc_crawl_complete_stats(self):
"""Increment Crawl Stats"""
duration = int((self.finished - self.started).total_seconds())
print(f"Duration: {duration}", flush=True)
# init org crawl stats
yymm = datetime.utcnow().strftime("%Y-%m")
await self.orgs.find_one_and_update(
{"_id": self.oid}, {"$inc": {f"usage.{yymm}": duration}}
)
async def update_running_crawl_stats(self, crawl_id):
"""update stats for running crawl"""
done = await self.redis.llen(f"{crawl_id}:d")
found = await self.redis.scard(f"{crawl_id}:s")
if self.last_done == done and self.last_found == found:
return
stats = {"found": found, "done": done}
if not self.last_found and found:
await self.update_crawl(state="running", stats=stats)
else:
await self.update_crawl(stats=stats)
self.last_found = found
self.last_done = done
async def update_crawl(self, **kwargs):
"""update crawl state, and optionally mark as finished"""
await self.crawls.find_one_and_update({"_id": self.job_id}, {"$set": kwargs})
async def init_crawl(self, crawlconfig):
"""create crawl, doesn't exist, mark as starting"""
try:
crawl = self._make_crawl("starting", self.scale, crawlconfig)
await self.crawls.insert_one(crawl.to_dict())
except pymongo.errors.DuplicateKeyError:
await self.update_crawl(state="starting", scale=self.scale)
async def add_file_to_crawl(self, cc_data):
"""Handle finished CrawlFile to db"""
filecomplete = CrawlCompleteIn(**cc_data)
inx = None
filename = None
if self.storage_path:
inx = filecomplete.filename.index(self.storage_path)
filename = filecomplete.filename[inx:] if inx > 0 else filecomplete.filename
# storage_name = job.metadata.annotations.get("btrix.storage_name")
def_storage_name = self.storage_name if inx else None
crawl_file = CrawlFile(
def_storage_name=def_storage_name,
filename=filename or filecomplete.filename,
size=filecomplete.size,
hash=filecomplete.hash,
)
await self.crawls.find_one_and_update(
{"_id": self.job_id},
{
"$push": {"files": crawl_file.dict()},
},
)
self._files_added = True
return True
async def graceful_shutdown(self):
"""attempt to graceful stop the crawl, all data should be uploaded"""
if (
self._graceful_shutdown_pending
and (time.time() - self._graceful_shutdown_pending) < SHUTDOWN_ATTEMPT_WAIT
):
print("Already trying to stop crawl gracefully", flush=True)
return {"success": False, "error": "already_stopping"}
print("Stopping crawl", flush=True)
if not await self._send_shutdown_signal("SIGUSR1"):
return {"success": False, "error": "unreachable"}
await self._send_shutdown_signal("SIGTERM")
self._graceful_shutdown_pending = time.time()
await self.update_crawl(state="stopping")
return {"success": True}
async def cancel(self):
"""cancel the crawl immediately"""
print("Canceling crawl", flush=True)
self.finished = dt_now()
await self.update_crawl(state="canceled", finished=self.finished)
await self.delete_crawl()
return {"success": True}
def _make_crawl(self, state, scale, crawlconfig):
"""Create crawl object for partial or fully complete crawl"""
return Crawl(
id=self.job_id,
state=state,
config=crawlconfig.config,
jobType=crawlconfig.jobType,
profileid=crawlconfig.profileid,
cid_rev=crawlconfig.rev,
schedule=crawlconfig.schedule,
crawlTimeout=crawlconfig.crawlTimeout,
userid=self.userid,
oid=self.oid,
cid=self.cid,
manual=self.is_manual,
scale=scale,
started=self.started,
tags=self.tags,
# colls=json.loads(job.metadata.annotations.get("btrix.colls", [])),
)
def register_handlers(self, app):
"""register signal and app handlers"""
def sig_handler():
if self._delete_pending:
print("got SIGTERM/SIGINT, already deleting", flush=True)
return
print("got SIGTERM/SIGINT, exiting job", flush=True)
sys.exit(3)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, sig_handler)
loop.add_signal_handler(signal.SIGINT, sig_handler)
@app.post("/scale/{size}")
async def scale(size: int):
return await self.scale_to(size)
@app.post("/stop")
async def stop():
return await self.graceful_shutdown()
@app.post("/cancel")
async def cancel():
return await self.cancel()
@app.get("/healthz")
async def healthz():
return {}
@app.post("/rollover")
async def restart():
return await self._rollover_restart()
@abstractmethod
async def init_job_objects(self, template, params):
"""base for creating objects"""
@abstractmethod
async def delete_job_objects(self, job_id):
"""base for deleting objects"""
@abstractmethod
async def _get_crawl(self):
"""get runnable object representing this crawl"""
@abstractmethod
def _get_crawl_scale(self, crawl):
"""get scale from crawl, if any"""
@abstractmethod
async def _do_scale(self, new_scale):
"""set number of replicas"""
@abstractmethod
async def _send_shutdown_signal(self, signame):
"""gracefully shutdown crawl"""
@abstractmethod
async def _rollover_restart(self):
"""change crawl config for this crawl"""
@property
@abstractmethod
def redis_url(self):
"""get redis url"""