dockermanager + scheduler:
- run as child process using aioprocessing - cleanup: support cleanup of orphaned containers - timeout: support crawlTimeout via check in cleanup loop - support crawl listing + crawl stopping
This commit is contained in:
		
							parent
							
								
									b417d7c185
								
							
						
					
					
						commit
						60b48ee8a6
					
				| @ -95,12 +95,13 @@ class CrawlConfig(BaseMongoModel): | ||||
| 
 | ||||
|     crawlTimeout: Optional[int] = 0 | ||||
| 
 | ||||
|     crawlCount: Optional[int] = 0 | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| class TriggerCrawl(BaseModel): | ||||
|     """ Crawl trigger from internal scheduler """ | ||||
| class UpdateSchedule(BaseModel): | ||||
|     """ Update the crawl schedule """ | ||||
| 
 | ||||
|     id: str | ||||
|     schedule: str | ||||
| 
 | ||||
| 
 | ||||
| @ -138,20 +139,21 @@ class CrawlOps: | ||||
|         ) | ||||
|         return result | ||||
| 
 | ||||
|     async def update_crawl_config( | ||||
|         self, cid: str, config: CrawlConfigIn, archive: Archive, user: User | ||||
|     ): | ||||
|         """ Update existing crawl config""" | ||||
|         data = config.dict() | ||||
|         data["archive"] = archive.id | ||||
|         data["user"] = str(user.id) | ||||
|         data["_id"] = cid | ||||
|     async def update_crawl_schedule(self, cid: str, update: UpdateSchedule): | ||||
|         """ Update schedule for existing crawl config""" | ||||
|         if not await self.crawl_configs.find_one_and_update( | ||||
|             {"_id": cid}, {"$set": {"schedule": update.schedule}} | ||||
|         ): | ||||
|             return None | ||||
| 
 | ||||
|         await self.crawl_configs.find_one_and_replace({"_id": cid}, data) | ||||
|         await self.crawl_manager.update_crawl_schedule(cid, update.schedule) | ||||
|         return True | ||||
| 
 | ||||
|         crawlconfig = CrawlConfig.from_dict(data) | ||||
| 
 | ||||
|         await self.crawl_manager.update_crawl_config(crawlconfig) | ||||
|     async def inc_crawls(self, cid: str): | ||||
|         """ Increment Crawl Counter """ | ||||
|         await self.crawl_configs.find_one_and_update( | ||||
|             {"_id": cid}, {"$inc": {"crawlCount": 1}} | ||||
|         ) | ||||
| 
 | ||||
|     async def get_crawl_configs(self, archive: Archive): | ||||
|         """Get all crawl configs for an archive is a member of""" | ||||
| @ -179,7 +181,7 @@ class CrawlOps: | ||||
| 
 | ||||
| # ============================================================================ | ||||
| # pylint: disable=redefined-builtin,invalid-name,too-many-locals | ||||
| def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): | ||||
| def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): | ||||
|     """Init /crawlconfigs api routes""" | ||||
|     ops = CrawlOps(mdb, archive_ops, crawl_manager) | ||||
| 
 | ||||
| @ -214,16 +216,18 @@ def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): | ||||
|         res = await ops.add_crawl_config(config, archive, user) | ||||
|         return {"added": str(res.inserted_id)} | ||||
| 
 | ||||
