diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index 62f03a8e..ab8f021f 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -2,6 +2,7 @@ import asyncio import traceback +import os from typing import Optional from datetime import datetime @@ -44,6 +45,9 @@ CJS = "CrawlJob.btrix.cloud/v1" DEFAULT_TTL = 30 +# time in seconds before a crawl is deemed 'waiting' instead of 'starting' +STARTING_TIME_SECS = 60 + # ============================================================================ class DeleteCrawlException(Exception): @@ -95,7 +99,10 @@ class CrawlStatus(BaseModel): filesAddedSize: int = 0 finished: Optional[str] = None stopping: bool = False - # forceRestart: Optional[str] + initRedis: bool = False + + # don't include in status, use by metacontroller + resync_after: Optional[int] = None # ============================================================================ @@ -117,6 +124,8 @@ class BtrixOperator(K8sAPI): self.done_key = "crawls-done" + self.fast_retry_secs = int(os.environ.get("FAST_RETRY_SECS") or 0) + with open(self.config_file, encoding="utf-8") as fh_config: self.shared_params = yaml.safe_load(fh_config) @@ -232,6 +241,7 @@ class BtrixOperator(K8sAPI): params["force_restart"] = spec.get("forceRestart") params["redis_url"] = redis_url + params["redis_scale"] = 1 if status.initRedis else 0 children = self.load_from_yaml("crawler.yaml", params) children.extend(self.load_from_yaml("redis.yaml", params)) @@ -249,7 +259,11 @@ class BtrixOperator(K8sAPI): "spec" ]["volumeClaimTemplates"] - return {"status": status.dict(exclude_none=True), "children": children} + return { + "status": status.dict(exclude_none=True, exclude={"resync_after": True}), + "children": children, + "resyncAfterSeconds": status.resync_after, + } async def set_state(self, state, status, crawl_id, allowed_from, **kwargs): """set status state and update db, if changed @@ -286,12 +300,15 @@ class BtrixOperator(K8sAPI): return True # get actual crawl state - new_state, finished = await get_crawl_state(self.crawls, crawl_id) - if new_state: - status.state = state + actual_state, finished = await get_crawl_state(self.crawls, crawl_id) + if actual_state: + status.state = actual_state if finished: status.finished = to_k8s_date(finished) + if actual_state != state: + print(f"state mismatch, actual state {actual_state}, requested {state}") + if status.state != state: print( f"Not setting state: {status.state} -> {state}, {crawl_id} not allowed" @@ -416,7 +433,7 @@ class BtrixOperator(K8sAPI): def _done_response(self, status, finalized=False): """done response for removing crawl""" return { - "status": status.dict(exclude_none=True), + "status": status.dict(exclude_none=True, exclude={"resync_after": True}), "children": [], "finalized": finalized, } @@ -450,12 +467,34 @@ class BtrixOperator(K8sAPI): async def sync_crawl_state(self, redis_url, crawl, status, pods): """sync crawl state for running crawl""" - redis = await self._get_redis(redis_url) - if not redis: + # check if at least one pod started running + if not await self.check_if_pods_running(pods): + if self.should_mark_waiting(status.state, crawl.started): + await self.set_state( + "waiting_capacity", + status, + crawl.id, + allowed_from=["starting", "running"], + ) + + status.initRedis = False + + # resync after N seconds + status.resync_after = self.fast_retry_secs return status - # if not prev_start_time: - # await redis.set("start_time", str(self.started)) + status.initRedis = True + + redis = await self._get_redis(redis_url) + if not redis: + # resync after N seconds, until redis is inited + status.resync_after = self.fast_retry_secs + return status + + # set state to running (if not already) + await self.set_state( + "running", status, crawl.id, allowed_from=["starting", "waiting_capacity"] + ) try: file_done = await redis.lpop(self.done_key) @@ -475,7 +514,7 @@ class BtrixOperator(K8sAPI): status.filesAddedSize = int(await redis.get("filesAddedSize") or 0) # update stats and get status - return await self.update_crawl_state(redis, crawl, status, pods) + return await self.update_crawl_state(redis, crawl, status) # pylint: disable=broad-except except Exception as exc: @@ -487,8 +526,21 @@ class BtrixOperator(K8sAPI): """check if at least one crawler pod has started""" try: for pod in pods.values(): - if pod["status"]["phase"] == "Running": + status = pod["status"] + if status["phase"] == "Running": return True + + # consider 'ContainerCreating' as running + if status["phase"] == "Pending": + if ( + "containerStatuses" in status + and status["containerStatuses"][0]["state"]["waiting"]["reason"] + == "ContainerCreating" + ): + return True + + # print("non-running pod status", pod["status"], flush=True) + # pylint: disable=bare-except except: # assume no valid pod found @@ -496,6 +548,17 @@ class BtrixOperator(K8sAPI): return False + def should_mark_waiting(self, state, started): + """Should the crawl be marked as waiting for capacity?""" + if state == "running": + return True + + if state == "starting": + started = from_k8s_date(started) + return (datetime.utcnow() - started).total_seconds() > STARTING_TIME_SECS + + return False + async def add_file_to_crawl(self, cc_data, crawl, redis): """Handle finished CrawlFile to db""" @@ -523,7 +586,7 @@ class BtrixOperator(K8sAPI): return True # pylint: disable=too-many-branches - async def update_crawl_state(self, redis, crawl, status, pods): + async def update_crawl_state(self, redis, crawl, status): """update crawl state and check if crawl is now done""" results = await redis.hvals(f"{crawl.id}:status") stats = await get_redis_crawl_stats(redis, crawl.id) @@ -542,23 +605,6 @@ class BtrixOperator(K8sAPI): # backwards compatibility with older crawler await redis.set("crawl-stop", "1") - # check if at least one pod started running - # otherwise, mark as 'waiting' and return - if not await self.check_if_pods_running(pods): - await self.set_state( - "waiting_capacity", - status, - crawl.id, - allowed_from=["starting", "running"], - ) - - return status - - # set state to running (if not already) - await self.set_state( - "running", status, crawl.id, allowed_from=["starting", "waiting_capacity"] - ) - # update status status.pagesDone = stats["done"] status.pagesFound = stats["found"] diff --git a/backend/btrixcloud/templates/redis.yaml b/backend/btrixcloud/templates/redis.yaml index 99dcf416..9f3ffe2f 100644 --- a/backend/btrixcloud/templates/redis.yaml +++ b/backend/btrixcloud/templates/redis.yaml @@ -18,7 +18,7 @@ spec: role: redis serviceName: redis-{{ id }} - replicas: 1 + replicas: {{ redis_scale }} podManagementPolicy: Parallel # not yet supported diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index c7f09e6c..faf7a16a 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -64,6 +64,8 @@ data: PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes | default 60 }}" + FAST_RETRY_SECS: "{{ .Values.operator_fast_resync_secs | default 3 }}" + --- apiVersion: v1