Fixes #1328 - Adds /retry endpoint for retrying failed jobs. - Returns 400 error if previous job still running or has succeeded - Keeps track of previous failed attempts in previousAttempts array on failed job. - Also amends the similar webhook /retry endpoint to use `POST` for consistency. - Remove duplicate api tag for backgroundjobs
		
			
				
	
	
		
			522 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			522 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""k8s background jobs"""
 | 
						|
import asyncio
 | 
						|
from datetime import datetime
 | 
						|
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
 | 
						|
from uuid import UUID
 | 
						|
 | 
						|
from urllib.parse import urlsplit
 | 
						|
 | 
						|
from fastapi import APIRouter, Depends, HTTPException
 | 
						|
 | 
						|
from .storages import StorageOps
 | 
						|
from .crawlmanager import CrawlManager
 | 
						|
 | 
						|
from .models import (
 | 
						|
    BaseFile,
 | 
						|
    Organization,
 | 
						|
    BackgroundJob,
 | 
						|
    BgJobType,
 | 
						|
    CreateReplicaJob,
 | 
						|
    DeleteReplicaJob,
 | 
						|
    PaginatedResponse,
 | 
						|
    AnyJob,
 | 
						|
    StorageRef,
 | 
						|
)
 | 
						|
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    from .orgs import OrgOps
 | 
						|
    from .basecrawls import BaseCrawlOps
 | 
						|
    from .profiles import ProfileOps
 | 
						|
else:
 | 
						|
    OrgOps = CrawlManager = BaseCrawlOps = ProfileOps = object
 | 
						|
 | 
						|
 | 
						|
# ============================================================================
 | 
						|
# pylint: disable=too-many-instance-attributes
 | 
						|
class BackgroundJobOps:
 | 
						|
    """k8s background job management"""
 | 
						|
 | 
						|
    org_ops: OrgOps
 | 
						|
    crawl_manager: CrawlManager
 | 
						|
    storage_ops: StorageOps
 | 
						|
 | 
						|
    base_crawl_ops: BaseCrawlOps
 | 
						|
    profile_ops: ProfileOps
 | 
						|
 | 
						|
    # pylint: disable=too-many-locals, too-many-arguments, invalid-name
 | 
						|
 | 
						|
    def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops):
 | 
						|
        self.jobs = mdb["jobs"]
 | 
						|
 | 
						|
        self.email = email
 | 
						|
        self.user_manager = user_manager
 | 
						|
 | 
						|
        self.org_ops = org_ops
 | 
						|
        self.crawl_manager = crawl_manager
 | 
						|
        self.storage_ops = storage_ops
 | 
						|
 | 
						|
        self.base_crawl_ops = cast(BaseCrawlOps, None)
 | 
						|
        self.profile_ops = cast(ProfileOps, None)
 | 
						|
 | 
						|
        self.router = APIRouter(
 | 
						|
            prefix="/jobs",
 | 
						|
            tags=["jobs"],
 | 
						|
            responses={404: {"description": "Not found"}},
 | 
						|
        )
 | 
						|
 | 
						|
    def set_ops(self, base_crawl_ops: BaseCrawlOps, profile_ops: ProfileOps) -> None:
 | 
						|
        """basecrawlops and profileops for updating files"""
 | 
						|
        self.base_crawl_ops = base_crawl_ops
 | 
						|
        self.profile_ops = profile_ops
 | 
						|
 | 
						|
    def strip_bucket(self, endpoint_url: str) -> tuple[str, str]:
 | 
						|
        """split the endpoint_url into the origin and return rest of endpoint as bucket path"""
 | 
						|
        parts = urlsplit(endpoint_url)
 | 
						|
        return parts.scheme + "://" + parts.netloc + "/", parts.path[1:]
 | 
						|
 | 
						|
    async def handle_replica_job_finished(self, job: CreateReplicaJob) -> None:
 | 
						|
        """Update replicas in corresponding file objects, based on type"""
 | 
						|
        res = None
 | 
						|
        if job.object_type in ("crawl", "upload"):
 | 
						|
            res = await self.base_crawl_ops.add_crawl_file_replica(
 | 
						|
                job.object_id, job.file_path, job.replica_storage
 | 
						|
            )
 | 
						|
        elif job.object_type == "profile":
 | 
						|
            res = await self.profile_ops.add_profile_file_replica(
 | 
						|
                UUID(job.object_id), job.file_path, job.replica_storage
 | 
						|
            )
 | 
						|
        if not res:
 | 
						|
            print("File deleted before replication job started, ignoring", flush=True)
 | 
						|
 | 
						|
    async def create_replica_jobs(
 | 
						|
        self, oid: UUID, file: BaseFile, object_id: str, object_type: str
 | 
						|
    ) -> Dict[str, Union[bool, List[str]]]:
 | 
						|
        """Create k8s background job to replicate a file to all replica storage locations."""
 | 
						|
        org = await self.org_ops.get_org_by_id(oid)
 | 
						|
 | 
						|
        primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
 | 
						|
        primary_endpoint, bucket_suffix = self.strip_bucket(
 | 
						|
            primary_storage.endpoint_url
 | 
						|
        )
 | 
						|
 | 
						|
        primary_file_path = bucket_suffix + file.filename
 | 
						|
 | 
						|
        ids = []
 | 
						|
 | 
						|
        for replica_ref in self.storage_ops.get_org_replicas_storage_refs(org):
 | 
						|
            job_id = await self.create_replica_job(
 | 
						|
                org,
 | 
						|
                file,
 | 
						|
                object_id,
 | 
						|
                object_type,
 | 
						|
                replica_ref,
 | 
						|
                primary_file_path,
 | 
						|
                primary_endpoint,
 | 
						|
            )
 | 
						|
            ids.append(job_id)
 | 
						|
 | 
						|
        return {"added": True, "ids": ids}
 | 
						|
 | 
						|
    async def create_replica_job(
 | 
						|
        self,
 | 
						|
        org: Organization,
 | 
						|
        file: BaseFile,
 | 
						|
        object_id: str,
 | 
						|
        object_type: str,
 | 
						|
        replica_ref: StorageRef,
 | 
						|
        primary_file_path: str,
 | 
						|
        primary_endpoint: str,
 | 
						|
        existing_job_id: Optional[str] = None,
 | 
						|
    ) -> str:
 | 
						|
        """Create k8s background job to replicate a file to a specific replica storage location."""
 | 
						|
        replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
 | 
						|
        replica_endpoint, bucket_suffix = self.strip_bucket(
 | 
						|
            replica_storage.endpoint_url
 | 
						|
        )
 | 
						|
        replica_file_path = bucket_suffix + file.filename
 | 
						|
 | 
						|
        job_type = BgJobType.CREATE_REPLICA.value
 | 
						|
 | 
						|
        try:
 | 
						|
            job_id = await self.crawl_manager.run_replica_job(
 | 
						|
                oid=str(org.id),
 | 
						|
                job_type=job_type,
 | 
						|
                primary_storage=file.storage,
 | 
						|
                primary_file_path=primary_file_path,
 | 
						|
                primary_endpoint=primary_endpoint,
 | 
						|
                replica_storage=replica_ref,
 | 
						|
                replica_file_path=replica_file_path,
 | 
						|
                replica_endpoint=replica_endpoint,
 | 
						|
                job_id_prefix=f"{job_type}-{object_id}",
 | 
						|
                existing_job_id=existing_job_id,
 | 
						|
            )
 | 
						|
            if existing_job_id:
 | 
						|
                replication_job = await self.get_background_job(existing_job_id, org.id)
 | 
						|
                previous_attempt = {
 | 
						|
                    "started": replication_job.started,
 | 
						|
                    "finished": replication_job.finished,
 | 
						|
                }
 | 
						|
                if replication_job.previousAttempts:
 | 
						|
                    replication_job.previousAttempts.append(previous_attempt)
 | 
						|
                else:
 | 
						|
                    replication_job.previousAttempts = [previous_attempt]
 | 
						|
                replication_job.started = datetime.now()
 | 
						|
                replication_job.finished = None
 | 
						|
                replication_job.success = None
 | 
						|
            else:
 | 
						|
                replication_job = CreateReplicaJob(
 | 
						|
                    id=job_id,
 | 
						|
                    oid=org.id,
 | 
						|
                    started=datetime.now(),
 | 
						|
                    file_path=file.filename,
 | 
						|
                    object_type=object_type,
 | 
						|
                    object_id=object_id,
 | 
						|
                    primary=file.storage,
 | 
						|
                    replica_storage=replica_ref,
 | 
						|
                )
 | 
						|
 | 
						|
            await self.jobs.find_one_and_update(
 | 
						|
                {"_id": job_id}, {"$set": replication_job.to_dict()}, upsert=True
 | 
						|
            )
 | 
						|
 | 
						|
            return job_id
 | 
						|
        except Exception as exc:
 | 
						|
            # pylint: disable=raise-missing-from
 | 
						|
            raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}")
 | 
						|
 | 
						|
    async def create_delete_replica_jobs(
 | 
						|
        self, org: Organization, file: BaseFile, object_id: str, object_type: str
 | 
						|
    ) -> Dict[str, Union[bool, List[str]]]:
 | 
						|
        """Create a job to delete each replica for the given file"""
 | 
						|
        ids = []
 | 
						|
 | 
						|
        for replica_ref in file.replicas or []:
 | 
						|
            job_id = await self.create_delete_replica_job(
 | 
						|
                org, file, object_id, object_type, replica_ref
 | 
						|
            )
 | 
						|
            ids.append(job_id)
 | 
						|
 | 
						|
        return {"added": True, "ids": ids}
 | 
						|
 | 
						|
    async def create_delete_replica_job(
 | 
						|
        self,
 | 
						|
        org: Organization,
 | 
						|
        file: BaseFile,
 | 
						|
        object_id: str,
 | 
						|
        object_type: str,
 | 
						|
        replica_ref: StorageRef,
 | 
						|
        existing_job_id: Optional[str] = None,
 | 
						|
    ) -> str:
 | 
						|
        """Create a job to delete one replica of a given file"""
 | 
						|
        replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
 | 
						|
        replica_endpoint, bucket_suffix = self.strip_bucket(
 | 
						|
            replica_storage.endpoint_url
 | 
						|
        )
 | 
						|
        replica_file_path = bucket_suffix + file.filename
 | 
						|
 | 
						|
        job_type = BgJobType.DELETE_REPLICA.value
 | 
						|
 | 
						|
        try:
 | 
						|
            job_id = await self.crawl_manager.run_replica_job(
 | 
						|
                oid=str(org.id),
 | 
						|
                job_type=job_type,
 | 
						|
                replica_storage=replica_ref,
 | 
						|
                replica_file_path=replica_file_path,
 | 
						|
                replica_endpoint=replica_endpoint,
 | 
						|
                job_id_prefix=f"{job_type}-{object_id}",
 | 
						|
                existing_job_id=existing_job_id,
 | 
						|
            )
 | 
						|
 | 
						|
            if existing_job_id:
 | 
						|
                delete_replica_job = await self.get_background_job(
 | 
						|
                    existing_job_id, org.id
 | 
						|
                )
 | 
						|
                previous_attempt = {
 | 
						|
                    "started": delete_replica_job.started,
 | 
						|
                    "finished": delete_replica_job.finished,
 | 
						|
                }
 | 
						|
                if delete_replica_job.previousAttempts:
 | 
						|
                    delete_replica_job.previousAttempts.append(previous_attempt)
 | 
						|
                else:
 | 
						|
                    delete_replica_job.previousAttempts = [previous_attempt]
 | 
						|
                delete_replica_job.started = datetime.now()
 | 
						|
                delete_replica_job.finished = None
 | 
						|
                delete_replica_job.success = None
 | 
						|
            else:
 | 
						|
                delete_replica_job = DeleteReplicaJob(
 | 
						|
                    id=job_id,
 | 
						|
                    oid=org.id,
 | 
						|
                    started=datetime.now(),
 | 
						|
                    file_path=file.filename,
 | 
						|
                    object_id=object_id,
 | 
						|
                    object_type=object_type,
 | 
						|
                    replica_storage=replica_ref,
 | 
						|
                )
 | 
						|
 | 
						|
            await self.jobs.find_one_and_update(
 | 
						|
                {"_id": job_id}, {"$set": delete_replica_job.to_dict()}, upsert=True
 | 
						|
            )
 | 
						|
 | 
						|
            return job_id
 | 
						|
 | 
						|
        except Exception as exc:
 | 
						|
            # pylint: disable=raise-missing-from
 | 
						|
            raise HTTPException(
 | 
						|
                status_code=400, detail=f"Error starting background job: {exc}"
 | 
						|
            )
 | 
						|
 | 
						|
    async def job_finished(
 | 
						|
        self,
 | 
						|
        job_id: str,
 | 
						|
        job_type: str,
 | 
						|
        oid: UUID,
 | 
						|
        success: bool,
 | 
						|
        finished: datetime,
 | 
						|
    ) -> None:
 | 
						|
        """Update job as finished, including
 | 
						|
        job-specific task handling"""
 | 
						|
 | 
						|
        job = await self.get_background_job(job_id, oid)
 | 
						|
        if job.finished:
 | 
						|
            return
 | 
						|
 | 
						|
        if job.type != job_type:
 | 
						|
            raise HTTPException(status_code=400, detail="invalid_job_type")
 | 
						|
 | 
						|
        if success:
 | 
						|
            if job_type == BgJobType.CREATE_REPLICA:
 | 
						|
                await self.handle_replica_job_finished(cast(CreateReplicaJob, job))
 | 
						|
        else:
 | 
						|
            print(
 | 
						|
                f"Background job {job.id} failed, sending email to superuser",
 | 
						|
                flush=True,
 | 
						|
            )
 | 
						|
            superuser = await self.user_manager.get_superuser()
 | 
						|
            org = await self.org_ops.get_org_by_id(job.oid)
 | 
						|
            await asyncio.get_event_loop().run_in_executor(
 | 
						|
                None,
 | 
						|
                self.email.send_background_job_failed,
 | 
						|
                job,
 | 
						|
                org,
 | 
						|
                finished,
 | 
						|
                superuser.email,
 | 
						|
            )
 | 
						|
 | 
						|
        await self.jobs.find_one_and_update(
 | 
						|
            {"_id": job_id, "oid": oid},
 | 
						|
            {"$set": {"success": success, "finished": finished}},
 | 
						|
        )
 | 
						|
 | 
						|
    async def get_background_job(
 | 
						|
        self, job_id: str, oid: UUID
 | 
						|
    ) -> Union[CreateReplicaJob, DeleteReplicaJob]:
 | 
						|
        """Get background job"""
 | 
						|
        query: dict[str, object] = {"_id": job_id, "oid": oid}
 | 
						|
        res = await self.jobs.find_one(query)
 | 
						|
        if not res:
 | 
						|
            raise HTTPException(status_code=404, detail="job_not_found")
 | 
						|
 | 
						|
        return self._get_job_by_type_from_data(res)
 | 
						|
 | 
						|
    def _get_job_by_type_from_data(self, data: dict[str, object]):
 | 
						|
        """convert dict to propert background job type"""
 | 
						|
        if data["type"] == BgJobType.CREATE_REPLICA:
 | 
						|
            return CreateReplicaJob.from_dict(data)
 | 
						|
 | 
						|
        return DeleteReplicaJob.from_dict(data)
 | 
						|
 | 
						|
        # return BackgroundJob.from_dict(data)
 | 
						|
 | 
						|
    async def list_background_jobs(
 | 
						|
        self,
 | 
						|
        org: Organization,
 | 
						|
        page_size: int = DEFAULT_PAGE_SIZE,
 | 
						|
        page: int = 1,
 | 
						|
        success: Optional[bool] = None,
 | 
						|
        job_type: Optional[str] = None,
 | 
						|
        sort_by: Optional[str] = None,
 | 
						|
        sort_direction: Optional[int] = -1,
 | 
						|
    ) -> Tuple[List[BackgroundJob], int]:
 | 
						|
        """List all background jobs"""
 | 
						|
        # pylint: disable=duplicate-code
 | 
						|
        # Zero-index page for query
 | 
						|
        page = page - 1
 | 
						|
        skip = page_size * page
 | 
						|
 | 
						|
        query: dict[str, object] = {"oid": org.id}
 | 
						|
 | 
						|
        if success in (True, False):
 | 
						|
            query["success"] = success
 | 
						|
 | 
						|
        if job_type:
 | 
						|
            query["type"] = job_type
 | 
						|
 | 
						|
        aggregate = [{"$match": query}]
 | 
						|
 | 
						|
        if sort_by:
 | 
						|
            SORT_FIELDS = ("success", "type", "started", "finished")
 | 
						|
            if sort_by not in SORT_FIELDS:
 | 
						|
                raise HTTPException(status_code=400, detail="invalid_sort_by")
 | 
						|
            if sort_direction not in (1, -1):
 | 
						|
                raise HTTPException(status_code=400, detail="invalid_sort_direction")
 | 
						|
 | 
						|
            aggregate.extend([{"$sort": {sort_by: sort_direction}}])
 | 
						|
 | 
						|
        aggregate.extend(
 | 
						|
            [
 | 
						|
                {
 | 
						|
                    "$facet": {
 | 
						|
                        "items": [
 | 
						|
                            {"$skip": skip},
 | 
						|
                            {"$limit": page_size},
 | 
						|
                        ],
 | 
						|
                        "total": [{"$count": "count"}],
 | 
						|
                    }
 | 
						|
                },
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
        # Get total
 | 
						|
        cursor = self.jobs.aggregate(aggregate)
 | 
						|
        results = await cursor.to_list(length=1)
 | 
						|
        result = results[0]
 | 
						|
        items = result["items"]
 | 
						|
 | 
						|
        try:
 | 
						|
            total = int(result["total"][0]["count"])
 | 
						|
        except (IndexError, ValueError):
 | 
						|
            total = 0
 | 
						|
 | 
						|
        jobs = [self._get_job_by_type_from_data(data) for data in items]
 | 
						|
 | 
						|
        return jobs, total
 | 
						|
 | 
						|
    async def get_replica_job_file(
 | 
						|
        self, job: Union[CreateReplicaJob, DeleteReplicaJob], org: Organization
 | 
						|
    ) -> BaseFile:
 | 
						|
        """Return file from replica job"""
 | 
						|
        try:
 | 
						|
            if job.object_type == "profile":
 | 
						|
                profile = await self.profile_ops.get_profile(UUID(job.object_id), org)
 | 
						|
                return BaseFile(**profile.resource.dict())
 | 
						|
 | 
						|
            item_res = await self.base_crawl_ops.get_crawl_raw(job.object_id, org)
 | 
						|
            matching_file = [
 | 
						|
                f for f in item_res.get("files", []) if f["filename"] == job.file_path
 | 
						|
            ][0]
 | 
						|
            return BaseFile(**matching_file)
 | 
						|
        # pylint: disable=broad-exception-caught, raise-missing-from
 | 
						|
        except Exception:
 | 
						|
            raise HTTPException(status_code=404, detail="file_not_found")
 | 
						|
 | 
						|
    async def retry_background_job(
 | 
						|
        self, job_id: str, org: Organization
 | 
						|
    ) -> Dict[str, Union[bool, Optional[str]]]:
 | 
						|
        """Retry background job and return new job id"""
 | 
						|
        job = await self.get_background_job(job_id, org.id)
 | 
						|
        if not job:
 | 
						|
            raise HTTPException(status_code=404, detail="job_not_found")
 | 
						|
 | 
						|
        if not job.finished:
 | 
						|
            raise HTTPException(status_code=400, detail="job_not_finished")
 | 
						|
 | 
						|
        if job.success:
 | 
						|
            raise HTTPException(status_code=400, detail="job_already_succeeded")
 | 
						|
 | 
						|
        file = await self.get_replica_job_file(job, org)
 | 
						|
 | 
						|
        if job.type == BgJobType.CREATE_REPLICA:
 | 
						|
            primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
 | 
						|
            primary_endpoint, bucket_suffix = self.strip_bucket(
 | 
						|
                primary_storage.endpoint_url
 | 
						|
            )
 | 
						|
            primary_file_path = bucket_suffix + file.filename
 | 
						|
            await self.create_replica_job(
 | 
						|
                org,
 | 
						|
                file,
 | 
						|
                job.object_id,
 | 
						|
                job.object_type,
 | 
						|
                job.replica_storage,
 | 
						|
                primary_file_path,
 | 
						|
                primary_endpoint,
 | 
						|
                existing_job_id=job_id,
 | 
						|
            )
 | 
						|
 | 
						|
        if job.type == BgJobType.DELETE_REPLICA:
 | 
						|
            await self.create_delete_replica_job(
 | 
						|
                org,
 | 
						|
                file,
 | 
						|
                job.object_id,
 | 
						|
                job.object_type,
 | 
						|
                job.replica_storage,
 | 
						|
                existing_job_id=job_id,
 | 
						|
            )
 | 
						|
 | 
						|
        return {"success": True}
 | 
						|
 | 
						|
 | 
						|
# ============================================================================
 | 
						|
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
 | 
						|
def init_background_jobs_api(
 | 
						|
    mdb, email, user_manager, org_ops, crawl_manager, storage_ops
 | 
						|
):
 | 
						|
    """init background jobs system"""
 | 
						|
    # pylint: disable=invalid-name
 | 
						|
 | 
						|
    ops = BackgroundJobOps(
 | 
						|
        mdb, email, user_manager, org_ops, crawl_manager, storage_ops
 | 
						|
    )
 | 
						|
 | 
						|
    router = ops.router
 | 
						|
 | 
						|
    # org_owner_dep = org_ops.org_owner_dep
 | 
						|
    org_crawl_dep = org_ops.org_crawl_dep
 | 
						|
 | 
						|
    @router.get(
 | 
						|
        "/{job_id}",
 | 
						|
        response_model=AnyJob,
 | 
						|
    )
 | 
						|
    async def get_background_job(
 | 
						|
        job_id: str,
 | 
						|
        org: Organization = Depends(org_crawl_dep),
 | 
						|
    ):
 | 
						|
        """Retrieve information for background job"""
 | 
						|
        return await ops.get_background_job(job_id, org.id)
 | 
						|
 | 
						|
    @router.post(
 | 
						|
        "/{job_id}/retry",
 | 
						|
    )
 | 
						|
    async def retry_background_job(
 | 
						|
        job_id: str,
 | 
						|
        org: Organization = Depends(org_crawl_dep),
 | 
						|
    ):
 | 
						|
        """Retry background job"""
 | 
						|
        return await ops.retry_background_job(job_id, org)
 | 
						|
 | 
						|
    @router.get("", response_model=PaginatedResponse)
 | 
						|
    async def list_background_jobs(
 | 
						|
        org: Organization = Depends(org_crawl_dep),
 | 
						|
        pageSize: int = DEFAULT_PAGE_SIZE,
 | 
						|
        page: int = 1,
 | 
						|
        success: Optional[bool] = None,
 | 
						|
        jobType: Optional[str] = None,
 | 
						|
        sortBy: Optional[str] = None,
 | 
						|
        sortDirection: Optional[int] = -1,
 | 
						|
    ):
 | 
						|
        """Retrieve paginated list of background jobs"""
 | 
						|
        jobs, total = await ops.list_background_jobs(
 | 
						|
            org,
 | 
						|
            page_size=pageSize,
 | 
						|
            page=page,
 | 
						|
            success=success,
 | 
						|
            job_type=jobType,
 | 
						|
            sort_by=sortBy,
 | 
						|
            sort_direction=sortDirection,
 | 
						|
        )
 | 
						|
        return paginated_format(jobs, total, page, pageSize)
 | 
						|
 | 
						|
    org_ops.router.include_router(router)
 | 
						|
 | 
						|
    return ops
 |