Additional operator edge case fixes (#2007)
Fix a few edge-case situations: - Restart evicted pods that have reached the terminal `Failed` state with reason `Evicted`, by just recreating them. These pods will not be automatically retried, so need to be recreated (usually happens due to memory pressure from the node) - Don't treat containers in ContainerCreating as running, even though this state is usually quick, its possible for containers to get stuck there, and will improve accuracy of exec seconds tracking. - Consolidate state transition for running states, either sets to running or to pending-wait/generate-wacz/upload-wacz and allows changing from to either of these states from each other or waiting_capacity
This commit is contained in:
		
							parent
							
								
									3923996aaf
								
							
						
					
					
						commit
						4ec7cf8adc
					
				| @ -305,7 +305,7 @@ class CrawlOperator(BaseOperator): | ||||
|             "resyncAfterSeconds": status.resync_after, | ||||
|         } | ||||
| 
 | ||||
|     def _load_redis(self, params, status, children): | ||||
|     def _load_redis(self, params, status: CrawlStatus, children): | ||||
|         name = f"redis-{params['id']}" | ||||
|         has_pod = name in children[POD] | ||||
| 
 | ||||
| @ -313,11 +313,13 @@ class CrawlOperator(BaseOperator): | ||||
|         params["name"] = name | ||||
|         params["cpu"] = pod_info.newCpu or params.get("redis_cpu") | ||||
|         params["memory"] = pod_info.newMemory or params.get("redis_memory") | ||||
|         restart = pod_info.should_restart_pod() and has_pod | ||||
|         if restart: | ||||
|             print(f"Restart {name}") | ||||
|         restart_reason = None | ||||
|         if has_pod: | ||||
|             restart_reason = pod_info.should_restart_pod() | ||||
|             if restart_reason: | ||||
|                 print(f"Restarting {name}, reason: {restart_reason}") | ||||
| 
 | ||||
|         params["init_redis"] = status.initRedis and not restart | ||||
|         params["init_redis"] = status.initRedis and not restart_reason | ||||
| 
 | ||||
|         return self.load_from_yaml("redis.yaml", params) | ||||
| 
 | ||||
| @ -362,7 +364,7 @@ class CrawlOperator(BaseOperator): | ||||
|         params["qa_source_replay_json"] = crawl_replay.json(include={"resources"}) | ||||
|         return self.load_from_yaml("qa_configmap.yaml", params) | ||||
| 
 | ||||
|     def _load_crawler(self, params, i, status, children): | ||||
|     def _load_crawler(self, params, i, status: CrawlStatus, children): | ||||
|         name = f"crawl-{params['id']}-{i}" | ||||
|         has_pod = name in children[POD] | ||||
| 
 | ||||
| @ -387,11 +389,12 @@ class CrawlOperator(BaseOperator): | ||||
|         else: | ||||
|             params["memory_limit"] = self.k8s.max_crawler_memory_size | ||||
|         params["workers"] = params.get(worker_field) or 1 | ||||
|         params["do_restart"] = ( | ||||
|             pod_info.should_restart_pod() or params.get("force_restart") | ||||
|         ) and has_pod | ||||
|         if params.get("do_restart"): | ||||
|             print(f"Restart {name}") | ||||
|         params["do_restart"] = False | ||||
|         if has_pod: | ||||
|             restart_reason = pod_info.should_restart_pod(params.get("force_restart")) | ||||
|             if restart_reason: | ||||
|                 print(f"Restarting {name}, reason: {restart_reason}") | ||||
|                 params["do_restart"] = True | ||||
| 
 | ||||
|         return self.load_from_yaml("crawler.yaml", params) | ||||
| 
 | ||||
| @ -523,7 +526,7 @@ class CrawlOperator(BaseOperator): | ||||
|                 finished=finished, | ||||
|                 stats=stats, | ||||
|             ) | ||||
|             if res: | ||||
|             if res and status.state != state: | ||||
|                 print(f"Setting state: {status.state} -> {state}, {crawl.id}") | ||||
|                 status.state = state | ||||
|                 return True | ||||
| @ -804,14 +807,6 @@ class CrawlOperator(BaseOperator): | ||||
|                 status.resync_after = self.fast_retry_secs | ||||
|                 return status | ||||
| 
 | ||||
|             # ensure running state is set | ||||
|             await self.set_state( | ||||
|                 "running", | ||||
|                 status, | ||||
|                 crawl, | ||||
|                 allowed_from=["starting", "waiting_capacity"], | ||||
|             ) | ||||
| 
 | ||||
