* misc backend fixes: - fix running w/o local minio - ensure crawler image pull policy is configurable, loaded via chart value - use digitalocean repo for main backend image (for now) - add bucket_name to config only if using default bucket * enable all behaviors, support 'access_endpoint_url' for default storages * debugging: add 'no_delete_jobs' setting for k8s and docker to disable deletion of completed jobs
		
			
				
	
	
		
			708 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			708 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ K8s support"""
 | |
| 
 | |
| import os
 | |
| import datetime
 | |
| import json
 | |
| import asyncio
 | |
| import base64
 | |
| 
 | |
| from kubernetes_asyncio import client, config, watch
 | |
| from kubernetes_asyncio.stream import WsApiClient
 | |
| 
 | |
| from crawls import Crawl, CrawlFile
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| CRAWLER_NAMESPACE = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
 | |
| 
 | |
| # an 2/31 schedule that will never run as empty is not allowed
 | |
| DEFAULT_NO_SCHEDULE = "* * 31 2 *"
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| class K8SManager:
 | |
|     # pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments
 | |
|     """K8SManager, manager creation of k8s resources from crawl api requests"""
 | |
| 
 | |
|     def __init__(self, namespace=CRAWLER_NAMESPACE):
 | |
|         config.load_incluster_config()
 | |
| 
 | |
|         self.crawl_ops = None
 | |
| 
 | |
|         self.core_api = client.CoreV1Api()
 | |
|         self.core_api_ws = client.CoreV1Api(api_client=WsApiClient())
 | |
|         self.batch_api = client.BatchV1Api()
 | |
|         self.batch_beta_api = client.BatchV1beta1Api()
 | |
| 
 | |
|         self.namespace = namespace
 | |
|         self._default_storage_endpoints = {}
 | |
| 
 | |
|         self.crawler_image = os.environ["CRAWLER_IMAGE"]
 | |
|         self.crawler_image_pull_policy = os.environ["CRAWLER_PULL_POLICY"]
 | |
| 
 | |
|         self.crawl_retries = int(os.environ.get("CRAWL_RETRIES", "3"))
 | |
| 
 | |
|         self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
 | |
| 
 | |
|         self.loop = asyncio.get_running_loop()
 | |
|         self.loop.create_task(self.run_event_loop())
 | |
| 
 | |
|     def set_crawl_ops(self, ops):
 | |
|         """ Set crawl ops handler """
 | |
|         self.crawl_ops = ops
 | |
| 
 | |
|     async def run_event_loop(self):
 | |
|         """ Run the job watch loop, retry in case of failure"""
 | |
|         while True:
 | |
|             try:
 | |
|                 await self.watch_events()
 | |
|             # pylint: disable=broad-except
 | |
|             except Exception as exc:
 | |
|                 print(f"Retrying job loop: {exc}")
 | |
|                 await asyncio.sleep(10)
 | |
| 
 | |
|     async def watch_events(self):
 | |
|         """ Get events for completed jobs"""
 | |
|         async with watch.Watch().stream(
 | |
|             self.core_api.list_namespaced_event,
 | |
|             self.namespace,
 | |
|             field_selector="involvedObject.kind=Job",
 | |
|         ) as stream:
 | |
|             async for event in stream:
 | |
|                 try:
 | |
|                     obj = event["object"]
 | |
|                     if obj.reason == "BackoffLimitExceeded":
 | |
|                         self.loop.create_task(
 | |
|                             self.handle_crawl_failed(obj.involved_object.name, "failed")
 | |
|                         )
 | |
| 
 | |
|                     elif obj.reason == "DeadlineExceeded":
 | |
|                         self.loop.create_task(
 | |
|                             self.handle_crawl_failed(
 | |
|                                 obj.involved_object.name, "timed_out"
 | |
|                             )
 | |
|                         )
 | |
| 
 | |
|                 # pylint: disable=broad-except
 | |
|                 except Exception as exc:
 | |
|                     print(exc)
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     async def get_storage(self, storage_name, is_default=False):
 | |
|         """Check if storage_name is valid by checking existing secret
 | |
