From b4fd5e6e944db490b56f25aec72eb31c1024c882 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 6 Nov 2023 16:32:58 -0800 Subject: [PATCH] Crawl Timeout via elapsed time (#1338) Fixes #1337 Crawl timeout is tracked via `elapsedCrawlTime` field on the crawl status, which is similar to regular crawl execution time, but only counts one pod if scale > 1. If scale == 1, this time is equivalent. Crawl is gracefully stopped when the elapsed execution time exceeds the timeout. For more responsiveness, also adding current crawl time since last update interval. Details: - handle crawl timeout via elapsed crawl time - longest running time of a single pod, instead of expire time. - include current running from last update for best precision - more accurately count elapsed time crawl is actually running - store elapsedCrawlTime in addition to crawlExecTime, storing the longest duration of each pod since last test interval --------- Co-authored-by: Tessa Walsh --- backend/btrixcloud/k8sapi.py | 11 +----- backend/btrixcloud/operator.py | 39 ++++++++++++++------ backend/test_nightly/test_upload_replicas.py | 1 + chart/app-templates/crawl_job.yaml | 4 +- 4 files changed, 31 insertions(+), 24 deletions(-) diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index 4899eb8f..21c9674f 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -2,8 +2,6 @@ import os import traceback -from datetime import timedelta - import yaml from kubernetes_asyncio import client, config @@ -18,7 +16,7 @@ from redis import asyncio as aioredis from fastapi import HTTPException from fastapi.templating import Jinja2Templates -from .utils import get_templates_dir, dt_now, to_k8s_date +from .utils import get_templates_dir, dt_now # ============================================================================ @@ -85,11 +83,6 @@ class K8sAPI: crawl_id=None, ): """load job template from yaml""" - if crawl_timeout: - crawl_expire_time = to_k8s_date(dt_now() + timedelta(seconds=crawl_timeout)) - else: - crawl_expire_time = "" - if not crawl_id: ts_now = dt_now().strftime("%Y%m%d%H%M%S") prefix = "manual" if manual else "sched" @@ -101,7 +94,7 @@ class K8sAPI: "oid": oid, "userid": userid, "scale": scale, - "expire_time": crawl_expire_time or 0, + "timeout": crawl_timeout, "max_crawl_size": max_crawl_size or 0, "storage_name": str(storage), "manual": "1" if manual else "0", diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index ff7d3b3f..bed99a90 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -8,7 +8,6 @@ from typing import Optional, DefaultDict, TYPE_CHECKING from collections import defaultdict -from datetime import datetime import json from uuid import UUID from fastapi import HTTPException @@ -114,8 +113,8 @@ class CrawlSpec(BaseModel): started: str stopping: bool = False scheduled: bool = False - expire_time: Optional[datetime] = None - max_crawl_size: Optional[int] = None + timeout: int = 0 + max_crawl_size: int = 0 # ============================================================================ @@ -232,9 +231,15 @@ class CrawlStatus(BaseModel): restartTime: Optional[str] canceled: bool = False - # Execution Time -- updated on pod exits and at regular interval + # updated on pod exits and at regular interval + # Crawl Execution Time -- time all crawler pods have been running + # used to track resource usage and enforce execution minutes limit crawlExecTime: int = 0 + # Elapsed Exec Time -- time crawl has been running in at least one pod + # used for crawl timeouts + elapsedCrawlTime: int = 0 + # last exec time update lastUpdatedTime: str = "" @@ -440,7 +445,7 @@ class BtrixOperator(K8sAPI): scale=spec.get("scale", 1), started=data.parent["metadata"]["creationTimestamp"], stopping=spec.get("stopping", False), - expire_time=from_k8s_date(spec.get("expireTime")), + timeout=spec.get("timeout") or 0, max_crawl_size=int(spec.get("maxCrawlSize") or 0), scheduled=spec.get("manual") != "1", ) @@ -1081,6 +1086,7 @@ class BtrixOperator(K8sAPI): return exec_time = 0 + max_duration = 0 print( f"Exec Time Update: {reason}: {now} - {update_start_time} = {update_duration}" ) @@ -1131,11 +1137,13 @@ class BtrixOperator(K8sAPI): f" - {name}: {pod_state}: {end_time} - {start_time} = {duration}" ) exec_time += duration + max_duration = max(duration, max_duration) if exec_time: await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time) await self.org_ops.inc_org_time_stats(oid, exec_time, True) status.crawlExecTime += exec_time + status.elapsedCrawlTime += max_duration print( f" Exec Time Total: {status.crawlExecTime}, Incremented By: {exec_time}", @@ -1254,7 +1262,7 @@ class BtrixOperator(K8sAPI): return True - def is_crawl_stopping(self, crawl, size): + def is_crawl_stopping(self, crawl: CrawlSpec, status: CrawlStatus) -> bool: """return true if crawl should begin graceful stopping phase""" # if user requested stop, then enter stopping phase @@ -1262,12 +1270,19 @@ class BtrixOperator(K8sAPI): print("Graceful Stop: User requested stop") return True - # check crawl expiry - if crawl.expire_time and dt_now() > crawl.expire_time: - print(f"Graceful Stop: Job duration expired at {crawl.expire_time}") - return True + # check timeout if timeout time exceeds elapsed time + if crawl.timeout: + elapsed = ( + status.elapsedCrawlTime + + (dt_now() - from_k8s_date(status.lastUpdatedTime)).total_seconds() + ) + if elapsed > crawl.timeout: + print( + f"Graceful Stop: Crawl running time exceeded {crawl.timeout} second timeout" + ) + return True - if crawl.max_crawl_size and size > crawl.max_crawl_size: + if crawl.max_crawl_size and status.size > crawl.max_crawl_size: print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit") return True @@ -1311,7 +1326,7 @@ class BtrixOperator(K8sAPI): pod_info = status.podStatus[key] pod_info.used.storage = value - status.stopping = self.is_crawl_stopping(crawl, status.size) + status.stopping = self.is_crawl_stopping(crawl, status) # check exec time quotas and stop if reached limit if not status.stopping: diff --git a/backend/test_nightly/test_upload_replicas.py b/backend/test_nightly/test_upload_replicas.py index 422d10ce..540d2f9e 100644 --- a/backend/test_nightly/test_upload_replicas.py +++ b/backend/test_nightly/test_upload_replicas.py @@ -12,6 +12,7 @@ from .utils import ( curr_dir = os.path.dirname(os.path.realpath(__file__)) + def test_upload_stream(admin_auth_headers, default_org_id): with open(os.path.join(curr_dir, "..", "test", "data", "example.wacz"), "rb") as fh: r = requests.put( diff --git a/chart/app-templates/crawl_job.yaml b/chart/app-templates/crawl_job.yaml index d8ff3480..e09f53f5 100644 --- a/chart/app-templates/crawl_job.yaml +++ b/chart/app-templates/crawl_job.yaml @@ -20,11 +20,9 @@ spec: oid: "{{ oid }}" scale: {{ scale }} maxCrawlSize: {{ max_crawl_size }} + timeout: {{ timeout }} manual: {{ manual }} ttlSecondsAfterFinished: 30 storageName: "{{ storage_name }}" - {% if expire_time %} - expireTime: "{{ expire_time }}" - {% endif %}