Scheduled Crawl Refactor: Handle via Operator + Add Skipped Crawls on Quota Reached (#1162)

* use metacontroller's decoratorcontroller to create CrawlJob from Job
* scheduled job work:
- use existing job name for scheduled crawljob
- use suspended job, set startTime, completionTime and succeeded status on job when crawljob is done
- simplify cronjob template: remove job_image, cron_namespace, using same namespace as crawls,
placeholder job image for cronjobs

* move storage quota check to crawljob handler:
- add 'skipped_quota_reached' as new failed status type
- check for storage quota before checking if crawljob can be started, fail if not (check before any pods/pvcs created)

* frontend:
- show all crawls in crawl workflow, no need to filter by status
- add 'skipped_quota_reached' status, show as 'Skipped (Quota Reached)', render same as failed

* migration: make release namespace available as DEFAULT_NAMESPACE, delete old cronjobs in DEFAULT_NAMESPACE and recreate in crawlers namespace with new template
This commit is contained in:
Ilya Kreymer 2023-09-12 13:05:43 -07:00 committed by GitHub
parent 9377a6f456
commit c9c39d47b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 294 additions and 146 deletions

View File

@ -34,7 +34,7 @@ RUNNING_STATES = ("running", "pending-wait", "generate-wacz", "uploading-wacz")
STARTING_STATES = ("starting", "waiting_capacity", "waiting_org_limit")
FAILED_STATES = ("canceled", "failed")
FAILED_STATES = ("canceled", "failed", "skipped_quota_reached")
SUCCESSFUL_STATES = ("complete", "partial_complete")

View File

@ -28,6 +28,7 @@ from .models import (
User,
PaginatedResponse,
)
from .basecrawls import FAILED_STATES
ALLOWED_SORT_KEYS = (
@ -792,7 +793,7 @@ async def stats_recompute_all(crawl_configs, crawls, cid: uuid.UUID):
update_query["crawlCount"] = len(results)
update_query["crawlSuccessfulCount"] = len(
[res for res in results if res["state"] not in ("canceled", "failed")]
[res for res in results if res["state"] not in FAILED_STATES]
)
last_crawl = results[0]

View File

@ -19,11 +19,6 @@ class CrawlManager(K8sAPI):
def __init__(self):
super().__init__()
self.job_image = os.environ["JOB_IMAGE"]
self.job_image_pull_policy = os.environ.get("JOB_PULL_POLICY", "Always")
self.cron_namespace = os.environ.get("CRON_NAMESPACE", "default")
self._default_storages = {}
self.loop = asyncio.get_running_loop()
@ -338,15 +333,13 @@ class CrawlManager(K8sAPI):
"""Delete Crawl Cron Job and all dependent resources, including configmap and secrets"""
await self.batch_api.delete_collection_namespaced_cron_job(
namespace=self.cron_namespace,
namespace=self.namespace,
label_selector=label,
propagation_policy="Foreground",
)
await self.core_api.delete_collection_namespaced_config_map(
namespace=self.namespace,
label_selector=label,
propagation_policy="Foreground",
)
async def _update_scheduled_job(self, crawlconfig, schedule):
@ -358,7 +351,7 @@ class CrawlManager(K8sAPI):
try:
cron_job = await self.batch_api.read_namespaced_cron_job(
name=cron_job_id,
namespace=self.cron_namespace,
namespace=self.namespace,
)
# pylint: disable=bare-except
except:
@ -368,7 +361,7 @@ class CrawlManager(K8sAPI):
if not crawlconfig.schedule:
if cron_job:
await self.batch_api.delete_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.cron_namespace
name=cron_job.metadata.name, namespace=self.namespace
)
return
@ -379,7 +372,7 @@ class CrawlManager(K8sAPI):
await self.batch_api.patch_namespaced_cron_job(
name=cron_job.metadata.name,
namespace=self.cron_namespace,
namespace=self.namespace,
body=cron_job,
)
return
@ -387,14 +380,12 @@ class CrawlManager(K8sAPI):
params = {
"id": cron_job_id,
"cid": str(crawlconfig.id),
"image": self.job_image,
"image_pull_policy": self.job_image_pull_policy,
"schedule": schedule,
}
data = self.templates.env.get_template("crawl_cron_job.yaml").render(params)
await self.create_from_yaml(data, self.cron_namespace)
await self.create_from_yaml(data, self.namespace)
return cron_job_id

View File

@ -15,7 +15,7 @@ from pymongo.errors import InvalidName
from .migrations import BaseMigration
CURR_DB_VERSION = "0015"
CURR_DB_VERSION = "0016"
# ============================================================================

View File

@ -70,8 +70,16 @@ class K8sAPI:
)
# pylint: disable=too-many-arguments
async def new_crawl_job(
self, cid, userid, oid, scale=1, crawl_timeout=0, max_crawl_size=0, manual=True
def new_crawl_job_yaml(
self,
cid,
userid,
oid,
scale=1,
crawl_timeout=0,
max_crawl_size=0,
manual=True,
crawl_id=None,
):
"""load job template from yaml"""
if crawl_timeout:
@ -79,6 +87,7 @@ class K8sAPI:
else:
crawl_expire_time = ""
if not crawl_id:
ts_now = dt_now().strftime("%Y%m%d%H%M%S")
prefix = "manual" if manual else "sched"
crawl_id = f"{prefix}-{ts_now}-{cid[:12]}"
@ -95,6 +104,11 @@ class K8sAPI:
}
data = self.templates.env.get_template("crawl_job.yaml").render(params)
return crawl_id, data
async def new_crawl_job(self, *args, **kwargs):
"""load and init crawl job via k8s api"""
crawl_id, data = self.new_crawl_job_yaml(*args, **kwargs)
# create job directly
await self.create_from_yaml(data)

