add crawl ending states: 'generate-wacz', 'uploading-wacz', 'pending-wait' that occur after a crawl is finished or is being stopped (#1022)
operator: ensure transitions from each of these states is supported, including to 'waiting_capacity' add extra check on stopping to avoid transitioning back to a running state after crawl is finished ui: add states to UI display, localization, add as active states fixes #263
This commit is contained in:
parent
d848502f84
commit
06cf9c7cc3
@ -37,6 +37,8 @@ SUCCESSFUL_STATES = ("complete", "partial_complete")
|
|||||||
|
|
||||||
RUNNING_AND_STARTING_STATES = (*STARTING_STATES, *RUNNING_STATES)
|
RUNNING_AND_STARTING_STATES = (*STARTING_STATES, *RUNNING_STATES)
|
||||||
|
|
||||||
|
RUNNING_AND_STARTING_ONLY = ("starting", *RUNNING_STATES)
|
||||||
|
|
||||||
NON_RUNNING_STATES = (*FAILED_STATES, *SUCCESSFUL_STATES)
|
NON_RUNNING_STATES = (*FAILED_STATES, *SUCCESSFUL_STATES)
|
||||||
|
|
||||||
ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES)
|
ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES)
|
||||||
|
@ -563,7 +563,7 @@ async def update_crawl_state_if_allowed(
|
|||||||
kwargs["state"] = state
|
kwargs["state"] = state
|
||||||
query = {"_id": crawl_id, "type": "crawl"}
|
query = {"_id": crawl_id, "type": "crawl"}
|
||||||
if allowed_from:
|
if allowed_from:
|
||||||
query["state"] = {"$in": allowed_from}
|
query["state"] = {"$in": list(allowed_from)}
|
||||||
|
|
||||||
return await crawls.find_one_and_update(query, {"$set": kwargs})
|
return await crawls.find_one_and_update(query, {"$set": kwargs})
|
||||||
|
|
||||||
|
@ -25,7 +25,12 @@ from .k8sapi import K8sAPI
|
|||||||
|
|
||||||
from .db import init_db
|
from .db import init_db
|
||||||
from .orgs import inc_org_stats, get_max_concurrent_crawls
|
from .orgs import inc_org_stats, get_max_concurrent_crawls
|
||||||
from .basecrawls import NON_RUNNING_STATES, SUCCESSFUL_STATES
|
from .basecrawls import (
|
||||||
|
NON_RUNNING_STATES,
|
||||||
|
RUNNING_STATES,
|
||||||
|
RUNNING_AND_STARTING_ONLY,
|
||||||
|
SUCCESSFUL_STATES,
|
||||||
|
)
|
||||||
from .colls import add_successful_crawl_to_collections
|
from .colls import add_successful_crawl_to_collections
|
||||||
from .crawlconfigs import stats_recompute_last
|
from .crawlconfigs import stats_recompute_last
|
||||||
from .crawls import (
|
from .crawls import (
|
||||||
@ -222,6 +227,9 @@ class BtrixOperator(K8sAPI):
|
|||||||
if has_crawl_children:
|
if has_crawl_children:
|
||||||
pods = data.related[POD]
|
pods = data.related[POD]
|
||||||
status = await self.sync_crawl_state(redis_url, crawl, status, pods)
|
status = await self.sync_crawl_state(redis_url, crawl, status, pods)
|
||||||
|
if crawl.stopping:
|
||||||
|
await self.check_if_finished(crawl, status)
|
||||||
|
|
||||||
if status.finished:
|
if status.finished:
|
||||||
return await self.handle_finished_delete_if_needed(
|
return await self.handle_finished_delete_if_needed(
|
||||||
crawl_id, status, spec
|
crawl_id, status, spec
|
||||||
@ -465,21 +473,34 @@ class BtrixOperator(K8sAPI):
|
|||||||
except:
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def check_if_finished(self, crawl, status):
|
||||||
|
"""set fast resync, unless crawl already finished"""
|
||||||
|
actual_state, finished = await get_crawl_state(self.crawls, crawl.id)
|
||||||
|
|
||||||
|
# stopping or finished, keep existing state
|
||||||
|
if actual_state in NON_RUNNING_STATES or finished:
|
||||||
|
# don't resync
|
||||||
|
status.state = actual_state
|
||||||
|
status.finished = to_k8s_date(finished)
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
async def sync_crawl_state(self, redis_url, crawl, status, pods):
|
async def sync_crawl_state(self, redis_url, crawl, status, pods):
|
||||||
"""sync crawl state for running crawl"""
|
"""sync crawl state for running crawl"""
|
||||||
# check if at least one pod started running
|
# check if at least one pod started running
|
||||||
if not await self.check_if_pods_running(pods):
|
if not self.check_if_pods_running(pods):
|
||||||
if self.should_mark_waiting(status.state, crawl.started):
|
if self.should_mark_waiting(status.state, crawl.started):
|
||||||
await self.set_state(
|
await self.set_state(
|
||||||
"waiting_capacity",
|
"waiting_capacity",
|
||||||
status,
|
status,
|
||||||
crawl.id,
|
crawl.id,
|
||||||
allowed_from=["starting", "running"],
|
allowed_from=RUNNING_AND_STARTING_ONLY,
|
||||||
)
|
)
|
||||||
|
|
||||||
status.initRedis = False
|
status.initRedis = False
|
||||||
|
|
||||||
# resync after N seconds
|
# if still running, resync after N seconds
|
||||||
status.resync_after = self.fast_retry_secs
|
status.resync_after = self.fast_retry_secs
|
||||||
return status
|
return status
|
||||||
|
|
||||||
@ -487,14 +508,18 @@ class BtrixOperator(K8sAPI):
|
|||||||
|
|
||||||
redis = await self._get_redis(redis_url)
|
redis = await self._get_redis(redis_url)
|
||||||
if not redis:
|
if not redis:
|
||||||
# resync after N seconds, until redis is inited
|
# if still running, resync after N seconds
|
||||||
status.resync_after = self.fast_retry_secs
|
status.resync_after = self.fast_retry_secs
|
||||||
return status
|
return status
|
||||||
|
|
||||||
# set state to running (if not already)
|
# set state to running (if not already)
|
||||||
await self.set_state(
|
if status.state not in RUNNING_STATES:
|
||||||
"running", status, crawl.id, allowed_from=["starting", "waiting_capacity"]
|
await self.set_state(
|
||||||
)
|
"running",
|
||||||
|
status,
|
||||||
|
crawl.id,
|
||||||
|
allowed_from=["starting", "waiting_capacity"],
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
file_done = await redis.lpop(self.done_key)
|
file_done = await redis.lpop(self.done_key)
|
||||||
@ -522,7 +547,7 @@ class BtrixOperator(K8sAPI):
|
|||||||
print(f"Crawl get failed: {exc}, will try again")
|
print(f"Crawl get failed: {exc}, will try again")
|
||||||
return status
|
return status
|
||||||
|
|
||||||
async def check_if_pods_running(self, pods):
|
def check_if_pods_running(self, pods):
|
||||||
"""check if at least one crawler pod has started"""
|
"""check if at least one crawler pod has started"""
|
||||||
try:
|
try:
|
||||||
for pod in pods.values():
|
for pod in pods.values():
|
||||||
@ -550,7 +575,7 @@ class BtrixOperator(K8sAPI):
|
|||||||
|
|
||||||
def should_mark_waiting(self, state, started):
|
def should_mark_waiting(self, state, started):
|
||||||
"""Should the crawl be marked as waiting for capacity?"""
|
"""Should the crawl be marked as waiting for capacity?"""
|
||||||
if state == "running":
|
if state in RUNNING_STATES:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if state == "starting":
|
if state == "starting":
|
||||||
@ -612,16 +637,12 @@ class BtrixOperator(K8sAPI):
|
|||||||
status.size = humanize.naturalsize(stats["size"])
|
status.size = humanize.naturalsize(stats["size"])
|
||||||
|
|
||||||
# check if done / failed
|
# check if done / failed
|
||||||
done = 0
|
status_count = {}
|
||||||
failed = 0
|
|
||||||
for res in results:
|
for res in results:
|
||||||
if res == "done":
|
status_count[res] = status_count.get(res, 0) + 1
|
||||||
done += 1
|
|
||||||
elif res == "failed":
|
|
||||||
failed += 1
|
|
||||||
|
|
||||||
# check if all crawlers are done
|
# check if all crawlers are done
|
||||||
if done >= crawl.scale:
|
if status_count.get("done", 0) >= crawl.scale:
|
||||||
# check if one-page crawls actually succeeded
|
# check if one-page crawls actually succeeded
|
||||||
# if only one page found, and no files, assume failed
|
# if only one page found, and no files, assume failed
|
||||||
if status.pagesFound == 1 and not status.filesAdded:
|
if status.pagesFound == 1 and not status.filesAdded:
|
||||||
@ -638,7 +659,7 @@ class BtrixOperator(K8sAPI):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# check if all crawlers failed
|
# check if all crawlers failed
|
||||||
if failed >= crawl.scale:
|
elif status_count.get("failed", 0) >= crawl.scale:
|
||||||
# if stopping, and no pages finished, mark as canceled
|
# if stopping, and no pages finished, mark as canceled
|
||||||
if crawl.stopping and not status.pagesDone:
|
if crawl.stopping and not status.pagesDone:
|
||||||
state = "canceled"
|
state = "canceled"
|
||||||
@ -649,6 +670,20 @@ class BtrixOperator(K8sAPI):
|
|||||||
redis, crawl.id, crawl.cid, status, state=state
|
redis, crawl.id, crawl.cid, status, state=state
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# check for other statuses
|
||||||
|
else:
|
||||||
|
new_status = None
|
||||||
|
if status_count.get("uploading-wacz"):
|
||||||
|
new_status = "uploading-wacz"
|
||||||
|
elif status_count.get("generate-wacz"):
|
||||||
|
new_status = "generate-wacz"
|
||||||
|
elif status_count.get("pending-wait"):
|
||||||
|
new_status = "pending-wait"
|
||||||
|
if new_status:
|
||||||
|
await self.set_state(
|
||||||
|
new_status, status, crawl.id, allowed_from=RUNNING_STATES
|
||||||
|
)
|
||||||
|
|
||||||
return status
|
return status
|
||||||
|
|
||||||
# pylint: disable=too-many-arguments
|
# pylint: disable=too-many-arguments
|
||||||
@ -664,7 +699,7 @@ class BtrixOperator(K8sAPI):
|
|||||||
kwargs["stats"] = stats
|
kwargs["stats"] = stats
|
||||||
|
|
||||||
if state in SUCCESSFUL_STATES:
|
if state in SUCCESSFUL_STATES:
|
||||||
allowed_from = ["running"]
|
allowed_from = RUNNING_STATES
|
||||||
else:
|
else:
|
||||||
allowed_from = []
|
allowed_from = []
|
||||||
|
|
||||||
|
@ -124,6 +124,42 @@ export class CrawlStatus extends LitElement {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case "pending-wait": {
|
||||||
|
icon = html`<sl-icon
|
||||||
|
name="dot"
|
||||||
|
library="app"
|
||||||
|
class="animatePulse"
|
||||||
|
slot="prefix"
|
||||||
|
style="color: var(--sl-color-purple-600)"
|
||||||
|
></sl-icon>`;
|
||||||
|
label = msg("Finishing Crawl");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case "generate-wacz": {
|
||||||
|
icon = html`<sl-icon
|
||||||
|
name="dot"
|
||||||
|
library="app"
|
||||||
|
class="animatePulse"
|
||||||
|
slot="prefix"
|
||||||
|
style="color: var(--sl-color-purple-600)"
|
||||||
|
></sl-icon>`;
|
||||||
|
label = msg("Generating WACZ");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case "uploading-wacz": {
|
||||||
|
icon = html`<sl-icon
|
||||||
|
name="dot"
|
||||||
|
library="app"
|
||||||
|
class="animatePulse"
|
||||||
|
slot="prefix"
|
||||||
|
style="color: var(--sl-color-purple-600)"
|
||||||
|
></sl-icon>`;
|
||||||
|
label = msg("Uploading WACZ");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case "complete": {
|
case "complete": {
|
||||||
icon = html`<sl-icon
|
icon = html`<sl-icon
|
||||||
name=${isUpload ? "upload" : "check-circle"}
|
name=${isUpload ? "upload" : "check-circle"}
|
||||||
|
@ -95,6 +95,9 @@ export type CrawlState =
|
|||||||
| "waiting_capacity"
|
| "waiting_capacity"
|
||||||
| "waiting_org_limit"
|
| "waiting_org_limit"
|
||||||
| "running"
|
| "running"
|
||||||
|
| "generate-wacz"
|
||||||
|
| "uploading-wacz"
|
||||||
|
| "pending-wait"
|
||||||
| "complete"
|
| "complete"
|
||||||
| "failed"
|
| "failed"
|
||||||
| "partial_complete"
|
| "partial_complete"
|
||||||
|
@ -4,6 +4,9 @@ export const activeCrawlStates: CrawlState[] = [
|
|||||||
"waiting_org_limit",
|
"waiting_org_limit",
|
||||||
"waiting_capacity",
|
"waiting_capacity",
|
||||||
"running",
|
"running",
|
||||||
|
"generate-wacz",
|
||||||
|
"uploading-wacz",
|
||||||
|
"pending-wait",
|
||||||
"stopping",
|
"stopping",
|
||||||
];
|
];
|
||||||
export const inactiveCrawlStates: CrawlState[] = [
|
export const inactiveCrawlStates: CrawlState[] = [
|
||||||
|
Loading…
Reference in New Issue
Block a user