|         is_default flag ignored"""
 | |
|         try:
 | |
|             return await self.core_api.read_namespaced_secret(
 | |
|                 f"storage-{storage_name}",
 | |
|                 namespace=self.namespace,
 | |
|             )
 | |
|         except Exception:
 | |
|             # pylint: disable=broad-except,raise-missing-from
 | |
|             raise Exception(f"Storage {storage_name} not found")
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     async def update_archive_storage(self, aid, uid, storage):
 | |
|         """Update storage by either creating a per-archive secret, if using custom storage
 | |
|         or deleting per-archive secret, if using default storage"""
 | |
|         archive_storage_name = f"storage-{aid}"
 | |
|         if storage.type == "default":
 | |
|             try:
 | |
|                 await self.core_api.delete_namespaced_secret(
 | |
|                     archive_storage_name,
 | |
|                     namespace=self.namespace,
 | |
|                     propagation_policy="Foreground",
 | |
|                 )
 | |
|             # pylint: disable=bare-except
 | |
|             except:
 | |
|                 pass
 | |
| 
 | |
|             return
 | |
| 
 | |
|         labels = {"btrix.archive": aid, "btrix.user": uid}
 | |
| 
 | |
|         crawl_secret = client.V1Secret(
 | |
|             metadata={
 | |
|                 "name": archive_storage_name,
 | |
|                 "namespace": self.namespace,
 | |
|                 "labels": labels,
 | |
|             },
 | |
|             string_data={
 | |
|                 "STORE_ENDPOINT_URL": storage.endpoint_url,
 | |
|                 "STORE_ACCESS_KEY": storage.access_key,
 | |
|                 "STORE_SECRET_KEY": storage.secret_key,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             await self.core_api.create_namespaced_secret(
 | |
|                 namespace=self.namespace, body=crawl_secret
 | |
|             )
 | |
| 
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             await self.core_api.patch_namespaced_secret(
 | |
|                 name=archive_storage_name, namespace=self.namespace, body=crawl_secret
 | |
|             )
 | |
| 
 | |
|     async def add_crawl_config(self, crawlconfig, storage, run_now):
 | |
|         """add new crawl as cron job, store crawl config in configmap"""
 | |
|         cid = str(crawlconfig.id)
 | |
|         userid = crawlconfig.user
 | |
|         aid = crawlconfig.archive
 | |
| 
 | |
|         annotations = {
 | |
|             "btrix.run.schedule": crawlconfig.schedule,
 | |
|             "btrix.storage_name": storage.name,
 | |
|             "btrix.colls": json.dumps(crawlconfig.colls),
 | |
|         }
 | |
| 
 | |
|         # Configure Annotations + Labels
 | |
|         if storage.type == "default":
 | |
|             storage_name = storage.name
 | |
|             storage_path = storage.path
 | |
|             annotations["btrix.def_storage_path"] = storage_path
 | |
|         else:
 | |
|             storage_name = aid
 | |
|             storage_path = ""
 | |
| 
 | |
|         labels = {
 | |
|             "btrix.user": userid,
 | |
|             "btrix.archive": aid,
 | |
|             "btrix.crawlconfig": cid,
 | |
|         }
 | |
| 
 | |
|         await self.get_storage(storage_name)
 | |
| 
 | |
|         # Create Config Map
 | |
|         config_map = self._create_config_map(crawlconfig, labels)
 | |
| 
 | |
|         # Create Cron Job
 | |
|         await self.core_api.create_namespaced_config_map(
 | |
|             namespace=self.namespace, body=config_map
 | |
|         )
 | |
| 
 | |
|         suspend, schedule = self._get_schedule_suspend_run_now(crawlconfig)
 | |
| 
 | |
|         job_template = self._get_job_template(
 | |
|             cid,
 | |
|             storage_name,
 | |
|             storage_path,
 | |
|             labels,
 | |
|             annotations,
 | |
|             crawlconfig.crawlTimeout,
 | |
|             crawlconfig.parallel,
 | |
|         )
 | |
| 
 | |
|         spec = client.V1beta1CronJobSpec(
 | |
|             schedule=schedule,
 | |
|             suspend=suspend,
 | |
|             concurrency_policy="Forbid",
 | |
|             successful_jobs_history_limit=2,
 | |
|             failed_jobs_history_limit=3,
 | |
|             job_template=job_template,
 | |
|         )
 | |
| 
 | |
|         cron_job = client.V1beta1CronJob(
 | |
|             metadata={
 | |
|                 "name": f"crawl-scheduled-{cid}",
 | |
|                 "namespace": self.namespace,
 | |
|                 "labels": labels,
 | |
|             },
 | |
|             spec=spec,
 | |
|         )
 | |
| 
 | |
|         cron_job = await self.batch_beta_api.create_namespaced_cron_job(
 | |
|             namespace=self.namespace, body=cron_job
 | |
|         )
 | |
| 
 | |
|         # Run Job Now
 | |
|         if run_now:
 | |
|             new_job = await self._create_run_now_job(cron_job)
 | |
|             return new_job.metadata.name
 | |
| 
 | |
|         return ""
 | |
| 
 | |
|     async def update_crawl_schedule(self, cid, schedule):
 | |
|         """ Update the schedule for existing crawl config """
 | |
| 
 | |
|         cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
 | |
|             namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
 | |
|         )
 | |
| 
 | |
|         if len(cron_jobs.items) != 1:
 | |
|             return
 | |
| 
 | |
|         cron_job = cron_jobs.items[0]
 | |
| 
 | |
|         real_schedule = schedule or DEFAULT_NO_SCHEDULE
 | |
| 
 | |
|         if real_schedule != cron_job.spec.schedule:
 | |
|             cron_job.spec.schedule = real_schedule
 | |
|             cron_job.spec.suspend = not schedule
 | |
| 
 | |
|             cron_job.spec.job_template.metadata.annotations[
 | |
|                 "btrix.run.schedule"
 | |
|             ] = schedule
 | |
| 
 | |
|             await self.batch_beta_api.patch_namespaced_cron_job(
 | |
|                 name=cron_job.metadata.name, namespace=self.namespace, body=cron_job
 | |
|             )
 | |
| 
 | |
|     async def run_crawl_config(self, cid):
 | |
|         """ Run crawl job for cron job based on specified crawlconfig id (cid) """
 | |
|         cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
 | |
|             namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
 | |
|         )
 | |
| 
 | |
|         if len(cron_jobs.items) != 1:
 | |
|             raise Exception("Crawl Config Not Found")
 | |
| 
 | |
|         res = await self._create_run_now_job(cron_jobs.items[0])
 | |
|         return res.metadata.name
 | |
| 
 | |
|     async def list_running_crawls(self, cid=None, aid=None, userid=None):
 | |
|         """ Return a list of running crawls """
 | |
|         filters = []
 | |
|         if cid:
 | |
|             filters.append(f"btrix.crawlconfig={cid}")
 | |
| 
 | |
|         if aid:
 | |
|             filters.append(f"btrix.archive={aid}")
 | |
| 
 | |
|         if userid:
 | |
|             filters.append(f"btrix.user={userid}")
 | |
| 
 | |
|         jobs = await self.batch_api.list_namespaced_job(
 | |
|             namespace=self.namespace,
 | |
|             label_selector=",".join(filters),
 | |
|             field_selector="status.successful=0",
 | |
|         )
 | |
| 
 | |
|         return [
 | |
|             self._make_crawl_for_job(job, "running")
 | |
|             for job in jobs.items
 | |
|             if job.status.active
 | |
|         ]
 | |
| 
 | |
|     async def init_crawl_screencast(self, crawl_id, aid):
 | |
|         """ Init service for this job/crawl_id to support screencasting """
 | |
|         labels = {"btrix.archive": aid}
 | |
| 
 | |
|         service = client.V1Service(
 | |
|             kind="Service",
 | |
|             api_version="v1",
 | |
|             metadata={
 | |
|                 "name": crawl_id,
 | |
|                 "labels": labels,
 | |
|             },
 | |
|             spec={
 | |
|                 "selector": {"job-name": crawl_id},
 | |
|                 "ports": [{"protocol": "TCP", "port": 9037, "name": "screencast"}],
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             await self.core_api.create_namespaced_service(
 | |
|                 body=service, namespace=self.namespace
 | |
|             )
 | |
|         except client.exceptions.ApiException as api_exc:
 | |
|             if api_exc.status != 409:
 | |
|                 raise api_exc
 | |
| 
 | |
|     async def process_crawl_complete(self, crawlcomplete):
 | |
|         """Ensure the crawlcomplete data is valid (job exists and user matches)
 | |
