Crawler pod memory padding + auto scaling (#1631)
- set memory limit to 1.2x memory request to provide extra padding and avoid OOM - attempt to resize crawler pods by 1.2x when exceeding 90% of available memory - do a 'soft OOM' (send extra SIGTERM) to pod when reaching 100% of requested memory, resulting in faster graceful restart, but avoiding a system-instant OOM Kill - Fixes #1632 --------- Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
parent
86311ab4ea
commit
3438133fcb
@ -288,3 +288,29 @@ class K8sAPI:
|
||||
# pylint: disable=broad-exception-caught
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def send_signal_to_pod(self, pod_name, signame) -> bool:
|
||||
"""send signal to all pods"""
|
||||
command = ["bash", "-c", f"kill -s {signame} 1"]
|
||||
signaled = False
|
||||
|
||||
try:
|
||||
print(f"Sending {signame} to {pod_name}", flush=True)
|
||||
|
||||
res = await self.core_api_ws.connect_get_namespaced_pod_exec(
|
||||
name=pod_name,
|
||||
namespace=self.namespace,
|
||||
command=command,
|
||||
stdout=True,
|
||||
)
|
||||
if res:
|
||||
print("Result", res, flush=True)
|
||||
|
||||
else:
|
||||
signaled = True
|
||||
|
||||
# pylint: disable=broad-except
|
||||
except Exception as exc:
|
||||
print(f"Send Signal Error: {exc}", flush=True)
|
||||
|
||||
return signaled
|
||||
|
@ -42,28 +42,35 @@ class K8sOpAPI(K8sAPI):
|
||||
"""compute memory / cpu resources for crawlers"""
|
||||
p = self.shared_params
|
||||
num = max(int(p["crawler_browser_instances"]) - 1, 0)
|
||||
crawler_cpu: float = 0
|
||||
crawler_memory: int = 0
|
||||
print("crawler resources")
|
||||
if not p.get("crawler_cpu"):
|
||||
base = parse_quantity(p["crawler_cpu_base"])
|
||||
extra = parse_quantity(p["crawler_extra_cpu_per_browser"])
|
||||
|
||||
# cpu is a floating value of cpu cores
|
||||
p["crawler_cpu"] = float(base + num * extra)
|
||||
crawler_cpu = float(base + num * extra)
|
||||
|
||||
print(f"cpu = {base} + {num} * {extra} = {p['crawler_cpu']}")
|
||||
print(f"cpu = {base} + {num} * {extra} = {crawler_cpu}")
|
||||
else:
|
||||
print(f"cpu = {p['crawler_cpu']}")
|
||||
crawler_cpu = float(parse_quantity(p["crawler_cpu"]))
|
||||
print(f"cpu = {crawler_cpu}")
|
||||
|
||||
if not p.get("crawler_memory"):
|
||||
base = parse_quantity(p["crawler_memory_base"])
|
||||
extra = parse_quantity(p["crawler_extra_memory_per_browser"])
|
||||
|
||||
# memory is always an int
|
||||
p["crawler_memory"] = int(base + num * extra)
|
||||
crawler_memory = int(base + num * extra)
|
||||
|
||||
print(f"memory = {base} + {num} * {extra} = {p['crawler_memory']}")
|
||||
print(f"memory = {base} + {num} * {extra} = {crawler_memory}")
|
||||
else:
|
||||
print(f"memory = {p['crawler_memory']}")
|
||||
crawler_memory = int(parse_quantity(p["crawler_memory"]))
|
||||
print(f"memory = {crawler_memory}")
|
||||
|
||||
p["crawler_cpu"] = crawler_cpu
|
||||
p["crawler_memory"] = crawler_memory
|
||||
|
||||
def compute_profile_resources(self):
|
||||
"""compute memory /cpu resources for a single profile browser"""
|
||||
|
@ -39,6 +39,7 @@ from .models import (
|
||||
CrawlStatus,
|
||||
MCBaseRequest,
|
||||
MCSyncData,
|
||||
PodInfo,
|
||||
POD,
|
||||
CMAP,
|
||||
PVC,
|
||||
@ -61,6 +62,19 @@ STARTING_TIME_SECS = 60
|
||||
EXEC_TIME_UPDATE_SECS = 60
|
||||
|
||||
|
||||
# scale up if exceeded this threshold of mem usage (eg. 90%)
|
||||
MEM_SCALE_UP_THRESHOLD = 0.90
|
||||
|
||||
# scale up by this much
|
||||
MEM_SCALE_UP = 1.2
|
||||
|
||||
# soft OOM if exceeded this threshold of mem usage (eg. 100%)
|
||||
MEM_SOFT_OOM_THRESHOLD = 1.0
|
||||
|
||||
# set memory limit to this much of request for extra padding
|
||||
MEM_LIMIT_PADDING = 1.2
|
||||
|
||||
|
||||
# pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements
|
||||
# pylint: disable=invalid-name, too-many-lines, too-many-return-statements
|
||||
# ============================================================================
|
||||
@ -209,8 +223,10 @@ class CrawlOperator(BaseOperator):
|
||||
data.related.get(METRICS, {}),
|
||||
)
|
||||
|
||||
# auto sizing handled here
|
||||
self.handle_auto_size(crawl.id, status.podStatus)
|
||||
# auto-scaling not possible without pod metrics
|
||||
if self.k8s.has_pod_metrics:
|
||||
# auto sizing handled here
|
||||
await self.handle_auto_size(status.podStatus)
|
||||
|
||||
if status.finished:
|
||||
return await self.finalize_response(
|
||||
@ -326,6 +342,7 @@ class CrawlOperator(BaseOperator):
|
||||
params["name"] = name
|
||||
params["cpu"] = pod_info.newCpu or params.get("crawler_cpu")
|
||||
params["memory"] = pod_info.newMemory or params.get("crawler_memory")
|
||||
params["memory_limit"] = float(params["memory"]) * MEM_LIMIT_PADDING
|
||||
params["do_restart"] = (
|
||||
pod_info.should_restart_pod() or params.get("force_restart")
|
||||
) and has_pod
|
||||
@ -758,8 +775,6 @@ class CrawlOperator(BaseOperator):
|
||||
qa_run_id = crawl.id if crawl.is_qa else None
|
||||
|
||||
while page_crawled:
|
||||
print("PAGE DATA", flush=True)
|
||||
print(page_crawled, flush=True)
|
||||
page_dict = json.loads(page_crawled)
|
||||
await self.page_ops.add_page_to_db(
|
||||
page_dict, crawl.db_crawl_id, qa_run_id, crawl.oid
|
||||
@ -836,7 +851,7 @@ class CrawlOperator(BaseOperator):
|
||||
|
||||
return crawler_running, redis_running, done
|
||||
|
||||
def handle_terminated_pod(self, name, role, status, terminated):
|
||||
def handle_terminated_pod(self, name, role, status: CrawlStatus, terminated):
|
||||
"""handle terminated pod state"""
|
||||
if not terminated:
|
||||
return
|
||||
@ -986,7 +1001,9 @@ class CrawlOperator(BaseOperator):
|
||||
|
||||
return False
|
||||
|
||||
async def add_used_stats(self, crawl_id, pod_status, redis, metrics):
|
||||
async def add_used_stats(
|
||||
self, crawl_id, pod_status: dict[str, PodInfo], redis, metrics
|
||||
):
|
||||
"""load current usage stats"""
|
||||
if redis:
|
||||
stats = await redis.info("persistence")
|
||||
@ -1010,20 +1027,42 @@ class CrawlOperator(BaseOperator):
|
||||
pod_info.used.memory = int(parse_quantity(usage["memory"]))
|
||||
pod_info.used.cpu = float(parse_quantity(usage["cpu"]))
|
||||
|
||||
def handle_auto_size(self, _, pod_status):
|
||||
async def handle_auto_size(self, pod_status: dict[str, PodInfo]) -> None:
|
||||
"""auto scale pods here, experimental"""
|
||||
for name, pod in pod_status.items():
|
||||
# if pod crashed due to OOM, increase mem
|
||||
# if pod.isNewExit and pod.reason == "oom":
|
||||
# pod.newMemory = int(float(pod.allocated.memory) * 1.2)
|
||||
# print(f"Resizing pod {name} -> mem {pod.newMemory} - OOM Detected")
|
||||
mem_usage = pod.get_percent_memory()
|
||||
new_memory = int(float(pod.allocated.memory) * MEM_SCALE_UP)
|
||||
send_sig = False
|
||||
|
||||
# if redis is using >0.90 of its memory, increase mem
|
||||
if name.startswith("redis") and pod.get_percent_memory() > 0.90:
|
||||
pod.newMemory = int(float(pod.allocated.memory) * 1.2)
|
||||
print(f"Resizing pod {name} -> mem {pod.newMemory} - Redis Capacity")
|
||||
# if pod is using >MEM_SCALE_UP_THRESHOLD of its memory, increase mem
|
||||
if mem_usage > MEM_SCALE_UP_THRESHOLD:
|
||||
pod.newMemory = new_memory
|
||||
print(
|
||||
f"Mem {mem_usage}: Resizing pod {name} -> mem {pod.newMemory} - Scale Up"
|
||||
)
|
||||
|
||||
async def log_crashes(self, crawl_id, pod_status, redis):
|
||||
# if crawler pod is using its OOM threshold, attempt a soft OOM
|
||||
# via a second SIGTERM
|
||||
if (
|
||||
mem_usage >= MEM_SOFT_OOM_THRESHOLD
|
||||
and name.startswith("crawl")
|
||||
and pod.signalAtMem != pod.newMemory
|
||||
):
|
||||
send_sig = True
|
||||
|
||||
# if any pod crashed due to OOM, increase mem
|
||||
elif pod.isNewExit and pod.reason == "oom":
|
||||
pod.newMemory = new_memory
|
||||
print(
|
||||
f"Mem {mem_usage}: Resizing pod {name} -> mem {pod.newMemory} - OOM Detected"
|
||||
)
|
||||
send_sig = True
|
||||
|
||||
# avoid resending SIGTERM multiple times after it already succeeded
|
||||
if send_sig and await self.k8s.send_signal_to_pod(name, "SIGTERM"):
|
||||
pod.signalAtMem = pod.newMemory
|
||||
|
||||
async def log_crashes(self, crawl_id, pod_status: dict[str, PodInfo], redis):
|
||||
"""report/log any pod crashes here"""
|
||||
for name, pod in pod_status.items():
|
||||
# log only unexpected exits as crashes
|
||||
|
@ -114,6 +114,7 @@ class PodInfo(BaseModel):
|
||||
|
||||
newCpu: Optional[int] = None
|
||||
newMemory: Optional[int] = None
|
||||
signalAtMem: Optional[int] = None
|
||||
|
||||
def dict(self, *a, **kw):
|
||||
res = super().dict(*a, **kw)
|
||||
@ -180,7 +181,7 @@ class CrawlStatus(BaseModel):
|
||||
initRedis: bool = False
|
||||
crawlerImage: Optional[str] = None
|
||||
lastActiveTime: str = ""
|
||||
podStatus: Optional[DefaultDict[str, PodInfo]] = defaultdict(
|
||||
podStatus: DefaultDict[str, PodInfo] = defaultdict(
|
||||
lambda: PodInfo() # pylint: disable=unnecessary-lambda
|
||||
)
|
||||
# placeholder for pydantic 2.0 -- will require this version
|
||||
|
@ -175,7 +175,7 @@ spec:
|
||||
|
||||
resources:
|
||||
limits:
|
||||
memory: "{{ memory }}"
|
||||
memory: "{{ memory_limit }}"
|
||||
|
||||
requests:
|
||||
cpu: "{{ cpu }}"
|
||||
|
@ -7,7 +7,7 @@ metadata:
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["pods", "pods/exec", "pods/log", "services", "configmaps", "secrets", "events", "persistentvolumeclaims"]
|
||||
verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"]
|
||||
verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection", "exec"]
|
||||
|
||||
- apiGroups: ["batch", "extensions", "apps"]
|
||||
resources: ["jobs", "cronjobs", "statefulsets"]
|
||||
|
Loading…
Reference in New Issue
Block a user