browsertrix/backend/btrixcloud/crawlmanager.py
Ilya Kreymer 544346d1d4
backend: make crawlconfigs mutable! (#656) (#662)
* backend: make crawlconfigs mutable! (#656)
- crawlconfig PATCH /{id} can now receive a new JSON config to replace the old one (in addition to scale, schedule, tags)
- exclusions: add / remove APIs mutate the current crawlconfig, do not result in a new crawlconfig created
- exclusions: ensure crawl job 'config' is updated when exclusions are added/removed, unify add/remove exclusions on crawl
- k8s: crawlconfig json is updated along with scale
- k8s: stateful set is restarted by updating annotation, instead of changing template
- crawl object: now has 'config', as well as 'profileid', 'schedule', 'crawlTimeout', 'jobType' properties to ensure anything that is changeable is stored on the crawl
- crawlconfigcore: store share properties between crawl and crawlconfig in new crawlconfigcore (includes 'schedule', 'jobType', 'config', 'profileid', 'schedule', 'crawlTimeout', 'tags', 'oid')
- crawlconfig object: remove 'oldId', 'newId', disallow deactivating/deleting while crawl is running
- rename 'userid' -> 'createdBy'
- remove unused 'completions' field
- add missing return to fix /run response
- crawlout: ensure 'profileName' is resolved on CrawlOut from profileid
- crawlout: return 'name' instead of 'configName' for consistent response
- update: 'modified', 'modifiedBy' fields to set modification date and user modifying config
- update: ensure PROFILE_FILENAME is updated in configmap is profileid provided, clear if profileid==""
- update: return 'settings_changed' and 'metadata_changed' if either crawl settings or metadata changed
- tests: update tests to check settings_changed/metadata_changed return values

add revision tracking to crawlconfig:
- store each revision separate mongo db collection
- revisions accessible via /crawlconfigs/{cid}/revs
- store 'rev' int in crawlconfig and in crawljob
- only add revision history if crawl config changed

migration:
- update to db v3
- copy fields from crawlconfig -> crawl
- rename userid -> createdBy
- copy userid -> modifiedBy, created -> modified
- skip invalid crawls (missing config), make createdBy optional (just in case)

frontend: Update crawl config keys with new API (#681), update frontend to use new PATCH endpoint, load config from crawl object in details view

---------

Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
Co-authored-by: sua yoo <sua@webrecorder.org>
Co-authored-by: sua yoo <sua@suayoo.com>
2023-03-07 20:36:50 -08:00

227 lines
7.2 KiB
Python

""" shared crawl manager implementation """
import os
import asyncio
import datetime
import secrets
from abc import ABC, abstractmethod
from fastapi.templating import Jinja2Templates
from .db import resolve_db_url
# ============================================================================
class BaseCrawlManager(ABC):
"""abstract crawl manager"""
def __init__(self, templates):
super().__init__()
self.job_image = os.environ["JOB_IMAGE"]
self.job_pull_policy = os.environ.get("JOB_PULL_POLICY", "Always")
self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
self.crawler_node_type = os.environ.get("CRAWLER_NODE_TYPE", "")
self.templates = Jinja2Templates(directory=templates)
self.loop = asyncio.get_running_loop()
# pylint: disable=too-many-arguments
async def run_profile_browser(
self,
userid,
oid,
url,
storage=None,
storage_name=None,
baseprofile=None,
profile_path=None,
):
"""run browser for profile creation"""
# if default storage, use name and path + profiles/
if storage:
storage_name = storage.name
storage_path = storage.path + "profiles/"
# otherwise, use storage name and existing path from secret
else:
storage_path = ""
await self.check_storage(storage_name)
browserid = f"prf-{secrets.token_hex(5)}"
params = {
"id": browserid,
"userid": str(userid),
"oid": str(oid),
"job_image": self.job_image,
"job_pull_policy": self.job_pull_policy,
"storage_name": storage_name,
"storage_path": storage_path or "",
"baseprofile": baseprofile or "",
"profile_path": profile_path,
"idle_timeout": os.environ.get("IDLE_TIMEOUT", "60"),
"url": url,
"env": os.environ,
}
data = self.templates.env.get_template("profile_job.yaml").render(params)
await self._create_from_yaml(f"job-{browserid}", data)
return browserid
async def add_crawl_config(
self,
crawlconfig,
storage,
run_now,
out_filename,
profile_filename,
):
"""add new crawl as cron job, store crawl config in configmap"""
if storage.type == "default":
storage_name = storage.name
storage_path = storage.path
else:
storage_name = str(crawlconfig.oid)
storage_path = ""
await self.check_storage(storage_name)
# Create Config Map
await self._create_config_map(
crawlconfig,
STORE_PATH=storage_path,
STORE_FILENAME=out_filename,
STORAGE_NAME=storage_name,
USER_ID=str(crawlconfig.modifiedBy),
ORG_ID=str(crawlconfig.oid),
CRAWL_CONFIG_ID=str(crawlconfig.id),
PROFILE_FILENAME=profile_filename,
)
crawl_id = None
if run_now:
crawl_id = await self._create_manual_job(crawlconfig)
await self._update_scheduled_job(crawlconfig)
return crawl_id
# pylint: disable=unused-argument
async def run_crawl_config(self, crawlconfig, userid=None):
"""Run crawl job for cron job based on specified crawlconfig
optionally set different user"""
return await self._create_manual_job(crawlconfig)
async def update_crawl_config(self, crawlconfig, update, profile_filename=None):
"""Update the schedule or scale for existing crawl config"""
has_sched_update = update.schedule is not None
has_scale_update = update.scale is not None
has_config_update = update.config is not None
if has_sched_update:
await self._update_scheduled_job(crawlconfig)
if has_scale_update or has_config_update or profile_filename:
await self._update_config_map(
crawlconfig, update.scale, profile_filename, has_config_update
)
return True
async def shutdown_crawl(self, crawl_id, oid, graceful=True):
"""Request a crawl cancelation or stop by calling an API
on the job pod/container, returning the result"""
return await self._post_to_job(
crawl_id, oid, "/stop" if graceful else "/cancel"
)
async def scale_crawl(self, crawl_id, oid, scale=1):
"""Set the crawl scale (job parallelism) on the specified job"""
return await self._post_to_job(crawl_id, oid, f"/scale/{scale}")
async def rollover_restart_crawl(self, crawl_id, oid):
"""Rolling restart of crawl"""
return await self._post_to_job(crawl_id, oid, "/rollover")
async def delete_crawl_configs_for_org(self, org):
"""Delete all crawl configs for given org"""
return await self._delete_crawl_configs(f"btrix.org={org}")
async def delete_crawl_config_by_id(self, cid):
"""Delete all crawl configs by id"""
return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
async def _create_manual_job(self, crawlconfig):
cid = str(crawlconfig.id)
ts_now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
crawl_id = f"manual-{ts_now}-{cid[:12]}"
data = await self._load_job_template(crawlconfig, crawl_id, manual=True)
# create job directly
await self._create_from_yaml(f"job-{crawl_id}", data)
return crawl_id
async def _load_job_template(self, crawlconfig, job_id, manual, schedule=None):
params = {
"id": job_id,
"cid": str(crawlconfig.id),
"rev": str(crawlconfig.rev),
"userid": str(crawlconfig.modifiedBy),
"oid": str(crawlconfig.oid),
"job_image": self.job_image,
"job_pull_policy": self.job_pull_policy,
"manual": "1" if manual else "0",
"crawler_node_type": self.crawler_node_type,
"schedule": schedule,
"env": os.environ,
"mongo_db_url": resolve_db_url(),
"tags": ",".join(crawlconfig.tags),
}
return self.templates.env.get_template("crawl_job.yaml").render(params)
async def _update_config_map(
self, crawlconfig, scale=None, profile_filename=None, update_config=False
):
"""update initial scale and crawler config in config, if needed (k8s only)"""
@abstractmethod
async def check_storage(self, storage_name, is_default=False):
"""check if given storage is valid"""
@abstractmethod
async def _create_from_yaml(self, id_, yaml_data):
"""check if given storage is valid"""
@abstractmethod
async def _create_config_map(self, crawlconfig, **kwargs):
"""create config map for config"""
@abstractmethod
async def _update_scheduled_job(self, crawlconfig):
"""update schedule on crawl job"""
@abstractmethod
async def _post_to_job(self, crawl_id, oid, path, data=None):
"""make a POST request to the container for specified crawl job"""
@abstractmethod
async def _delete_crawl_configs(self, label):
"""delete crawl configs by specified label"""