|         Fill in additional details about the crawl"""
 | |
|         job = await self.batch_api.read_namespaced_job(
 | |
|             name=crawlcomplete.id, namespace=self.namespace
 | |
|         )
 | |
| 
 | |
|         if not job:  # or job.metadata.labels["btrix.user"] != crawlcomplete.user:
 | |
|             return None, None
 | |
| 
 | |
|         manual = job.metadata.annotations.get("btrix.run.manual") == "1"
 | |
|         if manual and not self.no_delete_jobs:
 | |
|             self.loop.create_task(self._delete_job(job.metadata.name))
 | |
| 
 | |
|         crawl = self._make_crawl_for_job(
 | |
|             job,
 | |
|             "complete" if crawlcomplete.completed else "partial_complete",
 | |
|             finish_now=True,
 | |
|         )
 | |
| 
 | |
|         storage_path = job.metadata.annotations.get("btrix.def_storage_path")
 | |
|         inx = None
 | |
|         filename = None
 | |
|         storage_name = None
 | |
|         if storage_path:
 | |
|             inx = crawlcomplete.filename.index(storage_path)
 | |
|             filename = (
 | |
|                 crawlcomplete.filename[inx:] if inx > 0 else crawlcomplete.filename
 | |
|             )
 | |
|             storage_name = job.metadata.annotations.get("btrix.storage_name")
 | |
| 
 | |
|         def_storage_name = storage_name if inx else None
 | |
| 
 | |
|         crawl_file = CrawlFile(
 | |
|             def_storage_name=def_storage_name,
 | |
|             filename=filename or crawlcomplete.filename,
 | |
|             size=crawlcomplete.size,
 | |
|             hash=crawlcomplete.hash,
 | |
|         )
 | |
| 
 | |
|         return crawl, crawl_file
 | |
| 
 | |
|     async def get_default_storage_access_endpoint(self, name):
 | |
|         """ Get access_endpoint for default storage """
 | |
|         if name not in self._default_storage_endpoints:
 | |
|             storage_secret = await self.get_storage(name, is_default=True)
 | |
|             self._default_storage_endpoints[name] = base64.standard_b64decode(
 | |
|                 storage_secret.data["STORE_ACCESS_ENDPOINT_URL"]
 | |
|             ).decode()
 | |
| 
 | |
|         return self._default_storage_endpoints[name]
 | |
| 
 | |
|     async def is_running(self, job_name, aid):
 | |
|         """ Return true if the specified crawl (by job_name) is running """
 | |
|         try:
 | |
|             job = await self.batch_api.read_namespaced_job(
 | |
|                 name=job_name, namespace=self.namespace
 | |
|             )
 | |
| 
 | |
|             if not job or job.metadata.labels["btrix.archive"] != aid:
 | |
|                 return False
 | |
| 
 | |
|             return True
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception:
 | |
|             return False
 | |
| 
 | |
|     async def stop_crawl(self, job_name, aid, graceful=True):
 | |
|         """Attempt to stop crawl, either gracefully by issuing a SIGTERM which
 | |
