operator state fixes (follow up fomr #1639) (#1640)

- increase time for going to waiting_capacity from starting to 150
seconds
- relax requirement for state transitions, allow complete from waiting
- additional type safety for different states, ensure mark_finished()
only called with non-running states, add `Literal` types for all the
state types.
This commit is contained in:
Ilya Kreymer 2024-03-29 15:12:16 -07:00 committed by GitHub
parent c1817cbe04
commit ffc4b5b58f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 47 additions and 27 deletions

View File

@ -9,7 +9,7 @@ import urllib.parse
from datetime import datetime
from uuid import UUID
from typing import Optional, List, Dict, Union, Any
from typing import Optional, List, Dict, Union, Any, Sequence
from fastapi import Depends, HTTPException
from fastapi.responses import StreamingResponse
@ -41,6 +41,7 @@ from .models import (
RUNNING_AND_STARTING_STATES,
SUCCESSFUL_STATES,
ALL_CRAWL_STATES,
TYPE_ALL_CRAWL_STATES,
)
@ -442,8 +443,8 @@ class CrawlOps(BaseCrawlOps):
self,
crawl_id: str,
is_qa: bool,
state: str,
allowed_from: List[str],
state: TYPE_ALL_CRAWL_STATES,
allowed_from: Sequence[TYPE_ALL_CRAWL_STATES],
finished: Optional[datetime] = None,
stats: Optional[CrawlStats] = None,
):

View File

