backend: Fix for total crawl time limit. (#665)
* backend: fix for total crawl timelimit: - time limit is computed for total job run time - when limit is exceeded, job starts to stop crawls gracefully, equivalent to 'stop crawl' operation - fix for #664 * rename crawl-timeout -> crawl_expire_time * fix lint
This commit is contained in:
parent
8ca4276c57
commit
86ca9c4bac
@ -57,6 +57,10 @@ class CrawlJob(ABC):
|
|||||||
self.storage_path = os.environ.get("STORE_PATH")
|
self.storage_path = os.environ.get("STORE_PATH")
|
||||||
self.storage_name = os.environ.get("STORAGE_NAME")
|
self.storage_name = os.environ.get("STORAGE_NAME")
|
||||||
|
|
||||||
|
self.crawl_expire_time = os.environ.get("CRAWL_EXPIRE_TIME")
|
||||||
|
if self.crawl_expire_time:
|
||||||
|
self.crawl_expire_time = datetime.fromisoformat(self.crawl_expire_time)
|
||||||
|
|
||||||
self.last_done = None
|
self.last_done = None
|
||||||
self.last_found = None
|
self.last_found = None
|
||||||
self.redis = None
|
self.redis = None
|
||||||
@ -141,7 +145,7 @@ class CrawlJob(ABC):
|
|||||||
# check crawl status
|
# check crawl status
|
||||||
await self.check_crawl_status()
|
await self.check_crawl_status()
|
||||||
|
|
||||||
# pylint: disable=broad-except
|
# pylint: disable=broad-except
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
print(f"Retrying crawls done loop: {exc}")
|
print(f"Retrying crawls done loop: {exc}")
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(10)
|
||||||
@ -174,6 +178,15 @@ class CrawlJob(ABC):
|
|||||||
|
|
||||||
await self.delete_crawl()
|
await self.delete_crawl()
|
||||||
|
|
||||||
|
# check crawl expiry
|
||||||
|
if self.crawl_expire_time and datetime.utcnow() > self.crawl_expire_time:
|
||||||
|
res = await self.graceful_shutdown()
|
||||||
|
if res.get("success"):
|
||||||
|
print(
|
||||||
|
"Job duration expired at {self.crawl_expire_time}, "
|
||||||
|
+ "gracefully stopping crawl"
|
||||||
|
)
|
||||||
|
|
||||||
async def delete_crawl(self):
|
async def delete_crawl(self):
|
||||||
"""delete crawl stateful sets, services and pvcs"""
|
"""delete crawl stateful sets, services and pvcs"""
|
||||||
self._delete_pending = True
|
self._delete_pending = True
|
||||||
|
@ -178,12 +178,21 @@ class BaseCrawlManager(ABC):
|
|||||||
return crawl_id
|
return crawl_id
|
||||||
|
|
||||||
async def _load_job_template(self, crawlconfig, job_id, manual, schedule=None):
|
async def _load_job_template(self, crawlconfig, job_id, manual, schedule=None):
|
||||||
|
if crawlconfig.crawlTimeout:
|
||||||
|
crawl_expire_time = datetime.datetime.utcnow() + datetime.timedelta(
|
||||||
|
seconds=crawlconfig.crawlTimeout
|
||||||
|
)
|
||||||
|
crawl_expire_time = crawl_expire_time.isoformat()
|
||||||
|
else:
|
||||||
|
crawl_expire_time = ""
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"id": job_id,
|
"id": job_id,
|
||||||
"cid": str(crawlconfig.id),
|
"cid": str(crawlconfig.id),
|
||||||
"rev": str(crawlconfig.rev),
|
"rev": str(crawlconfig.rev),
|
||||||
"userid": str(crawlconfig.modifiedBy),
|
"userid": str(crawlconfig.modifiedBy),
|
||||||
"oid": str(crawlconfig.oid),
|
"oid": str(crawlconfig.oid),
|
||||||
|
"crawl_expire_time": crawl_expire_time,
|
||||||
"job_image": self.job_image,
|
"job_image": self.job_image,
|
||||||
"job_pull_policy": self.job_pull_policy,
|
"job_pull_policy": self.job_pull_policy,
|
||||||
"manual": "1" if manual else "0",
|
"manual": "1" if manual else "0",
|
||||||
|
@ -76,6 +76,9 @@ spec:
|
|||||||
- name: TAGS
|
- name: TAGS
|
||||||
value: "{{ tags }}"
|
value: "{{ tags }}"
|
||||||
|
|
||||||
|
- name: CRAWL_EXPIRE_TIME
|
||||||
|
value: "{{ crawl_expire_time }}"
|
||||||
|
|
||||||
- name: STORE_PATH
|
- name: STORE_PATH
|
||||||
valueFrom:
|
valueFrom:
|
||||||
configMapKeyRef:
|
configMapKeyRef:
|
||||||
|
Loading…
Reference in New Issue
Block a user