|         will attempt to finish current pages
 | |
| 
 | |
|         OR, abruptly by first issueing a SIGINT, followed by SIGTERM, which
 | |
|         will terminate immediately"""
 | |
| 
 | |
|         job = await self.batch_api.read_namespaced_job(
 | |
|             name=job_name, namespace=self.namespace
 | |
|         )
 | |
| 
 | |
|         if not job or job.metadata.labels["btrix.archive"] != aid:
 | |
|             return None
 | |
| 
 | |
|         result = None
 | |
| 
 | |
|         if not graceful:
 | |
|             pods = await self.core_api.list_namespaced_pod(
 | |
|                 namespace=self.namespace,
 | |
|                 label_selector=f"job-name={job_name},btrix.archive={aid}",
 | |
|             )
 | |
| 
 | |
|             await self._send_sig_to_pods(pods.items, aid)
 | |
| 
 | |
|             result = self._make_crawl_for_job(job, "canceled", True)
 | |
|         else:
 | |
|             result = True
 | |
| 
 | |
|         await self._delete_job(job_name)
 | |
| 
 | |
|         return result
 | |
| 
 | |
|     async def scale_crawl(self, job_name, aid, parallelism=1):
 | |
|         """ Set the crawl scale (job parallelism) on the specified job """
 | |
| 
 | |
|         try:
 | |
|             job = await self.batch_api.read_namespaced_job(
 | |
|                 name=job_name, namespace=self.namespace
 | |
|             )
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception:
 | |
|             return "Crawl not found"
 | |
| 
 | |
|         if not job or job.metadata.labels["btrix.archive"] != aid:
 | |
|             return "Invalid Crawled"
 | |
| 
 | |
|         if parallelism < 1 or parallelism > 10:
 | |
|             return "Invalid Scale: Must be between 1 and 10"
 | |
| 
 | |
|         job.spec.parallelism = parallelism
 | |
| 
 | |
|         await self.batch_api.patch_namespaced_job(
 | |
|             name=job.metadata.name, namespace=self.namespace, body=job
 | |
|         )
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     async def delete_crawl_configs_for_archive(self, archive):
 | |
|         """Delete all crawl configs for given archive"""
 | |
|         return await self._delete_crawl_configs(f"btrix.archive={archive}")
 | |
| 
 | |
|     async def delete_crawl_config_by_id(self, cid):
 | |
|         """Delete all crawl configs by id"""
 | |
|         return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
 | |
| 
 | |
|     async def handle_crawl_failed(self, job_name, reason):
 | |
|         """ Handle failed crawl job, add to db and then delete """
 | |
|         try:
 | |
|             job = await self.batch_api.read_namespaced_job(
 | |
|                 name=job_name, namespace=self.namespace
 | |
|             )
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             print("Job Failure Already Handled")
 | |
|             return
 | |
| 
 | |
|         crawl = self._make_crawl_for_job(job, reason, True)
 | |
| 
 | |
|         # if update succeeds, than crawl has not completed, so likely a failure
 | |
|         failure = await self.crawl_ops.store_crawl(crawl)
 | |
| 
 | |
|         # keep failed jobs around, for now
 | |
|         if not failure and not self.no_delete_jobs:
 | |
|             await self._delete_job(job_name)
 | |
| 
 | |
|     # ========================================================================
 | |
|     # Internal Methods
 | |
| 
 | |
|     # pylint: disable=no-self-use
 | |
|     def _make_crawl_for_job(self, job, state, finish_now=False):
 | |
|         """ Make a crawl object from a job"""
 | |
|         return Crawl(
 | |
|             id=job.metadata.name,
 | |
|             state=state,
 | |
|             scale=job.spec.parallelism or 1,
 | |
|             user=job.metadata.labels["btrix.user"],
 | |
|             aid=job.metadata.labels["btrix.archive"],
 | |
|             cid=job.metadata.labels["btrix.crawlconfig"],
 | |
|             schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
 | |
|             manual=job.metadata.annotations.get("btrix.run.manual") == "1",
 | |
|             started=job.status.start_time.replace(tzinfo=None),
 | |
|             finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None)
 | |
|             if finish_now
 | |
|             else None,
 | |
|             colls=json.loads(job.metadata.annotations.get("btrix.colls", [])),
 | |
|         )
 | |
| 
 | |
|     async def _delete_job(self, name):
 | |
|         await self.batch_api.delete_namespaced_job(
 | |
|             name=name,
 | |
|             namespace=self.namespace,
 | |
|             grace_period_seconds=60,
 | |
|             propagation_policy="Foreground",
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             await self.core_api.delete_namespaced_service(
 | |
|                 name=name,
 | |
|                 namespace=self.namespace,
 | |
|                 grace_period_seconds=60,
 | |
|                 propagation_policy="Foreground",
 | |
|             )
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|     def _create_config_map(self, crawlconfig, labels):
 | |
|         """ Create Config Map based on CrawlConfig + labels """
 | |
|         config_map = client.V1ConfigMap(
 | |
|             metadata={
 | |
|                 "name": f"crawl-config-{crawlconfig.id}",
 | |
|                 "namespace": self.namespace,
 | |
|                 "labels": labels,
 | |
|             },
 | |
|             data={"crawl-config.json": json.dumps(crawlconfig.config.dict())},
 | |
|         )
 | |
| 
 | |
|         return config_map
 | |
| 
 | |
|     # pylint: disable=no-self-use
 | |
|     def _get_schedule_suspend_run_now(self, crawlconfig):
 | |
|         """ get schedule/suspend/run_now data based on crawlconfig """
 | |
| 
 | |
|         # Create Cron Job
 | |
|         suspend = False
 | |
|         schedule = crawlconfig.schedule
 | |
| 
 | |
|         if not schedule:
 | |
|             schedule = DEFAULT_NO_SCHEDULE
 | |
|             suspend = True
 | |
| 
 | |
|         return suspend, schedule
 | |
| 
 | |
|     async def _send_sig_to_pods(self, pods, aid):
 | |
|         command = ["kill", "-s", "SIGUSR1", "1"]
 | |
|         interrupted = False
 | |
| 
 | |
|         try:
 | |
|             for pod in pods:
 | |
|                 if pod.metadata.labels["btrix.archive"] != aid:
 | |
|                     continue
 | |
| 
 | |
|                 await self.core_api_ws.connect_get_namespaced_pod_exec(
 | |
|                     pod.metadata.name,
 | |
|                     namespace=self.namespace,
 | |
|                     command=command,
 | |
|                     stdout=True,
 | |
|                 )
 | |
|                 interrupted = True
 | |
| 
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception as exc:
 | |
|             print(f"Exec Error: {exc}")
 | |
| 
 | |
|         return interrupted
 | |
| 
 | |
|     async def _delete_crawl_configs(self, label):
 | |
|         """Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
 | |