View File

@ -1,87 +0,0 @@
""" entrypoint for cron crawl job"""
import asyncio
import os
import uuid
from .k8sapi import K8sAPI
from .db import init_db
from .crawlconfigs import (
get_crawl_config,
inc_crawl_count,
)
from .crawls import add_new_crawl
from .orgs import storage_quota_reached
from .utils import register_exit_handler
# ============================================================================
class ScheduledJob(K8sAPI):
"""Schedulued Job APIs for starting CrawlJobs on schedule"""
def __init__(self):
super().__init__()
self.cid = os.environ["CID"]
_, mdb = init_db()
self.crawls = mdb["crawls"]
self.crawlconfigs = mdb["crawl_configs"]
self.orgs = mdb["organizations"]
async def run(self):
"""run crawl!"""
register_exit_handler()
config_map = await self.core_api.read_namespaced_config_map(
name=f"crawl-config-{self.cid}", namespace=self.namespace
)
data = config_map.data
userid = data["USER_ID"]
scale = int(data.get("INITIAL_SCALE", 0))
try:
crawl_timeout = int(data.get("CRAWL_TIMEOUT", 0))
# pylint: disable=bare-except
except:
crawl_timeout = 0
oid = data["ORG_ID"]
crawlconfig = await get_crawl_config(self.crawlconfigs, uuid.UUID(self.cid))
if await storage_quota_reached(self.orgs, uuid.UUID(oid)):
print(
f"Scheduled crawl from workflow {self.cid} not started - storage quota reached",
flush=True,
)
return
# k8s create
crawl_id = await self.new_crawl_job(
self.cid, userid, oid, scale, crawl_timeout, manual=False
)
# db create
await inc_crawl_count(self.crawlconfigs, crawlconfig.id)
await add_new_crawl(
self.crawls,
self.crawlconfigs,
crawl_id,
crawlconfig,
uuid.UUID(userid),
manual=False,
)
print("Crawl Created: " + crawl_id)
# ============================================================================
def main():
"""main entrypoint"""
job = ScheduledJob()
loop = asyncio.get_event_loop()
loop.run_until_complete(job.run())
if __name__ == "__main__":
main()

View File

