Fixes #2673 Changes in this PR: - Adds a new `file_uploads.py` module and corresponding `/files` API prefix with methods/endpoints for uploading, GETing, and deleting seed files (can be extended to other types of files moving forward) - Seed files are supported via `CrawlConfig.config.seedFileId` on POST and PATCH endpoints. This seedFileId is replaced by a presigned url when passed to the crawler by the operator - Seed files are read when first uploaded to calculate `firstSeed` and `seedCount` and store them in the database, and this is copied into the workflow and crawl documents when they are created. - Logic is added to store `firstSeed` and `seedCount` for other workflows as well, and a migration added to backfill data, to maintain consistency and fix some of the pymongo aggregations that previously assumed all workflows would have at least one `Seed` object in `CrawlConfig.seeds` - Seed file and thumbnail storage stats are added to org stats - Seed file and thumbnail uploads first check that the org's storage quota has not been exceeded and return a 400 if so - A cron background job (run weekly each Sunday at midnight by default, but configurable) is added to look for seed files at least x minutes old (1440 minutes, or 1 day, by default, but configurable) that are not in use in any workflows, and to delete them when they are found. The backend pods will ensure this k8s batch job exists when starting up and create it if it does not already exist. A database entry for each run of the job is created in the operator on job completion so that it'll appear in the `/jobs` API endpoints, but retrying of this type of regularly scheduled background job is not supported as we don't want to accidentally create multiple competing scheduled jobs. - Adds a `min_seed_file_crawler_image` value to the Helm chart that is checked before creating a crawl from a workflow if set. If a workflow cannot be run, return the detail of the exception in `CrawlConfigAddedResponse.errorDetail` so that we can display the reason in the frontend - Add SeedFile model from base UserFile (former ImageFIle), ensure all APIs returning uploaded files return an absolute pre-signed URL (either with external origin or internal service origin) --------- Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
381 lines
12 KiB
Python
381 lines
12 KiB
Python
"""K8S API Access"""
|
|
|
|
import os
|
|
import traceback
|
|
from typing import Optional
|
|
|
|
import yaml
|
|
|
|
from kubernetes_asyncio import client, config
|
|
from kubernetes_asyncio.stream import WsApiClient
|
|
from kubernetes_asyncio.client.api_client import ApiClient
|
|
from kubernetes_asyncio.client.api import custom_objects_api
|
|
from kubernetes_asyncio.utils import create_from_dict
|
|
from kubernetes_asyncio.client.exceptions import ApiException
|
|
|
|
from redis import asyncio as aioredis
|
|
|
|
from fastapi import HTTPException
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from .utils import get_templates_dir, dt_now
|
|
|
|
|
|
# ============================================================================
|
|
# pylint: disable=too-many-instance-attributes
|
|
class K8sAPI:
|
|
"""K8S API accessors"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.namespace = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
|
|
self.custom_resources = {}
|
|
|
|
self.templates = Jinja2Templates(
|
|
directory=get_templates_dir(), autoescape=False
|
|
)
|
|
|
|
config.load_incluster_config()
|
|
self.client = client
|
|
|
|
self.api_client = ApiClient()
|
|
|
|
self.core_api = client.CoreV1Api(self.api_client)
|
|
self.core_api_ws = client.CoreV1Api(api_client=WsApiClient())
|
|
self.batch_api = client.BatchV1Api(self.api_client)
|
|
self.apps_api = client.AppsV1Api(self.api_client)
|
|
|
|
# try separate api client to avoid content-type issues
|
|
self.custom_api = custom_objects_api.CustomObjectsApi(self.api_client)
|
|
|
|
# custom resource's client API
|
|
self.add_custom_resource("CrawlJob", "crawljobs")
|
|
self.add_custom_resource("ProfileJob", "profilejobs")
|
|
|
|
def add_custom_resource(self, name, plural):
|
|
"""add custom resource"""
|
|
self.custom_resources[name] = plural
|
|
|
|
def get_custom_api(self, kind):
|
|
"""return custom API"""
|
|
return self.custom_resources[kind] if kind in self.custom_resources else None
|
|
|
|
def get_redis_url(self, crawl_id):
|
|
"""get redis url for crawl id"""
|
|
redis_url = (
|
|
f"redis://redis-{crawl_id}.redis.{self.namespace}.svc.cluster.local/0"
|
|
)
|
|
return redis_url
|
|
|
|
async def get_redis_client(self, redis_url):
|
|
"""return redis client with correct params for one-time use"""
|
|
return aioredis.from_url(
|
|
redis_url,
|
|
decode_responses=True,
|
|
auto_close_connection_pool=True,
|
|
socket_timeout=20,
|
|
)
|
|
|
|
# pylint: disable=too-many-arguments, too-many-locals
|
|
def new_crawl_job_yaml(
|
|
self,
|
|
cid: str,
|
|
userid: str,
|
|
oid: str,
|
|
storage: str,
|
|
crawler_channel: Optional[str] = "",
|
|
scale: Optional[int] = 1,
|
|
browser_windows: Optional[int] = 1,
|
|
crawl_timeout: Optional[int] = 0,
|
|
max_crawl_size: Optional[int] = 0,
|
|
manual: bool = True,
|
|
crawl_id: Optional[str] = None,
|
|
warc_prefix: Optional[str] = "",
|
|
storage_filename: str = "",
|
|
profile_filename: str = "",
|
|
qa_source: str = "",
|
|
proxy_id: str = "",
|
|
is_single_page: bool = False,
|
|
seed_file_url: str = "",
|
|
):
|
|
"""load job template from yaml"""
|
|
if not crawl_id:
|
|
ts_now = dt_now().strftime("%Y%m%d%H%M%S")
|
|
prefix = "manual" if manual else "sched"
|
|
crawl_id = f"{prefix}-{ts_now}-{cid[:12]}"
|
|
|
|
params = {
|
|
"id": crawl_id,
|
|
"cid": cid,
|
|
"userid": userid,
|
|
"oid": oid,
|
|
"storage_name": storage,
|
|
"crawler_channel": crawler_channel,
|
|
"scale": scale,
|
|
"browser_windows": browser_windows,
|
|
"timeout": crawl_timeout,
|
|
"max_crawl_size": max_crawl_size or 0,
|
|
"manual": "1" if manual else "0",
|
|
"warc_prefix": warc_prefix,
|
|
"storage_filename": storage_filename,
|
|
"profile_filename": profile_filename,
|
|
"qa_source": qa_source,
|
|
"proxy_id": proxy_id,
|
|
"is_single_page": "1" if is_single_page else "0",
|
|
"seed_file_url": seed_file_url,
|
|
}
|
|
|
|
data = self.templates.env.get_template("crawl_job.yaml").render(params)
|
|
return crawl_id, data
|
|
|
|
async def new_crawl_job(
|
|
self,
|
|
cid: str,
|
|
userid: str,
|
|
oid: str,
|
|
storage: str,
|
|
crawler_channel: Optional[str] = "",
|
|
scale: Optional[int] = 1,
|
|
browser_windows: Optional[int] = 1,
|
|
crawl_timeout: Optional[int] = 0,
|
|
max_crawl_size: Optional[int] = 0,
|
|
manual: bool = True,
|
|
crawl_id: Optional[str] = None,
|
|
warc_prefix: Optional[str] = "",
|
|
storage_filename: str = "",
|
|
profile_filename: str = "",
|
|
qa_source: str = "",
|
|
proxy_id: str = "",
|
|
is_single_page: bool = False,
|
|
seed_file_url: str = "",
|
|
) -> str:
|
|
"""load and init crawl job via k8s api"""
|
|
crawl_id, data = self.new_crawl_job_yaml(
|
|
cid=cid,
|
|
userid=userid,
|
|
oid=oid,
|
|
storage=storage,
|
|
crawler_channel=crawler_channel,
|
|
scale=scale,
|
|
browser_windows=browser_windows,
|
|
crawl_timeout=crawl_timeout,
|
|
max_crawl_size=max_crawl_size,
|
|
manual=manual,
|
|
crawl_id=crawl_id,
|
|
warc_prefix=warc_prefix or "",
|
|
storage_filename=storage_filename,
|
|
profile_filename=profile_filename,
|
|
qa_source=qa_source,
|
|
proxy_id=proxy_id,
|
|
is_single_page=is_single_page,
|
|
seed_file_url=seed_file_url,
|
|
)
|
|
|
|
# create job directly
|
|
await self.create_from_yaml(data)
|
|
|
|
return crawl_id or ""
|
|
|
|
async def create_from_yaml(self, doc, namespace=None):
|
|
"""init k8s objects from yaml"""
|
|
yml_document_all = yaml.safe_load_all(doc)
|
|
k8s_objects = []
|
|
for yml_document in yml_document_all:
|
|
custom = self.custom_resources.get(yml_document["kind"])
|
|
if custom is not None:
|
|
created = await self.create_custom_from_dict(
|
|
custom, yml_document, namespace
|
|
)
|
|
else:
|
|
created = await create_from_dict(
|
|
self.api_client,
|
|
yml_document,
|
|
verbose=False,
|
|
namespace=namespace or self.namespace,
|
|
)
|
|
k8s_objects.append(created)
|
|
|
|
return k8s_objects
|
|
|
|
async def create_custom_from_dict(self, custom, doc, namespace):
|
|
"""create custom from dict"""
|
|
apiver = doc["apiVersion"].split("/")
|
|
created = await self.custom_api.create_namespaced_custom_object(
|
|
group=apiver[0],
|
|
version=apiver[1],
|
|
plural=custom,
|
|
body=doc,
|
|
namespace=namespace or self.namespace,
|
|
)
|
|
return created
|
|
|
|
async def has_storage_secret(self, storage_secret) -> bool:
|
|
"""Check if storage is valid by trying to get the storage secret
|
|
Will throw if not valid, otherwise return True"""
|
|
try:
|
|
await self.core_api.read_namespaced_secret(
|
|
storage_secret,
|
|
namespace=self.namespace,
|
|
)
|
|
return True
|
|
|
|
# pylint: disable=broad-except
|
|
except Exception:
|
|
# pylint: disable=broad-exception-raised,raise-missing-from
|
|
raise HTTPException(
|
|
status_code=400, detail="invalid_config_missing_storage_secret"
|
|
)
|
|
|
|
async def delete_crawl_job(self, crawl_id):
|
|
"""delete custom crawljob object"""
|
|
try:
|
|
name = f"crawljob-{crawl_id}"
|
|
|
|
await self.custom_api.delete_namespaced_custom_object(
|
|
group="btrix.cloud",
|
|
version="v1",
|
|
namespace=self.namespace,
|
|
plural="crawljobs",
|
|
name=name,
|
|
grace_period_seconds=0,
|
|
# delete as background to allow operator to do proper cleanup
|
|
propagation_policy="Background",
|
|
)
|
|
return {"success": True}
|
|
|
|
except ApiException as api_exc:
|
|
return {"error": str(api_exc.reason)}
|
|
|
|
async def delete_profile_browser(self, browserid):
|
|
"""delete custom crawljob object"""
|
|
try:
|
|
await self.custom_api.delete_namespaced_custom_object(
|
|
group="btrix.cloud",
|
|
version="v1",
|
|
namespace=self.namespace,
|
|
plural="profilejobs",
|
|
name=f"profilejob-{browserid}",
|
|
grace_period_seconds=0,
|
|
propagation_policy="Background",
|
|
)
|
|
return True
|
|
|
|
except ApiException:
|
|
return False
|
|
|
|
async def get_profile_browser(self, browserid):
|
|
"""get profile browser"""
|
|
return await self.custom_api.get_namespaced_custom_object(
|
|
group="btrix.cloud",
|
|
version="v1",
|
|
namespace=self.namespace,
|
|
plural="profilejobs",
|
|
name=f"profilejob-{browserid}",
|
|
)
|
|
|
|
async def _patch_job(self, crawl_id, body, pluraltype="crawljobs") -> dict:
|
|
try:
|
|
name = f"{pluraltype[:-1]}-{crawl_id}"
|
|
|
|
await self.custom_api.patch_namespaced_custom_object(
|
|
group="btrix.cloud",
|
|
version="v1",
|
|
namespace=self.namespace,
|
|
plural=pluraltype,
|
|
name=name,
|
|
body={"spec": body},
|
|
_content_type="application/merge-patch+json",
|
|
)
|
|
return {"success": True}
|
|
# pylint: disable=broad-except
|
|
except Exception as exc:
|
|
traceback.print_exc()
|
|
return {"error": str(exc)}
|
|
|
|
async def unsuspend_k8s_job(self, name) -> dict:
|
|
"""unsuspend k8s Job"""
|
|
try:
|
|
await self.batch_api.patch_namespaced_job(
|
|
name=name, namespace=self.namespace, body={"spec": {"suspend": False}}
|
|
)
|
|
return {"success": True}
|
|
# pylint: disable=broad-except
|
|
except Exception as exc:
|
|
traceback.print_exc()
|
|
return {"error": str(exc)}
|
|
|
|
async def print_pod_logs(self, pod_names, lines=100):
|
|
"""print pod logs"""
|
|
for pod in pod_names:
|
|
print(f"============== LOGS FOR POD: {pod} ==============")
|
|
try:
|
|
resp = await self.core_api.read_namespaced_pod_log(
|
|
pod, self.namespace, tail_lines=lines
|
|
)
|
|
print(resp)
|
|
# pylint: disable=bare-except
|
|
except:
|
|
print("Logs Not Found")
|
|
|
|
async def is_pod_metrics_available(self) -> bool:
|
|
"""return true/false if metrics server api is available by
|
|
attempting list operation. if operation succeeds, then
|
|
metrics are available, otherwise not available
|
|
"""
|
|
try:
|
|
await self.custom_api.list_namespaced_custom_object(
|
|
group="metrics.k8s.io",
|
|
version="v1beta1",
|
|
namespace=self.namespace,
|
|
plural="pods",
|
|
limit=1,
|
|
)
|
|
return True
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
print(exc)
|
|
return False
|
|
|
|
async def has_custom_jobs_with_label(self, plural, label) -> bool:
|
|
"""return true/false if any crawljobs or profilejobs
|
|
match given label"""
|
|
try:
|
|
await self.custom_api.list_namespaced_custom_object(
|
|
group="btrix.cloud",
|
|
version="v1",
|
|
namespace=self.namespace,
|
|
plural=plural,
|
|
label_selector=label,
|
|
limit=1,
|
|
)
|
|
return True
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception:
|
|
return False
|
|
|
|
async def send_signal_to_pod(self, pod_name, signame) -> bool:
|
|
"""send signal to all pods"""
|
|
command = ["bash", "-c", f"kill -s {signame} 1"]
|
|
signaled = False
|
|
|
|
try:
|
|
print(f"Sending {signame} to {pod_name}", flush=True)
|
|
|
|
res = await self.core_api_ws.connect_get_namespaced_pod_exec(
|
|
name=pod_name,
|
|
namespace=self.namespace,
|
|
command=command,
|
|
stdout=True,
|
|
)
|
|
if res:
|
|
print("Result", res, flush=True)
|
|
|
|
else:
|
|
signaled = True
|
|
|
|
# pylint: disable=broad-except
|
|
except Exception as exc:
|
|
print(f"Send Signal Error: {exc}", flush=True)
|
|
|
|
return signaled
|