| 
 | |
|         await self.batch_beta_api.delete_collection_namespaced_cron_job(
 | |
|             namespace=self.namespace,
 | |
|             label_selector=label,
 | |
|             propagation_policy="Foreground",
 | |
|         )
 | |
| 
 | |
|         await self.core_api.delete_collection_namespaced_config_map(
 | |
|             namespace=self.namespace,
 | |
|             label_selector=label,
 | |
|             propagation_policy="Foreground",
 | |
|         )
 | |
| 
 | |
|     async def _create_run_now_job(self, cron_job):
 | |
|         """Create new job from cron job to run instantly"""
 | |
|         annotations = cron_job.spec.job_template.metadata.annotations
 | |
|         annotations["btrix.run.manual"] = "1"
 | |
|         annotations["btrix.run.schedule"] = ""
 | |
| 
 | |
|         # owner_ref = client.V1OwnerReference(
 | |
|         #    kind="CronJob",
 | |
|         #    name=cron_job.metadata.name,
 | |
|         #    block_owner_deletion=True,
 | |
|         #    controller=True,
 | |
|         #    uid=cron_job.metadata.uid,
 | |
|         #    api_version="batch/v1beta1",
 | |
|         # )
 | |
| 
 | |
|         ts_now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
 | |
|         name = f"crawl-now-{ts_now}-{cron_job.metadata.labels['btrix.crawlconfig']}"
 | |