@ -7,7 +7,7 @@ from enum import Enum, IntEnum
from uuid import UUID
import os
from typing import Optional, List, Dict, Union, Literal, Any
from typing import Optional, List, Dict, Union, Literal, Any, get_args
from pydantic import (
BaseModel,
conint,
@ -153,20 +153,31 @@ class UserOut(BaseModel):
### CRAWL STATES
# ============================================================================
RUNNING_STATES = ["running", "pending-wait", "generate-wacz", "uploading-wacz"]
TYPE_RUNNING_STATES = Literal[
"running", "pending-wait", "generate-wacz", "uploading-wacz"
]
RUNNING_STATES = get_args(TYPE_RUNNING_STATES)
STARTING_STATES = ["starting", "waiting_capacity", "waiting_org_limit"]
TYPE_STARTING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
STARTING_STATES = get_args(TYPE_STARTING_STATES)
FAILED_STATES = ["canceled", "failed", "skipped_quota_reached"]
TYPE_FAILED_STATES = Literal["canceled", "failed", "skipped_quota_reached"]
FAILED_STATES = get_args(TYPE_FAILED_STATES)
SUCCESSFUL_STATES = ["complete", "stopped_by_user", "stopped_quota_reached"]
TYPE_SUCCESSFUL_STATES = Literal["complete", "stopped_by_user", "stopped_quota_reached"]
SUCCESSFUL_STATES = get_args(TYPE_SUCCESSFUL_STATES)
TYPE_RUNNING_AND_STARTING_STATES = Literal[TYPE_STARTING_STATES, TYPE_RUNNING_STATES]
RUNNING_AND_STARTING_STATES = [*STARTING_STATES, *RUNNING_STATES]
RUNNING_AND_STARTING_ONLY = ["starting", *RUNNING_STATES]
TYPE_NON_RUNNING_STATES = Literal[TYPE_FAILED_STATES, TYPE_SUCCESSFUL_STATES]
NON_RUNNING_STATES = [*FAILED_STATES, *SUCCESSFUL_STATES]
TYPE_ALL_CRAWL_STATES = Literal[
TYPE_RUNNING_AND_STARTING_STATES, TYPE_NON_RUNNING_STATES
]
ALL_CRAWL_STATES = [*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES]

View File

@ -3,7 +3,7 @@
import traceback
import os
from pprint import pprint
from typing import Optional, Any
from typing import Optional, Any, Sequence
from datetime import datetime
import json
@ -14,6 +14,9 @@ from kubernetes.utils import parse_quantity
from redis import asyncio as exceptions
from btrixcloud.models import (
TYPE_NON_RUNNING_STATES,
TYPE_RUNNING_STATES,
TYPE_ALL_CRAWL_STATES,
NON_RUNNING_STATES,
RUNNING_STATES,
RUNNING_AND_STARTING_ONLY,
@ -37,6 +40,7 @@ from .baseoperator import BaseOperator, Redis
from .models import (
CrawlSpec,
CrawlStatus,
StopReason,
MCBaseRequest,
MCSyncData,
PodInfo,
@ -56,7 +60,7 @@ DEFAULT_TTL = 30
REDIS_TTL = 60
# time in seconds before a crawl is deemed 'waiting' instead of 'starting'
STARTING_TIME_SECS = 60
STARTING_TIME_SECS = 150
# how often to update execution time seconds
EXEC_TIME_UPDATE_SECS = 60
@ -428,10 +432,10 @@ class CrawlOperator(BaseOperator):
async def set_state(
self,
state: str,
state: TYPE_ALL_CRAWL_STATES,
status: CrawlStatus,
crawl: CrawlSpec,
allowed_from: list[str],
allowed_from: Sequence[TYPE_ALL_CRAWL_STATES],
finished: Optional[datetime] = None,
stats: Optional[CrawlStats] = None,
):
@ -1132,7 +1136,7 @@ class CrawlOperator(BaseOperator):
async def is_crawl_stopping(
self, crawl: CrawlSpec, status: CrawlStatus
) -> Optional[str]:
) -> Optional[StopReason]:
"""check if crawl is stopping and set reason"""
# if user requested stop, then enter stopping phase
if crawl.stopping:
@ -1242,8 +1246,11 @@ class CrawlOperator(BaseOperator):
await self.fail_crawl(crawl, status, pods, stats)
return status
if status.stopReason in ("stopped_by_user", "stopped_quota_reached"):
state = status.stopReason
state: TYPE_NON_RUNNING_STATES
if status.stopReason == "stopped_by_user":
state = "stopped_by_user"
elif status.stopReason == "stopped_quota_reached":
state = "stopped_quota_reached"
else:
state = "complete"
@ -1259,7 +1266,7 @@ class CrawlOperator(BaseOperator):
# check for other statuses
else:
new_status = None
new_status: TYPE_RUNNING_STATES
if status_count.get("running"):
if status.state in ("generate-wacz", "uploading-wacz", "pending-wacz"):
new_status = "running"
@ -1282,17 +1289,14 @@ class CrawlOperator(BaseOperator):
self,
crawl: CrawlSpec,
status: CrawlStatus,
state: str,
state: TYPE_NON_RUNNING_STATES,
stats: Optional[CrawlStats] = None,
) -> bool:
"""mark crawl as finished, set finished timestamp and final state"""
finished = dt_now()
if state in SUCCESSFUL_STATES:
allowed_from = RUNNING_STATES
else:
allowed_from = RUNNING_AND_STARTING_STATES
allowed_from = RUNNING_AND_STARTING_STATES
# if set_state returns false, already set to same status, return
if not await self.set_state(
@ -1329,7 +1333,7 @@ class CrawlOperator(BaseOperator):
self,
crawl: CrawlSpec,
status: CrawlStatus,
state: str,
state: TYPE_NON_RUNNING_STATES,
) -> None:
"""Run tasks after crawl completes in asyncio.task coroutine."""
await self.crawl_config_ops.stats_recompute_last(
@ -1357,7 +1361,7 @@ class CrawlOperator(BaseOperator):
async def do_qa_run_finished_tasks(
self,
crawl: CrawlSpec,
state: str,
state: TYPE_NON_RUNNING_STATES,
) -> None:
"""Run tasks after qa run completes in asyncio.task coroutine."""

View File

@ -2,10 +2,10 @@
from collections import defaultdict
from uuid import UUID
from typing import Optional, DefaultDict
from typing import Optional, DefaultDict, Literal
from pydantic import BaseModel, Field
from kubernetes.utils import parse_quantity
from btrixcloud.models import StorageRef
from btrixcloud.models import StorageRef, TYPE_ALL_CRAWL_STATES
BTRIX_API = "btrix.cloud/v1"
@ -15,6 +15,10 @@ PVC = "PersistentVolumeClaim.v1"
POD = "Pod.v1"
CJS = f"CrawlJob.{BTRIX_API}"
StopReason = Literal[
"stopped_by_user", "time-limit", "size-limit", "stopped_quota_reached"
]
# ============================================================================
class MCBaseRequest(BaseModel):
@ -166,7 +170,7 @@ class PodInfo(BaseModel):
class CrawlStatus(BaseModel):
"""status from k8s CrawlJob object"""
state: str = "starting"
state: TYPE_ALL_CRAWL_STATES = "starting"
pagesFound: int = 0
pagesDone: int = 0
size: int = 0
@ -177,7 +181,7 @@ class CrawlStatus(BaseModel):
filesAddedSize: int = 0
finished: Optional[str] = None
stopping: bool = False
stopReason: Optional[str] = None
stopReason: Optional[StopReason] = None
initRedis: bool = False
crawlerImage: Optional[str] = None
lastActiveTime: str = ""