Fixes #1297 Ensures proper typing for UUIDs in FastAPI input models, to avoid explicit conversions, which may throw errors. This avoids possible 500 errors (due to ValueError exceptions) when converting UUIDs from user input. Instead, will get more 422 errors from FastAPI. UUID conversions remaining are in operator / profile handling where UUIDs are retrieved from previously set fields, remaining user input conversions in user auth and collection list are wrapped in exceptions. For `profileid`, update fastapi models to support union of UUID, null, and EmptyStr (new empty string only type), to differentiate removing profile (empty string) vs not changing at all (null) for config updates
380 lines
12 KiB
Python
380 lines
12 KiB
Python
""" shared crawl manager implementation """
|
|
|
|
import os
|
|
import asyncio
|
|
import secrets
|
|
import json
|
|
|
|
from datetime import timedelta
|
|
|
|
from .k8sapi import K8sAPI
|
|
from .utils import dt_now, to_k8s_date
|
|
|
|
|
|
# ============================================================================
|
|
class CrawlManager(K8sAPI):
|
|
"""abstract crawl manager"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self.loop = asyncio.get_running_loop()
|
|
|
|
# pylint: disable=too-many-arguments
|
|
async def run_profile_browser(
|
|
self,
|
|
userid,
|
|
oid,
|
|
url,
|
|
storage=None,
|
|
storage_name=None,
|
|
baseprofile=None,
|
|
profile_path=None,
|
|
):
|
|
"""run browser for profile creation"""
|
|
|
|
# if default storage, use name and path + profiles/
|
|
if storage:
|
|
storage_name = storage.name
|
|
storage_path = storage.path + "profiles/"
|
|
# otherwise, use storage name and existing path from secret
|
|
else:
|
|
storage_path = ""
|
|
|
|
await self.has_storage(storage_name)
|
|
|
|
browserid = f"prf-{secrets.token_hex(5)}"
|
|
|
|
params = {
|
|
"id": browserid,
|
|
"userid": str(userid),
|
|
"oid": str(oid),
|
|
"storage_name": storage_name,
|
|
"storage_path": storage_path or "",
|
|
"base_profile": baseprofile or "",
|
|
"profile_filename": profile_path,
|
|
"idle_timeout": os.environ.get("IDLE_TIMEOUT", "60"),
|
|
"url": url,
|
|
"vnc_password": secrets.token_hex(16),
|
|
"expire_time": to_k8s_date(dt_now() + timedelta(seconds=30)),
|
|
}
|
|
|
|
data = self.templates.env.get_template("profile_job.yaml").render(params)
|
|
|
|
await self.create_from_yaml(data)
|
|
|
|
return browserid
|
|
|
|
async def add_crawl_config(
|
|
self,
|
|
crawlconfig,
|
|
storage,
|
|
run_now,
|
|
out_filename,
|
|
profile_filename,
|
|
):
|
|
"""add new crawl, store crawl config in configmap"""
|
|
|
|
if storage.type == "default":
|
|
storage_name = storage.name
|
|
storage_path = storage.path
|
|
else:
|
|
storage_name = str(crawlconfig.oid)
|
|
storage_path = ""
|
|
|
|
await self.has_storage(storage_name)
|
|
|
|
# Create Config Map
|
|
await self._create_config_map(
|
|
crawlconfig,
|
|
USER_ID=str(crawlconfig.modifiedBy),
|
|
ORG_ID=str(crawlconfig.oid),
|
|
CRAWL_CONFIG_ID=str(crawlconfig.id),
|
|
STORE_PATH=storage_path,
|
|
STORE_FILENAME=out_filename,
|
|
STORAGE_NAME=storage_name,
|
|
PROFILE_FILENAME=profile_filename,
|
|
INITIAL_SCALE=str(crawlconfig.scale),
|
|
CRAWL_TIMEOUT=str(crawlconfig.crawlTimeout or 0),
|
|
MAX_CRAWL_SIZE=str(crawlconfig.maxCrawlSize or 0)
|
|
# REV=str(crawlconfig.rev),
|
|
)
|
|
|
|
crawl_id = None
|
|
|
|
if run_now:
|
|
crawl_id = await self.create_crawl_job(
|
|
crawlconfig, str(crawlconfig.modifiedBy)
|
|
)
|
|
|
|
await self._update_scheduled_job(crawlconfig, crawlconfig.schedule)
|
|
|
|
return crawl_id
|
|
|
|
async def create_crawl_job(self, crawlconfig, userid: str):
|
|
"""create new crawl job from config"""
|
|
cid = str(crawlconfig.id)
|
|
|
|
return await self.new_crawl_job(
|
|
cid,
|
|
userid,
|
|
crawlconfig.oid,
|
|
crawlconfig.scale,
|
|
crawlconfig.crawlTimeout,
|
|
crawlconfig.maxCrawlSize,
|
|
manual=True,
|
|
)
|
|
|
|
async def update_crawl_config(self, crawlconfig, update, profile_filename=None):
|
|
"""Update the schedule or scale for existing crawl config"""
|
|
|
|
has_sched_update = update.schedule is not None
|
|
has_scale_update = update.scale is not None
|
|
has_timeout_update = update.crawlTimeout is not None
|
|
has_max_crawl_size_update = update.maxCrawlSize is not None
|
|
has_config_update = update.config is not None
|
|
|
|
if has_sched_update:
|
|
await self._update_scheduled_job(crawlconfig, update.schedule)
|
|
|
|
if (
|
|
has_scale_update
|
|
or has_config_update
|
|
or has_timeout_update
|
|
or profile_filename is not None
|
|
or has_max_crawl_size_update
|
|
):
|
|
await self._update_config_map(
|
|
crawlconfig,
|
|
update,
|
|
profile_filename,
|
|
has_config_update,
|
|
)
|
|
|
|
return True
|
|
|
|
async def has_storage(self, storage_name):
|
|
"""Check if storage is valid by trying to get the storage secret
|
|
Will throw if not valid, otherwise return True"""
|
|
try:
|
|
await self.core_api.read_namespaced_secret(
|
|
f"storage-{storage_name}",
|
|
namespace=self.namespace,
|
|
)
|
|
return True
|
|
|
|
# pylint: disable=broad-except
|
|
except Exception:
|
|
# pylint: disable=broad-exception-raised,raise-missing-from
|
|
raise Exception(f"Storage {storage_name} not found")
|
|
|
|
async def update_org_storage(self, oid, userid, storage):
|
|
"""Update storage by either creating a per-org secret, if using custom storage
|
|
or deleting per-org secret, if using default storage"""
|
|
org_storage_name = f"storage-{oid}"
|
|
if storage.type == "default":
|
|
try:
|
|
await self.core_api.delete_namespaced_secret(
|
|
org_storage_name,
|
|
namespace=self.namespace,
|
|
propagation_policy="Foreground",
|
|
)
|
|
# pylint: disable=bare-except
|
|
except:
|
|
pass
|
|
|
|
return
|
|
|
|
labels = {"btrix.org": oid, "btrix.user": userid}
|
|
|
|
crawl_secret = self.client.V1Secret(
|
|
metadata={
|
|
"name": org_storage_name,
|
|
"namespace": self.namespace,
|
|
"labels": labels,
|
|
},
|
|
string_data={
|
|
"STORE_ENDPOINT_URL": storage.endpoint_url,
|
|
"STORE_ACCESS_KEY": storage.access_key,
|
|
"STORE_SECRET_KEY": storage.secret_key,
|
|
},
|
|
)
|
|
|
|
try:
|
|
await self.core_api.create_namespaced_secret(
|
|
namespace=self.namespace, body=crawl_secret
|
|
)
|
|
|
|
# pylint: disable=bare-except
|
|
except:
|
|
await self.core_api.patch_namespaced_secret(
|
|
name=org_storage_name, namespace=self.namespace, body=crawl_secret
|
|
)
|
|
|
|
async def get_profile_browser_metadata(self, browserid):
|
|
"""get browser profile labels"""
|
|
try:
|
|
browser = await self.get_profile_browser(browserid)
|
|
|
|
# pylint: disable=bare-except
|
|
except:
|
|
return {}
|
|
|
|
return browser["metadata"]["labels"]
|
|
|
|
async def get_configmap(self, cid):
|
|
"""get configmap by id"""
|
|
return await self.core_api.read_namespaced_config_map(
|
|
name=f"crawl-config-{cid}", namespace=self.namespace
|
|
)
|
|
|
|
async def ping_profile_browser(self, browserid):
|
|
"""return ping profile browser"""
|
|
expire_at = dt_now() + timedelta(seconds=30)
|
|
await self._patch_job(
|
|
browserid, {"expireTime": to_k8s_date(expire_at)}, "profilejobs"
|
|
)
|
|
|
|
async def rollover_restart_crawl(self, crawl_id):
|
|
"""Rolling restart of crawl by updating restartTime field"""
|
|
update = to_k8s_date(dt_now())
|
|
return await self._patch_job(crawl_id, {"restartTime": update})
|
|
|
|
async def scale_crawl(self, crawl_id, scale=1):
|
|
"""Set the crawl scale (job parallelism) on the specified job"""
|
|
return await self._patch_job(crawl_id, {"scale": scale})
|
|
|
|
async def shutdown_crawl(self, crawl_id, graceful=True):
|
|
"""Request a crawl cancelation or stop by calling an API
|
|
on the job pod/container, returning the result"""
|
|
if graceful:
|
|
patch = {"stopping": True}
|
|
return await self._patch_job(crawl_id, patch)
|
|
|
|
return await self.delete_crawl_job(crawl_id)
|
|
|
|
async def delete_crawl_configs_for_org(self, org):
|
|
"""Delete all crawl configs for given org"""
|
|
return await self._delete_crawl_configs(f"btrix.org={org}")
|
|
|
|
async def delete_crawl_config_by_id(self, cid):
|
|
"""Delete all crawl configs by id"""
|
|
return await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
|
|
|
|
# ========================================================================
|
|
# Internal Methods
|
|
async def _create_config_map(self, crawlconfig, **data):
|
|
"""Create Config Map based on CrawlConfig"""
|
|
data["crawl-config.json"] = json.dumps(crawlconfig.get_raw_config())
|
|
|
|
labels = {
|
|
"btrix.crawlconfig": str(crawlconfig.id),
|
|
"btrix.org": str(crawlconfig.oid),
|
|
}
|
|
|
|
config_map = self.client.V1ConfigMap(
|
|
metadata={
|
|
"name": f"crawl-config-{crawlconfig.id}",
|
|
"namespace": self.namespace,
|
|
"labels": labels,
|
|
},
|
|
data=data,
|
|
)
|
|
|
|
return await self.core_api.create_namespaced_config_map(
|
|
namespace=self.namespace, body=config_map
|
|
)
|
|
|
|
async def _delete_crawl_configs(self, label):
|
|
"""Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
|
|
|
|
await self.batch_api.delete_collection_namespaced_cron_job(
|
|
namespace=self.namespace,
|
|
label_selector=label,
|
|
)
|
|
|
|
await self.core_api.delete_collection_namespaced_config_map(
|
|
namespace=self.namespace,
|
|
label_selector=label,
|
|
)
|
|
|
|
async def _update_scheduled_job(self, crawlconfig, schedule):
|
|
"""create or remove cron job based on crawlconfig schedule"""
|
|
cid = str(crawlconfig.id)
|
|
|
|
cron_job_id = f"sched-{cid[:12]}"
|
|
cron_job = None
|
|
try:
|
|
cron_job = await self.batch_api.read_namespaced_cron_job(
|
|
name=cron_job_id,
|
|
namespace=self.namespace,
|
|
)
|
|
# pylint: disable=bare-except
|
|
except:
|
|
pass
|
|
|
|
# if no schedule, delete cron_job if exists and we're done
|
|
if not crawlconfig.schedule:
|
|
if cron_job:
|
|
await self.batch_api.delete_namespaced_cron_job(
|
|
name=cron_job.metadata.name, namespace=self.namespace
|
|
)
|
|
return
|
|
|
|
# if cron job exists, just patch schedule
|
|
if cron_job:
|
|
if crawlconfig.schedule != cron_job.spec.schedule:
|
|
cron_job.spec.schedule = crawlconfig.schedule
|
|
|
|
await self.batch_api.patch_namespaced_cron_job(
|
|
name=cron_job.metadata.name,
|
|
namespace=self.namespace,
|
|
body=cron_job,
|
|
)
|
|
return
|
|
|
|
params = {
|
|
"id": cron_job_id,
|
|
"cid": str(crawlconfig.id),
|
|
"schedule": schedule,
|
|
}
|
|
|
|
data = self.templates.env.get_template("crawl_cron_job.yaml").render(params)
|
|
|
|
await self.create_from_yaml(data, self.namespace)
|
|
|
|
return cron_job_id
|
|
|
|
async def _update_config_map(
|
|
self,
|
|
crawlconfig,
|
|
update,
|
|
profile_filename=None,
|
|
update_config=False,
|
|
):
|
|
config_map = await self.get_configmap(crawlconfig.id)
|
|
|
|
if update.scale is not None:
|
|
config_map.data["INITIAL_SCALE"] = str(update.scale)
|
|
|
|
if update.crawlTimeout is not None:
|
|
config_map.data["CRAWL_TIMEOUT"] = str(update.crawlTimeout)
|
|
|
|
if update.maxCrawlSize is not None:
|
|
config_map.data["MAX_CRAWL_SIZE"] = str(update.maxCrawlSize)
|
|
|
|
if update.crawlFilenameTemplate is not None:
|
|
config_map.data["STORE_FILENAME"] = update.crawlFilenameTemplate
|
|
|
|
if profile_filename is not None:
|
|
config_map.data["PROFILE_FILENAME"] = profile_filename
|
|
|
|
if update_config:
|
|
config_map.data["crawl-config.json"] = json.dumps(
|
|
crawlconfig.get_raw_config()
|
|
)
|
|
|
|
await self.core_api.patch_namespaced_config_map(
|
|
name=config_map.metadata.name, namespace=self.namespace, body=config_map
|
|
)
|