browsertrix/backend/btrixcloud/k8s/k8sman.py
Ilya Kreymer 544346d1d4
backend: make crawlconfigs mutable! (#656) (#662)
* backend: make crawlconfigs mutable! (#656)
- crawlconfig PATCH /{id} can now receive a new JSON config to replace the old one (in addition to scale, schedule, tags)
- exclusions: add / remove APIs mutate the current crawlconfig, do not result in a new crawlconfig created
- exclusions: ensure crawl job 'config' is updated when exclusions are added/removed, unify add/remove exclusions on crawl
- k8s: crawlconfig json is updated along with scale
- k8s: stateful set is restarted by updating annotation, instead of changing template
- crawl object: now has 'config', as well as 'profileid', 'schedule', 'crawlTimeout', 'jobType' properties to ensure anything that is changeable is stored on the crawl
- crawlconfigcore: store share properties between crawl and crawlconfig in new crawlconfigcore (includes 'schedule', 'jobType', 'config', 'profileid', 'schedule', 'crawlTimeout', 'tags', 'oid')
- crawlconfig object: remove 'oldId', 'newId', disallow deactivating/deleting while crawl is running
- rename 'userid' -> 'createdBy'
- remove unused 'completions' field
- add missing return to fix /run response
- crawlout: ensure 'profileName' is resolved on CrawlOut from profileid
- crawlout: return 'name' instead of 'configName' for consistent response
- update: 'modified', 'modifiedBy' fields to set modification date and user modifying config
- update: ensure PROFILE_FILENAME is updated in configmap is profileid provided, clear if profileid==""
- update: return 'settings_changed' and 'metadata_changed' if either crawl settings or metadata changed
- tests: update tests to check settings_changed/metadata_changed return values

add revision tracking to crawlconfig:
- store each revision separate mongo db collection
- revisions accessible via /crawlconfigs/{cid}/revs
- store 'rev' int in crawlconfig and in crawljob
- only add revision history if crawl config changed

migration:
- update to db v3
- copy fields from crawlconfig -> crawl
- rename userid -> createdBy
- copy userid -> modifiedBy, created -> modified
- skip invalid crawls (missing config), make createdBy optional (just in case)

frontend: Update crawl config keys with new API (#681), update frontend to use new PATCH endpoint, load config from crawl object in details view

---------

Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
Co-authored-by: sua yoo <sua@webrecorder.org>
Co-authored-by: sua yoo <sua@suayoo.com>
2023-03-07 20:36:50 -08:00

341 lines
11 KiB
Python

""" K8s support"""
import os
import json
import base64
import yaml
import aiohttp
from ..orgs import S3Storage
from ..crawlmanager import BaseCrawlManager
from .k8sapi import K8sAPI
from .utils import create_from_yaml, send_signal_to_pods, get_templates_dir
# ============================================================================
class K8SManager(BaseCrawlManager, K8sAPI):
# pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments
"""K8SManager, manager creation of k8s resources from crawl api requests"""
client = None
def __init__(self):
super().__init__(get_templates_dir())
self.namespace = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
self._default_storages = {}
# pylint: disable=unused-argument
async def check_storage(self, storage_name, is_default=False):
"""Check if storage is valid by trying to get the storage secret
Will throw if not valid, otherwise return True"""
await self._get_storage_secret(storage_name)
return True
async def update_org_storage(self, oid, userid, storage):
"""Update storage by either creating a per-org secret, if using custom storage
or deleting per-org secret, if using default storage"""
org_storage_name = f"storage-{oid}"
if storage.type == "default":
try:
await self.core_api.delete_namespaced_secret(
org_storage_name,
namespace=self.namespace,
propagation_policy="Foreground",
)
# pylint: disable=bare-except
except:
pass
return
labels = {"btrix.org": oid, "btrix.user": userid}
crawl_secret = self.client.V1Secret(
metadata={
"name": org_storage_name,
"namespace": self.namespace,
"labels": labels,
},
string_data={
"STORE_ENDPOINT_URL": storage.endpoint_url,
"STORE_ACCESS_KEY": storage.access_key,
"STORE_SECRET_KEY": storage.secret_key,
},
)
try:
await self.core_api.create_namespaced_secret(
namespace=self.namespace, body=crawl_secret
)
# pylint: disable=bare-except
except:
await self.core_api.patch_namespaced_secret(
name=org_storage_name, namespace=self.namespace, body=crawl_secret
)
async def get_default_storage_access_endpoint(self, name):
"""Get access_endpoint for default storage"""
return (await self.get_default_storage(name)).access_endpoint_url
async def get_default_storage(self, name):
"""get default storage"""
if name not in self._default_storages:
storage_secret = await self._get_storage_secret(name)
access_endpoint_url = self._secret_data(
storage_secret, "STORE_ACCESS_ENDPOINT_URL"
)
endpoint_url = self._secret_data(storage_secret, "STORE_ENDPOINT_URL")
access_key = self._secret_data(storage_secret, "STORE_ACCESS_KEY")
secret_key = self._secret_data(storage_secret, "STORE_SECRET_KEY")
region = self._secret_data(storage_secret, "STORE_REGION") or ""
use_access_for_presign = (
self._secret_data(storage_secret, "STORE_USE_ACCESS_FOR_PRESIGN") == "1"
)
self._default_storages[name] = S3Storage(
access_key=access_key,
secret_key=secret_key,
endpoint_url=endpoint_url,
access_endpoint_url=access_endpoint_url,
region=region,
use_access_for_presign=use_access_for_presign,
)
return self._default_storages[name]
async def ping_profile_browser(self, browserid):
"""return ping profile browser"""
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name=job-{browserid},btrix.profile=1",
)
if len(pods.items) == 0:
return False
await send_signal_to_pods(
self.core_api_ws, self.namespace, pods.items, "SIGUSR1"
)
return True
async def get_profile_browser_metadata(self, browserid):
"""get browser profile labels"""
try:
job = await self.batch_api.read_namespaced_job(
name=f"job-{browserid}", namespace=self.namespace
)
if not job.metadata.labels.get("btrix.profile"):
return {}
# pylint: disable=bare-except
except:
return {}
return job.metadata.labels
async def delete_profile_browser(self, browserid):
"""delete browser job, if it is a profile browser job"""
return await self._delete_job(f"job-{browserid}")
# ========================================================================
# Internal Methods
async def _create_from_yaml(self, _, yaml_data):
"""create from yaml"""
await create_from_yaml(self.api_client, yaml_data, namespace=self.namespace)
def _secret_data(self, secret, name):
"""decode secret data"""
return base64.standard_b64decode(secret.data[name]).decode()
async def _delete_job(self, name):
"""delete job"""
try:
await self.batch_api.delete_namespaced_job(
name=name,
namespace=self.namespace,
grace_period_seconds=60,
propagation_policy="Foreground",
)
return True
# pylint: disable=bare-except
except:
return False
async def _create_config_map(self, crawlconfig, **kwargs):
"""Create Config Map based on CrawlConfig"""
data = kwargs
data["crawl-config.json"] = json.dumps(crawlconfig.get_raw_config())
data["INITIAL_SCALE"] = str(crawlconfig.scale)
labels = {
"btrix.crawlconfig": str(crawlconfig.id),
"btrix.org": str(crawlconfig.oid),
}
config_map = self.client.V1ConfigMap(
metadata={
"name": f"crawl-config-{crawlconfig.id}",
"namespace": self.namespace,
"labels": labels,
},
data=data,
)
return await self.core_api.create_namespaced_config_map(
namespace=self.namespace, body=config_map
)
# pylint: disable=unused-argument
async def _get_storage_secret(self, storage_name):
"""Check if storage_name is valid by checking existing secret"""
try:
return await self.core_api.read_namespaced_secret(
f"storage-{storage_name}",
namespace=self.namespace,
)
# pylint: disable=broad-except
except Exception:
# pylint: disable=broad-exception-raised,raise-missing-from
raise Exception(f"Storage {storage_name} not found")
return None
async def _delete_crawl_configs(self, label):
"""Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
await self.batch_api.delete_collection_namespaced_cron_job(
namespace=self.namespace,
label_selector=label,
propagation_policy="Foreground",
)
await self.core_api.delete_collection_namespaced_config_map(
namespace=self.namespace,
label_selector=label,
propagation_policy="Foreground",
)
async def _post_to_job(self, crawl_id, oid, path, data=None):
"""post to default container in a pod for job
try all pods in case of many
"""
job_name = f"job-{crawl_id}"
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_name},btrix.org={oid}",
)
if not pods.items:
return {"error": "job_not_running"}
for pod in pods.items:
async with aiohttp.ClientSession() as session:
async with session.request(
"POST", f"http://{pod.status.pod_ip}:8000{path}", json=data
) as resp:
# try all in case of multiple pods, return value of first running pod
try:
return await resp.json()
# pylint: disable=bare-except
except:
# try next pod
pass
return {"error": "post_failed"}
async def _update_scheduled_job(self, crawlconfig):
"""create or remove cron job based on crawlconfig schedule"""
cid = str(crawlconfig.id)
cron_job_id = f"sched-{cid[:12]}"
cron_job = None
try:
cron_job = await self.batch_api.read_namespaced_cron_job(
name=f"job-{cron_job_id}",
namespace=self.namespace,
)
# pylint: disable=bare-except
except:
pass
if cron_job:
if crawlconfig.schedule and crawlconfig.schedule != cron_job.spec.schedule:
cron_job.spec.schedule = crawlconfig.schedule
await self.batch_api.patch_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace, body=cron_job
)
if not crawlconfig.schedule:
await self.batch_api.delete_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace
)
return
if not crawlconfig.schedule:
return
# create new cronjob
data = await self._load_job_template(crawlconfig, cron_job_id, manual=False)
job_yaml = yaml.safe_load(data)
job_template = self.api_client.deserialize(
FakeKubeResponse(job_yaml), "V1JobTemplateSpec"
)
metadata = job_yaml["metadata"]
spec = self.client.V1CronJobSpec(
schedule=crawlconfig.schedule,
suspend=False,
concurrency_policy="Forbid",
successful_jobs_history_limit=2,
failed_jobs_history_limit=3,
job_template=job_template,
)
cron_job = self.client.V1CronJob(metadata=metadata, spec=spec)
await self.batch_api.create_namespaced_cron_job(
namespace=self.namespace, body=cron_job
)
async def _update_config_map(
self, crawlconfig, scale=None, profile_filename=None, update_config=False
):
config_map = await self.core_api.read_namespaced_config_map(
name=f"crawl-config-{crawlconfig.id}", namespace=self.namespace
)
if scale is not None:
config_map.data["INITIAL_SCALE"] = str(scale)
if profile_filename is not None:
config_map.data["PROFILE_FILENAME"] = profile_filename
if update_config:
config_map.data["crawl-config.json"] = json.dumps(
crawlconfig.get_raw_config()
)
await self.core_api.patch_namespaced_config_map(
name=config_map.metadata.name, namespace=self.namespace, body=config_map
)
# ============================================================================
# pylint: disable=too-few-public-methods
class FakeKubeResponse:
"""wrap k8s response for decoding"""
def __init__(self, obj):
self.data = json.dumps(obj)