Fixes #2406 Converts migration 0042 to launch a background job (parallelized across several pods) to migrate all crawls by optimizing their pages and setting `version: 2` on the crawl when complete. Also Optimizes MongoDB queries for better performance. Migration Improvements: - Add `isMigrating` and `version` fields to `BaseCrawl` - Add new background job type to use in migration with accompanying `migration_job.yaml` template that allows for parallelization - Add new API endpoint to launch this crawl migration job, and ensure that we have list and retry endpoints for superusers that work with background jobs that aren't tied to a specific org - Rework background job models and methods now that not all background jobs are tied to a single org - Ensure new crawls and uploads have `version` set to `2` - Modify crawl and collection replay.json endpoints to only include fields for replay optimization (`initialPages`, `pageQueryUrl`, `preloadResources`) if all relevant crawls/uploads have `version` set to `2` - Remove `distinct` calls from migration pathways - Consolidate collection recompute stats Query Optimizations: - Remove all uses of $group and $facet - Optimize /replay.json endpoints to precompute preload_resources, avoid fetching crawl list twice - Optimize /collections endpoint by not fetching resources - Rename /urls -> /pageUrlCounts and avoid $group, instead sort with index, either by seed + ts or by url to get top matches. - Use $gte instead of $regex to get prefix matches on URL - Use $text instead of $regex to get text search on title - Remove total from /pages and /pageUrlCounts queries by not using $facet - frontend: only call /pageUrlCounts when dialog is opened. --------- Co-authored-by: Ilya Kreymer <ikreymer@gmail.com> Co-authored-by: Emma Segal-Grossman <hi@emma.cafe> Co-authored-by: Ilya Kreymer <ikreymer@users.noreply.github.com>
916 lines
32 KiB
Python
916 lines
32 KiB
Python
"""k8s background jobs"""
|
|
|
|
import asyncio
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
|
|
from uuid import UUID
|
|
|
|
from urllib.parse import urlsplit
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
|
|
from .storages import StorageOps
|
|
from .crawlmanager import CrawlManager
|
|
|
|
from .models import (
|
|
BaseFile,
|
|
Organization,
|
|
BackgroundJob,
|
|
BgJobType,
|
|
CreateReplicaJob,
|
|
DeleteReplicaJob,
|
|
DeleteOrgJob,
|
|
RecalculateOrgStatsJob,
|
|
ReAddOrgPagesJob,
|
|
OptimizePagesJob,
|
|
PaginatedBackgroundJobResponse,
|
|
AnyJob,
|
|
StorageRef,
|
|
User,
|
|
SuccessResponse,
|
|
SuccessResponseId,
|
|
)
|
|
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
|
|
from .utils import dt_now
|
|
|
|
if TYPE_CHECKING:
|
|
from .orgs import OrgOps
|
|
from .basecrawls import BaseCrawlOps
|
|
from .profiles import ProfileOps
|
|
else:
|
|
OrgOps = CrawlManager = BaseCrawlOps = ProfileOps = object
|
|
|
|
|
|
# ============================================================================
|
|
# pylint: disable=too-many-instance-attributes
|
|
class BackgroundJobOps:
|
|
"""k8s background job management"""
|
|
|
|
org_ops: OrgOps
|
|
crawl_manager: CrawlManager
|
|
storage_ops: StorageOps
|
|
|
|
base_crawl_ops: BaseCrawlOps
|
|
profile_ops: ProfileOps
|
|
|
|
migration_jobs_scale: int
|
|
|
|
# pylint: disable=too-many-locals, too-many-arguments, invalid-name
|
|
|
|
def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops):
|
|
self.jobs = mdb["jobs"]
|
|
|
|
self.email = email
|
|
self.user_manager = user_manager
|
|
|
|
self.org_ops = org_ops
|
|
self.crawl_manager = crawl_manager
|
|
self.storage_ops = storage_ops
|
|
|
|
self.base_crawl_ops = cast(BaseCrawlOps, None)
|
|
self.profile_ops = cast(ProfileOps, None)
|
|
|
|
self.migration_jobs_scale = int(os.environ.get("MIGRATION_JOBS_SCALE", 1))
|
|
|
|
self.router = APIRouter(
|
|
prefix="/jobs",
|
|
tags=["jobs"],
|
|
responses={404: {"description": "Not found"}},
|
|
)
|
|
|
|
def set_ops(self, base_crawl_ops: BaseCrawlOps, profile_ops: ProfileOps) -> None:
|
|
"""basecrawlops and profileops for updating files"""
|
|
self.base_crawl_ops = base_crawl_ops
|
|
self.profile_ops = profile_ops
|
|
|
|
def strip_bucket(self, endpoint_url: str) -> tuple[str, str]:
|
|
"""split the endpoint_url into the origin and return rest of endpoint as bucket path"""
|
|
parts = urlsplit(endpoint_url)
|
|
return parts.scheme + "://" + parts.netloc + "/", parts.path[1:]
|
|
|
|
async def handle_replica_job_finished(self, job: CreateReplicaJob) -> None:
|
|
"""Update replicas in corresponding file objects, based on type"""
|
|
res = None
|
|
if job.object_type in ("crawl", "upload"):
|
|
res = await self.base_crawl_ops.add_crawl_file_replica(
|
|
job.object_id, job.file_path, job.replica_storage
|
|
)
|
|
elif job.object_type == "profile":
|
|
res = await self.profile_ops.add_profile_file_replica(
|
|
UUID(job.object_id), job.file_path, job.replica_storage
|
|
)
|
|
if not res:
|
|
print("File deleted before replication job started, ignoring", flush=True)
|
|
|
|
async def handle_delete_replica_job_finished(self, job: DeleteReplicaJob) -> None:
|
|
"""After successful replica deletion, delete cronjob if scheduled"""
|
|
if job.schedule:
|
|
await self.crawl_manager.delete_replica_deletion_scheduled_job(job.id)
|
|
|
|
async def create_replica_jobs(
|
|
self, oid: UUID, file: BaseFile, object_id: str, object_type: str
|
|
) -> Dict[str, Union[bool, List[str]]]:
|
|
"""Create k8s background job to replicate a file to all replica storage locations."""
|
|
org = await self.org_ops.get_org_by_id(oid)
|
|
|
|
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
|
|
primary_endpoint, bucket_suffix = self.strip_bucket(
|
|
primary_storage.endpoint_url
|
|
)
|
|
|
|
primary_file_path = bucket_suffix + file.filename
|
|
|
|
ids = []
|
|
|
|
for replica_ref in self.storage_ops.get_org_replicas_storage_refs(org):
|
|
job_id = await self.create_replica_job(
|
|
org,
|
|
file,
|
|
object_id,
|
|
object_type,
|
|
replica_ref,
|
|
primary_file_path,
|
|
primary_endpoint,
|
|
)
|
|
ids.append(job_id)
|
|
|
|
return {"added": True, "ids": ids}
|
|
|
|
async def create_replica_job(
|
|
self,
|
|
org: Organization,
|
|
file: BaseFile,
|
|
object_id: str,
|
|
object_type: str,
|
|
replica_ref: StorageRef,
|
|
primary_file_path: str,
|
|
primary_endpoint: str,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> str:
|
|
"""Create k8s background job to replicate a file to a specific replica storage location."""
|
|
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
|
|
replica_endpoint, bucket_suffix = self.strip_bucket(
|
|
replica_storage.endpoint_url
|
|
)
|
|
replica_file_path = bucket_suffix + file.filename
|
|
|
|
job_type = BgJobType.CREATE_REPLICA.value
|
|
|
|
try:
|
|
job_id, _ = await self.crawl_manager.run_replica_job(
|
|
oid=str(org.id),
|
|
job_type=job_type,
|
|
primary_storage=file.storage,
|
|
primary_file_path=primary_file_path,
|
|
primary_endpoint=primary_endpoint,
|
|
replica_storage=replica_ref,
|
|
replica_file_path=replica_file_path,
|
|
replica_endpoint=replica_endpoint,
|
|
delay_days=0,
|
|
existing_job_id=existing_job_id,
|
|
)
|
|
if existing_job_id:
|
|
replication_job = await self.get_background_job(existing_job_id, org.id)
|
|
previous_attempt = {
|
|
"started": replication_job.started,
|
|
"finished": replication_job.finished,
|
|
}
|
|
if replication_job.previousAttempts:
|
|
replication_job.previousAttempts.append(previous_attempt)
|
|
else:
|
|
replication_job.previousAttempts = [previous_attempt]
|
|
replication_job.started = dt_now()
|
|
replication_job.finished = None
|
|
replication_job.success = None
|
|
else:
|
|
replication_job = CreateReplicaJob(
|
|
id=job_id,
|
|
oid=org.id,
|
|
started=dt_now(),
|
|
file_path=file.filename,
|
|
object_type=object_type,
|
|
object_id=object_id,
|
|
primary=file.storage,
|
|
replica_storage=replica_ref,
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id}, {"$set": replication_job.to_dict()}, upsert=True
|
|
)
|
|
|
|
return job_id
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
print(
|
|
"warning: replica job could not be started "
|
|
+ f"for {object_type} {file}: {exc}"
|
|
)
|
|
return ""
|
|
|
|
async def create_delete_replica_jobs(
|
|
self, org: Organization, file: BaseFile, object_id: str, object_type: str
|
|
) -> Dict[str, Union[bool, List[str]]]:
|
|
"""Create a job to delete each replica for the given file"""
|
|
ids = []
|
|
|
|
for replica_ref in file.replicas or []:
|
|
job_id = await self.create_delete_replica_job(
|
|
org, file, object_id, object_type, replica_ref
|
|
)
|
|
if job_id:
|
|
ids.append(job_id)
|
|
|
|
return {"added": True, "ids": ids}
|
|
|
|
async def create_delete_replica_job(
|
|
self,
|
|
org: Organization,
|
|
file: BaseFile,
|
|
object_id: str,
|
|
object_type: str,
|
|
replica_ref: StorageRef,
|
|
force_start_immediately: bool = False,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> str:
|
|
"""Create a job to delete one replica of a given file"""
|
|
try:
|
|
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
|
|
replica_endpoint, bucket_suffix = self.strip_bucket(
|
|
replica_storage.endpoint_url
|
|
)
|
|
replica_file_path = bucket_suffix + file.filename
|
|
|
|
job_type = BgJobType.DELETE_REPLICA.value
|
|
|
|
delay_days = int(os.environ.get("REPLICA_DELETION_DELAY_DAYS", 0))
|
|
if force_start_immediately:
|
|
delay_days = 0
|
|
|
|
job_id, schedule = await self.crawl_manager.run_replica_job(
|
|
oid=str(org.id),
|
|
job_type=job_type,
|
|
replica_storage=replica_ref,
|
|
replica_file_path=replica_file_path,
|
|
replica_endpoint=replica_endpoint,
|
|
delay_days=delay_days,
|
|
existing_job_id=existing_job_id,
|
|
)
|
|
|
|
if existing_job_id:
|
|
job = await self.get_background_job(existing_job_id, org.id)
|
|
delete_replica_job = cast(DeleteReplicaJob, job)
|
|
previous_attempt = {
|
|
"started": delete_replica_job.started,
|
|
"finished": delete_replica_job.finished,
|
|
}
|
|
if delete_replica_job.previousAttempts:
|
|
delete_replica_job.previousAttempts.append(previous_attempt)
|
|
else:
|
|
delete_replica_job.previousAttempts = [previous_attempt]
|
|
delete_replica_job.started = dt_now()
|
|
delete_replica_job.finished = None
|
|
delete_replica_job.success = None
|
|
delete_replica_job.schedule = None
|
|
else:
|
|
delete_replica_job = DeleteReplicaJob(
|
|
id=job_id,
|
|
oid=org.id,
|
|
started=dt_now(),
|
|
file_path=file.filename,
|
|
object_id=object_id,
|
|
object_type=object_type,
|
|
replica_storage=replica_ref,
|
|
schedule=schedule,
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id}, {"$set": delete_replica_job.to_dict()}, upsert=True
|
|
)
|
|
|
|
return job_id
|
|
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
print(
|
|
"warning: replica deletion job could not be started "
|
|
+ f"for {object_type} {file}: {exc}"
|
|
)
|
|
return ""
|
|
|
|
async def create_delete_org_job(
|
|
self,
|
|
org: Organization,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""Create background job to delete org and its data"""
|
|
|
|
try:
|
|
job_id = await self.crawl_manager.run_delete_org_job(
|
|
oid=str(org.id),
|
|
existing_job_id=existing_job_id,
|
|
)
|
|
if existing_job_id:
|
|
delete_org_job = await self.get_background_job(existing_job_id, org.id)
|
|
previous_attempt = {
|
|
"started": delete_org_job.started,
|
|
"finished": delete_org_job.finished,
|
|
}
|
|
if delete_org_job.previousAttempts:
|
|
delete_org_job.previousAttempts.append(previous_attempt)
|
|
else:
|
|
delete_org_job.previousAttempts = [previous_attempt]
|
|
delete_org_job.started = dt_now()
|
|
delete_org_job.finished = None
|
|
delete_org_job.success = None
|
|
else:
|
|
delete_org_job = DeleteOrgJob(
|
|
id=job_id,
|
|
oid=org.id,
|
|
started=dt_now(),
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id}, {"$set": delete_org_job.to_dict()}, upsert=True
|
|
)
|
|
|
|
return job_id
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
# pylint: disable=raise-missing-from
|
|
print(f"warning: delete org job could not be started: {exc}")
|
|
return None
|
|
|
|
async def create_recalculate_org_stats_job(
|
|
self,
|
|
org: Organization,
|
|
existing_job_id: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""Create background job to recalculate org stats"""
|
|
|
|
try:
|
|
job_id = await self.crawl_manager.run_recalculate_org_stats_job(
|
|
oid=str(org.id),
|
|
existing_job_id=existing_job_id,
|
|
)
|
|
if existing_job_id:
|
|
recalculate_job = await self.get_background_job(existing_job_id, org.id)
|
|
previous_attempt = {
|
|
"started": recalculate_job.started,
|
|
"finished": recalculate_job.finished,
|
|
}
|
|
if recalculate_job.previousAttempts:
|
|
recalculate_job.previousAttempts.append(previous_attempt)
|
|
else:
|
|
recalculate_job.previousAttempts = [previous_attempt]
|
|
recalculate_job.started = dt_now()
|
|
recalculate_job.finished = None
|
|
recalculate_job.success = None
|
|
else:
|
|
recalculate_job = RecalculateOrgStatsJob(
|
|
id=job_id,
|
|
oid=org.id,
|
|
started=dt_now(),
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id}, {"$set": recalculate_job.to_dict()}, upsert=True
|
|
)
|
|
|
|
return job_id
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
# pylint: disable=raise-missing-from
|
|
print(f"warning: recalculate org stats job could not be started: {exc}")
|
|
return None
|
|
|
|
async def create_re_add_org_pages_job(
|
|
self,
|
|
oid: UUID,
|
|
crawl_type: Optional[str] = None,
|
|
crawl_id: Optional[str] = None,
|
|
existing_job_id: Optional[str] = None,
|
|
):
|
|
"""Create job to (re)add all pages in an org, optionally filtered by crawl type"""
|
|
|
|
try:
|
|
job_id = await self.crawl_manager.run_re_add_org_pages_job(
|
|
oid=str(oid),
|
|
crawl_type=crawl_type,
|
|
crawl_id=crawl_id,
|
|
existing_job_id=existing_job_id,
|
|
)
|
|
if existing_job_id:
|
|
readd_pages_job = await self.get_background_job(existing_job_id, oid)
|
|
previous_attempt = {
|
|
"started": readd_pages_job.started,
|
|
"finished": readd_pages_job.finished,
|
|
}
|
|
if readd_pages_job.previousAttempts:
|
|
readd_pages_job.previousAttempts.append(previous_attempt)
|
|
else:
|
|
readd_pages_job.previousAttempts = [previous_attempt]
|
|
readd_pages_job.started = dt_now()
|
|
readd_pages_job.finished = None
|
|
readd_pages_job.success = None
|
|
else:
|
|
readd_pages_job = ReAddOrgPagesJob(
|
|
id=job_id,
|
|
oid=oid,
|
|
crawl_type=crawl_type,
|
|
crawl_id=crawl_id,
|
|
started=dt_now(),
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id}, {"$set": readd_pages_job.to_dict()}, upsert=True
|
|
)
|
|
|
|
return job_id
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
# pylint: disable=raise-missing-from
|
|
print(f"warning: re-add org pages job could not be started: {exc}")
|
|
return None
|
|
|
|
async def create_optimize_crawl_pages_job(
|
|
self,
|
|
existing_job_id: Optional[str] = None,
|
|
):
|
|
"""Create job to optimize crawl pages"""
|
|
|
|
try:
|
|
job_id = await self.crawl_manager.run_optimize_pages_job(
|
|
existing_job_id=existing_job_id, scale=self.migration_jobs_scale
|
|
)
|
|
if existing_job_id:
|
|
optimize_pages_job = await self.get_background_job(existing_job_id)
|
|
previous_attempt = {
|
|
"started": optimize_pages_job.started,
|
|
"finished": optimize_pages_job.finished,
|
|
}
|
|
if optimize_pages_job.previousAttempts:
|
|
optimize_pages_job.previousAttempts.append(previous_attempt)
|
|
else:
|
|
optimize_pages_job.previousAttempts = [previous_attempt]
|
|
optimize_pages_job.started = dt_now()
|
|
optimize_pages_job.finished = None
|
|
optimize_pages_job.success = None
|
|
else:
|
|
optimize_pages_job = OptimizePagesJob(
|
|
id=job_id,
|
|
started=dt_now(),
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id}, {"$set": optimize_pages_job.to_dict()}, upsert=True
|
|
)
|
|
|
|
return job_id
|
|
# pylint: disable=broad-exception-caught
|
|
except Exception as exc:
|
|
# pylint: disable=raise-missing-from
|
|
print(f"warning: optimize pages job could not be started: {exc}")
|
|
return None
|
|
|
|
async def job_finished(
|
|
self,
|
|
job_id: str,
|
|
job_type: str,
|
|
success: bool,
|
|
finished: datetime,
|
|
oid: Optional[UUID] = None,
|
|
) -> None:
|
|
"""Update job as finished, including
|
|
job-specific task handling"""
|
|
|
|
job = await self.get_background_job(job_id)
|
|
if job.finished:
|
|
return
|
|
|
|
if job.type != job_type:
|
|
raise HTTPException(status_code=400, detail="invalid_job_type")
|
|
|
|
if success:
|
|
if job_type == BgJobType.CREATE_REPLICA:
|
|
await self.handle_replica_job_finished(cast(CreateReplicaJob, job))
|
|
if job_type == BgJobType.DELETE_REPLICA:
|
|
await self.handle_delete_replica_job_finished(
|
|
cast(DeleteReplicaJob, job)
|
|
)
|
|
else:
|
|
print(
|
|
f"Background job {job.id} failed, sending email to superuser",
|
|
flush=True,
|
|
)
|
|
superuser = await self.user_manager.get_superuser()
|
|
org = None
|
|
if job.oid:
|
|
org = await self.org_ops.get_org_by_id(job.oid)
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
self.email.send_background_job_failed,
|
|
job,
|
|
finished,
|
|
superuser.email,
|
|
org,
|
|
)
|
|
|
|
await self.jobs.find_one_and_update(
|
|
{"_id": job_id, "oid": oid},
|
|
{"$set": {"success": success, "finished": finished}},
|
|
)
|
|
|
|
async def get_background_job(
|
|
self, job_id: str, oid: Optional[UUID] = None
|
|
) -> Union[
|
|
CreateReplicaJob,
|
|
DeleteReplicaJob,
|
|
DeleteOrgJob,
|
|
RecalculateOrgStatsJob,
|
|
ReAddOrgPagesJob,
|
|
OptimizePagesJob,
|
|
]:
|
|
"""Get background job"""
|
|
query: dict[str, object] = {"_id": job_id}
|
|
if oid:
|
|
query["oid"] = oid
|
|
|
|
res = await self.jobs.find_one(query)
|
|
if not res:
|
|
raise HTTPException(status_code=404, detail="job_not_found")
|
|
|
|
return self._get_job_by_type_from_data(res)
|
|
|
|
def _get_job_by_type_from_data(self, data: dict[str, object]):
|
|
"""convert dict to propert background job type"""
|
|
if data["type"] == BgJobType.CREATE_REPLICA:
|
|
return CreateReplicaJob.from_dict(data)
|
|
|
|
if data["type"] == BgJobType.DELETE_REPLICA:
|
|
return DeleteReplicaJob.from_dict(data)
|
|
|
|
if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
|
|
return RecalculateOrgStatsJob.from_dict(data)
|
|
|
|
if data["type"] == BgJobType.READD_ORG_PAGES:
|
|
return ReAddOrgPagesJob.from_dict(data)
|
|
|
|
if data["type"] == BgJobType.OPTIMIZE_PAGES:
|
|
return OptimizePagesJob.from_dict(data)
|
|
|
|
return DeleteOrgJob.from_dict(data)
|
|
|
|
async def list_background_jobs(
|
|
self,
|
|
org: Optional[Organization] = None,
|
|
page_size: int = DEFAULT_PAGE_SIZE,
|
|
page: int = 1,
|
|
success: Optional[bool] = None,
|
|
job_type: Optional[str] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[int] = -1,
|
|
) -> Tuple[List[BackgroundJob], int]:
|
|
"""List all background jobs"""
|
|
# pylint: disable=duplicate-code
|
|
# Zero-index page for query
|
|
page = page - 1
|
|
skip = page_size * page
|
|
|
|
query: dict[str, object] = {}
|
|
|
|
if org:
|
|
query["oid"] = org.id
|
|
|
|
if success in (True, False):
|
|
query["success"] = success
|
|
|
|
if job_type:
|
|
query["type"] = job_type
|
|
|
|
aggregate = [{"$match": query}]
|
|
|
|
if sort_by:
|
|
SORT_FIELDS = ("success", "type", "started", "finished")
|
|
if sort_by not in SORT_FIELDS:
|
|
raise HTTPException(status_code=400, detail="invalid_sort_by")
|
|
if sort_direction not in (1, -1):
|
|
raise HTTPException(status_code=400, detail="invalid_sort_direction")
|
|
|
|
aggregate.extend([{"$sort": {sort_by: sort_direction}}])
|
|
|
|
aggregate.extend(
|
|
[
|
|
{
|
|
"$facet": {
|
|
"items": [
|
|
{"$skip": skip},
|
|
{"$limit": page_size},
|
|
],
|
|
"total": [{"$count": "count"}],
|
|
}
|
|
},
|
|
]
|
|
)
|
|
|
|
# Get total
|
|
cursor = self.jobs.aggregate(aggregate)
|
|
results = await cursor.to_list(length=1)
|
|
result = results[0]
|
|
items = result["items"]
|
|
|
|
try:
|
|
total = int(result["total"][0]["count"])
|
|
except (IndexError, ValueError):
|
|
total = 0
|
|
|
|
jobs = [self._get_job_by_type_from_data(data) for data in items]
|
|
|
|
return jobs, total
|
|
|
|
async def get_replica_job_file(
|
|
self, job: Union[CreateReplicaJob, DeleteReplicaJob], org: Organization
|
|
) -> BaseFile:
|
|
"""Return file from replica job"""
|
|
try:
|
|
if job.object_type == "profile":
|
|
profile = await self.profile_ops.get_profile(UUID(job.object_id), org)
|
|
assert profile.resource
|
|
return BaseFile(**profile.resource.dict())
|
|
|
|
item_res = await self.base_crawl_ops.get_base_crawl(job.object_id, org)
|
|
matching_file = [f for f in item_res.files if f.filename == job.file_path][
|
|
0
|
|
]
|
|
return matching_file
|
|
# pylint: disable=broad-exception-caught, raise-missing-from
|
|
except Exception:
|
|
raise HTTPException(status_code=404, detail="file_not_found")
|
|
|
|
async def retry_background_job(
|
|
self, job_id: str, org: Optional[Organization] = None
|
|
):
|
|
"""Retry background job"""
|
|
job = await self.get_background_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="job_not_found")
|
|
|
|
if not job.finished:
|
|
raise HTTPException(status_code=400, detail="job_not_finished")
|
|
|
|
if job.success:
|
|
raise HTTPException(status_code=400, detail="job_already_succeeded")
|
|
|
|
if org:
|
|
return await self.retry_org_background_job(job, org)
|
|
|
|
if job.type == BgJobType.OPTIMIZE_PAGES:
|
|
await self.create_optimize_crawl_pages_job(
|
|
existing_job_id=job_id,
|
|
)
|
|
return {"success": True}
|
|
|
|
return {"success": False}
|
|
|
|
async def retry_org_background_job(
|
|
self, job: BackgroundJob, org: Organization
|
|
) -> Dict[str, Union[bool, Optional[str]]]:
|
|
"""Retry background job specific to one org"""
|
|
if job.type == BgJobType.CREATE_REPLICA:
|
|
job = cast(CreateReplicaJob, job)
|
|
file = await self.get_replica_job_file(job, org)
|
|
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
|
|
primary_endpoint, bucket_suffix = self.strip_bucket(
|
|
primary_storage.endpoint_url
|
|
)
|
|
primary_file_path = bucket_suffix + file.filename
|
|
await self.create_replica_job(
|
|
org,
|
|
file,
|
|
job.object_id,
|
|
job.object_type,
|
|
job.replica_storage,
|
|
primary_file_path,
|
|
primary_endpoint,
|
|
existing_job_id=job.id,
|
|
)
|
|
return {"success": True}
|
|
|
|
if job.type == BgJobType.DELETE_REPLICA:
|
|
job = cast(DeleteReplicaJob, job)
|
|
file = await self.get_replica_job_file(job, org)
|
|
await self.create_delete_replica_job(
|
|
org,
|
|
file,
|
|
job.object_id,
|
|
job.object_type,
|
|
job.replica_storage,
|
|
force_start_immediately=True,
|
|
existing_job_id=job.id,
|
|
)
|
|
return {"success": True}
|
|
|
|
if job.type == BgJobType.DELETE_ORG:
|
|
job = cast(DeleteOrgJob, job)
|
|
await self.create_delete_org_job(
|
|
org,
|
|
existing_job_id=job.id,
|
|
)
|
|
return {"success": True}
|
|
|
|
if job.type == BgJobType.RECALCULATE_ORG_STATS:
|
|
job = cast(RecalculateOrgStatsJob, job)
|
|
await self.create_recalculate_org_stats_job(
|
|
org,
|
|
existing_job_id=job.id,
|
|
)
|
|
return {"success": True}
|
|
|
|
if job.type == BgJobType.READD_ORG_PAGES:
|
|
job = cast(ReAddOrgPagesJob, job)
|
|
await self.create_re_add_org_pages_job(
|
|
org.id,
|
|
job.crawl_type,
|
|
job.crawl_id,
|
|
existing_job_id=job.id,
|
|
)
|
|
return {"success": True}
|
|
|
|
return {"success": False}
|
|
|
|
async def retry_failed_org_background_jobs(
|
|
self, org: Organization
|
|
) -> Dict[str, Union[bool, Optional[str]]]:
|
|
"""Retry all failed background jobs in an org
|
|
|
|
Keep track of tasks in set to prevent them from being garbage collected
|
|
See: https://stackoverflow.com/a/74059981
|
|
"""
|
|
bg_tasks = set()
|
|
async for job in self.jobs.find({"oid": org.id, "success": False}):
|
|
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
|
|
bg_tasks.add(task)
|
|
task.add_done_callback(bg_tasks.discard)
|
|
return {"success": True}
|
|
|
|
async def retry_all_failed_background_jobs(
|
|
self,
|
|
) -> Dict[str, Union[bool, Optional[str]]]:
|
|
"""Retry all failed background jobs from all orgs
|
|
|
|
Keep track of tasks in set to prevent them from being garbage collected
|
|
See: https://stackoverflow.com/a/74059981
|
|
"""
|
|
bg_tasks = set()
|
|
async for job in self.jobs.find({"success": False}):
|
|
org = None
|
|
if job.get("oid"):
|
|
org = await self.org_ops.get_org_by_id(job["oid"])
|
|
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
|
|
bg_tasks.add(task)
|
|
task.add_done_callback(bg_tasks.discard)
|
|
return {"success": True}
|
|
|
|
|
|
# ============================================================================
|
|
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
|
|
def init_background_jobs_api(
|
|
app, mdb, email, user_manager, org_ops, crawl_manager, storage_ops, user_dep
|
|
):
|
|
"""init background jobs system"""
|
|
# pylint: disable=invalid-name
|
|
|
|
ops = BackgroundJobOps(
|
|
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
|
|
)
|
|
|
|
router = ops.router
|
|
|
|
# org_owner_dep = org_ops.org_owner_dep
|
|
org_crawl_dep = org_ops.org_crawl_dep
|
|
|
|
@router.get(
|
|
"/{job_id}",
|
|
response_model=AnyJob,
|
|
)
|
|
async def get_org_background_job(
|
|
job_id: str,
|
|
org: Organization = Depends(org_crawl_dep),
|
|
):
|
|
"""Retrieve information for background job"""
|
|
return await ops.get_background_job(job_id, org.id)
|
|
|
|
@app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
|
|
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
|
|
"""Get background job from any org"""
|
|
if not user.is_superuser:
|
|
raise HTTPException(status_code=403, detail="Not Allowed")
|
|
|
|
return await ops.get_background_job(job_id)
|
|
|
|
@app.post(
|
|
"/orgs/all/jobs/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"]
|
|
)
|
|
async def retry_background_job_no_org(job_id: str, user: User = Depends(user_dep)):
|
|
"""Retry backgound job that doesn't belong to an org, e.g. migration job"""
|
|
if not user.is_superuser:
|
|
raise HTTPException(status_code=403, detail="Not Allowed")
|
|
|
|
job = await ops.get_background_job(job_id)
|
|
|
|
org = None
|
|
if job.oid:
|
|
org = await ops.org_ops.get_org_by_id(job.oid)
|
|
|
|
return await ops.retry_background_job(job_id, org)
|
|
|
|
@app.post(
|
|
"/orgs/all/jobs/migrateCrawls", response_model=SuccessResponseId, tags=["jobs"]
|
|
)
|
|
async def create_migrate_crawls_job(job_id: str, user: User = Depends(user_dep)):
|
|
"""Launch background job to migrate all crawls to v2 with optimized pages"""
|
|
if not user.is_superuser:
|
|
raise HTTPException(status_code=403, detail="Not Allowed")
|
|
|
|
job_id = await ops.create_optimize_crawl_pages_job()
|
|
|
|
return {"success": True, "id": job_id}
|
|
|
|
@router.post("/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"])
|
|
async def retry_org_background_job(
|
|
job_id: str,
|
|
org: Organization = Depends(org_crawl_dep),
|
|
):
|
|
"""Retry background job"""
|
|
return await ops.retry_background_job(job_id, org)
|
|
|
|
@app.post(
|
|
"/orgs/all/jobs/retryFailed", response_model=SuccessResponse, tags=["jobs"]
|
|
)
|
|
async def retry_all_failed_background_jobs(user: User = Depends(user_dep)):
|
|
"""Retry failed background jobs from all orgs"""
|
|
if not user.is_superuser:
|
|
raise HTTPException(status_code=403, detail="Not Allowed")
|
|
|
|
return await ops.retry_all_failed_background_jobs()
|
|
|
|
@router.post("/retryFailed", response_model=SuccessResponse, tags=["jobs"])
|
|
async def retry_failed_org_background_jobs(
|
|
org: Organization = Depends(org_crawl_dep),
|
|
):
|
|
"""Retry failed background jobs"""
|
|
return await ops.retry_failed_org_background_jobs(org)
|
|
|
|
@app.get(
|
|
"/orgs/all/jobs", response_model=PaginatedBackgroundJobResponse, tags=["jobs"]
|
|
)
|
|
async def list_all_background_jobs(
|
|
pageSize: int = DEFAULT_PAGE_SIZE,
|
|
page: int = 1,
|
|
success: Optional[bool] = None,
|
|
jobType: Optional[str] = None,
|
|
sortBy: Optional[str] = None,
|
|
sortDirection: Optional[int] = -1,
|
|
user: User = Depends(user_dep),
|
|
):
|
|
"""Retrieve paginated list of background jobs"""
|
|
if not user.is_superuser:
|
|
raise HTTPException(status_code=403, detail="Not Allowed")
|
|
|
|
jobs, total = await ops.list_background_jobs(
|
|
org=None,
|
|
page_size=pageSize,
|
|
page=page,
|
|
success=success,
|
|
job_type=jobType,
|
|
sort_by=sortBy,
|
|
sort_direction=sortDirection,
|
|
)
|
|
return paginated_format(jobs, total, page, pageSize)
|
|
|
|
@router.get("", response_model=PaginatedBackgroundJobResponse, tags=["jobs"])
|
|
async def list_background_jobs(
|
|
org: Organization = Depends(org_crawl_dep),
|
|
pageSize: int = DEFAULT_PAGE_SIZE,
|
|
page: int = 1,
|
|
success: Optional[bool] = None,
|
|
jobType: Optional[str] = None,
|
|
sortBy: Optional[str] = None,
|
|
sortDirection: Optional[int] = -1,
|
|
):
|
|
"""Retrieve paginated list of background jobs"""
|
|
jobs, total = await ops.list_background_jobs(
|
|
org=org,
|
|
page_size=pageSize,
|
|
page=page,
|
|
success=success,
|
|
job_type=jobType,
|
|
sort_by=sortBy,
|
|
sort_direction=sortDirection,
|
|
)
|
|
return paginated_format(jobs, total, page, pageSize)
|
|
|
|
org_ops.router.include_router(router)
|
|
|
|
return ops
|