Add crawl errors incrementally during crawl (#1561)
Fixes #1558 - Adds crawl errors to database incrementally during crawl rather than after crawl completes - Simplifies crawl /errors API endpoint to always return errors from database
This commit is contained in:
parent
804f755787
commit
da19691184
@ -384,26 +384,6 @@ class CrawlOps(BaseCrawlOps):
|
||||
|
||||
return {"total": total, "matched": matched, "nextOffset": next_offset}
|
||||
|
||||
async def get_errors_from_redis(
|
||||
self, crawl_id: str, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1
|
||||
):
|
||||
"""Get crawl errors from Redis and optionally store in mongodb."""
|
||||
# Zero-index page for query
|
||||
page = page - 1
|
||||
skip = page * page_size
|
||||
upper_bound = skip + page_size - 1
|
||||
|
||||
async with self.get_redis(crawl_id) as redis:
|
||||
try:
|
||||
errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
|
||||
total = await redis.llen(f"{crawl_id}:e")
|
||||
except exceptions.ConnectionError:
|
||||
# pylint: disable=raise-missing-from
|
||||
raise HTTPException(status_code=503, detail="error_logs_not_available")
|
||||
|
||||
parsed_errors = parse_jsonl_error_messages(errors)
|
||||
return parsed_errors, total
|
||||
|
||||
async def add_or_remove_exclusion(self, crawl_id, regex, org, user, add):
|
||||
"""add new exclusion to config or remove exclusion from config
|
||||
for given crawl_id, update config on crawl"""
|
||||
@ -470,10 +450,10 @@ class CrawlOps(BaseCrawlOps):
|
||||
return None, None
|
||||
return res.get("state"), res.get("finished")
|
||||
|
||||
async def add_crawl_errors(self, crawl_id, errors):
|
||||
"""add crawl errors from redis to mongodb errors field"""
|
||||
async def add_crawl_error(self, crawl_id: str, error: str):
|
||||
"""add crawl error from redis to mongodb errors field"""
|
||||
await self.crawls.find_one_and_update(
|
||||
{"_id": crawl_id}, {"$push": {"errors": {"$each": errors}}}
|
||||
{"_id": crawl_id}, {"$push": {"errors": error}}
|
||||
)
|
||||
|
||||
async def add_crawl_file(self, crawl_id, crawl_file, size):
|
||||
@ -931,15 +911,11 @@ def init_crawls_api(app, user_dep, *args):
|
||||
crawl_raw = await ops.get_crawl_raw(crawl_id, org)
|
||||
crawl = Crawl.from_dict(crawl_raw)
|
||||
|
||||
if crawl.finished:
|
||||
skip = (page - 1) * pageSize
|
||||
upper_bound = skip + pageSize
|
||||
errors = crawl.errors[skip:upper_bound]
|
||||
parsed_errors = parse_jsonl_error_messages(errors)
|
||||
total = len(crawl.errors)
|
||||
return paginated_format(parsed_errors, total, page, pageSize)
|
||||
skip = (page - 1) * pageSize
|
||||
upper_bound = skip + pageSize
|
||||
|
||||
errors, total = await ops.get_errors_from_redis(crawl_id, pageSize, page)
|
||||
return paginated_format(errors, total, page, pageSize)
|
||||
errors = crawl.errors[skip:upper_bound]
|
||||
parsed_errors = parse_jsonl_error_messages(errors)
|
||||
return paginated_format(parsed_errors, len(crawl.errors), page, pageSize)
|
||||
|
||||
return ops
|
||||
|
@ -300,6 +300,7 @@ class BtrixOperator(K8sAPI):
|
||||
|
||||
self.done_key = "crawls-done"
|
||||
self.pages_key = "pages"
|
||||
self.errors_key = "e"
|
||||
|
||||
self.fast_retry_secs = int(os.environ.get("FAST_RETRY_SECS") or 0)
|
||||
|
||||
@ -999,7 +1000,6 @@ class BtrixOperator(K8sAPI):
|
||||
)
|
||||
|
||||
file_done = await redis.lpop(self.done_key)
|
||||
|
||||
while file_done:
|
||||
msg = json.loads(file_done)
|
||||
# add completed file
|
||||
@ -1011,12 +1011,16 @@ class BtrixOperator(K8sAPI):
|
||||
file_done = await redis.lpop(self.done_key)
|
||||
|
||||
page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}")
|
||||
|
||||
while page_crawled:
|
||||
page_dict = json.loads(page_crawled)
|
||||
await self.page_ops.add_page_to_db(page_dict, crawl.id, crawl.oid)
|
||||
page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}")
|
||||
|
||||
crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}")
|
||||
while crawl_error:
|
||||
await self.crawl_ops.add_crawl_error(crawl.id, crawl_error)
|
||||
crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}")
|
||||
|
||||
# ensure filesAdded and filesAddedSize always set
|
||||
status.filesAdded = int(await redis.get("filesAdded") or 0)
|
||||
status.filesAddedSize = int(await redis.get("filesAddedSize") or 0)
|
||||
@ -1552,8 +1556,6 @@ class BtrixOperator(K8sAPI):
|
||||
crawl_id, oid, state
|
||||
)
|
||||
|
||||
await self.add_crawl_errors_to_db(crawl_id)
|
||||
|
||||
# finally, delete job
|
||||
await self.delete_crawl_job(crawl_id)
|
||||
|
||||
@ -1582,42 +1584,6 @@ class BtrixOperator(K8sAPI):
|
||||
if redis:
|
||||
await redis.close()
|
||||
|
||||
async def add_crawl_errors_to_db(self, crawl_id, inc=100):
|
||||
"""Pull crawl errors from redis and write to mongo db"""
|
||||
index = 0
|
||||
redis = None
|
||||
try:
|
||||
redis_url = self.get_redis_url(crawl_id)
|
||||
redis = await self._get_redis(redis_url)
|
||||
if not redis:
|
||||
return
|
||||
|
||||
# ensure this only runs once
|
||||
if not await redis.setnx("errors-exported", "1"):
|
||||
return
|
||||
|
||||
while True:
|
||||
skip = index * inc
|
||||
upper_bound = skip + inc - 1
|
||||
errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
|
||||
if not errors:
|
||||
break
|
||||
|
||||
await self.crawl_ops.add_crawl_errors(crawl_id, errors)
|
||||
|
||||
if len(errors) < inc:
|
||||
# If we have fewer than inc errors, we can assume this is the
|
||||
# last page of data to add.
|
||||
break
|
||||
index += 1
|
||||
# pylint: disable=bare-except
|
||||
except:
|
||||
# likely redis has already been deleted, so nothing to do
|
||||
pass
|
||||
finally:
|
||||
if redis:
|
||||
await redis.close()
|
||||
|
||||
def get_cronjob_crawl_related(self, data: MCBaseRequest):
|
||||
"""return configmap related to crawl"""
|
||||
labels = data.parent.get("metadata", {}).get("labels", {})
|
||||
|
Loading…
Reference in New Issue
Block a user