|     @router.patch("/{cid}") | ||||
|     async def update_crawl_config( | ||||
|         config: CrawlConfigIn, | ||||
|     @router.patch("/{cid}/schedule", dependencies=[Depends(archive_crawl_dep)]) | ||||
|     async def update_crawl_schedule( | ||||
|         update: UpdateSchedule, | ||||
|         cid: str, | ||||
|         archive: Archive = Depends(archive_crawl_dep), | ||||
|         user: User = Depends(user_dep), | ||||
|     ): | ||||
| 
 | ||||
|         try: | ||||
|             await ops.update_crawl_config(cid, config, archive, user) | ||||
|             if not await ops.update_crawl_schedule(cid, update): | ||||
|                 raise HTTPException( | ||||
|                     status_code=404, detail=f"Crawl Config '{cid}' not found" | ||||
|                 ) | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             # pylint: disable=raise-missing-from | ||||
|             raise HTTPException( | ||||
| @ -250,13 +254,6 @@ def init_crawl_config_api(app, mdb, user_dep, archive_ops, crawl_manager): | ||||
| 
 | ||||
|         return {"started": crawl_id} | ||||
| 
 | ||||
|     @app.post("/crawls/trigger", tags=["crawlconfigs"]) | ||||
|     async def trigger_crawl(trigger: TriggerCrawl): | ||||
|         await crawl_manager.run_crawl_config( | ||||
|             trigger.id, manual=False, schedule=trigger.schedule | ||||
|         ) | ||||
|         return {} | ||||
| 
 | ||||
|     @router.delete("") | ||||
|     async def delete_crawl_configs(archive: Archive = Depends(archive_crawl_dep)): | ||||
|         result = await ops.delete_crawl_configs(archive) | ||||
|  | ||||
| @ -60,9 +60,10 @@ class CrawlCompleteIn(BaseModel): | ||||
| class CrawlOps: | ||||
|     """ Crawl Ops """ | ||||
| 
 | ||||
|     def __init__(self, mdb, crawl_manager, archives): | ||||
|     def __init__(self, mdb, crawl_manager, crawl_configs, archives): | ||||
|         self.crawls = mdb["crawls"] | ||||
|         self.crawl_manager = crawl_manager | ||||
|         self.crawl_configs = crawl_configs | ||||
|         self.archives = archives | ||||
| 
 | ||||
|         self.crawl_manager.set_crawl_ops(self) | ||||
| @ -96,6 +97,8 @@ class CrawlOps: | ||||
| 
 | ||||
|         await self.archives.inc_usage(crawl.aid, dura) | ||||
| 
 | ||||
|         await self.crawl_configs.inc_crawls(crawl.cid) | ||||
| 
 | ||||
|     async def list_crawls(self, aid: str, cid: str = None): | ||||
|         """Get all crawl configs for an archive is a member of""" | ||||
|         query = {"aid": aid} | ||||
| @ -115,10 +118,10 @@ class CrawlOps: | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| def init_crawls_api(app, mdb, crawl_manager, archives): | ||||
| def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives): | ||||
|     """ API for crawl management, including crawl done callback""" | ||||
| 
 | ||||
|     ops = CrawlOps(mdb, crawl_manager, archives) | ||||
|     ops = CrawlOps(mdb, crawl_manager, crawl_config_ops, archives) | ||||
| 
 | ||||
|     archive_crawl_dep = archives.archive_crawl_dep | ||||
| 
 | ||||
|  | ||||
| @ -5,6 +5,7 @@ Docker crawl manager | ||||
| import tarfile | ||||
| import os | ||||
| import json | ||||
| import time | ||||
| import asyncio | ||||
| 
 | ||||
| from datetime import datetime | ||||
| @ -12,9 +13,12 @@ from io import BytesIO | ||||
| from tempfile import NamedTemporaryFile | ||||
| 
 | ||||
| import aiodocker | ||||
| import aioprocessing | ||||
| 
 | ||||
| from crawls import Crawl | ||||
| 
 | ||||
| from scheduler import run_scheduler | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| class DockerManager: | ||||
| @ -30,10 +34,33 @@ class DockerManager: | ||||
|         self.archive_ops = archive_ops | ||||
|         self.crawl_ops = None | ||||
| 
 | ||||
|         self.loop = asyncio.get_running_loop() | ||||
|         self.loop.create_task(self.run_event_loop()) | ||||
| 
 | ||||
|         self.extra_crawl_params = extra_crawl_params or [] | ||||
|         self._event_q = None | ||||
| 
 | ||||
|         self.loop = asyncio.get_running_loop() | ||||
| 
 | ||||
|         self.loop.create_task(self.run_event_loop()) | ||||
|         self.loop.create_task(self.init_trigger_queue()) | ||||
|         self.loop.create_task(self.cleanup_loop()) | ||||
| 
 | ||||
|     # pylint: disable=no-member | ||||
|     async def init_trigger_queue(self): | ||||
|         """ Crawl trigger queue from separate scheduling process """ | ||||
|         self._event_q = aioprocessing.AioQueue() | ||||
|         _trigger_q = aioprocessing.AioQueue() | ||||
| 
 | ||||
|         self.sched = aioprocessing.AioProcess( | ||||
|             target=run_scheduler, args=(self._event_q, _trigger_q) | ||||
|         ) | ||||
|         self.sched.start() | ||||
| 
 | ||||
|         while True: | ||||
|             try: | ||||
|                 result = await _trigger_q.coro_get() | ||||
|                 self.loop.create_task(self.run_crawl_config(manual=False, **result)) | ||||
|             # pylint: disable=broad-except | ||||
|             except Exception as exc: | ||||
|                 print(f"Error trigger crawl: {exc}") | ||||
| 
 | ||||
|     async def run_event_loop(self): | ||||
|         """ Run Docker event loop""" | ||||
| @ -49,6 +76,46 @@ class DockerManager: | ||||
|             if event["Action"] == "die": | ||||
|                 self.loop.create_task(self._handle_container_die(event["Actor"])) | ||||
| 
 | ||||
|     async def cleanup_loop(self): | ||||
|         """Clean-up any orphaned crawler images that are not running. | ||||
|         Stop containers whose crawlTimeout has been exceeded""" | ||||
| 
 | ||||
|         while True: | ||||
|             # cleanup orphaned | ||||
|             results = await self.client.containers.list( | ||||
|                 filters=json.dumps( | ||||
|                     { | ||||
|                         "label": ["btrix.crawlconfig"], | ||||
|                         "status": ["exited"], | ||||
|                         "exited": ["1"], | ||||
|                     } | ||||
|                 ) | ||||
|             ) | ||||
| 
 | ||||
|             for container in results: | ||||
|                 print(f"Cleaning Up Orphan Container {container['Id']}", flush=True) | ||||
|                 await container.delete() | ||||
| 
 | ||||
|             results = await self.client.containers.list( | ||||
|                 filters=json.dumps( | ||||
|                     { | ||||
|                         "label": ["btrix.timeout"], | ||||
|                         "status": ["running"], | ||||
|                     } | ||||
|                 ) | ||||
|             ) | ||||
| 
 | ||||
|             for container in results: | ||||
|                 timeout = int(container["Labels"]["btrix.timeout"]) | ||||
|                 actual = int(time.time()) - int(container["Created"]) | ||||
|                 if actual >= timeout: | ||||
|                     print( | ||||
|                         f"Crawl {container['Id']} running for {actual} seconds, exceeded timeout {timeout}, stopping..." | ||||
|                     ) | ||||
|                     await container.kill(signal="SIGTERM") | ||||
| 
 | ||||
|             await asyncio.sleep(30) | ||||
| 
 | ||||
|     def set_crawl_ops(self, ops): | ||||
|         """ set crawl ops """ | ||||
|         self.crawl_ops = ops | ||||
| @ -70,14 +137,17 @@ class DockerManager: | ||||
|             "btrix.coll": crawlconfig.config.collection, | ||||
|         } | ||||
| 
 | ||||
