Execution time tracking tweaks (#1994)
Tweaks to how execution time is tracked for more accuracy + excluding waiting states: - don't update if crawl state is in a 'waiting state' (waiting for capacity or waiting for org limit) - rename start states -> waiting states for clarity - reset lastUpdatedTime if two consecutive updates of non-running state, to ensure non-running states don't count, but also account for occasional hiccups -- if only one update detects non-running state, don't reset - webhooks: move start webhook to when crawl actually starts for first time (db lastUpdatedTime is not yet + crawl is running) - don't set lastUpdatedTime until pods actually running - set crawljob update interval to every 10 seconds for more accurate execution time tracking - frontend: show seconds in 'Execution Time' display
This commit is contained in:
parent
ec29928b28
commit
7fa2b61b29
@ -23,7 +23,7 @@ from .models import (
|
||||
PaginatedCrawlOutResponse,
|
||||
User,
|
||||
StorageRef,
|
||||
RUNNING_AND_STARTING_STATES,
|
||||
RUNNING_AND_WAITING_STATES,
|
||||
SUCCESSFUL_STATES,
|
||||
QARun,
|
||||
UpdatedResponse,
|
||||
@ -272,7 +272,7 @@ class BaseCrawlOps:
|
||||
{
|
||||
"_id": crawl_id,
|
||||
"type": "crawl",
|
||||
"state": {"$in": RUNNING_AND_STARTING_STATES},
|
||||
"state": {"$in": RUNNING_AND_WAITING_STATES},
|
||||
},
|
||||
{"$set": data},
|
||||
)
|
||||
|
@ -44,7 +44,7 @@ from .models import (
|
||||
PaginatedCrawlOutResponse,
|
||||
PaginatedSeedResponse,
|
||||
PaginatedCrawlErrorResponse,
|
||||
RUNNING_AND_STARTING_STATES,
|
||||
RUNNING_AND_WAITING_STATES,
|
||||
SUCCESSFUL_STATES,
|
||||
NON_RUNNING_STATES,
|
||||
ALL_CRAWL_STATES,
|
||||
@ -165,7 +165,7 @@ class CrawlOps(BaseCrawlOps):
|
||||
query["userid"] = userid
|
||||
|
||||
if running_only:
|
||||
query["state"] = {"$in": RUNNING_AND_STARTING_STATES}
|
||||
query["state"] = {"$in": RUNNING_AND_WAITING_STATES}
|
||||
|
||||
# Override running_only if state list is explicitly passed
|
||||
if state:
|
||||
@ -425,7 +425,7 @@ class CrawlOps(BaseCrawlOps):
|
||||
|
||||
state, _ = await self.get_crawl_state(crawl_id, False)
|
||||
|
||||
if state not in RUNNING_AND_STARTING_STATES:
|
||||
if state not in RUNNING_AND_WAITING_STATES:
|
||||
raise HTTPException(status_code=400, detail="crawl_not_running")
|
||||
|
||||
total = 0
|
||||
@ -463,7 +463,7 @@ class CrawlOps(BaseCrawlOps):
|
||||
limit <= next_offset < limit + step"""
|
||||
state, _ = await self.get_crawl_state(crawl_id, False)
|
||||
|
||||
if state not in RUNNING_AND_STARTING_STATES:
|
||||
if state not in RUNNING_AND_WAITING_STATES:
|
||||
raise HTTPException(status_code=400, detail="crawl_not_running")
|
||||
|
||||
total = 0
|
||||
@ -513,7 +513,7 @@ class CrawlOps(BaseCrawlOps):
|
||||
|
||||
crawl = await self.get_crawl(crawl_id, org)
|
||||
|
||||
if crawl.state not in RUNNING_AND_STARTING_STATES:
|
||||
if crawl.state not in RUNNING_AND_WAITING_STATES:
|
||||
raise HTTPException(status_code=400, detail="crawl_not_running")
|
||||
|
||||
cid = crawl.cid
|
||||
@ -591,30 +591,36 @@ class CrawlOps(BaseCrawlOps):
|
||||
"qaCrawlExecSeconds": exec_time,
|
||||
"qa.crawlExecSeconds": exec_time,
|
||||
}
|
||||
field = "qa._lut"
|
||||
else:
|
||||
inc_update = {"crawlExecSeconds": exec_time}
|
||||
field = "_lut"
|
||||
|
||||
res = await self.crawls.find_one_and_update(
|
||||
{
|
||||
"_id": crawl_id,
|
||||
"type": "crawl",
|
||||
"_lut": {"$ne": last_updated_time},
|
||||
field: {"$ne": last_updated_time},
|
||||
},
|
||||
{
|
||||
"$inc": inc_update,
|
||||
"$set": {"_lut": last_updated_time},
|
||||
"$set": {field: last_updated_time},
|
||||
},
|
||||
)
|
||||
return res is not None
|
||||
|
||||
async def get_crawl_exec_last_update_time(
|
||||
self, crawl_id: str
|
||||
self, crawl_id: str, is_qa: bool
|
||||
) -> Optional[datetime]:
|
||||
"""get crawl last updated time"""
|
||||
field = "_lut" if not is_qa else "qa._lut"
|
||||
res = await self.crawls.find_one(
|
||||
{"_id": crawl_id, "type": "crawl"}, projection=["_lut"]
|
||||
{"_id": crawl_id, "type": "crawl"}, projection=[field]
|
||||
)
|
||||
return res and res.get("_lut")
|
||||
if not res:
|
||||
return None
|
||||
|
||||
return res.get("qa", {}).get("_lut") if is_qa else res.get("_lut")
|
||||
|
||||
async def get_crawl_state(
|
||||
self, crawl_id: str, is_qa: bool
|
||||
|
@ -208,8 +208,8 @@ TYPE_RUNNING_STATES = Literal[
|
||||
]
|
||||
RUNNING_STATES = get_args(TYPE_RUNNING_STATES)
|
||||
|
||||
TYPE_STARTING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
|
||||
STARTING_STATES = get_args(TYPE_STARTING_STATES)
|
||||
TYPE_WAITING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
|
||||
WAITING_STATES = get_args(TYPE_WAITING_STATES)
|
||||
|
||||
TYPE_FAILED_STATES = Literal[
|
||||
"canceled",
|
||||
@ -228,8 +228,8 @@ TYPE_SUCCESSFUL_STATES = Literal[
|
||||
]
|
||||
SUCCESSFUL_STATES = get_args(TYPE_SUCCESSFUL_STATES)
|
||||
|
||||
TYPE_RUNNING_AND_STARTING_STATES = Literal[TYPE_STARTING_STATES, TYPE_RUNNING_STATES]
|
||||
RUNNING_AND_STARTING_STATES = [*STARTING_STATES, *RUNNING_STATES]
|
||||
TYPE_RUNNING_AND_WAITING_STATES = Literal[TYPE_WAITING_STATES, TYPE_RUNNING_STATES]
|
||||
RUNNING_AND_WAITING_STATES = [*WAITING_STATES, *RUNNING_STATES]
|
||||
|
||||
RUNNING_AND_STARTING_ONLY = ["starting", *RUNNING_STATES]
|
||||
|
||||
@ -237,9 +237,9 @@ TYPE_NON_RUNNING_STATES = Literal[TYPE_FAILED_STATES, TYPE_SUCCESSFUL_STATES]
|
||||
NON_RUNNING_STATES = [*FAILED_STATES, *SUCCESSFUL_STATES]
|
||||
|
||||
TYPE_ALL_CRAWL_STATES = Literal[
|
||||
TYPE_RUNNING_AND_STARTING_STATES, TYPE_NON_RUNNING_STATES
|
||||
TYPE_RUNNING_AND_WAITING_STATES, TYPE_NON_RUNNING_STATES
|
||||
]
|
||||
ALL_CRAWL_STATES = [*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES]
|
||||
ALL_CRAWL_STATES = [*RUNNING_AND_WAITING_STATES, *NON_RUNNING_STATES]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
|
@ -19,8 +19,9 @@ from btrixcloud.models import (
|
||||
TYPE_ALL_CRAWL_STATES,
|
||||
NON_RUNNING_STATES,
|
||||
RUNNING_STATES,
|
||||
WAITING_STATES,
|
||||
RUNNING_AND_STARTING_ONLY,
|
||||
RUNNING_AND_STARTING_STATES,
|
||||
RUNNING_AND_WAITING_STATES,
|
||||
SUCCESSFUL_STATES,
|
||||
FAILED_STATES,
|
||||
CrawlStats,
|
||||
@ -119,6 +120,7 @@ class CrawlOperator(BaseOperator):
|
||||
"""sync crawls"""
|
||||
|
||||
status = CrawlStatus(**data.parent.get("status", {}))
|
||||
status.last_state = status.state
|
||||
|
||||
spec = data.parent.get("spec", {})
|
||||
crawl_id = spec["id"]
|
||||
@ -250,11 +252,6 @@ class CrawlOperator(BaseOperator):
|
||||
|
||||
else:
|
||||
status.scale = 1
|
||||
now = dt_now()
|
||||
await self.crawl_ops.inc_crawl_exec_time(
|
||||
crawl.db_crawl_id, crawl.is_qa, 0, now
|
||||
)
|
||||
status.lastUpdatedTime = to_k8s_date(now)
|
||||
|
||||
children = self._load_redis(params, status, data.children)
|
||||
|
||||
@ -807,25 +804,13 @@ class CrawlOperator(BaseOperator):
|
||||
status.resync_after = self.fast_retry_secs
|
||||
return status
|
||||
|
||||
# if true (state is set), also run webhook
|
||||
if await self.set_state(
|
||||
# ensure running state is set
|
||||
await self.set_state(
|
||||
"running",
|
||||
status,
|
||||
crawl,
|
||||
allowed_from=["starting", "waiting_capacity"],
|
||||
):
|
||||
if not crawl.qa_source_crawl_id:
|
||||
self.run_task(
|
||||
self.event_webhook_ops.create_crawl_started_notification(
|
||||
crawl.id, crawl.oid, scheduled=crawl.scheduled
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.run_task(
|
||||
self.event_webhook_ops.create_qa_analysis_started_notification(
|
||||
crawl.id, crawl.oid, crawl.qa_source_crawl_id
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# update lastActiveTime if crawler is running
|
||||
if crawler_running:
|
||||
@ -967,11 +952,33 @@ class CrawlOperator(BaseOperator):
|
||||
"""inc exec time tracking"""
|
||||
now = dt_now()
|
||||
|
||||
# don't count time crawl is not running
|
||||
if status.state in WAITING_STATES:
|
||||
# reset lastUpdatedTime if at least 2 consecutive updates of non-running state
|
||||
if status.last_state in WAITING_STATES:
|
||||
status.lastUpdatedTime = to_k8s_date(now)
|
||||
return
|
||||
|
||||
update_start_time = await self.crawl_ops.get_crawl_exec_last_update_time(
|
||||
crawl.db_crawl_id
|
||||
crawl.db_crawl_id, crawl.is_qa
|
||||
)
|
||||
|
||||
if not update_start_time:
|
||||
print("Crawl first started, webhooks called", now, crawl.id)
|
||||
# call initial running webhook
|
||||
if not crawl.qa_source_crawl_id:
|
||||
self.run_task(
|
||||
self.event_webhook_ops.create_crawl_started_notification(
|
||||
crawl.id, crawl.oid, scheduled=crawl.scheduled
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.run_task(
|
||||
self.event_webhook_ops.create_qa_analysis_started_notification(
|
||||
crawl.id, crawl.oid, crawl.qa_source_crawl_id
|
||||
)
|
||||
)
|
||||
|
||||
await self.crawl_ops.inc_crawl_exec_time(
|
||||
crawl.db_crawl_id, crawl.is_qa, 0, now
|
||||
)
|
||||
@ -1414,7 +1421,7 @@ class CrawlOperator(BaseOperator):
|
||||
|
||||
finished = dt_now()
|
||||
|
||||
allowed_from = RUNNING_AND_STARTING_STATES
|
||||
allowed_from = RUNNING_AND_WAITING_STATES
|
||||
|
||||
# if set_state returns false, already set to same status, return
|
||||
if not await self.set_state(
|
||||
|
@ -223,3 +223,6 @@ class CrawlStatus(BaseModel):
|
||||
|
||||
# don't include in status, use by metacontroller
|
||||
resync_after: Optional[int] = Field(default=None, exclude=True)
|
||||
|
||||
# last state
|
||||
last_state: TYPE_ALL_CRAWL_STATES = Field(default="starting", exclude=True)
|
||||
|
@ -27,7 +27,7 @@ from aiostream import stream
|
||||
from .models import (
|
||||
SUCCESSFUL_STATES,
|
||||
RUNNING_STATES,
|
||||
STARTING_STATES,
|
||||
WAITING_STATES,
|
||||
BaseCrawl,
|
||||
Organization,
|
||||
StorageRef,
|
||||
@ -890,7 +890,7 @@ class OrgOps:
|
||||
{"oid": org.id, "state": {"$in": RUNNING_STATES}}
|
||||
)
|
||||
workflows_queued_count = await self.crawls_db.count_documents(
|
||||
{"oid": org.id, "state": {"$in": STARTING_STATES}}
|
||||
{"oid": org.id, "state": {"$in": WAITING_STATES}}
|
||||
)
|
||||
collections_count = await self.colls_db.count_documents({"oid": org.id})
|
||||
public_collections_count = await self.colls_db.count_documents(
|
||||
|
@ -5,7 +5,7 @@ metadata:
|
||||
name: crawljobs-operator
|
||||
spec:
|
||||
generateSelector: false
|
||||
resyncPeriodSeconds: {{ .Values.operator_resync_seconds | default 30 }}
|
||||
resyncPeriodSeconds: {{ .Values.operator_resync_seconds | default 10 }}
|
||||
parentResource:
|
||||
apiVersion: btrix.cloud/v1
|
||||
resource: crawljobs
|
||||
|
@ -802,6 +802,7 @@ export class ArchivedItemDetail extends TailwindElement {
|
||||
? html`<span
|
||||
>${humanizeExecutionSeconds(
|
||||
this.crawl!.crawlExecSeconds,
|
||||
{ displaySeconds: true },
|
||||
)}</span
|
||||
>`
|
||||
: html`<span class="text-0-400">${msg("Pending")}</span>`}
|
||||
|
Loading…
Reference in New Issue
Block a user