Improved Scale Handling (#1889)
Fixes #1888 Refactors scale handling: - Ensures number of scaled instances does not exceed number of pages, but is also at minimum 1 - Checks for finish condition to be numFailed + numDone >= desired scale - If at least one instance succeeds, crawl considers successful / done. - If all instances fail, crawl considered failed - Ensures that pod done count >= redis done count --------- Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
		
							parent
							
								
									9140dd75bc
								
							
						
					
					
						commit
						6df10d5fb0
					
				| @ -248,7 +248,7 @@ class CrawlOperator(BaseOperator): | ||||
|             ) | ||||
| 
 | ||||
|         else: | ||||
|             status.scale = crawl.scale | ||||
|             status.scale = 1 | ||||
|             now = dt_now() | ||||
|             await self.crawl_ops.inc_crawl_exec_time( | ||||
|                 crawl.db_crawl_id, crawl.is_qa, 0, now | ||||
| @ -410,8 +410,8 @@ class CrawlOperator(BaseOperator): | ||||
|             actual_scale -= 1 | ||||
| 
 | ||||
|         # ensure at least enough pages for the scale | ||||
|         if status.pagesFound and status.pagesFound < desired_scale: | ||||
|             desired_scale = status.pagesFound | ||||
|         if status.pagesFound < desired_scale: | ||||
|             desired_scale = max(1, status.pagesFound) | ||||
| 
 | ||||
|         # if desired_scale same or scaled up, return desired_scale | ||||
|         if desired_scale >= actual_scale: | ||||
| @ -514,13 +514,15 @@ class CrawlOperator(BaseOperator): | ||||
|                 status.finished = to_k8s_date(finished) | ||||
| 
 | ||||
|             if actual_state != state: | ||||
|                 print(f"state mismatch, actual state {actual_state}, requested {state}") | ||||
|                 print( | ||||
|                     f"State mismatch, actual state {actual_state}, requested {state}, {crawl.id}" | ||||
|                 ) | ||||
|                 if not actual_state and state == "canceled": | ||||
|                     return True | ||||
| 
 | ||||
|         if status.state != state: | ||||
|             print( | ||||
|                 f"Not setting state: {status.state} -> {state}, {crawl.id} not allowed" | ||||
|                 f"Not setting state: {status.state} -> {state}, not allowed, {crawl.id}" | ||||
|             ) | ||||
|         return False | ||||
| 
 | ||||
| @ -730,7 +732,9 @@ class CrawlOperator(BaseOperator): | ||||
|     ): | ||||
|         """sync crawl state for running crawl""" | ||||
|         # check if at least one crawler pod started running | ||||
|         crawler_running, redis_running, done = self.sync_pod_status(pods, status) | ||||
|         crawler_running, redis_running, pod_done_count = self.sync_pod_status( | ||||
|             pods, status | ||||
|         ) | ||||
|         redis = None | ||||
| 
 | ||||
|         try: | ||||
| @ -745,7 +749,9 @@ class CrawlOperator(BaseOperator): | ||||
| 
 | ||||
|             if not crawler_running or not redis: | ||||
|                 # if either crawler is not running or redis is inaccessible | ||||
|                 if self.should_mark_waiting(status.state, crawl.started): | ||||
|                 if not pod_done_count and self.should_mark_waiting( | ||||
|                     status.state, crawl.started | ||||
|                 ): | ||||
|                     # mark as waiting (if already running) | ||||
|                     await self.set_state( | ||||
|                         "waiting_capacity", | ||||
| @ -755,8 +761,10 @@ class CrawlOperator(BaseOperator): | ||||
|                     ) | ||||
| 
 | ||||
|                 if not crawler_running and redis: | ||||
|                     # if crawler running, but no redis, stop redis instance until crawler | ||||
|                     # is running | ||||
|                     # if crawler is not running for REDIS_TTL seconds, also stop redis | ||||
|                     # but not right away in case crawler pod is just restarting. | ||||
|                     # avoids keeping redis pods around while no crawler pods are up | ||||
|                     # (eg. due to resource constraints) | ||||
|                     if status.lastActiveTime and ( | ||||
|                         ( | ||||
|                             dt_now() - from_k8s_date(status.lastActiveTime) | ||||
| @ -770,7 +778,6 @@ class CrawlOperator(BaseOperator): | ||||
|                 elif crawler_running and not redis: | ||||
|                     # if crawler is running, but no redis, init redis | ||||
|                     status.initRedis = True | ||||
|                     status.lastActiveTime = to_k8s_date(dt_now()) | ||||
| 
 | ||||
|                 # if no crawler / no redis, resync after N seconds | ||||
|                 status.resync_after = self.fast_retry_secs | ||||
| @ -791,6 +798,10 @@ class CrawlOperator(BaseOperator): | ||||
|                         ) | ||||
|                     ) | ||||
| 
 | ||||
|             # update lastActiveTime if crawler is running | ||||
|             if crawler_running: | ||||
|                 status.lastActiveTime = to_k8s_date(dt_now()) | ||||
| 
 | ||||
|             file_done = await redis.lpop(self.done_key) | ||||
|             while file_done: | ||||
|                 msg = json.loads(file_done) | ||||
| @ -824,7 +835,9 @@ class CrawlOperator(BaseOperator): | ||||
|             status.filesAddedSize = int(await redis.get("filesAddedSize") or 0) | ||||
| 
 | ||||
