browsertrix/backend/k8sman.py
Ilya Kreymer 3d4d7049a2
Misc backend fixes for cloud deployment (#26)
* misc backend fixes:
- fix running w/o local minio
- ensure crawler image pull policy is configurable, loaded via chart value
- use digitalocean repo for main backend image (for now)
- add bucket_name to config only if using default bucket

* enable all behaviors, support 'access_endpoint_url' for default storages

* debugging: add 'no_delete_jobs' setting for k8s and docker to disable deletion of completed jobs
2021-11-25 11:58:26 -08:00

708 lines
24 KiB
Python

""" K8s support"""
import os
import datetime
import json
import asyncio
import base64
from kubernetes_asyncio import client, config, watch
from kubernetes_asyncio.stream import WsApiClient
from crawls import Crawl, CrawlFile
# ============================================================================
CRAWLER_NAMESPACE = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
# an 2/31 schedule that will never run as empty is not allowed
DEFAULT_NO_SCHEDULE = "* * 31 2 *"
# ============================================================================
class K8SManager:
# pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments
"""K8SManager, manager creation of k8s resources from crawl api requests"""
def __init__(self, namespace=CRAWLER_NAMESPACE):
config.load_incluster_config()
self.crawl_ops = None
self.core_api = client.CoreV1Api()
self.core_api_ws = client.CoreV1Api(api_client=WsApiClient())
self.batch_api = client.BatchV1Api()
self.batch_beta_api = client.BatchV1beta1Api()
self.namespace = namespace
self._default_storage_endpoints = {}
self.crawler_image = os.environ["CRAWLER_IMAGE"]
self.crawler_image_pull_policy = os.environ["CRAWLER_PULL_POLICY"]
self.crawl_retries = int(os.environ.get("CRAWL_RETRIES", "3"))
self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
self.loop = asyncio.get_running_loop()
self.loop.create_task(self.run_event_loop())
def set_crawl_ops(self, ops):
""" Set crawl ops handler """
self.crawl_ops = ops
async def run_event_loop(self):
""" Run the job watch loop, retry in case of failure"""
while True:
try:
await self.watch_events()
# pylint: disable=broad-except
except Exception as exc:
print(f"Retrying job loop: {exc}")
await asyncio.sleep(10)
async def watch_events(self):
""" Get events for completed jobs"""
async with watch.Watch().stream(
self.core_api.list_namespaced_event,
self.namespace,
field_selector="involvedObject.kind=Job",
) as stream:
async for event in stream:
try:
obj = event["object"]
if obj.reason == "BackoffLimitExceeded":
self.loop.create_task(
self.handle_crawl_failed(obj.involved_object.name, "failed")
)
elif obj.reason == "DeadlineExceeded":
self.loop.create_task(
self.handle_crawl_failed(
obj.involved_object.name, "timed_out"
)
)
# pylint: disable=broad-except
except Exception as exc:
print(exc)
# pylint: disable=unused-argument
async def get_storage(self, storage_name, is_default=False):
"""Check if storage_name is valid by checking existing secret
is_default flag ignored"""
try:
return await self.core_api.read_namespaced_secret(
f"storage-{storage_name}",
namespace=self.namespace,
)
except Exception:
# pylint: disable=broad-except,raise-missing-from
raise Exception(f"Storage {storage_name} not found")
return None
async def update_archive_storage(self, aid, uid, storage):
"""Update storage by either creating a per-archive secret, if using custom storage
or deleting per-archive secret, if using default storage"""
archive_storage_name = f"storage-{aid}"
if storage.type == "default":
try:
await self.core_api.delete_namespaced_secret(
archive_storage_name,
namespace=self.namespace,
propagation_policy="Foreground",
)
# pylint: disable=bare-except
except:
pass
return
labels = {"btrix.archive": aid, "btrix.user": uid}
crawl_secret = client.V1Secret(
metadata={
"name": archive_storage_name,
"namespace": self.namespace,
"labels": labels,
},
string_data={
"STORE_ENDPOINT_URL": storage.endpoint_url,
"STORE_ACCESS_KEY": storage.access_key,
"STORE_SECRET_KEY": storage.secret_key,
},
)
try:
await self.core_api.create_namespaced_secret(
namespace=self.namespace, body=crawl_secret
)
# pylint: disable=bare-except
except:
await self.core_api.patch_namespaced_secret(
name=archive_storage_name, namespace=self.namespace, body=crawl_secret
)
async def add_crawl_config(self, crawlconfig, storage, run_now):
"""add new crawl as cron job, store crawl config in configmap"""
cid = str(crawlconfig.id)
userid = crawlconfig.user
aid = crawlconfig.archive
annotations = {
"btrix.run.schedule": crawlconfig.schedule,
"btrix.storage_name": storage.name,
"btrix.colls": json.dumps(crawlconfig.colls),
}
# Configure Annotations + Labels
if storage.type == "default":
storage_name = storage.name
storage_path = storage.path
annotations["btrix.def_storage_path"] = storage_path
else:
storage_name = aid
storage_path = ""
labels = {
"btrix.user": userid,
"btrix.archive": aid,
"btrix.crawlconfig": cid,
}
await self.get_storage(storage_name)
# Create Config Map
config_map = self._create_config_map(crawlconfig, labels)
# Create Cron Job
await self.core_api.create_namespaced_config_map(
namespace=self.namespace, body=config_map
)
suspend, schedule = self._get_schedule_suspend_run_now(crawlconfig)
job_template = self._get_job_template(
cid,
storage_name,
storage_path,
labels,
annotations,
crawlconfig.crawlTimeout,
crawlconfig.parallel,
)
spec = client.V1beta1CronJobSpec(
schedule=schedule,
suspend=suspend,
concurrency_policy="Forbid",
successful_jobs_history_limit=2,
failed_jobs_history_limit=3,
job_template=job_template,
)
cron_job = client.V1beta1CronJob(
metadata={
"name": f"crawl-scheduled-{cid}",
"namespace": self.namespace,
"labels": labels,
},
spec=spec,
)
cron_job = await self.batch_beta_api.create_namespaced_cron_job(
namespace=self.namespace, body=cron_job
)
# Run Job Now
if run_now:
new_job = await self._create_run_now_job(cron_job)
return new_job.metadata.name
return ""
async def update_crawl_schedule(self, cid, schedule):
""" Update the schedule for existing crawl config """
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
if len(cron_jobs.items) != 1:
return
cron_job = cron_jobs.items[0]
real_schedule = schedule or DEFAULT_NO_SCHEDULE
if real_schedule != cron_job.spec.schedule:
cron_job.spec.schedule = real_schedule
cron_job.spec.suspend = not schedule
cron_job.spec.job_template.metadata.annotations[
"btrix.run.schedule"
] = schedule
await self.batch_beta_api.patch_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace, body=cron_job
)
async def run_crawl_config(self, cid):
""" Run crawl job for cron job based on specified crawlconfig id (cid) """
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
if len(cron_jobs.items) != 1:
raise Exception("Crawl Config Not Found")
res = await self._create_run_now_job(cron_jobs.items[0])
return res.metadata.name
async def list_running_crawls(self, cid=None, aid=None, userid=None):
""" Return a list of running crawls """
filters = []
if cid:
filters.append(f"btrix.crawlconfig={cid}")
if aid:
filters.append(f"btrix.archive={aid}")
if userid:
filters.append(f"btrix.user={userid}")
jobs = await self.batch_api.list_namespaced_job(
namespace=self.namespace,
label_selector=",".join(filters),
field_selector="status.successful=0",
)
return [
self._make_crawl_for_job(job, "running")
for job in jobs.items
if job.status.active
]
async def init_crawl_screencast(self, crawl_id, aid):
""" Init service for this job/crawl_id to support screencasting """
labels = {"btrix.archive": aid}
service = client.V1Service(
kind="Service",
api_version="v1",
metadata={
"name": crawl_id,
"labels": labels,
},
spec={
"selector": {"job-name": crawl_id},
"ports": [{"protocol": "TCP", "port": 9037, "name": "screencast"}],
},
)
try:
await self.core_api.create_namespaced_service(
body=service, namespace=self.namespace
)
except client.exceptions.ApiException as api_exc:
if api_exc.status != 409:
raise api_exc
async def process_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches)
Fill in additional details about the crawl"""
job = await self.batch_api.read_namespaced_job(
name=crawlcomplete.id, namespace=self.namespace
)
if not job: # or job.metadata.labels["btrix.user"] != crawlcomplete.user:
return None, None
manual = job.metadata.annotations.get("btrix.run.manual") == "1"
if manual and not self.no_delete_jobs:
self.loop.create_task(self._delete_job(job.metadata.name))
crawl = self._make_crawl_for_job(
job,
"complete" if crawlcomplete.completed else "partial_complete",
finish_now=True,
)
storage_path = job.metadata.annotations.get("btrix.def_storage_path")
inx = None
filename = None
storage_name = None
if storage_path:
inx = crawlcomplete.filename.index(storage_path)
filename = (
crawlcomplete.filename[inx:] if inx > 0 else crawlcomplete.filename
)
storage_name = job.metadata.annotations.get("btrix.storage_name")
def_storage_name = storage_name if inx else None
crawl_file = CrawlFile(
def_storage_name=def_storage_name,
filename=filename or crawlcomplete.filename,
size=crawlcomplete.size,
hash=crawlcomplete.hash,
)
return crawl, crawl_file
async def get_default_storage_access_endpoint(self, name):
""" Get access_endpoint for default storage """
if name not in self._default_storage_endpoints:
storage_secret = await self.get_storage(name, is_default=True)
self._default_storage_endpoints[name] = base64.standard_b64decode(
storage_secret.data["STORE_ACCESS_ENDPOINT_URL"]
).decode()
return self._default_storage_endpoints[name]
async def is_running(self, job_name, aid):
""" Return true if the specified crawl (by job_name) is running """
try:
job = await self.batch_api.read_namespaced_job(
name=job_name, namespace=self.namespace
)
if not job or job.metadata.labels["btrix.archive"] != aid:
return False
return True
# pylint: disable=broad-except
except Exception:
return False
async def stop_crawl(self, job_name, aid, graceful=True):
"""Attempt to stop crawl, either gracefully by issuing a SIGTERM which
will attempt to finish current pages
OR, abruptly by first issueing a SIGINT, followed by SIGTERM, which
will terminate immediately"""
job = await self.batch_api.read_namespaced_job(
name=job_name, namespace=self.namespace
)
if not job or job.metadata.labels["btrix.archive"] != aid:
return None
result = None
if not graceful:
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_name},btrix.archive={aid}",
)
await self._send_sig_to_pods(pods.items, aid)
result = self._make_crawl_for_job(job, "canceled", True)
else:
result = True
await self._delete_job(job_name)
return result
async def scale_crawl(self, job_name, aid, parallelism=1):
""" Set the crawl scale (job parallelism) on the specified job """
try:
job = await self.batch_api.read_namespaced_job(
name=job_name, namespace=self.namespace
)
# pylint: disable=broad-except
except Exception:
return "Crawl not found"
if not job or job.metadata.labels["btrix.archive"] != aid:
return "Invalid Crawled"
if parallelism < 1 or parallelism > 10:
return "Invalid Scale: Must be between 1 and 10"
job.spec.parallelism = parallelism
await self.batch_api.patch_namespaced_job(
name=job.metadata.name, namespace=self.namespace, body=job
)
return None
async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive"""
return await self._delete_crawl_configs(f"btrix.archive={archive}")
async def delete_crawl_config_by_id(self, cid):
"""Delete all crawl configs by id"""
return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
async def handle_crawl_failed(self, job_name, reason):
""" Handle failed crawl job, add to db and then delete """
try:
job = await self.batch_api.read_namespaced_job(
name=job_name, namespace=self.namespace
)
# pylint: disable=bare-except
except:
print("Job Failure Already Handled")
return
crawl = self._make_crawl_for_job(job, reason, True)
# if update succeeds, than crawl has not completed, so likely a failure
failure = await self.crawl_ops.store_crawl(crawl)
# keep failed jobs around, for now
if not failure and not self.no_delete_jobs:
await self._delete_job(job_name)
# ========================================================================
# Internal Methods
# pylint: disable=no-self-use
def _make_crawl_for_job(self, job, state, finish_now=False):
""" Make a crawl object from a job"""
return Crawl(
id=job.metadata.name,
state=state,
scale=job.spec.parallelism or 1,
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None)
if finish_now
else None,
colls=json.loads(job.metadata.annotations.get("btrix.colls", [])),
)
async def _delete_job(self, name):
await self.batch_api.delete_namespaced_job(
name=name,
namespace=self.namespace,
grace_period_seconds=60,
propagation_policy="Foreground",
)
try:
await self.core_api.delete_namespaced_service(
name=name,
namespace=self.namespace,
grace_period_seconds=60,
propagation_policy="Foreground",
)
# pylint: disable=bare-except
except:
pass
def _create_config_map(self, crawlconfig, labels):
""" Create Config Map based on CrawlConfig + labels """
config_map = client.V1ConfigMap(
metadata={
"name": f"crawl-config-{crawlconfig.id}",
"namespace": self.namespace,
"labels": labels,
},
data={"crawl-config.json": json.dumps(crawlconfig.config.dict())},
)
return config_map
# pylint: disable=no-self-use
def _get_schedule_suspend_run_now(self, crawlconfig):
""" get schedule/suspend/run_now data based on crawlconfig """
# Create Cron Job
suspend = False
schedule = crawlconfig.schedule
if not schedule:
schedule = DEFAULT_NO_SCHEDULE
suspend = True
return suspend, schedule
async def _send_sig_to_pods(self, pods, aid):
command = ["kill", "-s", "SIGUSR1", "1"]
interrupted = False
try:
for pod in pods:
if pod.metadata.labels["btrix.archive"] != aid:
continue
await self.core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name,
namespace=self.namespace,
command=command,
stdout=True,
)
interrupted = True
# pylint: disable=broad-except
except Exception as exc:
print(f"Exec Error: {exc}")
return interrupted
async def _delete_crawl_configs(self, label):
"""Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
await self.batch_beta_api.delete_collection_namespaced_cron_job(
namespace=self.namespace,
label_selector=label,
propagation_policy="Foreground",
)
await self.core_api.delete_collection_namespaced_config_map(
namespace=self.namespace,
label_selector=label,
propagation_policy="Foreground",
)
async def _create_run_now_job(self, cron_job):
"""Create new job from cron job to run instantly"""
annotations = cron_job.spec.job_template.metadata.annotations
annotations["btrix.run.manual"] = "1"
annotations["btrix.run.schedule"] = ""
# owner_ref = client.V1OwnerReference(
# kind="CronJob",
# name=cron_job.metadata.name,
# block_owner_deletion=True,
# controller=True,
# uid=cron_job.metadata.uid,
# api_version="batch/v1beta1",
# )
ts_now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
name = f"crawl-now-{ts_now}-{cron_job.metadata.labels['btrix.crawlconfig']}"
object_meta = client.V1ObjectMeta(
name=name,
annotations=annotations,
labels=cron_job.metadata.labels,
# owner_references=[owner_ref],
)
job = client.V1Job(
kind="Job",
api_version="batch/v1",
metadata=object_meta,
spec=cron_job.spec.job_template.spec,
)
return await self.batch_api.create_namespaced_job(
body=job, namespace=self.namespace
)
def _get_job_template(
self,
cid,
storage_name,
storage_path,
labels,
annotations,
crawl_timeout,
parallel,
):
"""Return crawl job template for crawl job, including labels, adding optiona crawl params"""
requests_memory = "256M"
limit_memory = "1G"
requests_cpu = "120m"
limit_cpu = "1000m"
resources = {
"limits": {
"cpu": limit_cpu,
"memory": limit_memory,
},
"requests": {
"cpu": requests_cpu,
"memory": requests_memory,
},
}
job_template = {
"metadata": {"annotations": annotations},
"spec": {
"backoffLimit": self.crawl_retries,
"parallelism": parallel,
"template": {
"metadata": {"labels": labels},
"spec": {
"containers": [
{
"name": "crawler",
"image": self.crawler_image,
"imagePullPolicy": self.crawler_image_pull_policy,
"command": [
"crawl",
"--config",
"/tmp/crawl-config.json",
],
"volumeMounts": [
{
"name": "crawl-config",
"mountPath": "/tmp/crawl-config.json",
"subPath": "crawl-config.json",
"readOnly": True,
}
],
"envFrom": [
{"configMapRef": {"name": "shared-crawler-config"}},
{"secretRef": {"name": f"storage-{storage_name}"}},
],
"env": [
{
"name": "CRAWL_ID",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.labels['job-name']"
}
},
},
{"name": "STORE_PATH", "value": storage_path},
{
"name": "STORE_FILENAME",
"value": "@ts-@hostname.wacz",
},
],
"resources": resources,
}
],
"volumes": [
{
"name": "crawl-config",
"configMap": {
"name": f"crawl-config-{cid}",
"items": [
{
"key": "crawl-config.json",
"path": "crawl-config.json",
}
],
},
}
],
"restartPolicy": "OnFailure",
},
},
},
}
if crawl_timeout > 0:
job_template["spec"]["activeDeadlineSeconds"] = crawl_timeout
return job_template