* Btrixjobs Operator - Phase 1 (#679) - add metacontroller and custom crds - add main_op entrypoint for operator * Btrix Operator Crawl Management (#767) * operator backend: - run operator api in separate container but in same pod, with WEB_CONCURRENCY=1 - operator creates statefulsets and services for CrawlJob and ProfileJob - operator: use service hook endpoint, set port in values.yaml * crawls working with CrawlJob - jobs start with 'crawljob-' prefix - update status to reflect current crawl state - set sync time to 10 seconds by default, overridable with 'operator_resync_seconds' - mark crawl as running, failed, complete when finished - store finished status when crawl is complete - support updating scale, forcing rollover, stop via patching CrawlJob - support cancel via deletion - requires hack to content-length for patching custom resources - auto-delete of CrawlJob via 'ttlSecondsAfterFinished' - also delete pvcs until autodelete supported via statefulset (k8s >1.27) - ensure filesAdded always set correctly, keep counter in redis, add to status display - optimization: attempt to reduce automerging, by reusing volumeClaimTemplates from existing children, as these may have additional props added - add add_crawl_errors_to_db() for storing crawl errors from redis '<crawl>:e' key to mongodb when crawl is finished/failed/canceled - add .status.size to display human-readable crawl size, if available (from webrecorder/browsertrix-crawler#291) - support new page size, >0.9.0 and old page size key (changed in webrecorder/browsertrix-crawler#284) * support for scheduled jobs! - add main_scheduled_job entrypoint to run scheduled jobs - add crawl_cron_job.yaml template for declaring CronJob - CronJobs moved to default namespace * operator manages ProfileJobs: - jobs start with 'profilejob-' - update expiry time by updating ProfileJob object 'expireTime' while profile is active * refactor/cleanup: - remove k8s package - merge k8sman and basecrawlmanager into crawlmanager - move templates, k8sapi, utils into root package - delete all *_job.py files - remove dt_now, ts_now from crawls, now in utils - all db operations happen in crawl/crawlconfig/org files - move shared crawl/crawlconfig/org functions that use the db to be importable directly, including get_crawl_config, add_new_crawl, inc_crawl_stats * role binding: more secure setup, don't allow crawler namespace any k8s permissions - move cronjobs to be created in default namespace - grant default namespace access to create cronjobs in default namespace - remove role binding from crawler namespace * additional tweaks to templates: - templates: split crawler and redis statefulset into separate yaml file (in case need to load one or other separately) * stats / redis optimization: - don't update stats in mongodb on every operator sync, only when crawl is finished - for api access, read stats directly from redis to get up-to-date stats - move get_page_stats() to utils, add get_redis_url() to k8sapi to unify access * Add migration for operator changes - Update configmap for crawl configs with scale > 1 or crawlTimeout > 0 and schedule exists to recreate CronJobs - add option to rerun last migration, enabled via env var and by running helm with --set=rerun_last_migration=1 * subcharts: move crawljob and profilejob crds to separate subchart, as this seems best way to guarantee proper install order with + update on upgrade with helm, add built btrix-crds-0.1.0.tgz subchart - metacontroller: use release from ghcr, add metacontroller-helm-v4.10.1.tgz subchart * backend api fixes - ensure changing scale of crawl also updates it in the db - crawlconfigs: add 'currCrawlSize' and 'lastCrawlSize' to crawlconfig api --------- Co-authored-by: D. Lee <leepro@gmail.com> Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
		
			
				
	
	
		
			428 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			428 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ shared crawl manager implementation """
 | |
| 
 | |
| import os
 | |
| import asyncio
 | |
| import secrets
 | |
| import json
 | |
| import base64
 | |
| 
 | |
| from datetime import timedelta
 | |
| 
 | |
| from .orgs import S3Storage
 | |
| 
 | |
| from .k8sapi import K8sAPI
 | |
| 
 | |
| from .utils import dt_now, to_k8s_date
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| class CrawlManager(K8sAPI):
 | |
|     """abstract crawl manager"""
 | |
| 
 | |
|     def __init__(self):
 | |
|         super().__init__()
 | |
|         self.job_image = os.environ["JOB_IMAGE"]
 | |
|         self.job_image_pull_policy = os.environ.get("JOB_PULL_POLICY", "Always")
 | |
| 
 | |
|         self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
 | |
| 
 | |
|         self.crawler_node_type = os.environ.get("CRAWLER_NODE_TYPE", "")
 | |
| 
 | |
|         self.cron_namespace = os.environ.get("CRON_NAMESPACE", "default")
 | |
| 
 | |
|         self._default_storages = {}
 | |
| 
 | |
|         self.loop = asyncio.get_running_loop()
 | |
| 
 | |
|     # pylint: disable=too-many-arguments
 | |
|     async def run_profile_browser(
 | |
|         self,
 | |
|         userid,
 | |
|         oid,
 | |
|         url,
 | |
|         storage=None,
 | |
|         storage_name=None,
 | |
|         baseprofile=None,
 | |
|         profile_path=None,
 | |
|     ):
 | |
|         """run browser for profile creation"""
 | |
| 
 | |
|         # if default storage, use name and path + profiles/
 | |
|         if storage:
 | |
|             storage_name = storage.name
 | |
|             storage_path = storage.path + "profiles/"
 | |
|         # otherwise, use storage name and existing path from secret
 | |
|         else:
 | |
|             storage_path = ""
 | |
| 
 | |
|         await self.check_storage(storage_name)
 | |
| 
 | |
|         browserid = f"prf-{secrets.token_hex(5)}"
 | |
| 
 | |
|         params = {
 | |
|             "id": browserid,
 | |
|             "userid": str(userid),
 | |
|             "oid": str(oid),
 | |
|             "storage_name": storage_name,
 | |
|             "storage_path": storage_path or "",
 | |
|             "base_profile": baseprofile or "",
 | |
|             "profile_filename": profile_path,
 | |
|             "idle_timeout": os.environ.get("IDLE_TIMEOUT", "60"),
 | |
|             "url": url,
 | |
|             "vnc_password": secrets.token_hex(16),
 | |
|             "expire_time": to_k8s_date(dt_now() + timedelta(seconds=30)),
 | |
|         }
 | |
| 
 | |
|         data = self.templates.env.get_template("profile_job.yaml").render(params)
 | |
| 
 | |
|         await self.create_from_yaml(data)
 | |
| 
 | |
|         return browserid
 | |
| 
 | |
|     async def add_crawl_config(
 | |
|         self,
 | |
|         crawlconfig,
 | |
|         storage,
 | |
|         run_now,
 | |
|         out_filename,
 | |
|         profile_filename,
 | |
|     ):
 | |
|         """add new crawl, store crawl config in configmap"""
 | |
| 
 | |
|         if storage.type == "default":
 | |
|             storage_name = storage.name
 | |
|             storage_path = storage.path
 | |
|         else:
 | |
|             storage_name = str(crawlconfig.oid)
 | |
|             storage_path = ""
 | |
| 
 | |
|         await self.check_storage(storage_name)
 | |
| 
 | |
|         # Create Config Map
 | |
|         await self._create_config_map(
 | |
|             crawlconfig,
 | |
|             STORE_PATH=storage_path,
 | |
|             STORE_FILENAME=out_filename,
 | |
|             STORAGE_NAME=storage_name,
 | |
|             USER_ID=str(crawlconfig.modifiedBy),
 | |
|             ORG_ID=str(crawlconfig.oid),
 | |
|             CRAWL_CONFIG_ID=str(crawlconfig.id),
 | |
|             PROFILE_FILENAME=profile_filename,
 | |
|             INITIAL_SCALE=str(crawlconfig.scale),
 | |
|             CRAWL_TIMEOUT=str(crawlconfig.crawlTimeout)
 | |
|             # REV=str(crawlconfig.rev),
 | |
|         )
 | |
| 
 | |
|         crawl_id = None
 | |
| 
 | |
|         if run_now:
 | |
|             crawl_id = await self.create_crawl_job(
 | |
|                 crawlconfig, str(crawlconfig.modifiedBy)
 | |
|             )
 | |
| 
 | |
|         await self._update_scheduled_job(crawlconfig, crawlconfig.schedule)
 | |
| 
 | |
|         return crawl_id
 | |
| 
 | |
|     async def create_crawl_job(self, crawlconfig, userid: str):
 | |
|         """create new crawl job from config"""
 | |
|         cid = str(crawlconfig.id)
 | |
| 
 | |
|         return await self.new_crawl_job(
 | |
|             cid, userid, crawlconfig.scale, crawlconfig.crawlTimeout, manual=True
 | |
|         )
 | |
| 
 | |
|     async def update_crawl_config(self, crawlconfig, update, profile_filename=None):
 | |
|         """Update the schedule or scale for existing crawl config"""
 | |
| 
 | |
|         has_sched_update = update.schedule is not None
 | |
|         has_scale_update = update.scale is not None
 | |
|         has_timeout_update = update.crawlTimeout is not None
 | |
|         has_config_update = update.config is not None
 | |
| 
 | |
|         if has_sched_update:
 | |
|             await self._update_scheduled_job(crawlconfig, update.schedule)
 | |
| 
 | |
|         if (
 | |
|             has_scale_update
 | |
|             or has_config_update
 | |
|             or has_timeout_update
 | |
|             or profile_filename
 | |
|         ):
 | |
|             await self._update_config_map(
 | |
|                 crawlconfig,
 | |
|                 update.scale,
 | |
|                 profile_filename,
 | |
|                 update.crawlTimeout,
 | |
|                 has_config_update,
 | |
|             )
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     async def check_storage(self, storage_name, is_default=False):
 | |
|         """Check if storage is valid by trying to get the storage secret
 | |
|         Will throw if not valid, otherwise return True"""
 | |
|         await self._get_storage_secret(storage_name)
 | |
|         return True
 | |
| 
 | |
|     async def update_org_storage(self, oid, userid, storage):
 | |
|         """Update storage by either creating a per-org secret, if using custom storage
 | |
|         or deleting per-org secret, if using default storage"""
 | |
|         org_storage_name = f"storage-{oid}"
 | |
|         if storage.type == "default":
 | |
|             try:
 | |
|                 await self.core_api.delete_namespaced_secret(
 | |
|                     org_storage_name,
 | |
|                     namespace=self.namespace,
 | |
|                     propagation_policy="Foreground",
 | |
|                 )
 | |
|             # pylint: disable=bare-except
 | |
|             except:
 | |
|                 pass
 | |
| 
 | |
|             return
 | |
| 
 | |
|         labels = {"btrix.org": oid, "btrix.user": userid}
 | |
| 
 | |
|         crawl_secret = self.client.V1Secret(
 | |
|             metadata={
 | |
|                 "name": org_storage_name,
 | |
|                 "namespace": self.namespace,
 | |
|                 "labels": labels,
 | |
|             },
 | |
|             string_data={
 | |
|                 "STORE_ENDPOINT_URL": storage.endpoint_url,
 | |
|                 "STORE_ACCESS_KEY": storage.access_key,
 | |
|                 "STORE_SECRET_KEY": storage.secret_key,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             await self.core_api.create_namespaced_secret(
 | |
|                 namespace=self.namespace, body=crawl_secret
 | |
|             )
 | |
| 
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             await self.core_api.patch_namespaced_secret(
 | |
|                 name=org_storage_name, namespace=self.namespace, body=crawl_secret
 | |
|             )
 | |
| 
 | |
|     async def get_default_storage_access_endpoint(self, name):
 | |
|         """Get access_endpoint for default storage"""
 | |
|         return (await self.get_default_storage(name)).access_endpoint_url
 | |
| 
 | |
|     async def get_default_storage(self, name):
 | |
|         """get default storage"""
 | |
|         if name not in self._default_storages:
 | |
|             storage_secret = await self._get_storage_secret(name)
 | |
| 
 | |
|             access_endpoint_url = self._secret_data(
 | |
|                 storage_secret, "STORE_ACCESS_ENDPOINT_URL"
 | |
|             )
 | |
|             endpoint_url = self._secret_data(storage_secret, "STORE_ENDPOINT_URL")
 | |
|             access_key = self._secret_data(storage_secret, "STORE_ACCESS_KEY")
 | |
|             secret_key = self._secret_data(storage_secret, "STORE_SECRET_KEY")
 | |
|             region = self._secret_data(storage_secret, "STORE_REGION") or ""
 | |
|             use_access_for_presign = (
 | |
|                 self._secret_data(storage_secret, "STORE_USE_ACCESS_FOR_PRESIGN") == "1"
 | |
|             )
 | |
| 
 | |
|             self._default_storages[name] = S3Storage(
 | |
|                 access_key=access_key,
 | |
|                 secret_key=secret_key,
 | |
|                 endpoint_url=endpoint_url,
 | |
|                 access_endpoint_url=access_endpoint_url,
 | |
|                 region=region,
 | |
|                 use_access_for_presign=use_access_for_presign,
 | |
|             )
 | |
| 
 | |
|         return self._default_storages[name]
 | |
| 
 | |
|     async def get_profile_browser_metadata(self, browserid):
 | |
|         """get browser profile labels"""
 | |
|         try:
 | |
|             browser = await self.get_profile_browser(browserid)
 | |
| 
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             return {}
 | |
| 
 | |
|         return browser["metadata"]["labels"]
 | |
| 
 | |
|     async def get_configmap(self, cid):
 | |
|         """get configmap by id"""
 | |
|         return await self.core_api.read_namespaced_config_map(
 | |
|             name=f"crawl-config-{cid}", namespace=self.namespace
 | |
|         )
 | |
| 
 | |
|     async def ping_profile_browser(self, browserid):
 | |
|         """return ping profile browser"""
 | |
|         expire_at = dt_now() + timedelta(seconds=30)
 | |
|         await self._patch_job(
 | |
|             browserid, {"expireTime": to_k8s_date(expire_at)}, "profilejobs"
 | |
|         )
 | |
| 
 | |
|     async def rollover_restart_crawl(self, crawl_id, oid):
 | |
|         """Rolling restart of crawl by updating forceRestart field"""
 | |
|         update = to_k8s_date(dt_now())
 | |
|         return await self._patch_job(crawl_id, {"forceRestart": update})
 | |
| 
 | |
|     async def scale_crawl(self, crawl_id, oid, scale=1):
 | |
|         """Set the crawl scale (job parallelism) on the specified job"""
 | |
|         return await self._patch_job(crawl_id, {"scale": scale})
 | |
| 
 | |
|     async def shutdown_crawl(self, crawl_id, oid, graceful=True):
 | |
|         """Request a crawl cancelation or stop by calling an API
 | |
|         on the job pod/container, returning the result"""
 | |
|         if graceful:
 | |
|             patch = {"stopping": True}
 | |
|             return await self._patch_job(crawl_id, patch)
 | |
| 
 | |
|         await self.delete_crawl_job(crawl_id)
 | |
| 
 | |
|         return {"success": True}
 | |
| 
 | |
|     async def delete_crawl_configs_for_org(self, org):
 | |
|         """Delete all crawl configs for given org"""
 | |
|         return await self._delete_crawl_configs(f"btrix.org={org}")
 | |
| 
 | |
|     async def delete_crawl_config_by_id(self, cid):
 | |
|         """Delete all crawl configs by id"""
 | |
|         return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
 | |
| 
 | |
|     # ========================================================================
 | |
|     # Internal Methods
 | |
|     def _secret_data(self, secret, name):
 | |
|         """decode secret data"""
 | |
|         return base64.standard_b64decode(secret.data[name]).decode()
 | |
| 
 | |
|     async def _create_config_map(self, crawlconfig, **data):
 | |
|         """Create Config Map based on CrawlConfig"""
 | |
|         data["crawl-config.json"] = json.dumps(crawlconfig.get_raw_config())
 | |
| 
 | |
|         labels = {
 | |
|             "btrix.crawlconfig": str(crawlconfig.id),
 | |
|             "btrix.org": str(crawlconfig.oid),
 | |
|         }
 | |
| 
 | |
|         config_map = self.client.V1ConfigMap(
 | |
|             metadata={
 | |
|                 "name": f"crawl-config-{crawlconfig.id}",
 | |
|                 "namespace": self.namespace,
 | |
|                 "labels": labels,
 | |
|             },
 | |
|             data=data,
 | |
|         )
 | |
| 
 | |
|         return await self.core_api.create_namespaced_config_map(
 | |
|             namespace=self.namespace, body=config_map
 | |
|         )
 | |
| 
 | |
|     async def _get_storage_secret(self, storage_name):
 | |
|         """Check if storage_name is valid by checking existing secret"""
 | |
|         try:
 | |
|             return await self.core_api.read_namespaced_secret(
 | |
|                 f"storage-{storage_name}",
 | |
|                 namespace=self.namespace,
 | |
|             )
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception:
 | |
|             # pylint: disable=broad-exception-raised,raise-missing-from
 | |
|             raise Exception(f"Storage {storage_name} not found")
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     async def _delete_crawl_configs(self, label):
 | |
|         """Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
 | |
| 
 | |
|         await self.batch_api.delete_collection_namespaced_cron_job(
 | |
|             namespace=self.cron_namespace,
 | |
|             label_selector=label,
 | |
|             propagation_policy="Foreground",
 | |
|         )
 | |
| 
 | |
|         await self.core_api.delete_collection_namespaced_config_map(
 | |
|             namespace=self.namespace,
 | |
|             label_selector=label,
 | |
|             propagation_policy="Foreground",
 | |
|         )
 | |
| 
 | |
|     async def _update_scheduled_job(self, crawlconfig, schedule):
 | |
|         """create or remove cron job based on crawlconfig schedule"""
 | |
|         cid = str(crawlconfig.id)
 | |
| 
 | |
|         cron_job_id = f"sched-{cid[:12]}"
 | |
|         cron_job = None
 | |
|         try:
 | |
|             cron_job = await self.batch_api.read_namespaced_cron_job(
 | |
|                 name=cron_job_id,
 | |
|                 namespace=self.cron_namespace,
 | |
|             )
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|         # if no schedule, delete cron_job if exists and we're done
 | |
|         if not crawlconfig.schedule:
 | |
|             if cron_job:
 | |
|                 await self.batch_api.delete_namespaced_cron_job(
 | |
|                     name=cron_job.metadata.name, namespace=self.cron_namespace
 | |
|                 )
 | |
|             return
 | |
| 
 | |
|         # if cron job exists, just patch schedule
 | |
|         if cron_job:
 | |
|             if crawlconfig.schedule != cron_job.spec.schedule:
 | |
|                 cron_job.spec.schedule = crawlconfig.schedule
 | |
| 
 | |
|                 await self.batch_api.patch_namespaced_cron_job(
 | |
|                     name=cron_job.metadata.name,
 | |
|                     namespace=self.cron_namespace,
 | |
|                     body=cron_job,
 | |
|                 )
 | |
|             return
 | |
| 
 | |
|         params = {
 | |
|             "id": cron_job_id,
 | |
|             "cid": str(crawlconfig.id),
 | |
|             "image": self.job_image,
 | |
|             "image_pull_policy": self.job_image_pull_policy,
 | |
|             "schedule": schedule,
 | |
|         }
 | |
| 
 | |
|         data = self.templates.env.get_template("crawl_cron_job.yaml").render(params)
 | |
| 
 | |
|         await self.create_from_yaml(data, self.cron_namespace)
 | |
| 
 | |
|         return cron_job_id
 | |
| 
 | |
|     async def _update_config_map(
 | |
|         self,
 | |
|         crawlconfig,
 | |
|         scale=None,
 | |
|         profile_filename=None,
 | |
|         crawl_timeout=None,
 | |
|         update_config=False,
 | |
|     ):
 | |
|         config_map = await self.get_configmap(crawlconfig.id)
 | |
| 
 | |
|         if scale is not None:
 | |
|             config_map.data["INITIAL_SCALE"] = str(scale)
 | |
| 
 | |
|         if profile_filename is not None:
 | |
|             config_map.data["PROFILE_FILENAME"] = profile_filename
 | |
| 
 | |
|         if crawl_timeout is not None:
 | |
|             config_map.data["CRAWL_TIMEOUT"] = str(crawl_timeout)
 | |
| 
 | |
|         if update_config:
 | |
|             config_map.data["crawl-config.json"] = json.dumps(
 | |
|                 crawlconfig.get_raw_config()
 | |
|             )
 | |
| 
 | |
|         await self.core_api.patch_namespaced_config_map(
 | |
|             name=config_map.metadata.name, namespace=self.namespace, body=config_map
 | |
|         )
 |