| 
 | |
|         object_meta = client.V1ObjectMeta(
 | |
|             name=name,
 | |
|             annotations=annotations,
 | |
|             labels=cron_job.metadata.labels,
 | |
|             # owner_references=[owner_ref],
 | |
|         )
 | |
| 
 | |
|         job = client.V1Job(
 | |
|             kind="Job",
 | |
|             api_version="batch/v1",
 | |
|             metadata=object_meta,
 | |
|             spec=cron_job.spec.job_template.spec,
 | |
|         )
 | |
| 
 | |
|         return await self.batch_api.create_namespaced_job(
 | |
|             body=job, namespace=self.namespace
 | |
|         )
 | |
| 
 | |
|     def _get_job_template(
 | |
|         self,
 | |
|         cid,
 | |
|         storage_name,
 | |
|         storage_path,
 | |
|         labels,
 | |
|         annotations,
 | |
|         crawl_timeout,
 | |
|         parallel,
 | |
|     ):
 | |
|         """Return crawl job template for crawl job, including labels, adding optiona crawl params"""
 | |
| 
 | |
|         requests_memory = "256M"
 | |
|         limit_memory = "1G"
 | |
| 
 | |
|         requests_cpu = "120m"
 | |
|         limit_cpu = "1000m"
 | |
| 
 | |
