job handling:

- job watch: add watch loop for job failure (backofflimitexceeded)
- set job retries + job timeout via chart values
- sigterm starts graceful shutdown by default, including for timeout
- use sigusr1 to switch to instant shutdown
- update stop_crawl() to use new semantics
This commit is contained in:
Ilya Kreymer 2021-08-23 21:19:21 -07:00
parent 7146e054a4
commit ed27f3e3ee
4 changed files with 162 additions and 70 deletions

View File

@ -1,6 +1,7 @@
""" Crawl API """ """ Crawl API """
import asyncio import asyncio
import traceback
from typing import Optional, List from typing import Optional, List
from datetime import datetime from datetime import datetime
@ -64,6 +65,8 @@ class CrawlOps:
self.crawl_manager = crawl_manager self.crawl_manager = crawl_manager
self.archives = archives self.archives = archives
self.crawl_manager.set_crawl_ops(self)
async def on_handle_crawl_complete(self, msg: CrawlCompleteIn): async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
""" Handle completed crawl, add to crawls db collection, also update archive usage """ """ Handle completed crawl, add to crawls db collection, also update archive usage """
crawl = await self.crawl_manager.validate_crawl_complete(msg) crawl = await self.crawl_manager.validate_crawl_complete(msg)
@ -79,6 +82,9 @@ class CrawlOps:
dura = int((crawl.finished - crawl.started).total_seconds()) dura = int((crawl.finished - crawl.started).total_seconds())
print(crawl, flush=True)
print(f"Duration: {dura}", flush=True)
await self.archives.inc_usage(crawl.aid, dura) await self.archives.inc_usage(crawl.aid, dura)
async def list_crawls(self, aid: str, cid: str = None): async def list_crawls(self, aid: str, cid: str = None):
@ -138,13 +144,17 @@ def init_crawls_api(app, mdb, crawl_manager, archives):
crawl_id, archive: Archive = Depends(archive_crawl_dep) crawl_id, archive: Archive = Depends(archive_crawl_dep)
): ):
try: try:
crawl = await crawl_manager.stop_crawl(crawl_id, archive.id) crawl = await crawl_manager.stop_crawl(crawl_id, archive.id, graceful=False)
if not crawl: if not crawl:
raise HTTPException( raise HTTPException(
status_code=404, detail=f"Crawl not found: {crawl_id}" status_code=404, detail=f"Crawl not found: {crawl_id}"
) )
await ops.handle_finished(crawl) await ops.handle_finished(crawl)
except HTTPException as httpe:
raise httpe
except Exception as exc: except Exception as exc:
# pylint: disable=raise-missing-from # pylint: disable=raise-missing-from
raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}") raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}")
@ -159,17 +169,21 @@ def init_crawls_api(app, mdb, crawl_manager, archives):
crawl_id, archive: Archive = Depends(archive_crawl_dep) crawl_id, archive: Archive = Depends(archive_crawl_dep)
): ):
try: try:
canceled = await crawl_manager.stop_crawl_graceful( canceled = await crawl_manager.stop_crawl(
crawl_id, str(archive.id) crawl_id, archive.id, graceful=True
) )
if not canceled: if not canceled:
raise HTTPException( raise HTTPException(
status_code=404, detail=f"Crawl not found: {crawl_id}" status_code=404, detail=f"Crawl not found: {crawl_id}"
) )
except HTTPException as httpe:
raise httpe
except Exception as exc: except Exception as exc:
# pylint: disable=raise-missing-from # pylint: disable=raise-missing-from
raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}") traceback.print_exc()
raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}")
return {"stopped_gracefully": True} return {"stopped_gracefully": True}

View File

