make crawlTimeout a per-crawconfig property

allow crawl complete/partial complete to update existing crawl state, eg. timeout
enable handling backofflimitexceeded / deadlineexceeded failure, with possible success able to override the failure state
filter out only active jobs in running crawls listing
This commit is contained in:
Ilya Kreymer 2021-08-24 11:27:34 -07:00
parent ed27f3e3ee
commit 20b19f932f
4 changed files with 57 additions and 26 deletions

View File

@ -75,7 +75,7 @@ class CrawlConfigIn(BaseModel):
schedule: Optional[str] = ""
runNow: Optional[bool] = False
# storageName: Optional[str] = "default"
crawlTimeout: Optional[int] = 0
config: RawCrawlConfig
@ -93,6 +93,8 @@ class CrawlConfig(BaseMongoModel):
config: RawCrawlConfig
crawlTimeout: Optional[int] = 0
# ============================================================================
class CrawlOps:

View File

@ -1,13 +1,13 @@
""" Crawl API """
import asyncio
import traceback
from typing import Optional, List
from datetime import datetime
from fastapi import Depends, HTTPException
from pydantic import BaseModel
import pymongo
from db import BaseMongoModel
from archives import Archive
@ -74,11 +74,20 @@ class CrawlOps:
print("Not a valid crawl complete msg!", flush=True)
return
await self.handle_finished(crawl)
await self.store_crawl(crawl, update_existing=True)
async def handle_finished(self, crawl: Crawl):
async def store_crawl(self, crawl: Crawl, update_existing=False):
""" Add finished crawl to db, increment archive usage """
if update_existing:
await self.crawls.find_one_and_replace(
{"_id": crawl.id}, crawl.to_dict(), upsert=True
)
else:
try:
await self.crawls.insert_one(crawl.to_dict())
except pymongo.errors.DuplicateKeyError:
print(f"Crawl Already Added: {crawl.id}")
return False
dura = int((crawl.finished - crawl.started).total_seconds())
@ -150,7 +159,7 @@ def init_crawls_api(app, mdb, crawl_manager, archives):
status_code=404, detail=f"Crawl not found: {crawl_id}"
)
await ops.handle_finished(crawl)
await ops.store_crawl(crawl)
except HTTPException as httpe:
raise httpe
@ -182,7 +191,6 @@ def init_crawls_api(app, mdb, crawl_manager, archives):
except Exception as exc:
# pylint: disable=raise-missing-from
traceback.print_exc()
raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}")
return {"stopped_gracefully": True}

View File

@ -37,17 +37,26 @@ class K8SManager:
self.crawler_image = os.environ.get("CRAWLER_IMAGE")
self.crawler_image_pull_policy = "IfNotPresent"
self.crawl_timeout = int(os.environ.get("CRAWL_TIMEOUT", "1000000"))
self.crawl_retries = int(os.environ.get("CRAWL_RETRIES", "3"))
self.loop = asyncio.get_running_loop()
self.loop.create_task(self.watch_job_loop())
self.loop.create_task(self.run_event_loop())
def set_crawl_ops(self, ops):
""" Set crawl ops handler """
self.crawl_ops = ops
async def watch_job_loop(self):
async def run_event_loop(self):
""" Run the job watch loop, retry in case of failure"""
while True:
try:
await self.watch_events()
# pylint: disable=broad-except
except Exception as exc:
print(f"Retrying job loop: {exc}")
await asyncio.sleep(10)
async def watch_events(self):
""" Get events for completed jobs"""
async with watch.Watch().stream(
self.core_api.list_namespaced_event,
@ -62,12 +71,12 @@ class K8SManager:
self.handle_crawl_failed(obj.involved_object.name, "failed")
)
# elif obj.reason == "DeadlineExceeded":
# self.loop.create_task(
# self.handle_crawl_failed(
# obj.involved_object.name, "timed_out"
# )
# )
elif obj.reason == "DeadlineExceeded":
self.loop.create_task(
self.handle_crawl_failed(
obj.involved_object.name, "timed_out"
)
)
# pylint: disable=broad-except
except Exception as exc:
@ -131,7 +140,7 @@ class K8SManager:
extra_crawl_params = extra_crawl_params or []
job_template = self._get_job_template(
cid, labels, annotations, extra_crawl_params
cid, labels, annotations, crawlconfig.crawlTimeout, extra_crawl_params
)
spec = client.V1beta1CronJobSpec(
@ -205,6 +214,15 @@ class K8SManager:
cron_job.spec.suspend = suspend
changed = True
if (
crawlconfig.crawlTimeout
!= cron_job.spec.job_template.spec.active_deadline_seconds
):
cron_job.spec.job_template.spec.active_deadline_seconds = (
crawlconfig.crawlTimeout
)
changed = True
if changed:
cron_job.spec.job_template.metadata.annotations[
"btrix.run.schedule"
@ -248,7 +266,11 @@ class K8SManager:
field_selector="status.successful=0",
)
return [self._make_crawl_for_job(job, "running") for job in jobs.items]
return [
self._make_crawl_for_job(job, "running")
for job in jobs.items
if job.status.active
]
async def validate_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches)
@ -332,7 +354,7 @@ class K8SManager:
crawl = self._make_crawl_for_job(job, reason, True)
await self.crawl_ops.handle_finished(crawl)
await self.crawl_ops.store_crawl(crawl)
await self._delete_job(job_name)
@ -360,7 +382,7 @@ class K8SManager:
await self.batch_api.delete_namespaced_job(
name=name,
namespace=self.namespace,
grace_period_seconds=120,
grace_period_seconds=60,
propagation_policy="Foreground",
)
@ -474,7 +496,9 @@ class K8SManager:
body=job, namespace=self.namespace
)
def _get_job_template(self, uid, labels, annotations, extra_crawl_params):
def _get_job_template(
self, uid, labels, annotations, crawl_timeout, extra_crawl_params
):
"""Return crawl job template for crawl job, including labels, adding optiona crawl params"""
command = ["crawl", "--config", "/tmp/crawl-config.json"]
@ -556,7 +580,7 @@ class K8SManager:
},
}
if self.crawl_timeout > 0:
job_template["spec"]["activeDeadlineSeconds"] = self.crawl_timeout
if crawl_timeout > 0:
job_template["spec"]["activeDeadlineSeconds"] = crawl_timeout
return job_template

View File

@ -41,9 +41,6 @@ crawler_pull_policy: "Never"
crawler_namespace: "crawlers"
# set 0 to disable timeout
crawl_timeout: 0
# num retries
crawl_retries: 1