Fixes #2406 Converts migration 0042 to launch a background job (parallelized across several pods) to migrate all crawls by optimizing their pages and setting `version: 2` on the crawl when complete. Also Optimizes MongoDB queries for better performance. Migration Improvements: - Add `isMigrating` and `version` fields to `BaseCrawl` - Add new background job type to use in migration with accompanying `migration_job.yaml` template that allows for parallelization - Add new API endpoint to launch this crawl migration job, and ensure that we have list and retry endpoints for superusers that work with background jobs that aren't tied to a specific org - Rework background job models and methods now that not all background jobs are tied to a single org - Ensure new crawls and uploads have `version` set to `2` - Modify crawl and collection replay.json endpoints to only include fields for replay optimization (`initialPages`, `pageQueryUrl`, `preloadResources`) if all relevant crawls/uploads have `version` set to `2` - Remove `distinct` calls from migration pathways - Consolidate collection recompute stats Query Optimizations: - Remove all uses of $group and $facet - Optimize /replay.json endpoints to precompute preload_resources, avoid fetching crawl list twice - Optimize /collections endpoint by not fetching resources - Rename /urls -> /pageUrlCounts and avoid $group, instead sort with index, either by seed + ts or by url to get top matches. - Use $gte instead of $regex to get prefix matches on URL - Use $text instead of $regex to get text search on title - Remove total from /pages and /pageUrlCounts queries by not using $facet - frontend: only call /pageUrlCounts when dialog is opened. --------- Co-authored-by: Ilya Kreymer <ikreymer@gmail.com> Co-authored-by: Emma Segal-Grossman <hi@emma.cafe> Co-authored-by: Ilya Kreymer <ikreymer@users.noreply.github.com>
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
"""Operator handler for BackgroundJobs"""
|
|
|
|
from uuid import UUID
|
|
import traceback
|
|
|
|
from btrixcloud.utils import (
|
|
str_to_date,
|
|
dt_now,
|
|
)
|
|
|
|
from .models import MCDecoratorSyncData
|
|
from .baseoperator import BaseOperator
|
|
|
|
|
|
# ============================================================================
|
|
class BgJobOperator(BaseOperator):
|
|
"""BgJobOperator"""
|
|
|
|
def init_routes(self, app):
|
|
"""init routes for this operator"""
|
|
|
|
# nop, but needed for metacontroller
|
|
@app.post("/op/backgroundjob/sync")
|
|
async def mc_sync_background_jobs():
|
|
return {"attachments": []}
|
|
|
|
@app.post("/op/backgroundjob/finalize")
|
|
async def mc_finalize_background_jobs(data: MCDecoratorSyncData):
|
|
return await self.finalize_background_job(data)
|
|
|
|
async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict:
|
|
"""handle finished background job"""
|
|
|
|
metadata = data.object["metadata"]
|
|
labels: dict[str, str] = metadata.get("labels", {})
|
|
oid: str = labels.get("btrix.org") or ""
|
|
job_type: str = labels.get("job_type") or ""
|
|
job_id: str = labels.get("job_id") or metadata.get("name")
|
|
|
|
status = data.object["status"]
|
|
success = status.get("succeeded") == 1
|
|
completion_time = status.get("completionTime")
|
|
|
|
finalized = True
|
|
|
|
finished = None
|
|
if completion_time:
|
|
finished = str_to_date(completion_time)
|
|
if not finished:
|
|
finished = dt_now()
|
|
|
|
try:
|
|
org_id = UUID(oid)
|
|
# pylint: disable=broad-except
|
|
except Exception:
|
|
org_id = None
|
|
|
|
try:
|
|
await self.background_job_ops.job_finished(
|
|
job_id, job_type, success=success, finished=finished, oid=org_id
|
|
)
|
|
# print(
|
|
# f"{job_type} background job completed: success: {success}, {job_id}",
|
|
# flush=True,
|
|
# )
|
|
|
|
# pylint: disable=broad-except
|
|
except Exception:
|
|
print("Update Background Job Error", flush=True)
|
|
traceback.print_exc()
|
|
|
|
return {"attachments": [], "finalized": finalized}
|