|             # update lastActiveTime if crawler is running | ||||
|             if crawler_running: | ||||
|                 status.lastActiveTime = to_k8s_date(dt_now()) | ||||
| @ -874,6 +869,7 @@ class CrawlOperator(BaseOperator): | ||||
|         try: | ||||
|             for name, pod in pods.items(): | ||||
|                 running = False | ||||
|                 evicted = False | ||||
| 
 | ||||
|                 pstatus = pod["status"] | ||||
|                 phase = pstatus["phase"] | ||||
| @ -881,18 +877,24 @@ class CrawlOperator(BaseOperator): | ||||
| 
 | ||||
|                 if phase in ("Running", "Succeeded"): | ||||
|                     running = True | ||||
|                 elif phase == "Failed" and pstatus.get("reason") == "Evicted": | ||||
|                     evicted = True | ||||
| 
 | ||||
|                 status.podStatus[name].evicted = evicted | ||||
| 
 | ||||
|                 if "containerStatuses" in pstatus: | ||||
|                     cstatus = pstatus["containerStatuses"][0] | ||||
| 
 | ||||
|                     # consider 'ContainerCreating' as running | ||||
|                     waiting = cstatus["state"].get("waiting") | ||||
|                     if ( | ||||
|                         phase == "Pending" | ||||
|                         and waiting | ||||
|                         and waiting.get("reason") == "ContainerCreating" | ||||
|                     ): | ||||
|                         running = True | ||||
|                     # don't consider 'ContainerCreating' as running for now | ||||
|                     # may be stuck in this state for other reasons | ||||
|                     # | ||||
|                     # waiting = cstatus["state"].get("waiting") | ||||
|                     # if ( | ||||
|                     #    phase == "Pending" | ||||
|                     #    and waiting | ||||
|                     #    and waiting.get("reason") == "ContainerCreating" | ||||
|                     # ): | ||||
|                     #    running = True | ||||
| 
 | ||||
|                     self.handle_terminated_pod( | ||||
|                         name, role, status, cstatus["state"].get("terminated") | ||||
| @ -1388,24 +1390,20 @@ class CrawlOperator(BaseOperator): | ||||
|             else: | ||||
|                 await self.fail_crawl(crawl, status, pods, stats) | ||||
| 
 | ||||
|         # check for other statuses | ||||
|         # check for other statuses, default to "running" | ||||
|         else: | ||||
|             new_status: Optional[TYPE_RUNNING_STATES] = None | ||||
|             if status_count.get("running"): | ||||
|                 if status.state in ("generate-wacz", "uploading-wacz", "pending-wacz"): | ||||
|                     new_status = "running" | ||||
|             new_status: TYPE_RUNNING_STATES = "running" | ||||
| 
 | ||||
|             elif status_count.get("generate-wacz"): | ||||
|             if status_count.get("generate-wacz"): | ||||
|                 new_status = "generate-wacz" | ||||
|             elif status_count.get("uploading-wacz"): | ||||
|                 new_status = "uploading-wacz" | ||||
|             elif status_count.get("pending-wait"): | ||||
|                 new_status = "pending-wait" | ||||
| 
 | ||||
|             if new_status: | ||||
|                 await self.set_state( | ||||
|                     new_status, status, crawl, allowed_from=RUNNING_STATES | ||||
|                 ) | ||||
|             await self.set_state( | ||||
|                 new_status, status, crawl, allowed_from=RUNNING_AND_WAITING_STATES | ||||
|             ) | ||||
| 
 | ||||
|         return status | ||||
| 
 | ||||
|  | ||||
| @ -134,6 +134,8 @@ class PodInfo(BaseModel): | ||||
|     newMemory: Optional[int] = None | ||||
|     signalAtMem: Optional[int] = None | ||||
| 
 | ||||
|     evicted: Optional[bool] = False | ||||
| 
 | ||||
|     def dict(self, *a, **kw): | ||||
|         res = super().dict(*a, **kw) | ||||
|         percent = { | ||||
| @ -168,15 +170,21 @@ class PodInfo(BaseModel): | ||||
|             else 0 | ||||
|         ) | ||||
| 
 | ||||
|     def should_restart_pod(self): | ||||
|     def should_restart_pod(self, forced: bool = False) -> Optional[str]: | ||||
|         """return true if pod should be restarted""" | ||||
|         if self.newMemory and self.newMemory != self.allocated.memory: | ||||
|             return True | ||||
|             return "newMemory" | ||||
| 
 | ||||
|         if self.newCpu and self.newCpu != self.allocated.cpu: | ||||
|             return True | ||||
|             return "newCpu" | ||||
| 
 | ||||
|         return False | ||||
|         if self.evicted: | ||||
|             return "evicted" | ||||
| 
 | ||||
|         if forced: | ||||
|             return "forced" | ||||
| 
 | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user