exclusions are already updated dynamically if crawler pod is running, but when crawler pod is restarted, this ensures new exclusions are also picked up: - mount configmap in separate path, avoiding subPath, to allow dynamic updates of mounted volume - adds a lastConfigUpdate timestamp to CrawlJob - if lastConfigUpdate in spec is different from current, the configmap is recreated by operator - operator: also update image from channel avoid any issues with updating crawler in channel - only updates for exclusion add/remove so far, can later be expanded to other crawler settings (see: #2355 for broader running crawl config updates) - fixes #2408
449 lines
14 KiB
Python
449 lines
14 KiB
Python
"""shared crawl manager implementation"""
|
|
|
|
import os
|
|
import secrets
|
|
|
|
from typing import Optional, Dict, Tuple
|
|
from datetime import timedelta
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from .utils import dt_now, date_to_str
|
|
from .k8sapi import K8sAPI
|
|
|
|
from .models import StorageRef, CrawlConfig, BgJobType
|
|
|
|
|
|
# ============================================================================
|
|
DEFAULT_PROXY_ID: str = os.environ.get("DEFAULT_PROXY_ID", "")
|
|
|
|
DEFAULT_NAMESPACE: str = os.environ.get("DEFAULT_NAMESPACE", "default")
|
|
|
|
|
|
# ============================================================================
|
|
class CrawlManager(K8sAPI):
|
|
"""abstract crawl manager"""
|
|
|
|
# pylint: disable=too-many-arguments
|
|
async def run_profile_browser(
|
|
self,
|
|
userid: str,
|
|
oid: str,
|
|
url: str,
|
|
storage: StorageRef,
|
|
crawler_image: str,
|
|
baseprofile: str = "",
|
|
profile_filename: str = "",
|
|
proxy_id: str = "",
|
|
) -> str:
|
|
"""run browser for profile creation"""
|
|
|
|
storage_secret = storage.get_storage_secret_name(oid)
|
|
|
|
await self.has_storage_secret(storage_secret)
|
|
|
|
browserid = f"prf-{secrets.token_hex(5)}"
|
|
|
|
params = {
|
|
"id": browserid,
|
|
"userid": str(userid),
|
|
"oid": str(oid),
|
|
"storage_name": str(storage),
|
|
"base_profile": baseprofile or "",
|
|
"profile_filename": profile_filename or "",
|
|
"idle_timeout": os.environ.get("IDLE_TIMEOUT", "60"),
|
|
"url": url,
|
|
"vnc_password": secrets.token_hex(16),
|
|
"expire_time": date_to_str(dt_now() + timedelta(seconds=30)),
|
|
"crawler_image": crawler_image,
|
|
"proxy_id": proxy_id or DEFAULT_PROXY_ID,
|
|
}
|
|
|
|
data = self.templates.env.get_template("profile_job.yaml").render(params)
|
|
|
|
await self.create_from_yaml(data)
|
|
|
|
return browserid
|
|
|
|
async def run_replica_job(
|
|
self,
|
|
oid: str,
|
|
job_type: str,
|
|
replica_storage: StorageRef,
|
|
replica_file_path: str,
|
|
replica_endpoint: str,
|
|
delay_days: int = 0,
|
|
primary_storage: Optional[StorageRef] = None,
|
|
primary_file_path: Optional[str] = None,
|
|
primary_endpoint: Optional[str] = None,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> Tuple[str, Optional[str]]:
|
|
"""run job to replicate file from primary storage to replica storage"""
|
|
|
|
if existing_job_id:
|
|
job_id = existing_job_id
|
|
else:
|
|
# Keep name shorter than in past to avoid k8s issues with length
|
|
job_id = f"{job_type}-{secrets.token_hex(5)}"
|
|
|
|
params: Dict[str, object] = {
|
|
"id": job_id,
|
|
"oid": oid,
|
|
"job_type": job_type,
|
|
"replica_secret_name": replica_storage.get_storage_secret_name(oid),
|
|
"replica_file_path": replica_file_path,
|
|
"replica_endpoint": replica_endpoint,
|
|
"primary_secret_name": (
|
|
primary_storage.get_storage_secret_name(oid)
|
|
if primary_storage
|
|
else None
|
|
),
|
|
"primary_file_path": primary_file_path if primary_file_path else None,
|
|
"primary_endpoint": primary_endpoint if primary_endpoint else None,
|
|
"BgJobType": BgJobType,
|
|
}
|
|
|
|
if job_type == BgJobType.DELETE_REPLICA.value and delay_days > 0:
|
|
# If replica deletion delay is configured, schedule as cronjob
|
|
return await self.create_replica_deletion_scheduled_job(
|
|
job_id, params, delay_days
|
|
)
|
|
|
|
data = self.templates.env.get_template("replica_job.yaml").render(params)
|
|
|
|
await self.create_from_yaml(data)
|
|
|
|
return job_id, None
|
|
|
|
async def run_delete_org_job(
|
|
self,
|
|
oid: str,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> str:
|
|
"""run job to delete org and all of its data"""
|
|
|
|
if existing_job_id:
|
|
job_id = existing_job_id
|
|
else:
|
|
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"
|
|
|
|
return await self._run_bg_job_with_ops_classes(
|
|
oid, job_id, job_type=BgJobType.DELETE_ORG.value
|
|
)
|
|
|
|
async def run_recalculate_org_stats_job(
|
|
self,
|
|
oid: str,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> str:
|
|
"""run job to recalculate storage stats for the org"""
|
|
|
|
if existing_job_id:
|
|
job_id = existing_job_id
|
|
else:
|
|
job_id = f"org-stats-{oid}-{secrets.token_hex(5)}"
|
|
|
|
return await self._run_bg_job_with_ops_classes(
|
|
oid,
|
|
job_id,
|
|
job_type=BgJobType.RECALCULATE_ORG_STATS.value,
|
|
)
|
|
|
|
async def run_re_add_org_pages_job(
|
|
self,
|
|
oid: str,
|
|
crawl_type: Optional[str] = None,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> str:
|
|
"""run job to recalculate storage stats for the org"""
|
|
|
|
if existing_job_id:
|
|
job_id = existing_job_id
|
|
else:
|
|
job_id = f"org-pages-{oid}-{secrets.token_hex(5)}"
|
|
|
|
return await self._run_bg_job_with_ops_classes(
|
|
oid,
|
|
job_id,
|
|
job_type=BgJobType.READD_ORG_PAGES.value,
|
|
crawl_type=crawl_type,
|
|
)
|
|
|
|
async def _run_bg_job_with_ops_classes(
|
|
self, oid: str, job_id: str, job_type: str, **kwargs
|
|
) -> str:
|
|
"""run background job with access to ops classes"""
|
|
|
|
params = {
|
|
"id": job_id,
|
|
"oid": oid,
|
|
"job_type": job_type,
|
|
"backend_image": os.environ.get("BACKEND_IMAGE", ""),
|
|
"pull_policy": os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
|
|
**kwargs,
|
|
}
|
|
|
|
data = self.templates.env.get_template("background_job.yaml").render(params)
|
|
|
|
await self.create_from_yaml(data, namespace=DEFAULT_NAMESPACE)
|
|
|
|
return job_id
|
|
|
|
async def create_crawl_job(
|
|
self,
|
|
crawlconfig: CrawlConfig,
|
|
storage: StorageRef,
|
|
userid: str,
|
|
warc_prefix: str,
|
|
storage_filename: str,
|
|
profile_filename: str,
|
|
) -> str:
|
|
"""create new crawl job from config"""
|
|
cid = str(crawlconfig.id)
|
|
storage_secret = storage.get_storage_secret_name(str(crawlconfig.oid))
|
|
|
|
await self.has_storage_secret(storage_secret)
|
|
|
|
return await self.new_crawl_job(
|
|
cid,
|
|
userid,
|
|
str(crawlconfig.oid),
|
|
str(storage),
|
|
crawlconfig.crawlerChannel,
|
|
crawlconfig.scale,
|
|
crawlconfig.crawlTimeout,
|
|
crawlconfig.maxCrawlSize,
|
|
manual=True,
|
|
warc_prefix=warc_prefix,
|
|
storage_filename=storage_filename,
|
|
profile_filename=profile_filename,
|
|
proxy_id=crawlconfig.proxyId or DEFAULT_PROXY_ID,
|
|
)
|
|
|
|
async def reload_running_crawl_config(self, crawl_id: str):
|
|
"""force reload of configmap for crawl"""
|
|
return await self._patch_job(
|
|
crawl_id, {"lastConfigUpdate": date_to_str(dt_now())}
|
|
)
|
|
|
|
async def create_qa_crawl_job(
|
|
self,
|
|
crawlconfig: CrawlConfig,
|
|
storage: StorageRef,
|
|
userid: str,
|
|
storage_filename: str,
|
|
qa_source: str,
|
|
) -> str:
|
|
"""create new QA Run crawl job with qa source crawl id"""
|
|
cid = str(crawlconfig.id)
|
|
|
|
storage_secret = storage.get_storage_secret_name(str(crawlconfig.oid))
|
|
|
|
await self.has_storage_secret(storage_secret)
|
|
|
|
ts_now = dt_now().strftime("%Y%m%d%H%M%S")
|
|
crawl_id = f"qa-{ts_now}-{cid[:12]}"
|
|
|
|
return await self.new_crawl_job(
|
|
cid,
|
|
userid,
|
|
str(crawlconfig.oid),
|
|
str(storage),
|
|
crawlconfig.crawlerChannel,
|
|
1,
|
|
0,
|
|
0,
|
|
warc_prefix="qa",
|
|
storage_filename=storage_filename,
|
|
crawl_id=crawl_id,
|
|
qa_source=qa_source,
|
|
)
|
|
|
|
async def remove_org_storage(self, storage: StorageRef, oid: str) -> bool:
|
|
"""Delete custom org storage secret"""
|
|
storage_secret = storage.get_storage_secret_name(oid)
|
|
storage_label = f"btrix.storage={storage_secret}"
|
|
|
|
if await self.has_custom_jobs_with_label("crawljobs", storage_label):
|
|
raise HTTPException(status_code=400, detail="storage_in_use")
|
|
|
|
if await self.has_custom_jobs_with_label("profilejobs", storage_label):
|
|
raise HTTPException(status_code=400, detail="storage_in_use")
|
|
|
|
try:
|
|
await self.core_api.delete_namespaced_secret(
|
|
storage_secret,
|
|
namespace=self.namespace,
|
|
)
|
|
return True
|
|
# pylint: disable=bare-except
|
|
except:
|
|
return False
|
|
|
|
async def add_org_storage(
|
|
self, storage: StorageRef, string_data: Dict[str, str], oid: str
|
|
) -> None:
|
|
"""Add custom org storage secret"""
|
|
labels = {"btrix.org": oid}
|
|
|
|
storage_secret = storage.get_storage_secret_name(oid)
|
|
|
|
crawl_secret = self.client.V1Secret(
|
|
metadata={
|
|
"name": storage_secret,
|
|
"namespace": self.namespace,
|
|
"labels": labels,
|
|
},
|
|
string_data=string_data,
|
|
)
|
|
|
|
try:
|
|
await self.core_api.create_namespaced_secret(
|
|
namespace=self.namespace, body=crawl_secret
|
|
)
|
|
|
|
# pylint: disable=bare-except
|
|
except:
|
|
await self.core_api.patch_namespaced_secret(
|
|
name=storage_secret, namespace=self.namespace, body=crawl_secret
|
|
)
|
|
|
|
async def get_profile_browser_metadata(self, browserid: str) -> dict[str, str]:
|
|
"""get browser profile labels"""
|
|
try:
|
|
browser = await self.get_profile_browser(browserid)
|
|
|
|
# pylint: disable=bare-except
|
|
except:
|
|
return {}
|
|
|
|
return browser["metadata"]["labels"]
|
|
|
|
async def ping_profile_browser(self, browserid: str) -> None:
|
|
"""return ping profile browser"""
|
|
expire_at = dt_now() + timedelta(seconds=30)
|
|
await self._patch_job(
|
|
browserid, {"expireTime": date_to_str(expire_at)}, "profilejobs"
|
|
)
|
|
|
|
async def rollover_restart_crawl(self, crawl_id: str) -> dict:
|
|
"""Rolling restart of crawl by updating restartTime field"""
|
|
update = date_to_str(dt_now())
|
|
return await self._patch_job(crawl_id, {"restartTime": update})
|
|
|
|
async def scale_crawl(self, crawl_id: str, scale: int = 1) -> dict:
|
|
"""Set the crawl scale (job parallelism) on the specified job"""
|
|
return await self._patch_job(crawl_id, {"scale": scale})
|
|
|
|
async def shutdown_crawl(self, crawl_id: str, graceful=True) -> dict:
|
|
"""Request a crawl cancelation or stop by calling an API
|
|
on the job pod/container, returning the result"""
|
|
if graceful:
|
|
patch = {"stopping": True}
|
|
return await self._patch_job(crawl_id, patch)
|
|
|
|
return await self.delete_crawl_job(crawl_id)
|
|
|
|
async def delete_crawl_configs_for_org(self, org: str) -> None:
|
|
"""Delete all crawl configs for given org"""
|
|
await self._delete_crawl_configs(f"btrix.org={org}")
|
|
|
|
async def delete_crawl_config_by_id(self, cid: str) -> None:
|
|
"""Delete all crawl configs by id"""
|
|
await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
|
|
|
|
# ========================================================================
|
|
# Internal Methods
|
|
async def _delete_crawl_configs(self, label) -> None:
|
|
"""Delete any crawl config specific resources (now only cron jobs)"""
|
|
|
|
await self.batch_api.delete_collection_namespaced_cron_job(
|
|
namespace=self.namespace,
|
|
label_selector=label,
|
|
)
|
|
|
|
async def update_scheduled_job(
|
|
self, crawlconfig: CrawlConfig, userid: Optional[str] = None
|
|
) -> Optional[str]:
|
|
"""create or remove cron job based on crawlconfig schedule"""
|
|
cid = str(crawlconfig.id)
|
|
|
|
cron_job_id = f"sched-{cid[:12]}"
|
|
cron_job = None
|
|
try:
|
|
cron_job = await self.batch_api.read_namespaced_cron_job(
|
|
name=cron_job_id,
|
|
namespace=self.namespace,
|
|
)
|
|
# pylint: disable=bare-except
|
|
except:
|
|
pass
|
|
|
|
# if no schedule, delete cron_job if exists and we're done
|
|
if not crawlconfig.schedule:
|
|
if cron_job:
|
|
await self.batch_api.delete_namespaced_cron_job(
|
|
name=cron_job.metadata.name, namespace=self.namespace
|
|
)
|
|
return None
|
|
|
|
# if cron job exists, just patch schedule
|
|
if cron_job:
|
|
if crawlconfig.schedule != cron_job.spec.schedule:
|
|
cron_job.spec.schedule = crawlconfig.schedule
|
|
|
|
await self.batch_api.patch_namespaced_cron_job(
|
|
name=cron_job.metadata.name,
|
|
namespace=self.namespace,
|
|
body=cron_job,
|
|
)
|
|
return None
|
|
|
|
params = {
|
|
"id": cron_job_id,
|
|
"cid": str(crawlconfig.id),
|
|
"oid": str(crawlconfig.oid),
|
|
"schedule": crawlconfig.schedule,
|
|
"userid": userid,
|
|
}
|
|
|
|
data = self.templates.env.get_template("crawl_cron_job.yaml").render(params)
|
|
|
|
await self.create_from_yaml(data, self.namespace)
|
|
|
|
return cron_job_id
|
|
|
|
async def create_replica_deletion_scheduled_job(
|
|
self,
|
|
job_id: str,
|
|
params: Dict[str, object],
|
|
delay_days: int,
|
|
) -> Tuple[str, Optional[str]]:
|
|
"""create scheduled job to delay replica file in x days"""
|
|
now = dt_now()
|
|
run_at = now + timedelta(days=delay_days)
|
|
schedule = f"{run_at.minute} {run_at.hour} {run_at.day} {run_at.month} *"
|
|
|
|
params["schedule"] = schedule
|
|
|
|
print(f"Replica deletion cron schedule: '{schedule}'", flush=True)
|
|
|
|
data = self.templates.env.get_template("replica_deletion_cron_job.yaml").render(
|
|
params
|
|
)
|
|
|
|
await self.create_from_yaml(data, self.namespace)
|
|
|
|
return job_id, schedule
|
|
|
|
async def delete_replica_deletion_scheduled_job(self, job_id: str):
|
|
"""delete scheduled job to delay replica file in x days"""
|
|
cron_job = await self.batch_api.read_namespaced_cron_job(
|
|
name=job_id,
|
|
namespace=self.namespace,
|
|
)
|
|
if cron_job:
|
|
await self.batch_api.delete_namespaced_cron_job(
|
|
name=cron_job.metadata.name, namespace=self.namespace
|
|
)
|