|             # update stats and get status | ||||
|             return await self.update_crawl_state(redis, crawl, status, pods, done) | ||||
|             return await self.update_crawl_state( | ||||
|                 redis, crawl, status, pods, pod_done_count | ||||
|             ) | ||||
| 
 | ||||
|         # pylint: disable=broad-except | ||||
|         except Exception as exc: | ||||
| @ -836,11 +849,13 @@ class CrawlOperator(BaseOperator): | ||||
|             if redis: | ||||
|                 await redis.close() | ||||
| 
 | ||||
|     def sync_pod_status(self, pods: dict[str, dict], status: CrawlStatus): | ||||
|     def sync_pod_status( | ||||
|         self, pods: dict[str, dict], status: CrawlStatus | ||||
|     ) -> tuple[bool, bool, int]: | ||||
|         """check status of pods""" | ||||
|         crawler_running = False | ||||
|         redis_running = False | ||||
|         done = True | ||||
|         pod_done_count = 0 | ||||
| 
 | ||||
|         try: | ||||
|             for name, pod in pods.items(): | ||||
| @ -871,16 +886,16 @@ class CrawlOperator(BaseOperator): | ||||
| 
 | ||||
|                 if role == "crawler": | ||||
|                     crawler_running = crawler_running or running | ||||
|                     done = done and phase == "Succeeded" | ||||
|                     if phase == "Succeeded": | ||||
|                         pod_done_count += 1 | ||||
|                 elif role == "redis": | ||||
|                     redis_running = redis_running or running | ||||
| 
 | ||||
|         # pylint: disable=broad-except | ||||
|         except Exception as exc: | ||||
|             done = False | ||||
|             print(exc) | ||||
| 
 | ||||
|         return crawler_running, redis_running, done | ||||
|         return crawler_running, redis_running, pod_done_count | ||||
| 
 | ||||
|     def handle_terminated_pod(self, name, role, status: CrawlStatus, terminated): | ||||
|         """handle terminated pod state""" | ||||
| @ -1231,7 +1246,7 @@ class CrawlOperator(BaseOperator): | ||||
|         crawl: CrawlSpec, | ||||
|         status: CrawlStatus, | ||||
|         pods: dict[str, dict], | ||||
|         done: bool, | ||||
|         pod_done_count: int, | ||||
|     ) -> CrawlStatus: | ||||
|         """update crawl state and check if crawl is now done""" | ||||
|         results = await redis.hgetall(f"{crawl.id}:status") | ||||
| @ -1274,13 +1289,20 @@ class CrawlOperator(BaseOperator): | ||||
| 
 | ||||
|         # check if done / failed | ||||
|         status_count: dict[str, int] = {} | ||||
|         for i in range(crawl.scale): | ||||
|         for i in range(status.scale): | ||||
|             res = results.get(f"crawl-{crawl.id}-{i}") | ||||
|             if res: | ||||
|                 status_count[res] = status_count.get(res, 0) + 1 | ||||
| 
 | ||||
|         # check if all crawlers are done | ||||
|         if done and status_count.get("done", 0) >= crawl.scale: | ||||
|         num_done = status_count.get("done", 0) | ||||
|         num_failed = status_count.get("failed", 0) | ||||
|         # all expected pods are either done or failed | ||||
|         all_completed = (num_done + num_failed) >= status.scale | ||||
| 
 | ||||
|         # if at least one is done according to redis, consider crawl successful | ||||
|         # ensure pod successfully exited as well | ||||
|         # pylint: disable=chained-comparison | ||||
|         if all_completed and num_done >= 1 and pod_done_count >= num_done: | ||||
|             # check if one-page crawls actually succeeded | ||||
|             # if only one page found, and no files, assume failed | ||||
|             if status.pagesFound == 1 and not status.filesAdded: | ||||
| @ -1297,8 +1319,8 @@ class CrawlOperator(BaseOperator): | ||||
| 
 | ||||
|             await self.mark_finished(crawl, status, state, stats) | ||||
| 
 | ||||
|         # check if all crawlers failed | ||||
|         elif status_count.get("failed", 0) >= crawl.scale: | ||||
|         # check if all crawlers failed -- no crawl data was generated | ||||
|         elif all_completed and num_done == 0 and num_failed > 0: | ||||
|             # if stopping, and no pages finished, mark as canceled | ||||
|             if status.stopping and not status.pagesDone: | ||||
|                 await self.mark_finished(crawl, status, "canceled", stats) | ||||
| @ -1318,6 +1340,7 @@ class CrawlOperator(BaseOperator): | ||||
|                 new_status = "uploading-wacz" | ||||
|             elif status_count.get("pending-wait"): | ||||
|                 new_status = "pending-wait" | ||||
| 
 | ||||
|             if new_status: | ||||
|                 await self.set_state( | ||||
|                     new_status, status, crawl, allowed_from=RUNNING_STATES | ||||
|  | ||||
| @ -12,6 +12,8 @@ default_crawl_filename_template: "@ts-testing-@hostsuffix.wacz" | ||||
| 
 | ||||
| operator_resync_seconds: 3 | ||||
| 
 | ||||
| qa_scale: 2 | ||||
| 
 | ||||
| # for testing only | ||||
| crawler_extra_cpu_per_browser: 300m | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user