|         resources = {
 | |
|             "limits": {
 | |
|                 "cpu": limit_cpu,
 | |
|                 "memory": limit_memory,
 | |
|             },
 | |
|             "requests": {
 | |
|                 "cpu": requests_cpu,
 | |
|                 "memory": requests_memory,
 | |
|             },
 | |
|         }
 | |
| 
 | |
|         job_template = {
 | |
|             "metadata": {"annotations": annotations},
 | |
|             "spec": {
 | |
|                 "backoffLimit": self.crawl_retries,
 | |
|                 "parallelism": parallel,
 | |
|                 "template": {
 | |
|                     "metadata": {"labels": labels},
 | |
|                     "spec": {
 | |
|                         "containers": [
 | |
|                             {
 | |
|                                 "name": "crawler",
 | |
|                                 "image": self.crawler_image,
 | |
|                                 "imagePullPolicy": self.crawler_image_pull_policy,
 | |
|                                 "command": [
 | |
|                                     "crawl",
 | |
|                                     "--config",
 | |
|                                     "/tmp/crawl-config.json",
 | |
|                                 ],
 | |
|                                 "volumeMounts": [
 | |
|                                     {
 | |
|                                         "name": "crawl-config",
 | |
|                                         "mountPath": "/tmp/crawl-config.json",
 | |
|                                         "subPath": "crawl-config.json",
 | |
|                                         "readOnly": True,
 | |
|                                     }
 | |
|                                 ],
 | |
|                                 "envFrom": [
 | |
|                                     {"configMapRef": {"name": "shared-crawler-config"}},
 | |
|                                     {"secretRef": {"name": f"storage-{storage_name}"}},
 | |
|                                 ],
 | |
|                                 "env": [
 | |
|                                     {
 | |
|                                         "name": "CRAWL_ID",
 | |
|                                         "valueFrom": {
 | |
|                                             "fieldRef": {
 | |
|                                                 "fieldPath": "metadata.labels['job-name']"
 | |
|                                             }
 | |
|                                         },
 | |
|                                     },
 | |
|                                     {"name": "STORE_PATH", "value": storage_path},
 | |
|                                     {
 | |
|                                         "name": "STORE_FILENAME",
 | |
|                                         "value": "@ts-@hostname.wacz",
 | |
|                                     },
 | |
|                                 ],
 | |
|                                 "resources": resources,
 | |
|                             }
 | |
|                         ],
 | |
|                         "volumes": [
 | |
|                             {
 | |
|                                 "name": "crawl-config",
 | |
|                                 "configMap": {
 | |
|                                     "name": f"crawl-config-{cid}",
 | |
|                                     "items": [
 | |
|                                         {
 | |
|                                             "key": "crawl-config.json",
 | |
|                                             "path": "crawl-config.json",
 | |
|                                         }
 | |
|                                     ],
 | |
|                                 },
 | |
|                             }
 | |
|                         ],
 | |
|                         "restartPolicy": "OnFailure",
 | |
|                     },
 | |
|                 },
 | |
|             },
 | |
|         }
 | |
| 
 | |
|         if crawl_timeout > 0:
 | |
|             job_template["spec"]["activeDeadlineSeconds"] = crawl_timeout
 | |
| 
 | |
|         return job_template
 |