@ -0,0 +1,62 @@
"""
Migration 0016 - Updating scheduled cron jobs after Operator changes v2
"""
import os
from btrixcloud.models import CrawlConfig, UpdateCrawlConfig
from btrixcloud.crawlmanager import CrawlManager
from btrixcloud.migrations import BaseMigration
MIGRATION_VERSION = "0016"
class Migration(BaseMigration):
"""Migration class."""
def __init__(self, mdb, migration_version=MIGRATION_VERSION):
super().__init__(mdb, migration_version)
async def migrate_up(self):
"""Perform migration up.
Find existing workflows with schedule and create new crawl_cron_jobs
from template, back in crawlers workspace and using noop image
"""
# pylint: disable=too-many-locals, duplicate-code
crawl_configs = self.mdb["crawl_configs"]
crawl_manager = CrawlManager()
# Update configmap for crawl configs that have non-zero timeout or scale > 1
match_query = {"schedule": {"$nin": ["", None]}}
configs_to_update = [res async for res in crawl_configs.find(match_query)]
for config_dict in configs_to_update:
config = CrawlConfig.from_dict(config_dict)
print(
f"Updating CronJob for Crawl Config {config.id}: schedule: {config.schedule}"
)
try:
await crawl_manager.update_crawl_config(
config,
UpdateCrawlConfig(
schedule=config.schedule,
),
)
# pylint: disable=broad-except
except Exception as exc:
print(
"Skip crawl config migration due to error, likely missing config",
exc,
)
# Delete existing scheduled jobs from default namespace
print("Deleting cronjobs from default namespace")
default_namespace = os.environ.get("DEFAULT_NAMESPACE", "default")
await crawl_manager.batch_api.delete_collection_namespaced_cron_job(
namespace=default_namespace, label_selector="btrix.crawlconfig"
)
result = await crawl_manager.batch_api.list_namespaced_cron_job(
namespace=default_namespace, label_selector="btrix.crawlconfig"
)
assert len(result.items) == 0

View File

