Compute crawl execution time in operator (#1256)

* store execution time in operator:
- rename isNewCrash -> isNewExit, crashTime -> exitTime
- keep track of exitCode
- add execTime counter, increment when state has a 'finishedAt' and 'startedAt' state
- ensure pods are complete before deleting
- store 'crawlExecSeconds' on crawl and org levels, add to Crawl, CrawlOut, Organization models

* support for fast cancel:
- set redis ':canceled' key to immediately cancel crawl
- delete crawl pods to ensure pod exits immediately
- in finalizer, don't wait for pods to complete when canceling (but still check if terminated)
- add currentTime in pod.status.running.startedAt times for all existing pods
- logging: log exec time, missing finishedAt
- logging: don't log exit code 11 (interrupt due to time/size limits) as a crash

* don't wait for pods completed on failed with existing browsertrix-crawler image

---------
Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
Ilya Kreymer 2023-10-09 17:45:00 -07:00 committed by GitHub
parent 748c86700d
commit 5cad9acee9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 175 additions and 36 deletions

View File

@ -521,6 +521,13 @@ class CrawlOps(BaseCrawlOps):
query = {"_id": crawl_id, "type": "crawl", "state": "running"}
return await self.crawls.find_one_and_update(query, {"$set": {"stats": stats}})
async def store_exec_time(self, crawl_id, exec_time):
"""set exec time, only if not already set"""
query = {"_id": crawl_id, "type": "crawl", "execTime": {"$in": [0, None]}}
return await self.crawls.find_one_and_update(
query, {"$set": {"execTime": exec_time}}
)
async def get_crawl_state(self, crawl_id):
"""return current crawl state of a crawl"""
res = await self.crawls.find_one(

View File

@ -374,6 +374,8 @@ class CrawlOut(BaseMongoModel):
collectionIds: Optional[List[UUID4]] = []
crawlExecSeconds: int = 0
# automated crawl fields
config: Optional[RawCrawlConfig]
cid: Optional[UUID4]
@ -441,6 +443,8 @@ class Crawl(BaseCrawl, CrawlConfigCore):
stopping: Optional[bool] = False
crawlExecSeconds: int = 0
# ============================================================================
class CrawlCompleteIn(BaseModel):
@ -666,6 +670,7 @@ class Organization(BaseMongoModel):
storage: Union[S3Storage, DefaultStorage]
usage: Dict[str, int] = {}
crawlExecSeconds: Dict[str, int] = {}
bytesStored: int = 0
bytesStoredCrawls: int = 0
@ -713,6 +718,7 @@ class Organization(BaseMongoModel):
if not self.is_crawler(user):
exclude.add("usage")
exclude.add("crawlExecSeconds")
result = self.to_dict(
exclude_unset=True,
@ -747,6 +753,7 @@ class OrgOut(BaseMongoModel):
name: str
users: Optional[Dict[str, Any]]
usage: Optional[Dict[str, int]]
crawlExecSeconds: Optional[Dict[str, int]]
default: bool = False
bytesStored: int
bytesStoredCrawls: int

View File

@ -132,8 +132,9 @@ class PodResources(BaseModel):
class PodInfo(BaseModel):
"""Aggregate pod status info held in CrawlJob"""
crashTime: Optional[str] = None
isNewCrash: Optional[bool] = Field(default=None, exclude=True)
exitTime: Optional[str] = None
exitCode: Optional[int] = None
isNewExit: Optional[bool] = Field(default=None, exclude=True)
reason: Optional[str] = None
allocated: PodResources = PodResources()
@ -211,13 +212,15 @@ class CrawlStatus(BaseModel):
lambda: PodInfo() # pylint: disable=unnecessary-lambda
)
restartTime: Optional[str]
execTime: int = 0
canceled: bool = False
# don't include in status, use by metacontroller
resync_after: Optional[int] = None
# ============================================================================
# pylint: disable=too-many-statements, too-many-public-methods, too-many-branches
# pylint: disable=too-many-statements, too-many-public-methods, too-many-branches, too-many-nested-blocks
# pylint: disable=too-many-instance-attributes, too-many-locals, too-many-lines, too-many-arguments
class BtrixOperator(K8sAPI):
"""BtrixOperator Handler"""
@ -331,15 +334,17 @@ class BtrixOperator(K8sAPI):
if data.finalizing:
if not status.finished:
# if can't cancel, already finished
if not await self.mark_finished(
crawl_id, uuid.UUID(cid), uuid.UUID(oid), status, "canceled"
if not await self.cancel_crawl(
crawl_id, uuid.UUID(cid), uuid.UUID(oid), status, data.children[POD]
):
# instead of fetching the state (that was already set)
# return exception to ignore this request, keep previous
# finished state
raise HTTPException(status_code=400, detail="out_of_sync_status")
return self.finalize_response(crawl_id, status, spec, data.children, params)
return await self.finalize_response(
crawl_id, uuid.UUID(oid), status, spec, data.children, params
)
# just in case, finished but not deleted, can only get here if
# do_crawl_finished_tasks() doesn't reach the end or taking too long
@ -348,7 +353,9 @@ class BtrixOperator(K8sAPI):
f"warn crawl {crawl_id} finished but not deleted, post-finish taking too long?"
)
asyncio.create_task(self.delete_crawl_job(crawl_id))
return self.finalize_response(crawl_id, status, spec, data.children, params)
return await self.finalize_response(
crawl_id, uuid.UUID(oid), status, spec, data.children, params
)
try:
configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"]
@ -407,8 +414,8 @@ class BtrixOperator(K8sAPI):
self.handle_auto_size(crawl.id, status.podStatus)
if status.finished:
return self.finalize_response(
crawl_id, status, spec, data.children, params
return await self.finalize_response(
crawl_id, uuid.UUID(oid), status, spec, data.children, params
)
else:
status.scale = crawl.scale
@ -654,6 +661,43 @@ class BtrixOperator(K8sAPI):
)
return False
async def cancel_crawl(self, crawl_id, cid, oid, status, pods):
"""Mark crawl as canceled"""
if not await self.mark_finished(crawl_id, cid, oid, status, "canceled"):
return False
await self.mark_for_cancelation(crawl_id)
if not status.canceled:
cancel_time = datetime.utcnow()
for name, pod in pods.items():
pstatus = pod["status"]
role = pod["metadata"]["labels"]["role"]
if role != "crawler":
continue
if "containerStatuses" not in pstatus:
continue
cstatus = pstatus["containerStatuses"][0]
running = cstatus["state"].get("running")
if running:
self.inc_exec_time(
name, status, cancel_time, running.get("startedAt")
)
self.handle_terminated_pod(
name, role, status, cstatus["state"].get("terminated")
)
status.canceled = True
return status.canceled
async def fail_crawl(self, crawl_id, cid, status, pods, stats=None):
"""Mark crawl as failed, log crawl state and print crawl logs, if possible"""
prev_state = status.state
@ -683,16 +727,29 @@ class BtrixOperator(K8sAPI):
"children": [],
}
def finalize_response(self, crawl_id, status, spec, children, params):
async def finalize_response(
self,
crawl_id: str,
oid: uuid.UUID,
status: CrawlStatus,
spec: dict,
children: dict,
params: dict,
):
"""ensure crawl id ready for deletion"""
redis_pod = f"redis-{crawl_id}"
new_children = []
finalized = False
if redis_pod in children[POD]:
exec_updated = False
pods = children[POD]
if redis_pod in pods:
# if has other pods, keep redis pod until they are removed
if len(children[POD]) > 1:
if len(pods) > 1:
new_children = self._load_redis(params, status, children)
# keep pvs until pods are removed
@ -700,6 +757,9 @@ class BtrixOperator(K8sAPI):
new_children.extend(list(children[PVC].values()))
if not children[POD] and not children[PVC]:
# ensure exec time was successfully updated
exec_updated = await self.store_exec_time(crawl_id, oid, status.execTime)
# keep parent until ttl expired, if any
if status.finished:
ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL)
@ -713,7 +773,7 @@ class BtrixOperator(K8sAPI):
return {
"status": status.dict(exclude_none=True, exclude={"resync_after": True}),
"children": new_children,
"finalized": finalized,
"finalized": finalized and exec_updated,
}
async def _get_redis(self, redis_url):
@ -735,7 +795,7 @@ class BtrixOperator(K8sAPI):
async def sync_crawl_state(self, redis_url, crawl, status, pods, metrics):
"""sync crawl state for running crawl"""
# check if at least one crawler pod started running
crawler_running, redis_running = self.sync_pod_status(pods, status)
crawler_running, redis_running, done = self.sync_pod_status(pods, status)
redis = None
try:
@ -809,7 +869,7 @@ class BtrixOperator(K8sAPI):
status.filesAddedSize = int(await redis.get("filesAddedSize") or 0)
# update stats and get status
return await self.update_crawl_state(redis, crawl, status, pods)
return await self.update_crawl_state(redis, crawl, status, pods, done)
# pylint: disable=broad-except
except Exception as exc:
@ -825,6 +885,7 @@ class BtrixOperator(K8sAPI):
"""check status of pods"""
crawler_running = False
redis_running = False
done = True
try:
for name, pod in pods.items():
running = False
@ -848,30 +909,50 @@ class BtrixOperator(K8sAPI):
):
running = True
terminated = cstatus["state"].get("terminated")
exit_code = terminated and terminated.get("exitCode")
if terminated and exit_code:
crash_time = terminated.get("finishedAt")
pod_status = status.podStatus[name]
pod_status.isNewCrash = pod_status.crashTime != crash_time
pod_status.crashTime = crash_time
# detect reason
if terminated.get("reason") == "OOMKilled" or exit_code == 137:
pod_status.reason = "oom"
else:
pod_status.reason = "interrupt: " + str(exit_code)
self.handle_terminated_pod(
name, role, status, cstatus["state"].get("terminated")
)
if role == "crawler":
crawler_running = crawler_running or running
done = done and phase == "Succeeded"
elif role == "redis":
redis_running = redis_running or running
# pylint: disable=broad-except
except Exception as exc:
done = False
print(exc)
return crawler_running, redis_running
return crawler_running, redis_running, done
def handle_terminated_pod(self, name, role, status, terminated):
"""handle terminated pod state"""
if not terminated:
return
exit_time = terminated.get("finishedAt")
if not exit_time:
print("warn: terminated pod missing finishedAt", flush=True)
return
pod_status = status.podStatus[name]
pod_status.isNewExit = pod_status.exitTime != exit_time
if pod_status.isNewExit and role == "crawler":
self.inc_exec_time(name, status, exit_time, terminated.get("startedAt"))
pod_status.exitTime = exit_time
# detect reason
exit_code = terminated.get("exitCode")
pod_status.reason = "done"
if exit_code == 0:
pod_status.reason = "done"
elif terminated.get("reason") == "OOMKilled" or exit_code == 137:
pod_status.reason = "oom"
else:
pod_status.reason = "interrupt: " + str(exit_code)
def should_mark_waiting(self, state, started):
"""Should the crawl be marked as waiting for capacity?"""
@ -912,7 +993,7 @@ class BtrixOperator(K8sAPI):
"""auto scale pods here, experimental"""
for name, pod in pod_status.items():
# if pod crashed due to OOM, increase mem
# if pod.isNewCrash and pod.reason == "oom":
# if pod.isNewExit and pod.reason == "oom":
# pod.newMemory = int(float(pod.allocated.memory) * 1.2)
# print(f"Resizing pod {name} -> mem {pod.newMemory} - OOM Detected")
@ -924,7 +1005,10 @@ class BtrixOperator(K8sAPI):
async def log_crashes(self, crawl_id, pod_status, redis):
"""report/log any pod crashes here"""
for name, pod in pod_status.items():
if not pod.isNewCrash:
# log only unexpected exits as crashes
# - 0 is success / intended shutdown
# - 11 is interrupt / intended restart
if not pod.isNewExit or pod.exitCode in (0, 11):
continue
log = self.get_log_line(
@ -991,7 +1075,7 @@ class BtrixOperator(K8sAPI):
return False
async def update_crawl_state(self, redis, crawl, status, pods):
async def update_crawl_state(self, redis, crawl, status, pods, done):
"""update crawl state and check if crawl is now done"""
results = await redis.hgetall(f"{crawl.id}:status")
stats, sizes = await get_redis_crawl_stats(redis, crawl.id)
@ -1034,7 +1118,7 @@ class BtrixOperator(K8sAPI):
status_count[res] = status_count.get(res, 0) + 1
# check if all crawlers are done
if status_count.get("done", 0) >= crawl.scale:
if done and status_count.get("done", 0) >= crawl.scale:
# check if one-page crawls actually succeeded
# if only one page found, and no files, assume failed
if status.pagesFound == 1 and not status.filesAdded:
@ -1138,6 +1222,32 @@ class BtrixOperator(K8sAPI):
# finally, delete job
await self.delete_crawl_job(crawl_id)
def inc_exec_time(self, name, status, finished_at, started_at):
"""increment execTime on pod status"""
end_time = (
from_k8s_date(finished_at)
if not isinstance(finished_at, datetime)
else finished_at
)
start_time = from_k8s_date(started_at)
exec_time = int((end_time - start_time).total_seconds())
status.execTime += exec_time
print(f"{name} exec time: {exec_time}")
return exec_time
async def store_exec_time(self, crawl_id, oid, exec_time):
"""store execTime in crawl (if not already set), and increment org counter"""
try:
if await self.crawl_ops.store_exec_time(crawl_id, exec_time):
print(f"Exec Time: {exec_time}", flush=True)
await self.org_ops.inc_org_time_stats(oid, exec_time, True)
return True
# pylint: disable=broad-except
except Exception as exc:
print(exc, flush=True)
return False
async def inc_crawl_complete_stats(self, crawl, finished):
"""Increment Crawl Stats"""
@ -1147,7 +1257,21 @@ class BtrixOperator(K8sAPI):
print(f"Duration: {duration}", flush=True)
await self.org_ops.inc_org_stats(crawl.oid, duration)
await self.org_ops.inc_org_time_stats(crawl.oid, duration)
async def mark_for_cancelation(self, crawl_id):
"""mark crawl as canceled in redis"""
try:
redis_url = self.get_redis_url(crawl_id)
redis = await self._get_redis(redis_url)
if not redis:
return False
await redis.set(f"{crawl_id}:canceled", "1")
return True
finally:
if redis:
await redis.close()
async def add_crawl_errors_to_db(self, crawl_id, inc=100):
"""Pull crawl errors from redis and write to mongo db"""

View File

@ -320,12 +320,13 @@ class OrgOps:
{"_id": org.id}, {"$set": {"origin": origin}}
)
async def inc_org_stats(self, oid, duration):
async def inc_org_time_stats(self, oid, duration, is_exec_time=False):
"""inc crawl duration stats for org oid"""
# init org crawl stats
key = "crawlExecSeconds" if is_exec_time else "usage"
yymm = datetime.utcnow().strftime("%Y-%m")
await self.orgs.find_one_and_update(
{"_id": oid}, {"$inc": {f"usage.{yymm}": duration}}
{"_id": oid}, {"$inc": {f"{key}.{yymm}": duration}}
)
async def get_max_concurrent_crawls(self, oid):