|         if crawlconfig.crawlTimeout: | ||||
|             labels["btrix.timeout"] = str(crawlconfig.crawlTimeout) | ||||
| 
 | ||||
|         # Create Config Volume | ||||
|         volume = await self._create_volume(crawlconfig, labels) | ||||
| 
 | ||||
|         if crawlconfig.schedule: | ||||
|             print("Scheduling...", flush=True) | ||||
| 
 | ||||
|             await self._send_sched_msg( | ||||
|                 {"type": "add", "id": crawlconfig.id, "schedule": crawlconfig.schedule} | ||||
|             await self._schedule_update( | ||||
|                 cid=crawlconfig.id, schedule=crawlconfig.schedule | ||||
|             ) | ||||
| 
 | ||||
|         if crawlconfig.runNow: | ||||
| @ -87,22 +157,15 @@ class DockerManager: | ||||
|                 volume, | ||||
|             ) | ||||
| 
 | ||||
|     async def update_crawl_config(self, crawlconfig): | ||||
|         """ Only updating the schedule + run now """ | ||||
|     async def update_crawl_schedule(self, cid, schedule): | ||||
|         """ Update the schedule for existing crawl config """ | ||||
| 
 | ||||
|         if crawlconfig.schedule: | ||||
|         if schedule: | ||||
|             print("Updating Schedule..", flush=True) | ||||
| 
 | ||||
|             await self._send_sched_msg( | ||||
|                 {"type": "add", "id": crawlconfig.id, "schedule": crawlconfig.schedule} | ||||
|             ) | ||||
|             await self._schedule_update(cid=cid, schedule=schedule) | ||||
|         else: | ||||
|             await self._send_sched_msg( | ||||
|                 {"type": "remove", "id": crawlconfig.id} | ||||
|             ) | ||||
| 
 | ||||
