From da19691184081b714aef1398e866edccb000dbf3 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Thu, 29 Feb 2024 12:16:34 -0500 Subject: [PATCH] Add crawl errors incrementally during crawl (#1561) Fixes #1558 - Adds crawl errors to database incrementally during crawl rather than after crawl completes - Simplifies crawl /errors API endpoint to always return errors from database --- backend/btrixcloud/crawls.py | 40 ++++++----------------------- backend/btrixcloud/operator.py | 46 +++++----------------------------- 2 files changed, 14 insertions(+), 72 deletions(-) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 231a5c03..74897c63 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -384,26 +384,6 @@ class CrawlOps(BaseCrawlOps): return {"total": total, "matched": matched, "nextOffset": next_offset} - async def get_errors_from_redis( - self, crawl_id: str, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1 - ): - """Get crawl errors from Redis and optionally store in mongodb.""" - # Zero-index page for query - page = page - 1 - skip = page * page_size - upper_bound = skip + page_size - 1 - - async with self.get_redis(crawl_id) as redis: - try: - errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound) - total = await redis.llen(f"{crawl_id}:e") - except exceptions.ConnectionError: - # pylint: disable=raise-missing-from - raise HTTPException(status_code=503, detail="error_logs_not_available") - - parsed_errors = parse_jsonl_error_messages(errors) - return parsed_errors, total - async def add_or_remove_exclusion(self, crawl_id, regex, org, user, add): """add new exclusion to config or remove exclusion from config for given crawl_id, update config on crawl""" @@ -470,10 +450,10 @@ class CrawlOps(BaseCrawlOps): return None, None return res.get("state"), res.get("finished") - async def add_crawl_errors(self, crawl_id, errors): - """add crawl errors from redis to mongodb errors field""" + async def add_crawl_error(self, crawl_id: str, error: str): + """add crawl error from redis to mongodb errors field""" await self.crawls.find_one_and_update( - {"_id": crawl_id}, {"$push": {"errors": {"$each": errors}}} + {"_id": crawl_id}, {"$push": {"errors": error}} ) async def add_crawl_file(self, crawl_id, crawl_file, size): @@ -931,15 +911,11 @@ def init_crawls_api(app, user_dep, *args): crawl_raw = await ops.get_crawl_raw(crawl_id, org) crawl = Crawl.from_dict(crawl_raw) - if crawl.finished: - skip = (page - 1) * pageSize - upper_bound = skip + pageSize - errors = crawl.errors[skip:upper_bound] - parsed_errors = parse_jsonl_error_messages(errors) - total = len(crawl.errors) - return paginated_format(parsed_errors, total, page, pageSize) + skip = (page - 1) * pageSize + upper_bound = skip + pageSize - errors, total = await ops.get_errors_from_redis(crawl_id, pageSize, page) - return paginated_format(errors, total, page, pageSize) + errors = crawl.errors[skip:upper_bound] + parsed_errors = parse_jsonl_error_messages(errors) + return paginated_format(parsed_errors, len(crawl.errors), page, pageSize) return ops diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index 0f1e8fa1..bbe7fc29 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -300,6 +300,7 @@ class BtrixOperator(K8sAPI): self.done_key = "crawls-done" self.pages_key = "pages" + self.errors_key = "e" self.fast_retry_secs = int(os.environ.get("FAST_RETRY_SECS") or 0) @@ -999,7 +1000,6 @@ class BtrixOperator(K8sAPI): ) file_done = await redis.lpop(self.done_key) - while file_done: msg = json.loads(file_done) # add completed file @@ -1011,12 +1011,16 @@ class BtrixOperator(K8sAPI): file_done = await redis.lpop(self.done_key) page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}") - while page_crawled: page_dict = json.loads(page_crawled) await self.page_ops.add_page_to_db(page_dict, crawl.id, crawl.oid) page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}") + crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}") + while crawl_error: + await self.crawl_ops.add_crawl_error(crawl.id, crawl_error) + crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}") + # ensure filesAdded and filesAddedSize always set status.filesAdded = int(await redis.get("filesAdded") or 0) status.filesAddedSize = int(await redis.get("filesAddedSize") or 0) @@ -1552,8 +1556,6 @@ class BtrixOperator(K8sAPI): crawl_id, oid, state ) - await self.add_crawl_errors_to_db(crawl_id) - # finally, delete job await self.delete_crawl_job(crawl_id) @@ -1582,42 +1584,6 @@ class BtrixOperator(K8sAPI): if redis: await redis.close() - async def add_crawl_errors_to_db(self, crawl_id, inc=100): - """Pull crawl errors from redis and write to mongo db""" - index = 0 - redis = None - try: - redis_url = self.get_redis_url(crawl_id) - redis = await self._get_redis(redis_url) - if not redis: - return - - # ensure this only runs once - if not await redis.setnx("errors-exported", "1"): - return - - while True: - skip = index * inc - upper_bound = skip + inc - 1 - errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound) - if not errors: - break - - await self.crawl_ops.add_crawl_errors(crawl_id, errors) - - if len(errors) < inc: - # If we have fewer than inc errors, we can assume this is the - # last page of data to add. - break - index += 1 - # pylint: disable=bare-except - except: - # likely redis has already been deleted, so nothing to do - pass - finally: - if redis: - await redis.close() - def get_cronjob_crawl_related(self, data: MCBaseRequest): """return configmap related to crawl""" labels = data.parent.get("metadata", {}).get("labels", {})