@ -3,8 +3,9 @@
import os import os
import datetime import datetime
import json import json
import asyncio
from kubernetes_asyncio import client, config from kubernetes_asyncio import client, config, watch
from kubernetes_asyncio.stream import WsApiClient from kubernetes_asyncio.stream import WsApiClient
from crawls import Crawl from crawls import Crawl
@ -24,6 +25,8 @@ class K8SManager:
def __init__(self, namespace=DEFAULT_NAMESPACE): def __init__(self, namespace=DEFAULT_NAMESPACE):
config.load_incluster_config() config.load_incluster_config()
self.crawl_ops = None
self.core_api = client.CoreV1Api() self.core_api = client.CoreV1Api()
self.core_api_ws = client.CoreV1Api(api_client=WsApiClient()) self.core_api_ws = client.CoreV1Api(api_client=WsApiClient())
self.batch_api = client.BatchV1Api() self.batch_api = client.BatchV1Api()
@ -34,8 +37,41 @@ class K8SManager:
self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image = os.environ.get("CRAWLER_IMAGE")
self.crawler_image_pull_policy = "IfNotPresent" self.crawler_image_pull_policy = "IfNotPresent"
# loop = asyncio.get_running_loop() self.crawl_timeout = int(os.environ.get("CRAWL_TIMEOUT", "1000000"))
# loop.create_task(self.watch_job_done()) self.crawl_retries = int(os.environ.get("CRAWL_RETRIES", "3"))
self.loop = asyncio.get_running_loop()
self.loop.create_task(self.watch_job_loop())
def set_crawl_ops(self, ops):
""" Set crawl ops handler """
self.crawl_ops = ops
async def watch_job_loop(self):
""" Get events for completed jobs"""
async with watch.Watch().stream(
self.core_api.list_namespaced_event,
self.namespace,
field_selector="involvedObject.kind=Job",
) as stream:
async for event in stream:
try:
obj = event["object"]
if obj.reason == "BackoffLimitExceeded":
self.loop.create_task(
self.handle_crawl_failed(obj.involved_object.name, "failed")
)
# elif obj.reason == "DeadlineExceeded":
# self.loop.create_task(
# self.handle_crawl_failed(
# obj.involved_object.name, "timed_out"
# )
# )
# pylint: disable=broad-except
except Exception as exc:
print(exc)
async def add_crawl_config( async def add_crawl_config(
self, self,
@ -212,19 +248,7 @@ class K8SManager:
field_selector="status.successful=0", field_selector="status.successful=0",
) )
return [ return [self._make_crawl_for_job(job, "running") for job in jobs.items]
Crawl(
id=job.metadata.name,
state="running",
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None),
)
for job in jobs.items
]
async def validate_crawl_complete(self, crawlcomplete): async def validate_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches) """Ensure the crawlcomplete data is valid (job exists and user matches)
@ -238,12 +262,7 @@ class K8SManager:
manual = job.metadata.annotations.get("btrix.run.manual") == "1" manual = job.metadata.annotations.get("btrix.run.manual") == "1"
if not manual: if not manual:
await self.batch_api.delete_namespaced_job( await self._delete_job(job.metadata.name)
name=job.metadata.name,
namespace=self.namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
)
return Crawl( return Crawl(
id=crawlcomplete.id, id=crawlcomplete.id,
@ -260,58 +279,37 @@ class K8SManager:
hash=crawlcomplete.hash, hash=crawlcomplete.hash,
) )
async def stop_crawl(self, job_id, aid): async def stop_crawl(self, job_name, aid, graceful=True):
""" Stop Crawl based on crawl job id """ """Attempt to stop crawl, either gracefully by issuing a SIGTERM which
will attempt to finish current pages
OR, abruptly by first issueing a SIGINT, followed by SIGTERM, which
will terminate immediately"""
job = await self.batch_api.read_namespaced_job( job = await self.batch_api.read_namespaced_job(
name=job_id, namespace=self.namespace name=job_name, namespace=self.namespace
) )
if not job or job.metadata.labels["btrix.archive"] != aid: if not job or job.metadata.labels["btrix.archive"] != aid:
return None return None
await self.batch_api.delete_namespaced_job( result = None
name=job_id,
namespace=self.namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
)
return Crawl( if not graceful:
id=job_id, pods = await self.core_api.list_namespaced_pod(
state="canceled",
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None),
)
async def stop_crawl_graceful(self, job_name, aid):
""" Attempt to gracefully stop crawl by sending a SIGINT to the pod(s)"""
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_name},btrix.archive={aid}",
)
command = ["kill", "-s", "SIGINT", "1"]
interrupted = False
for pod in pods.items:
if pod.metadata.labels["btrix.archive"] != aid:
continue
await self.core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name,
namespace=self.namespace, namespace=self.namespace,
command=command, label_selector=f"job-name={job_name},btrix.archive={aid}",
stdout=True,
) )
interrupted = True
return interrupted await self._send_sig_to_pods(pods.items, aid)
result = self._make_crawl_for_job(job, "canceled", True)
else:
result = True
await self._delete_job(job_name)
return result
async def delete_crawl_configs_for_archive(self, archive): async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive""" """Delete all crawl configs for given archive"""
@ -321,9 +319,51 @@ class K8SManager:
"""Delete all crawl configs by id""" """Delete all crawl configs by id"""
return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}") return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
async def handle_crawl_failed(self, job_name, reason):
""" Handle failed crawl job, add to db and then delete """
try:
job = await self.batch_api.read_namespaced_job(
name=job_name, namespace=self.namespace
)
# pylint: disable=bare-except
except:
print("Job Failure Already Handled")
return
crawl = self._make_crawl_for_job(job, reason, True)
await self.crawl_ops.handle_finished(crawl)
await self._delete_job(job_name)
# ======================================================================== # ========================================================================
# Internal Methods # Internal Methods
# pylint: disable=no-self-use
def _make_crawl_for_job(self, job, state, finish_now=False):
""" Make a crawl object from a job"""
return Crawl(
id=job.metadata.name,
state=state,
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None)
if finish_now
else None,
)
async def _delete_job(self, name):
await self.batch_api.delete_namespaced_job(
name=name,
namespace=self.namespace,
grace_period_seconds=120,
propagation_policy="Foreground",
)
def _create_config_map(self, crawlconfig, labels): def _create_config_map(self, crawlconfig, labels):
""" Create Config Map based on CrawlConfig + labels """ """ Create Config Map based on CrawlConfig + labels """
config_map = client.V1ConfigMap( config_map = client.V1ConfigMap(
@ -355,6 +395,29 @@ class K8SManager:
return suspend, schedule, run_now return suspend, schedule, run_now
async def _send_sig_to_pods(self, pods, aid):
command = ["kill", "-s", "SIGUSR1", "1"]
interrupted = False
try:
for pod in pods:
if pod.metadata.labels["btrix.archive"] != aid:
continue
await self.core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name,
namespace=self.namespace,
command=command,
stdout=True,
)
interrupted = True
# pylint: disable=broad-except
except Exception as exc:
print(f"Exec Error: {exc}")
return interrupted
async def _delete_crawl_configs(self, label): async def _delete_crawl_configs(self, label):
"""Delete Crawl Cron Job and all dependent resources, including configmap and secrets""" """Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
@ -436,9 +499,10 @@ class K8SManager:
}, },
} }
return { job_template = {
"metadata": {"annotations": annotations}, "metadata": {"annotations": annotations},
"spec": { "spec": {
"backoffLimit": self.crawl_retries,
"template": { "template": {
"metadata": {"labels": labels}, "metadata": {"labels": labels},
"spec": { "spec": {
@ -488,6 +552,11 @@ class K8SManager:
], ],
"restartPolicy": "OnFailure", "restartPolicy": "OnFailure",
}, },
} },
}, },
} }
if self.crawl_timeout > 0:
job_template["spec"]["activeDeadlineSeconds"] = self.crawl_timeout
return job_template

View File

@ -11,6 +11,9 @@ data:
CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }} CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }}
CRAWLER_IMAGE: {{ .Values.crawler_image }} CRAWLER_IMAGE: {{ .Values.crawler_image }}
CRAWL_TIMEOUT: "{{ .Values.crawl_timeout }}"
CRAWL_RETRIES: "{{ .Values.crawl_retries }}"
--- ---
apiVersion: v1 apiVersion: v1

View File

@ -41,6 +41,12 @@ crawler_pull_policy: "Never"
crawler_namespace: "crawlers" crawler_namespace: "crawlers"
# set 0 to disable timeout
crawl_timeout: 0
# num retries
crawl_retries: 1
# Storage # Storage
# ========================================= # =========================================