|         if crawlconfig.runNow: | ||||
|             await self.run_crawl_config(crawlconfig.id) | ||||
|             await self._schedule_update(cid=cid, schedule="") | ||||
| 
 | ||||
|     async def list_running_crawls(self, aid): | ||||
|         """ List running containers for this archive """ | ||||
| @ -189,13 +252,11 @@ class DockerManager: | ||||
| 
 | ||||
|     async def delete_crawl_config_by_id(self, cid): | ||||
|         """ Delete Crawl Config by Crawl Config Id""" | ||||
|         await self._delete_volume_by_labels( | ||||
|             filters={"label": [f"btrix.crawlconfig={cid}"]} | ||||
|         ) | ||||
|         await self._delete_volume_by_labels([f"btrix.crawlconfig={cid}"]) | ||||
| 
 | ||||
|     async def delete_crawl_configs_for_archive(self, aid): | ||||
|         """ Delete Crawl Config by Archive Id""" | ||||
|         await self._delete_volume_by_labels(filters={"label": [f"btrix.archive={aid}"]}) | ||||
|         await self._delete_volume_by_labels([f"btrix.archive={aid}"]) | ||||
| 
 | ||||
|     # ======================================================================== | ||||
|     async def _create_volume(self, crawlconfig, labels): | ||||
| @ -242,19 +303,25 @@ class DockerManager: | ||||
| 
 | ||||
|         await container.delete() | ||||
| 
 | ||||
|     async def _delete_volume_by_labels(self, filters): | ||||
|     async def _delete_volume_by_labels(self, labels): | ||||
|         """ Delete Crawl Configs by specified filter """ | ||||
| 
 | ||||
|         containers = await self._list_running_containers(labels) | ||||
|         if len(containers): | ||||
|             raise Exception("Cannot delete crawl config, in use for running crawl") | ||||
| 
 | ||||
|         # pylint: disable=protected-access | ||||
|         resp = await self.client._query_json( | ||||
|             "volumes", method="GET", params={"filters": json.dumps(filters)} | ||||
|             "volumes", | ||||
|             method="GET", | ||||
|             params={"filters": json.dumps({"label": labels})}, | ||||
|         ) | ||||
| 
 | ||||
|         for volume in resp["Volumes"]: | ||||
|             vol_obj = aiodocker.docker.DockerVolume(self.client, volume["Name"]) | ||||
| 
 | ||||
