From 065365763704643741fbe004e3df20d6f4789e79 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 9 Feb 2024 16:14:29 -0800 Subject: [PATCH] better handling of failed redis connection + exec time updates (#1520) This PR addresses a possible failure when Redis pod was inaccessible from Crawler pod. - Ensure crawl is set to 'waiting_for_capacity' if either no crawler pods are available or no redis pod. previously, missing/inaccessible redis would not result in 'waiting_for_capacity' if crawler pods are available - Rework logic: if no crawler and redis after >60 seconds, shutdown redis. if crawler and no redis, init (or reinit) redis - track 'lastUpdatedTime' in db when incrementing exec time to avoid double counting if lastUpdatedTime has not changed, eg. if operator sync fails. - add redis timeout of 20 seconds to avoid timing out operator responses if redis conn takes too long, assume unavailable --- backend/btrixcloud/crawls.py | 9 ++++-- backend/btrixcloud/k8sapi.py | 5 +++- backend/btrixcloud/operator.py | 51 +++++++++++++++++++++------------- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index db6fcd26..231a5c03 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -451,11 +451,14 @@ class CrawlOps(BaseCrawlOps): query = {"_id": crawl_id, "type": "crawl", "state": "running"} return await self.crawls.find_one_and_update(query, {"$set": {"stats": stats}}) - async def inc_crawl_exec_time(self, crawl_id, exec_time): + async def inc_crawl_exec_time(self, crawl_id, exec_time, last_updated_time): """increment exec time""" return await self.crawls.find_one_and_update( - {"_id": crawl_id, "type": "crawl"}, - {"$inc": {"crawlExecSeconds": exec_time}}, + {"_id": crawl_id, "type": "crawl", "_lut": {"$ne": last_updated_time}}, + { + "$inc": {"crawlExecSeconds": exec_time}, + "$set": {"_lut": last_updated_time}, + }, ) async def get_crawl_state(self, crawl_id): diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index 237c8744..766756b8 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -67,7 +67,10 @@ class K8sAPI: async def get_redis_client(self, redis_url): """return redis client with correct params for one-time use""" return aioredis.from_url( - redis_url, decode_responses=True, auto_close_connection_pool=True + redis_url, + decode_responses=True, + auto_close_connection_pool=True, + socket_timeout=20, ) # pylint: disable=too-many-arguments, too-many-locals diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index c90e6c8f..382b2d5d 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -942,8 +942,10 @@ class BtrixOperator(K8sAPI): if status.anyCrawlPodNewExit: await self.log_crashes(crawl.id, status.podStatus, redis) - if not crawler_running: + if not crawler_running or not redis: + # if either crawler is not running or redis is inaccessible if self.should_mark_waiting(status.state, crawl.started): + # mark as waiting (if already running) await self.set_state( "waiting_capacity", status, @@ -951,25 +953,25 @@ class BtrixOperator(K8sAPI): allowed_from=RUNNING_AND_STARTING_ONLY, ) - # for now, don't reset redis once inited - if status.lastActiveTime and ( - (dt_now() - from_k8s_date(status.lastActiveTime)).total_seconds() - > REDIS_TTL - ): - print( - f"Pausing redis, no running crawler pods for >{REDIS_TTL} secs" - ) - status.initRedis = False + if not crawler_running and redis: + # if crawler running, but no redis, stop redis instance until crawler + # is running + if status.lastActiveTime and ( + ( + dt_now() - from_k8s_date(status.lastActiveTime) + ).total_seconds() + > REDIS_TTL + ): + print( + f"Pausing redis, no running crawler pods for >{REDIS_TTL} secs" + ) + status.initRedis = False + elif crawler_running and not redis: + # if crawler is running, but no redis, init redis + status.initRedis = True + status.lastActiveTime = to_k8s_date(dt_now()) - # if still running, resync after N seconds - status.resync_after = self.fast_retry_secs - return status - - status.initRedis = True - status.lastActiveTime = to_k8s_date(dt_now()) - - if not redis: - # if still running, resync after N seconds + # if no crawler / no redis, resync after N seconds status.resync_after = self.fast_retry_secs return status @@ -1182,7 +1184,16 @@ class BtrixOperator(K8sAPI): max_duration = max(duration, max_duration) if exec_time: - await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time) + if not await self.crawl_ops.inc_crawl_exec_time( + crawl_id, exec_time, status.lastUpdatedTime + ): + # if lastUpdatedTime is same as previous, something is wrong, don't update! + print( + "Already updated for lastUpdatedTime, skipping execTime update!", + flush=True, + ) + return + await self.org_ops.inc_org_time_stats(oid, exec_time, True) status.crawlExecTime += exec_time status.elapsedCrawlTime += max_duration