@ -26,7 +26,7 @@ from .utils import (
)
from .k8sapi import K8sAPI
from .orgs import inc_org_stats, get_max_concurrent_crawls
from .orgs import inc_org_stats, get_max_concurrent_crawls, storage_quota_reached
from .basecrawls import (
NON_RUNNING_STATES,
RUNNING_STATES,
@ -44,6 +44,11 @@ from .crawls import (
)
from .models import CrawlFile, CrawlCompleteIn
from .orgs import add_crawl_files_to_org_bytes_stored
from .crawlconfigs import (
get_crawl_config,
inc_crawl_count,
)
from .crawls import add_new_crawl
CMAP = "ConfigMap.v1"
@ -60,11 +65,6 @@ REDIS_TTL = 60
STARTING_TIME_SECS = 60
# ============================================================================
class DeleteCrawlException(Exception):
"""throw to force deletion of crawl objects"""
# ============================================================================
class MCBaseRequest(BaseModel):
"""base metacontroller model, used for customize hook"""
@ -82,6 +82,18 @@ class MCSyncData(MCBaseRequest):
finalizing: bool = False
# ============================================================================
class MCDecoratorSyncData(BaseModel):
"""sync for decoratorcontroller model"""
object: dict
controller: dict
attachments: dict
related: dict
finalizing: bool = False
# ============================================================================
class CrawlSpec(BaseModel):
"""spec from k8s CrawlJob object"""
@ -125,7 +137,7 @@ class CrawlStatus(BaseModel):
# ============================================================================
# pylint: disable=too-many-statements, too-many-public-methods, too-many-branches
# pylint: disable=too-many-instance-attributes,too-many-locals
# pylint: disable=too-many-instance-attributes, too-many-locals, too-many-lines
class BtrixOperator(K8sAPI):
"""BtrixOperator Handler"""
@ -205,6 +217,7 @@ class BtrixOperator(K8sAPI):
return {"status": {}, "children": children}
# pylint: disable=too-many-return-statements
async def sync_crawls(self, data: MCSyncData):
"""sync crawls"""
@ -271,6 +284,20 @@ class BtrixOperator(K8sAPI):
scheduled=spec.get("manual") != "1",
)
# first, check storage quota, and fail immediately if quota reached
if status.state in ("starting", "skipped_quota_reached"):
# only check on very first run, before any pods/pvcs created
# for now, allow if crawl has already started (pods/pvcs created)
if (
not pods
and not data.children[PVC]
and await storage_quota_reached(self.orgs, crawl.oid)
):
await self.mark_finished(
crawl.id, crawl.cid, status, "skipped_quota_reached"
)
return self._empty_response(status)
if status.state in ("starting", "waiting_org_limit"):
if not await self.can_start_new(crawl, data, status):
return self._empty_response(status)
@ -430,6 +457,8 @@ class BtrixOperator(K8sAPI):
if actual_state != state:
print(f"state mismatch, actual state {actual_state}, requested {state}")
if not actual_state and state == "canceled":
return True
if status.state != state:
print(
@ -444,7 +473,7 @@ class BtrixOperator(K8sAPI):
)
def get_related(self, data: MCBaseRequest):
"""return configmap related to crawl"""
"""return objects related to crawl pods"""
spec = data.parent.get("spec", {})
cid = spec["cid"]
# crawl_id = spec["id"]
@ -556,7 +585,7 @@ class BtrixOperator(K8sAPI):
ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL)
finished = from_k8s_date(status.finished)
if (dt_now() - finished).total_seconds() > ttl > 0:
print("Job expired, deleting: " + crawl_id)
print("CrawlJob expired, deleting: " + crawl_id)
finalized = True
else:
finalized = True
@ -860,11 +889,11 @@ class BtrixOperator(K8sAPI):
self.crawl_configs, self.crawls, cid, files_added_size, 1
)
if state in SUCCESSFUL_STATES:
await add_crawl_files_to_org_bytes_stored(
self.crawls, self.orgs, crawl_id, files_added_size
)
if state in SUCCESSFUL_STATES:
await add_successful_crawl_to_collections(
self.crawls, self.crawl_configs, self.collections, crawl_id, cid
)
@ -924,6 +953,96 @@ class BtrixOperator(K8sAPI):
if redis:
await redis.close()
def get_cronjob_crawl_related(self, data: MCBaseRequest):
"""return configmap related to crawl"""
labels = data.parent.get("metadata", {}).get("labels", {})
cid = labels.get("btrix.crawlconfig")
return {
"relatedResources": [
{
"apiVersion": "v1",
"resource": "configmaps",
"labelSelector": {"matchLabels": {"btrix.crawlconfig": cid}},
}
]
}
async def sync_cronjob_crawl(self, data: MCDecoratorSyncData):
"""create crawljobs from a job object spawned by cronjob"""
metadata = data.object["metadata"]
labels = metadata.get("labels", {})
cid = labels.get("btrix.crawlconfig")
name = metadata.get("name")
crawl_id = name
actual_state, finished = await get_crawl_state(self.crawls, crawl_id)
if finished:
status = None
# mark job as completed
if not data.object["status"].get("succeeded"):
print("Cron Job Complete!", finished)
status = {
"succeeded": 1,
"startTime": metadata.get("creationTimestamp"),
"completionTime": to_k8s_date(finished),
}
return {
"attachments": [],
"annotations": {"finished": finished},
"status": status,
}
configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"]
oid = configmap.get("ORG_ID")
userid = configmap.get("USER_ID")
crawljobs = data.attachments[CJS]
crawl_id, crawljob = self.new_crawl_job_yaml(
cid,
userid=userid,
oid=oid,
scale=int(configmap.get("INITIAL_SCALE", 1)),
crawl_timeout=int(configmap.get("CRAWL_TIMEOUT", 0)),
max_crawl_size=int(configmap.get("MAX_CRAWL_SIZE", "0")),
manual=False,
crawl_id=crawl_id,
)
attachments = list(yaml.safe_load_all(crawljob))
if crawl_id in crawljobs:
attachments[0]["status"] = crawljobs[CJS][crawl_id]["status"]
if not actual_state:
# pylint: disable=duplicate-code
crawlconfig = await get_crawl_config(self.crawl_configs, uuid.UUID(cid))
if not crawlconfig:
print(
f"warn: no crawlconfig {cid}. skipping scheduled job. old cronjob left over?"
)
return {"attachments": []}
# db create
await inc_crawl_count(self.crawl_configs, crawlconfig.id)
await add_new_crawl(
self.crawls,
self.crawl_configs,
crawl_id,
crawlconfig,
uuid.UUID(userid),
manual=False,
)
print("Scheduled Crawl Created: " + crawl_id)
return {
"attachments": attachments,
}
# ============================================================================
def init_operator_api(app, mdb, event_webhook_ops):
@ -948,6 +1067,14 @@ def init_operator_api(app, mdb, event_webhook_ops):
async def mc_sync_profile_browsers(data: MCSyncData):
return await oper.sync_profile_browsers(data)
@app.post("/op/cronjob/sync")
async def mc_sync_cronjob_crawls(data: MCDecoratorSyncData):
return await oper.sync_cronjob_crawl(data)
@app.post("/op/cronjob/customize")
async def mc_cronjob_related(data: MCBaseRequest):
return oper.get_cronjob_crawl_related(data)
@app.get("/healthz", include_in_schema=False)
async def healthz():
return {}

