Add crawl errors endpoint (#757)
* Add crawl errors endpoint If this endpoint is called while the crawl is running, errors are pulled directly from redis. If this endpoint is called when the crawl is finished, errors are pulled from mongodb, where they're written when crawls complete. * Add nightly backend test for errors endpoint * Add errors for failed and cancelled crawls to mongo Co-authored-by: Ilya Kreymer <ikreymer@users.noreply.github.com>
This commit is contained in:
parent
4a46f894a2
commit
6b19f72a89
@ -215,6 +215,8 @@ class CrawlJob(ABC):
|
||||
|
||||
await self.update_crawl(state="failed", finished=self.finished)
|
||||
|
||||
await self.add_crawl_errors_to_mongo()
|
||||
|
||||
async def finish_crawl(self):
|
||||
"""finish crawl"""
|
||||
if self.finished:
|
||||
@ -235,9 +237,29 @@ class CrawlJob(ABC):
|
||||
|
||||
await self.update_crawl(state=state, finished=self.finished)
|
||||
|
||||
await self.add_crawl_errors_to_mongo()
|
||||
|
||||
if completed:
|
||||
await self.inc_crawl_complete_stats()
|
||||
|
||||
async def add_crawl_errors_to_mongo(self, inc=100):
|
||||
"""Pull crawl errors from redis and write to mongo"""
|
||||
index = 0
|
||||
while True:
|
||||
skip = index * inc
|
||||
upper_bound = skip + inc - 1
|
||||
errors = await self.redis.lrange(f"{self.job_id}:e", skip, upper_bound)
|
||||
if not errors:
|
||||
break
|
||||
await self.crawls.find_one_and_update(
|
||||
{"_id": self.job_id}, {"$push": {"errors": {"$each": errors}}}
|
||||
)
|
||||
if len(errors) < inc:
|
||||
# If we have fewer than inc errors, we can assume this is the
|
||||
# last page of data to add.
|
||||
break
|
||||
index += 1
|
||||
|
||||
async def inc_crawl_complete_stats(self):
|
||||
"""Increment Crawl Stats"""
|
||||
|
||||
@ -341,6 +363,8 @@ class CrawlJob(ABC):
|
||||
self.finished = dt_now()
|
||||
await self.update_crawl(state="canceled", finished=self.finished)
|
||||
|
||||
await self.add_crawl_errors_to_mongo()
|
||||
|
||||
await self.delete_crawl()
|
||||
|
||||
return {"success": True}
|
||||
|
@ -101,6 +101,8 @@ class Crawl(CrawlConfigCore):
|
||||
|
||||
notes: Optional[str]
|
||||
|
||||
errors: Optional[List[str]] = []
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class CrawlOut(Crawl):
|
||||
@ -113,6 +115,7 @@ class CrawlOut(Crawl):
|
||||
resources: Optional[List[CrawlFileOut]] = []
|
||||
firstSeed: Optional[str]
|
||||
seedCount: Optional[int] = 0
|
||||
errors: Optional[List[str]]
|
||||
collections: Optional[List[str]] = []
|
||||
|
||||
|
||||
@ -149,6 +152,7 @@ class ListCrawlOut(BaseMongoModel):
|
||||
|
||||
firstSeed: Optional[str]
|
||||
seedCount: Optional[int] = 0
|
||||
errors: Optional[List[str]]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -761,6 +765,24 @@ class CrawlOps:
|
||||
|
||||
return num_removed
|
||||
|
||||
async def get_errors_from_redis(
|
||||
self, crawl_id: str, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1
|
||||
):
|
||||
"""Get crawl errors from Redis and optionally store in mongodb."""
|
||||
# Zero-index page for query
|
||||
page = page - 1
|
||||
skip = page * page_size
|
||||
|
||||
try:
|
||||
redis = await self.get_redis(crawl_id)
|
||||
errors = await redis.lrange(f"{crawl_id}:e", skip, page_size)
|
||||
total = len(errors)
|
||||
except exceptions.ConnectionError:
|
||||
# pylint: disable=raise-missing-from
|
||||
raise HTTPException(status_code=503, detail="redis_connection_error")
|
||||
|
||||
return errors, total
|
||||
|
||||
async def get_redis(self, crawl_id):
|
||||
"""get redis url for crawl id"""
|
||||
# pylint: disable=line-too-long
|
||||
@ -1136,6 +1158,29 @@ def init_crawls_api(app, mdb, users, crawl_manager, crawl_config_ops, orgs, user
|
||||
|
||||
raise HTTPException(status_code=400, detail="crawl_not_finished")
|
||||
|
||||
@app.get(
|
||||
"/orgs/{oid}/crawls/{crawl_id}/errors",
|
||||
tags=["crawls"],
|
||||
)
|
||||
async def get_crawl_errors(
|
||||
crawl_id: str,
|
||||
pageSize: int = DEFAULT_PAGE_SIZE,
|
||||
page: int = 1,
|
||||
org: Organization = Depends(org_crawl_dep),
|
||||
):
|
||||
crawl_raw = await ops.get_crawl_raw(crawl_id, org)
|
||||
crawl = Crawl.from_dict(crawl_raw)
|
||||
|
||||
if crawl.finished:
|
||||
skip = (page - 1) * pageSize
|
||||
upper_bound = skip + pageSize - 1
|
||||
errors = crawl.errors[skip:upper_bound]
|
||||
total = len(errors)
|
||||
return paginated_format(errors, total, page, pageSize)
|
||||
|
||||
errors, total = await ops.get_errors_from_redis(crawl_id, pageSize, page)
|
||||
return paginated_format(errors, total, page, pageSize)
|
||||
|
||||
return ops
|
||||
|
||||
|
||||
|
@ -175,7 +175,6 @@ def large_crawl_id(admin_auth_headers, default_org_id):
|
||||
|
||||
crawl_id = data["run_now_job"]
|
||||
|
||||
# Wait for crawl to start running
|
||||
while True:
|
||||
r = requests.get(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
|
||||
@ -225,3 +224,33 @@ def timeout_crawl(admin_auth_headers, default_org_id):
|
||||
)
|
||||
data = r.json()
|
||||
return data["run_now_job"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def error_crawl_id(admin_auth_headers, default_org_id):
|
||||
crawl_data = {
|
||||
"runNow": True,
|
||||
"name": "Youtube crawl with errors",
|
||||
"config": {
|
||||
"seeds": [{"url": "https://www.youtube.com/watch?v=Sh-x3QmbRZc"}],
|
||||
"limit": 10,
|
||||
},
|
||||
}
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
|
||||
headers=admin_auth_headers,
|
||||
json=crawl_data,
|
||||
)
|
||||
data = r.json()
|
||||
|
||||
crawl_id = data["run_now_job"]
|
||||
|
||||
while True:
|
||||
r = requests.get(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
data = r.json()
|
||||
if data["state"] == "complete":
|
||||
return crawl_id
|
||||
time.sleep(5)
|
||||
|
14
backend/test_nightly/test_crawl_errors.py
Normal file
14
backend/test_nightly/test_crawl_errors.py
Normal file
@ -0,0 +1,14 @@
|
||||
import requests
|
||||
|
||||
from .conftest import API_PREFIX
|
||||
|
||||
|
||||
def test_get_crawl_errors(admin_auth_headers, default_org_id, error_crawl_id):
|
||||
r = requests.get(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{error_crawl_id}/errors",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert data["total"] > 0
|
||||
assert data["items"]
|
@ -151,7 +151,7 @@ crawler_namespace: "crawlers"
|
||||
crawl_retries: 1000
|
||||
|
||||
# browsertrix-crawler args:
|
||||
crawler_args: "--logging stats,behaviors,debug --generateWACZ --text --collection thecrawl --screencastPort 9037 --diskUtilization {{ .Values.disk_utilization_threshold | default 90 }} --waitOnDone"
|
||||
crawler_args: "--logging stats,behaviors,debug --generateWACZ --text --collection thecrawl --screencastPort 9037 --logErrorsToRedis --diskUtilization {{ .Values.disk_utilization_threshold | default 90 }} --waitOnDone"
|
||||
|
||||
crawler_browser_instances: 2
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user