- use python-on-whale to use docker cli api directly, creating docker stack for each crawl or profile browser - configure storages via storages.yaml secret - add crawl_job, profile_job, splitting into base and k8s/swarm implementations - split manager into base crawlmanager and k8s/swarm implementations - swarm: load initial scale from db to avoid modifying fixed configs, in k8s, load from configmap - swarm: support scheduled jobs via swarm-cronjob service - remove docker dependencies (aiodocker, apscheduler, scheduling) - swarm: when using local minio, expose via /data/ route in nginx via extra include (in k8s, include dir is empty and routing handled via ingress) - k8s: cleanup minio chart: move init containers to minio.yaml - swarm: stateful set implementation to be consistent with k8s scaling: - don't use service replicas, - create a unique service with '-N' appended and allocate unique volume for each replica - allows crawl containers to be restarted w/o losing data - add volume pruning background service, as volumes can be deleted only after service shuts down fully - watch: fully simplify routing, route via replica index instead of ip for both k8s and swarm - rename network btrix-cloud-net -> btrix-net to avoid conflict with compose network
		
			
				
	
	
		
			92 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			92 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ base k8s job driver """
 | |
| 
 | |
| import os
 | |
| import asyncio
 | |
| import sys
 | |
| 
 | |
| import yaml
 | |
| 
 | |
| from fastapi.templating import Jinja2Templates
 | |
| 
 | |
| from .utils import create_from_yaml, get_templates_dir
 | |
| from .k8sapi import K8sAPI
 | |
| 
 | |
| 
 | |
| # =============================================================================
 | |
| # pylint: disable=too-many-instance-attributes,bare-except,broad-except
 | |
| class K8SJobMixin(K8sAPI):
 | |
|     """ Crawl Job State """
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.namespace = os.environ.get("CRAWL_NAMESPACE") or "crawlers"
 | |
|         self.config_file = "/config/config.yaml"
 | |
| 
 | |
|         self.job_id = os.environ.get("JOB_ID")
 | |
|         self.orig_job_id = self.job_id
 | |
|         if self.job_id.startswith("job-"):
 | |
|             self.job_id = self.job_id[4:]
 | |
| 
 | |
|         self.templates = Jinja2Templates(directory=get_templates_dir())
 | |
|         super().__init__()
 | |
| 
 | |
|     async def init_job_objects(self, template, extra_params=None):
 | |
|         """ init k8s objects from specified template with given extra_params """
 | |
|         with open(self.config_file) as fh_config:
 | |
|             params = yaml.safe_load(fh_config)
 | |
| 
 | |
|         params["id"] = self.job_id
 | |
| 
 | |
|         if extra_params:
 | |
|             params.update(extra_params)
 | |
| 
 | |
|         data = self.templates.env.get_template(template).render(params)
 | |
| 
 | |
|         await create_from_yaml(self.api_client, data, namespace=self.namespace)
 | |
| 
 | |
|     async def delete_job_objects(self, selector):
 | |
|         """ delete crawl stateful sets, services and pvcs """
 | |
|         kwargs = {
 | |
|             "namespace": self.namespace,
 | |
|             "label_selector": selector,
 | |
|         }
 | |
| 
 | |
|         statefulsets = await self.apps_api.list_namespaced_stateful_set(**kwargs)
 | |
| 
 | |
|         for statefulset in statefulsets.items:
 | |
|             print(f"Deleting service {statefulset.spec.service_name}")
 | |
|             await self.core_api.delete_namespaced_service(
 | |
|                 name=statefulset.spec.service_name,
 | |
|                 namespace=self.namespace,
 | |
|                 propagation_policy="Foreground",
 | |
|             )
 | |
|             print(f"Deleting statefulset {statefulset.metadata.name}")
 | |
|             await self.apps_api.delete_namespaced_stateful_set(
 | |
|                 name=statefulset.metadata.name,
 | |
|                 namespace=self.namespace,
 | |
|                 propagation_policy="Foreground",
 | |
|             )
 | |
| 
 | |
|         # until delete policy is supported
 | |
|         try:
 | |
|             await self.core_api.delete_collection_namespaced_persistent_volume_claim(
 | |
|                 **kwargs
 | |
|             )
 | |
|         except Exception as exc:
 | |
|             print("PVC Delete failed", exc, flush=True)
 | |
| 
 | |
|         # delete our own job!
 | |
|         await self.batch_api.delete_namespaced_job(
 | |
|             name=self.orig_job_id,
 | |
|             namespace=self.namespace,
 | |
|             grace_period_seconds=30,
 | |
|             propagation_policy="Foreground",
 | |
|         )
 | |
| 
 | |
|         asyncio.create_task(self.exit_soon(5))
 | |
| 
 | |
|     async def exit_soon(self, timeout):
 | |
|         """ exit soon """
 | |
|         print("k8s objects deleted, job complete, exiting", flush=True)
 | |
|         await asyncio.sleep(timeout)
 | |
|         sys.exit(0)
 |