* Btrixjobs Operator - Phase 1 (#679) - add metacontroller and custom crds - add main_op entrypoint for operator * Btrix Operator Crawl Management (#767) * operator backend: - run operator api in separate container but in same pod, with WEB_CONCURRENCY=1 - operator creates statefulsets and services for CrawlJob and ProfileJob - operator: use service hook endpoint, set port in values.yaml * crawls working with CrawlJob - jobs start with 'crawljob-' prefix - update status to reflect current crawl state - set sync time to 10 seconds by default, overridable with 'operator_resync_seconds' - mark crawl as running, failed, complete when finished - store finished status when crawl is complete - support updating scale, forcing rollover, stop via patching CrawlJob - support cancel via deletion - requires hack to content-length for patching custom resources - auto-delete of CrawlJob via 'ttlSecondsAfterFinished' - also delete pvcs until autodelete supported via statefulset (k8s >1.27) - ensure filesAdded always set correctly, keep counter in redis, add to status display - optimization: attempt to reduce automerging, by reusing volumeClaimTemplates from existing children, as these may have additional props added - add add_crawl_errors_to_db() for storing crawl errors from redis '<crawl>:e' key to mongodb when crawl is finished/failed/canceled - add .status.size to display human-readable crawl size, if available (from webrecorder/browsertrix-crawler#291) - support new page size, >0.9.0 and old page size key (changed in webrecorder/browsertrix-crawler#284) * support for scheduled jobs! - add main_scheduled_job entrypoint to run scheduled jobs - add crawl_cron_job.yaml template for declaring CronJob - CronJobs moved to default namespace * operator manages ProfileJobs: - jobs start with 'profilejob-' - update expiry time by updating ProfileJob object 'expireTime' while profile is active * refactor/cleanup: - remove k8s package - merge k8sman and basecrawlmanager into crawlmanager - move templates, k8sapi, utils into root package - delete all *_job.py files - remove dt_now, ts_now from crawls, now in utils - all db operations happen in crawl/crawlconfig/org files - move shared crawl/crawlconfig/org functions that use the db to be importable directly, including get_crawl_config, add_new_crawl, inc_crawl_stats * role binding: more secure setup, don't allow crawler namespace any k8s permissions - move cronjobs to be created in default namespace - grant default namespace access to create cronjobs in default namespace - remove role binding from crawler namespace * additional tweaks to templates: - templates: split crawler and redis statefulset into separate yaml file (in case need to load one or other separately) * stats / redis optimization: - don't update stats in mongodb on every operator sync, only when crawl is finished - for api access, read stats directly from redis to get up-to-date stats - move get_page_stats() to utils, add get_redis_url() to k8sapi to unify access * Add migration for operator changes - Update configmap for crawl configs with scale > 1 or crawlTimeout > 0 and schedule exists to recreate CronJobs - add option to rerun last migration, enabled via env var and by running helm with --set=rerun_last_migration=1 * subcharts: move crawljob and profilejob crds to separate subchart, as this seems best way to guarantee proper install order with + update on upgrade with helm, add built btrix-crds-0.1.0.tgz subchart - metacontroller: use release from ghcr, add metacontroller-helm-v4.10.1.tgz subchart * backend api fixes - ensure changing scale of crawl also updates it in the db - crawlconfigs: add 'currCrawlSize' and 'lastCrawlSize' to crawlconfig api --------- Co-authored-by: D. Lee <leepro@gmail.com> Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
		
			
				
	
	
		
			79 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			79 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| BaseMigration class to subclass in each migration module
 | |
| """
 | |
| import os
 | |
| from pymongo.errors import OperationFailure
 | |
| 
 | |
| 
 | |
| class MigrationError(Exception):
 | |
|     """Custom migration exception class"""
 | |
| 
 | |
| 
 | |
| class BaseMigration:
 | |
|     """Base Migration class."""
 | |
| 
 | |
|     def __init__(self, mdb, migration_version="0001"):
 | |
|         self.mdb = mdb
 | |
|         self.migration_version = migration_version
 | |
|         self.rerun_migration = os.environ.get("RERUN_LAST_MIGRATION") == "1"
 | |
| 
 | |
|     async def get_db_version(self):
 | |
|         """Get current db version from database."""
 | |
|         db_version = None
 | |
|         version_collection = self.mdb["version"]
 | |
|         version_record = await version_collection.find_one()
 | |
|         if not version_record:
 | |
|             return db_version
 | |
|         try:
 | |
|             db_version = version_record["version"]
 | |
|         except KeyError:
 | |
|             pass
 | |
|         return db_version
 | |
| 
 | |
|     async def set_db_version(self):
 | |
|         """Set db version to migration_version."""
 | |
|         version_collection = self.mdb["version"]
 | |
|         await version_collection.find_one_and_update(
 | |
|             {}, {"$set": {"version": self.migration_version}}, upsert=True
 | |
|         )
 | |
| 
 | |
|     async def migrate_up_needed(self):
 | |
|         """Verify migration up is needed and return boolean indicator."""
 | |
|         db_version = await self.get_db_version()
 | |
|         print(f"Current database version before migration: {db_version}")
 | |
|         print(f"Migration available to apply: {self.migration_version}")
 | |
|         # Databases from prior to migrations will not have a version set.
 | |
|         if not db_version:
 | |
|             return True
 | |
|         if db_version < self.migration_version:
 | |
|             return True
 | |
| 
 | |
|         if self.rerun_migration and db_version == self.migration_version:
 | |
|             print("Rerunning last migration")
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     async def migrate_up(self):
 | |
|         """Perform migration up."""
 | |
|         raise NotImplementedError(
 | |
|             "Not implemented in base class - implement in subclass"
 | |
|         )
 | |
| 
 | |
|     async def run(self):
 | |
|         """Run migrations."""
 | |
|         if await self.migrate_up_needed():
 | |
|             print("Performing migration up", flush=True)
 | |
|             try:
 | |
|                 await self.migrate_up()
 | |
|                 await self.set_db_version()
 | |
|             except OperationFailure as err:
 | |
|                 print(f"Error running migration {self.migration_version}: {err}")
 | |
|                 return False
 | |
| 
 | |
|         else:
 | |
|             print("No migration to apply - skipping", flush=True)
 | |
|             return False
 | |
| 
 | |
|         print(f"Database successfully migrated to {self.migration_version}", flush=True)
 | |
|         return True
 |