Attempt to auto-adjust PVC storage if: - used storage (as reported in redis by the crawler) * 2.5 > total_storage - will cause PVC to resize, if possible (not supported by all drivers) - uses multiples of 1Gi, rounding up to next GB - AVAIL_STORAGE_RATIO hard-coded to 2.5 for now, to account for 2x space for WACZ plus change for fast updating crawls Some caveats: - only works if the storageClass used for PVCs has `allowVolumeExpansion: true`, if not, it will have no effect - designed as a last resort option: the `crawl_storage` in values and `--sizeLimit` and `--diskUtilization` should generally result in this not being needed. - can be useful in cases where a crawl is rapidly capturing a lot of content in one page, and there's no time to interrupt / restart, since the other limits apply only at page end. - May want to have crawler update the disk usage more frequently, not just at page end to make this more effective.
238 lines
6.6 KiB
Python
238 lines
6.6 KiB
Python
""" Operator Models """
|
|
|
|
from collections import defaultdict
|
|
from uuid import UUID
|
|
from typing import Optional, DefaultDict, Literal, Annotated, Any
|
|
from pydantic import BaseModel, Field
|
|
from kubernetes.utils import parse_quantity
|
|
from btrixcloud.models import StorageRef, TYPE_ALL_CRAWL_STATES
|
|
|
|
|
|
BTRIX_API = "btrix.cloud/v1"
|
|
|
|
CMAP = "ConfigMap.v1"
|
|
PVC = "PersistentVolumeClaim.v1"
|
|
POD = "Pod.v1"
|
|
CJS = f"CrawlJob.{BTRIX_API}"
|
|
|
|
StopReason = Literal[
|
|
"stopped_by_user",
|
|
"time-limit",
|
|
"size-limit",
|
|
"stopped_storage_quota_reached",
|
|
"stopped_time_quota_reached",
|
|
"stopped_org_readonly",
|
|
]
|
|
|
|
|
|
# ============================================================================
|
|
class MCBaseRequest(BaseModel):
|
|
"""base metacontroller model, used for customize hook"""
|
|
|
|
parent: dict
|
|
controller: dict
|
|
|
|
|
|
# ============================================================================
|
|
class MCSyncData(MCBaseRequest):
|
|
"""sync / finalize metacontroller model"""
|
|
|
|
children: dict
|
|
related: dict
|
|
finalizing: bool = False
|
|
|
|
|
|
# ============================================================================
|
|
class MCDecoratorSyncData(BaseModel):
|
|
"""sync for decoratorcontroller model"""
|
|
|
|
object: dict
|
|
controller: dict
|
|
|
|
attachments: dict
|
|
related: dict
|
|
finalizing: bool = False
|
|
|
|
|
|
# ============================================================================
|
|
class MCDecoratorSyncResponse(BaseModel):
|
|
"""Response model for decoratorcontroller sync api"""
|
|
|
|
attachments: list[dict[str, Any]]
|
|
status: Optional[dict[str, Any]] = None
|
|
annotations: Optional[dict[str, str]] = None
|
|
|
|
|
|
# ============================================================================
|
|
class CrawlSpec(BaseModel):
|
|
"""spec from k8s CrawlJob object"""
|
|
|
|
id: str
|
|
cid: UUID
|
|
oid: UUID
|
|
scale: int = 1
|
|
storage: StorageRef
|
|
started: str
|
|
crawler_channel: str
|
|
stopping: bool = False
|
|
scheduled: bool = False
|
|
timeout: int = 0
|
|
max_crawl_size: int = 0
|
|
qa_source_crawl_id: Optional[str] = ""
|
|
|
|
@property
|
|
def db_crawl_id(self) -> str:
|
|
"""return actual crawl_id for db, if qa run"""
|
|
return self.qa_source_crawl_id or self.id
|
|
|
|
@property
|
|
def is_qa(self) -> bool:
|
|
"""return true if qa run"""
|
|
return bool(self.qa_source_crawl_id)
|
|
|
|
|
|
# ============================================================================
|
|
class PodResourcePercentage(BaseModel):
|
|
"""Resource usage percentage ratios"""
|
|
|
|
memory: float = 0
|
|
cpu: float = 0
|
|
storage: float = 0
|
|
|
|
|
|
# ============================================================================
|
|
class PodResources(BaseModel):
|
|
"""Pod Resources"""
|
|
|
|
memory: int = 0
|
|
cpu: float = 0
|
|
storage: int = 0
|
|
|
|
def __init__(self, *a, **kw):
|
|
if "memory" in kw:
|
|
kw["memory"] = int(parse_quantity(kw["memory"]))
|
|
if "cpu" in kw:
|
|
kw["cpu"] = float(parse_quantity(kw["cpu"]))
|
|
if "storage" in kw:
|
|
kw["storage"] = int(parse_quantity(kw["storage"]))
|
|
super().__init__(*a, **kw)
|
|
|
|
|
|
# ============================================================================
|
|
class PodInfo(BaseModel):
|
|
"""Aggregate pod status info held in CrawlJob"""
|
|
|
|
exitTime: Optional[str] = None
|
|
exitCode: Optional[int] = None
|
|
isNewExit: Optional[bool] = Field(default=None, exclude=True)
|
|
reason: Optional[str] = None
|
|
|
|
allocated: PodResources = PodResources()
|
|
used: PodResources = PodResources()
|
|
|
|
newCpu: Optional[int] = None
|
|
newMemory: Optional[int] = None
|
|
newStorage: Optional[str] = None
|
|
signalAtMem: Optional[int] = None
|
|
|
|
evicted: Optional[bool] = False
|
|
|
|
def dict(self, *a, **kw):
|
|
res = super().dict(*a, **kw)
|
|
percent = {
|
|
"memory": self.get_percent_memory(),
|
|
"cpu": self.get_percent_cpu(),
|
|
"storage": self.get_percent_storage(),
|
|
}
|
|
res["percent"] = percent
|
|
return res
|
|
|
|
def get_percent_memory(self) -> float:
|
|
"""compute percent memory used"""
|
|
return (
|
|
float(self.used.memory) / float(self.allocated.memory)
|
|
if self.allocated.memory
|
|
else 0
|
|
)
|
|
|
|
def get_percent_cpu(self) -> float:
|
|
"""compute percent cpu used"""
|
|
return (
|
|
float(self.used.cpu) / float(self.allocated.cpu)
|
|
if self.allocated.cpu
|
|
else 0
|
|
)
|
|
|
|
def get_percent_storage(self) -> float:
|
|
"""compute percent storage used"""
|
|
return (
|
|
float(self.used.storage) / float(self.allocated.storage)
|
|
if self.allocated.storage
|
|
else 0
|
|
)
|
|
|
|
def should_restart_pod(self, forced: bool = False) -> Optional[str]:
|
|
"""return true if pod should be restarted"""
|
|
if self.newMemory and self.newMemory != self.allocated.memory:
|
|
return "newMemory"
|
|
|
|
if self.newCpu and self.newCpu != self.allocated.cpu:
|
|
return "newCpu"
|
|
|
|
if self.evicted:
|
|
return "evicted"
|
|
|
|
if forced:
|
|
return "forced"
|
|
|
|
return None
|
|
|
|
|
|
# ============================================================================
|
|
# pylint: disable=invalid-name
|
|
class CrawlStatus(BaseModel):
|
|
"""status from k8s CrawlJob object"""
|
|
|
|
state: TYPE_ALL_CRAWL_STATES = "starting"
|
|
pagesFound: int = 0
|
|
pagesDone: int = 0
|
|
size: int = 0
|
|
# human readable size string
|
|
sizeHuman: str = ""
|
|
scale: int = 1
|
|
filesAdded: int = 0
|
|
filesAddedSize: int = 0
|
|
finished: Optional[str] = None
|
|
stopping: bool = False
|
|
stopReason: Optional[StopReason] = None
|
|
initRedis: bool = False
|
|
crawlerImage: Optional[str] = None
|
|
lastActiveTime: str = ""
|
|
podStatus: DefaultDict[str, Annotated[PodInfo, Field(default_factory=PodInfo)]] = (
|
|
defaultdict(lambda: PodInfo()) # pylint: disable=unnecessary-lambda
|
|
)
|
|
|
|
restartTime: Optional[str] = None
|
|
canceled: bool = False
|
|
|
|
# updated on pod exits and at regular interval
|
|
# Crawl Execution Time -- time all crawler pods have been running
|
|
# used to track resource usage and enforce execution minutes limit
|
|
crawlExecTime: int = 0
|
|
|
|
# Elapsed Exec Time -- time crawl has been running in at least one pod
|
|
# used for crawl timeouts
|
|
elapsedCrawlTime: int = 0
|
|
|
|
# last exec time update
|
|
lastUpdatedTime: str = ""
|
|
|
|
# any pods exited
|
|
anyCrawlPodNewExit: Optional[bool] = Field(default=False, exclude=True)
|
|
|
|
# don't include in status, use by metacontroller
|
|
resync_after: Optional[int] = Field(default=None, exclude=True)
|
|
|
|
# last state
|
|
last_state: TYPE_ALL_CRAWL_STATES = Field(default="starting", exclude=True)
|