diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index e6eee4df..af1fd233 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -248,7 +248,7 @@ class CrawlOperator(BaseOperator): ) else: - status.scale = crawl.scale + status.scale = 1 now = dt_now() await self.crawl_ops.inc_crawl_exec_time( crawl.db_crawl_id, crawl.is_qa, 0, now @@ -410,8 +410,8 @@ class CrawlOperator(BaseOperator): actual_scale -= 1 # ensure at least enough pages for the scale - if status.pagesFound and status.pagesFound < desired_scale: - desired_scale = status.pagesFound + if status.pagesFound < desired_scale: + desired_scale = max(1, status.pagesFound) # if desired_scale same or scaled up, return desired_scale if desired_scale >= actual_scale: @@ -514,13 +514,15 @@ class CrawlOperator(BaseOperator): status.finished = to_k8s_date(finished) if actual_state != state: - print(f"state mismatch, actual state {actual_state}, requested {state}") + print( + f"State mismatch, actual state {actual_state}, requested {state}, {crawl.id}" + ) if not actual_state and state == "canceled": return True if status.state != state: print( - f"Not setting state: {status.state} -> {state}, {crawl.id} not allowed" + f"Not setting state: {status.state} -> {state}, not allowed, {crawl.id}" ) return False @@ -730,7 +732,9 @@ class CrawlOperator(BaseOperator): ): """sync crawl state for running crawl""" # check if at least one crawler pod started running - crawler_running, redis_running, done = self.sync_pod_status(pods, status) + crawler_running, redis_running, pod_done_count = self.sync_pod_status( + pods, status + ) redis = None try: @@ -745,7 +749,9 @@ class CrawlOperator(BaseOperator): if not crawler_running or not redis: # if either crawler is not running or redis is inaccessible - if self.should_mark_waiting(status.state, crawl.started): + if not pod_done_count and self.should_mark_waiting( + status.state, crawl.started + ): # mark as waiting (if already running) await self.set_state( "waiting_capacity", @@ -755,8 +761,10 @@ class CrawlOperator(BaseOperator): ) if not crawler_running and redis: - # if crawler running, but no redis, stop redis instance until crawler - # is running + # if crawler is not running for REDIS_TTL seconds, also stop redis + # but not right away in case crawler pod is just restarting. + # avoids keeping redis pods around while no crawler pods are up + # (eg. due to resource constraints) if status.lastActiveTime and ( ( dt_now() - from_k8s_date(status.lastActiveTime) @@ -770,7 +778,6 @@ class CrawlOperator(BaseOperator): elif crawler_running and not redis: # if crawler is running, but no redis, init redis status.initRedis = True - status.lastActiveTime = to_k8s_date(dt_now()) # if no crawler / no redis, resync after N seconds status.resync_after = self.fast_retry_secs @@ -791,6 +798,10 @@ class CrawlOperator(BaseOperator): ) ) + # update lastActiveTime if crawler is running + if crawler_running: + status.lastActiveTime = to_k8s_date(dt_now()) + file_done = await redis.lpop(self.done_key) while file_done: msg = json.loads(file_done) @@ -824,7 +835,9 @@ class CrawlOperator(BaseOperator): status.filesAddedSize = int(await redis.get("filesAddedSize") or 0) # update stats and get status - return await self.update_crawl_state(redis, crawl, status, pods, done) + return await self.update_crawl_state( + redis, crawl, status, pods, pod_done_count + ) # pylint: disable=broad-except except Exception as exc: @@ -836,11 +849,13 @@ class CrawlOperator(BaseOperator): if redis: await redis.close() - def sync_pod_status(self, pods: dict[str, dict], status: CrawlStatus): + def sync_pod_status( + self, pods: dict[str, dict], status: CrawlStatus + ) -> tuple[bool, bool, int]: """check status of pods""" crawler_running = False redis_running = False - done = True + pod_done_count = 0 try: for name, pod in pods.items(): @@ -871,16 +886,16 @@ class CrawlOperator(BaseOperator): if role == "crawler": crawler_running = crawler_running or running - done = done and phase == "Succeeded" + if phase == "Succeeded": + pod_done_count += 1 elif role == "redis": redis_running = redis_running or running # pylint: disable=broad-except except Exception as exc: - done = False print(exc) - return crawler_running, redis_running, done + return crawler_running, redis_running, pod_done_count def handle_terminated_pod(self, name, role, status: CrawlStatus, terminated): """handle terminated pod state""" @@ -1231,7 +1246,7 @@ class CrawlOperator(BaseOperator): crawl: CrawlSpec, status: CrawlStatus, pods: dict[str, dict], - done: bool, + pod_done_count: int, ) -> CrawlStatus: """update crawl state and check if crawl is now done""" results = await redis.hgetall(f"{crawl.id}:status") @@ -1274,13 +1289,20 @@ class CrawlOperator(BaseOperator): # check if done / failed status_count: dict[str, int] = {} - for i in range(crawl.scale): + for i in range(status.scale): res = results.get(f"crawl-{crawl.id}-{i}") if res: status_count[res] = status_count.get(res, 0) + 1 - # check if all crawlers are done - if done and status_count.get("done", 0) >= crawl.scale: + num_done = status_count.get("done", 0) + num_failed = status_count.get("failed", 0) + # all expected pods are either done or failed + all_completed = (num_done + num_failed) >= status.scale + + # if at least one is done according to redis, consider crawl successful + # ensure pod successfully exited as well + # pylint: disable=chained-comparison + if all_completed and num_done >= 1 and pod_done_count >= num_done: # check if one-page crawls actually succeeded # if only one page found, and no files, assume failed if status.pagesFound == 1 and not status.filesAdded: @@ -1297,8 +1319,8 @@ class CrawlOperator(BaseOperator): await self.mark_finished(crawl, status, state, stats) - # check if all crawlers failed - elif status_count.get("failed", 0) >= crawl.scale: + # check if all crawlers failed -- no crawl data was generated + elif all_completed and num_done == 0 and num_failed > 0: # if stopping, and no pages finished, mark as canceled if status.stopping and not status.pagesDone: await self.mark_finished(crawl, status, "canceled", stats) @@ -1318,6 +1340,7 @@ class CrawlOperator(BaseOperator): new_status = "uploading-wacz" elif status_count.get("pending-wait"): new_status = "pending-wait" + if new_status: await self.set_state( new_status, status, crawl, allowed_from=RUNNING_STATES diff --git a/chart/test/test.yaml b/chart/test/test.yaml index 51bd3842..40582ffe 100644 --- a/chart/test/test.yaml +++ b/chart/test/test.yaml @@ -12,6 +12,8 @@ default_crawl_filename_template: "@ts-testing-@hostsuffix.wacz" operator_resync_seconds: 3 +qa_scale: 2 + # for testing only crawler_extra_cpu_per_browser: 300m