View File

@ -8,29 +8,23 @@ metadata:
spec:
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 3
successfulJobsHistoryLimit: 2
failedJobsHistoryLimit: 2
schedule: "{{ schedule }}"
jobTemplate:
metadata:
labels:
btrix.crawlconfig: "{{ cid }}"
role: "scheduled-crawljob"
spec:
suspend: true
template:
spec:
restartPolicy: OnFailure
restartPolicy: Never
containers:
- name: scheduled
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
command:
- python
- -m
- btrixcloud.main_scheduled_job
env:
- name: CID
value: "{{ cid }}"
envFrom:
- secretRef:
name: mongo-auth
- name: noop
image: "docker.io/tianon/true"
imagePullPolicy: IfNotPresent

View File

@ -8,19 +8,16 @@ metadata:
data:
APP_ORIGIN: {{ .Values.ingress.tls | ternary "https" "http" }}://{{ .Values.ingress.host | default "localhost:9870" }}
CRON_NAMESPACE: {{ .Release.Namespace }}
CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }}
DEFAULT_NAMESPACE: {{ .Release.Namespace }}
CRAWLER_FQDN_SUFFIX: ".{{ .Values.crawler_namespace }}.svc.cluster.local"
DEFAULT_ORG: "{{ .Values.default_org }}"
INVITE_EXPIRE_SECONDS: "{{ .Values.invite_expire_seconds }}"
JOB_IMAGE: "{{ .Values.backend_image }}"
JOB_PULL_POLICY: "{{ .Values.backend_pull_policy }}"
REGISTRATION_ENABLED: "{{ .Values.registration_enabled | default 0 }}"
ALLOW_DUPE_INVITES: "{{ .Values.allow_dupe_invites | default 0 }}"

View File

@ -70,3 +70,40 @@ spec:
name: {{ .Values.name }}-backend
port: {{ .Values.opPort }}
path: /op/profilebrowsers/sync
---
apiVersion: metacontroller.k8s.io/v1alpha1
kind: DecoratorController
metadata:
name: cron-crawljobs-operator
spec:
resyncPeriodSeconds: 30
resources:
- apiVersion: batch/v1
resource: jobs
labelSelector:
matchLabels:
role: scheduled-crawljob
attachments:
- apiVersion: btrix.cloud/v1
resource: crawljobs
updateStrategy:
method: InPlace
hooks:
sync:
webhook:
service:
namespace: {{ .Release.Namespace }}
name: {{ .Values.name }}-backend
port: {{ .Values.opPort }}
path: /op/cronjob/sync
customize:
webhook:
service:
namespace: {{ .Release.Namespace }}
name: {{ .Values.name }}-backend
port: {{ .Values.opPort }}
path: /op/cronjob/customize

View File

@ -180,6 +180,16 @@ export class CrawlStatus extends LitElement {
break;
}
case "skipped_quota_reached": {
icon = html`<sl-icon
name="exclamation-triangle"
slot="prefix"
style="color: var(--danger)"
></sl-icon>`;
label = msg("Skipped (Quota Reached)");
break;
}
case "partial_complete": {
icon = html`<sl-icon
name="dash-circle"

View File

@ -1240,7 +1240,7 @@ export class WorkflowDetail extends LiteElement {
private async getCrawls(): Promise<APIPaginatedList> {
const query = queryString.stringify(
{
state: this.filterBy.state || inactiveCrawlStates,
state: this.filterBy.state,
cid: this.workflowId,
sortBy: "started",
},

View File

@ -102,6 +102,7 @@ export type CrawlState =
| "pending-wait"
| "complete"
| "failed"
| "skipped_quota_reached"
| "partial_complete"
| "timed_out"
| "stopping"

View File

@ -13,6 +13,7 @@ export const inactiveCrawlStates: CrawlState[] = [
"complete",
"canceled",
"partial_complete",
"skipped_quota_reached",
"timed_out",
"failed",
];