|             await self._send_sched_msg( | ||||
|                 {"type": "remove", "id": volume["Labels"]["btrix.crawlconfig"]} | ||||
|             await self._schedule_update( | ||||
|                 cid=volume["Labels"]["btrix.crawlconfig"], schedule="" | ||||
|             ) | ||||
| 
 | ||||
|             try: | ||||
| @ -263,13 +330,8 @@ class DockerManager: | ||||
|             except: | ||||
|                 print("Warning: Volume Delete Failed, Container in Use", flush=True) | ||||
| 
 | ||||
|     async def _send_sched_msg(self, msg): | ||||
|         reader, writer = await asyncio.open_connection("scheduler", 9017) | ||||
|         writer.write(json.dumps(msg).encode("utf-8") + b"\n") | ||||
|         await writer.drain() | ||||
|         await reader.readline() | ||||
|         writer.close() | ||||
|         await writer.wait_closed() | ||||
|     async def _schedule_update(self, cid, schedule=""): | ||||
|         await self._event_q.coro_put({"cid": cid, "schedule": schedule}) | ||||
| 
 | ||||
|     # pylint: disable=too-many-arguments | ||||
|     async def _run_crawl_now(self, storage, labels, volume, schedule="", manual=True): | ||||
|  | ||||
| @ -169,10 +169,8 @@ class K8SManager: | ||||
| 
 | ||||
|         return cron_job | ||||
| 
 | ||||
|     async def update_crawl_config(self, crawlconfig): | ||||
|         """ Update existing crawl config """ | ||||
| 
 | ||||
|         cid = crawlconfig.id | ||||
|     async def update_crawl_config(self, cid, schedule): | ||||
|         """ Update the schedule for existing crawl config """ | ||||
| 
 | ||||
|         cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( | ||||
|             namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" | ||||
| @ -183,57 +181,20 @@ class K8SManager: | ||||
| 
 | ||||
|         cron_job = cron_jobs.items[0] | ||||
| 
 | ||||
|         if crawlconfig.archive != cron_job.metadata.labels["btrix.archive"]: | ||||
|             return | ||||
|         real_schedule = schedule or DEFAULT_NO_SCHEDULE | ||||
| 
 | ||||
|         labels = { | ||||
|             "btrix.user": crawlconfig.user, | ||||
|             "btrix.archive": crawlconfig.archive, | ||||
|             "btrix.crawlconfig": cid, | ||||
|         } | ||||
|         if real_schedule != cron_job.spec.schedule: | ||||
|             cron_job.spec.schedule = real_schedule | ||||
|             cron_job.spec.suspend = not schedule | ||||
| 
 | ||||
|         # Update Config Map | ||||
|         config_map = self._create_config_map(crawlconfig, labels) | ||||
| 
 | ||||
|         await self.core_api.patch_namespaced_config_map( | ||||
|             name=f"crawl-config-{cid}", namespace=self.namespace, body=config_map | ||||
|         ) | ||||
| 
 | ||||
|         # Update CronJob, if needed | ||||
|         suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) | ||||
| 
 | ||||
|         changed = False | ||||
| 
 | ||||
|         if schedule != cron_job.spec.schedule: | ||||
|             cron_job.spec.schedule = schedule | ||||
|             changed = True | ||||
| 
 | ||||
|         if suspend != cron_job.spec.suspend: | ||||
|             cron_job.spec.suspend = suspend | ||||
|             changed = True | ||||
| 
 | ||||
|         if ( | ||||
|             crawlconfig.crawlTimeout | ||||
|             != cron_job.spec.job_template.spec.active_deadline_seconds | ||||
|         ): | ||||
|             cron_job.spec.job_template.spec.active_deadline_seconds = ( | ||||
|                 crawlconfig.crawlTimeout | ||||
|             ) | ||||
|             changed = True | ||||
| 
 | ||||
|         if changed: | ||||
|             cron_job.spec.job_template.metadata.annotations[ | ||||
|                 "btrix.run.schedule" | ||||
|             ] = crawlconfig.schedule | ||||
|             ] = schedule | ||||
| 
 | ||||
|             await self.batch_beta_api.patch_namespaced_cron_job( | ||||
|                 name=cron_job.metadata.name, namespace=self.namespace, body=cron_job | ||||
|             ) | ||||
| 
 | ||||
|         # Run Job Now | ||||
|         if run_now: | ||||
|             await self._create_run_now_job(cron_job) | ||||
| 
 | ||||
|     async def run_crawl_config(self, cid, manual=True, schedule=""): | ||||
|         """ Run crawl job for cron job based on specified crawlconfig id (cid) """ | ||||
|         cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( | ||||
|  | ||||
| @ -75,7 +75,6 @@ class BrowsertrixAPI: | ||||
|             ) | ||||
| 
 | ||||
|         self.crawl_config_ops = init_crawl_config_api( | ||||
|             self.app, | ||||
|             self.mdb, | ||||
|             current_active_user, | ||||
|             self.archive_ops, | ||||
| @ -86,6 +85,7 @@ class BrowsertrixAPI: | ||||
|             self.app, | ||||
|             self.mdb, | ||||
|             self.crawl_manager, | ||||
|             self.crawl_config_ops, | ||||
|             self.archive_ops, | ||||
|         ) | ||||
| 
 | ||||
|  | ||||
| @ -5,3 +5,4 @@ aiofiles | ||||
| kubernetes-asyncio | ||||
| aiodocker | ||||
| apscheduler | ||||
| aioprocessing | ||||
|  | ||||
| @ -18,17 +18,6 @@ services: | ||||
|       - mongo | ||||
|       - scheduler | ||||
| 
 | ||||
|   scheduler: | ||||
|     build: ./backend | ||||
|     image: webrecorder/browsertrix-api | ||||
|     command: python -u scheduler.py | ||||
| 
 | ||||
|     env_file: | ||||
|       - ./config.env | ||||
| 
 | ||||
|     depends_on: | ||||
|       - mongo | ||||
| 
 | ||||
|   mongo: | ||||
|     image: mongo | ||||
|     volumes: | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user