Crawl Timeout via elapsed time (#1338)
Fixes #1337 Crawl timeout is tracked via `elapsedCrawlTime` field on the crawl status, which is similar to regular crawl execution time, but only counts one pod if scale > 1. If scale == 1, this time is equivalent. Crawl is gracefully stopped when the elapsed execution time exceeds the timeout. For more responsiveness, also adding current crawl time since last update interval. Details: - handle crawl timeout via elapsed crawl time - longest running time of a single pod, instead of expire time. - include current running from last update for best precision - more accurately count elapsed time crawl is actually running - store elapsedCrawlTime in addition to crawlExecTime, storing the longest duration of each pod since last test interval --------- Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
parent
5530ca92e1
commit
b4fd5e6e94
@ -2,8 +2,6 @@
|
||||
import os
|
||||
import traceback
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import yaml
|
||||
|
||||
from kubernetes_asyncio import client, config
|
||||
@ -18,7 +16,7 @@ from redis import asyncio as aioredis
|
||||
from fastapi import HTTPException
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
from .utils import get_templates_dir, dt_now, to_k8s_date
|
||||
from .utils import get_templates_dir, dt_now
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -85,11 +83,6 @@ class K8sAPI:
|
||||
crawl_id=None,
|
||||
):
|
||||
"""load job template from yaml"""
|
||||
if crawl_timeout:
|
||||
crawl_expire_time = to_k8s_date(dt_now() + timedelta(seconds=crawl_timeout))
|
||||
else:
|
||||
crawl_expire_time = ""
|
||||
|
||||
if not crawl_id:
|
||||
ts_now = dt_now().strftime("%Y%m%d%H%M%S")
|
||||
prefix = "manual" if manual else "sched"
|
||||
@ -101,7 +94,7 @@ class K8sAPI:
|
||||
"oid": oid,
|
||||
"userid": userid,
|
||||
"scale": scale,
|
||||
"expire_time": crawl_expire_time or 0,
|
||||
"timeout": crawl_timeout,
|
||||
"max_crawl_size": max_crawl_size or 0,
|
||||
"storage_name": str(storage),
|
||||
"manual": "1" if manual else "0",
|
||||
|
@ -8,7 +8,6 @@ from typing import Optional, DefaultDict, TYPE_CHECKING
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
from uuid import UUID
|
||||
from fastapi import HTTPException
|
||||
@ -114,8 +113,8 @@ class CrawlSpec(BaseModel):
|
||||
started: str
|
||||
stopping: bool = False
|
||||
scheduled: bool = False
|
||||
expire_time: Optional[datetime] = None
|
||||
max_crawl_size: Optional[int] = None
|
||||
timeout: int = 0
|
||||
max_crawl_size: int = 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -232,9 +231,15 @@ class CrawlStatus(BaseModel):
|
||||
restartTime: Optional[str]
|
||||
canceled: bool = False
|
||||
|
||||
# Execution Time -- updated on pod exits and at regular interval
|
||||
# updated on pod exits and at regular interval
|
||||
# Crawl Execution Time -- time all crawler pods have been running
|
||||
# used to track resource usage and enforce execution minutes limit
|
||||
crawlExecTime: int = 0
|
||||
|
||||
# Elapsed Exec Time -- time crawl has been running in at least one pod
|
||||
# used for crawl timeouts
|
||||
elapsedCrawlTime: int = 0
|
||||
|
||||
# last exec time update
|
||||
lastUpdatedTime: str = ""
|
||||
|
||||
@ -440,7 +445,7 @@ class BtrixOperator(K8sAPI):
|
||||
scale=spec.get("scale", 1),
|
||||
started=data.parent["metadata"]["creationTimestamp"],
|
||||
stopping=spec.get("stopping", False),
|
||||
expire_time=from_k8s_date(spec.get("expireTime")),
|
||||
timeout=spec.get("timeout") or 0,
|
||||
max_crawl_size=int(spec.get("maxCrawlSize") or 0),
|
||||
scheduled=spec.get("manual") != "1",
|
||||
)
|
||||
@ -1081,6 +1086,7 @@ class BtrixOperator(K8sAPI):
|
||||
return
|
||||
|
||||
exec_time = 0
|
||||
max_duration = 0
|
||||
print(
|
||||
f"Exec Time Update: {reason}: {now} - {update_start_time} = {update_duration}"
|
||||
)
|
||||
@ -1131,11 +1137,13 @@ class BtrixOperator(K8sAPI):
|
||||
f" - {name}: {pod_state}: {end_time} - {start_time} = {duration}"
|
||||
)
|
||||
exec_time += duration
|
||||
max_duration = max(duration, max_duration)
|
||||
|
||||
if exec_time:
|
||||
await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time)
|
||||
await self.org_ops.inc_org_time_stats(oid, exec_time, True)
|
||||
status.crawlExecTime += exec_time
|
||||
status.elapsedCrawlTime += max_duration
|
||||
|
||||
print(
|
||||
f" Exec Time Total: {status.crawlExecTime}, Incremented By: {exec_time}",
|
||||
@ -1254,7 +1262,7 @@ class BtrixOperator(K8sAPI):
|
||||
|
||||
return True
|
||||
|
||||
def is_crawl_stopping(self, crawl, size):
|
||||
def is_crawl_stopping(self, crawl: CrawlSpec, status: CrawlStatus) -> bool:
|
||||
"""return true if crawl should begin graceful stopping phase"""
|
||||
|
||||
# if user requested stop, then enter stopping phase
|
||||
@ -1262,12 +1270,19 @@ class BtrixOperator(K8sAPI):
|
||||
print("Graceful Stop: User requested stop")
|
||||
return True
|
||||
|
||||
# check crawl expiry
|
||||
if crawl.expire_time and dt_now() > crawl.expire_time:
|
||||
print(f"Graceful Stop: Job duration expired at {crawl.expire_time}")
|
||||
return True
|
||||
# check timeout if timeout time exceeds elapsed time
|
||||
if crawl.timeout:
|
||||
elapsed = (
|
||||
status.elapsedCrawlTime
|
||||
+ (dt_now() - from_k8s_date(status.lastUpdatedTime)).total_seconds()
|
||||
)
|
||||
if elapsed > crawl.timeout:
|
||||
print(
|
||||
f"Graceful Stop: Crawl running time exceeded {crawl.timeout} second timeout"
|
||||
)
|
||||
return True
|
||||
|
||||
if crawl.max_crawl_size and size > crawl.max_crawl_size:
|
||||
if crawl.max_crawl_size and status.size > crawl.max_crawl_size:
|
||||
print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit")
|
||||
return True
|
||||
|
||||
@ -1311,7 +1326,7 @@ class BtrixOperator(K8sAPI):
|
||||
pod_info = status.podStatus[key]
|
||||
pod_info.used.storage = value
|
||||
|
||||
status.stopping = self.is_crawl_stopping(crawl, status.size)
|
||||
status.stopping = self.is_crawl_stopping(crawl, status)
|
||||
|
||||
# check exec time quotas and stop if reached limit
|
||||
if not status.stopping:
|
||||
|
@ -12,6 +12,7 @@ from .utils import (
|
||||
|
||||
curr_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
||||
def test_upload_stream(admin_auth_headers, default_org_id):
|
||||
with open(os.path.join(curr_dir, "..", "test", "data", "example.wacz"), "rb") as fh:
|
||||
r = requests.put(
|
||||
|
@ -20,11 +20,9 @@ spec:
|
||||
oid: "{{ oid }}"
|
||||
scale: {{ scale }}
|
||||
maxCrawlSize: {{ max_crawl_size }}
|
||||
timeout: {{ timeout }}
|
||||
manual: {{ manual }}
|
||||
ttlSecondsAfterFinished: 30
|
||||
|
||||
storageName: "{{ storage_name }}"
|
||||
|
||||
{% if expire_time %}
|
||||
expireTime: "{{ expire_time }}"
|
||||
{% endif %}
|
||||
|
Loading…
Reference in New Issue
Block a user