Implement retry API endpoint for failed background jobs (#1356)
Fixes #1328 - Adds /retry endpoint for retrying failed jobs. - Returns 400 error if previous job still running or has succeeded - Keeps track of previous failed attempts in previousAttempts array on failed job. - Also amends the similar webhook /retry endpoint to use `POST` for consistency. - Remove duplicate api tag for backgroundjobs
This commit is contained in:
parent
82a5d1e4e4
commit
1afc411114
@ -20,6 +20,7 @@ from .models import (
|
||||
DeleteReplicaJob,
|
||||
PaginatedResponse,
|
||||
AnyJob,
|
||||
StorageRef,
|
||||
)
|
||||
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
|
||||
|
||||
@ -90,9 +91,8 @@ class BackgroundJobOps:
|
||||
|
||||
async def create_replica_jobs(
|
||||
self, oid: UUID, file: BaseFile, object_id: str, object_type: str
|
||||
) -> Dict:
|
||||
"""Create k8s background job to replicate a file to another storage location."""
|
||||
|
||||
) -> Dict[str, Union[bool, List[str]]]:
|
||||
"""Create k8s background job to replicate a file to all replica storage locations."""
|
||||
org = await self.org_ops.get_org_by_id(oid)
|
||||
|
||||
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
|
||||
@ -105,23 +105,42 @@ class BackgroundJobOps:
|
||||
ids = []
|
||||
|
||||
for replica_ref in self.storage_ops.get_org_replicas_storage_refs(org):
|
||||
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
|
||||
replica_endpoint, bucket_suffix = self.strip_bucket(
|
||||
replica_storage.endpoint_url
|
||||
job_id = await self.create_replica_job(
|
||||
org,
|
||||
file,
|
||||
object_id,
|
||||
object_type,
|
||||
replica_ref,
|
||||
primary_file_path,
|
||||
primary_endpoint,
|
||||
)
|
||||
replica_file_path = bucket_suffix + file.filename
|
||||
ids.append(job_id)
|
||||
|
||||
# print(f"primary: {file.storage.get_storage_secret_name(str(oid))}")
|
||||
# print(f" endpoint: {primary_endpoint}")
|
||||
# print(f" path: {primary_file_path}")
|
||||
# print(f"replica: {replica_ref.get_storage_secret_name(str(oid))}")
|
||||
# print(f" endpoint: {replica_endpoint}")
|
||||
# print(f" path: {replica_file_path}")
|
||||
return {"added": True, "ids": ids}
|
||||
|
||||
job_type = BgJobType.CREATE_REPLICA.value
|
||||
async def create_replica_job(
|
||||
self,
|
||||
org: Organization,
|
||||
file: BaseFile,
|
||||
object_id: str,
|
||||
object_type: str,
|
||||
replica_ref: StorageRef,
|
||||
primary_file_path: str,
|
||||
primary_endpoint: str,
|
||||
existing_job_id: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Create k8s background job to replicate a file to a specific replica storage location."""
|
||||
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
|
||||
replica_endpoint, bucket_suffix = self.strip_bucket(
|
||||
replica_storage.endpoint_url
|
||||
)
|
||||
replica_file_path = bucket_suffix + file.filename
|
||||
|
||||
job_type = BgJobType.CREATE_REPLICA.value
|
||||
|
||||
try:
|
||||
job_id = await self.crawl_manager.run_replica_job(
|
||||
oid=str(oid),
|
||||
oid=str(org.id),
|
||||
job_type=job_type,
|
||||
primary_storage=file.storage,
|
||||
primary_file_path=primary_file_path,
|
||||
@ -130,74 +149,122 @@ class BackgroundJobOps:
|
||||
replica_file_path=replica_file_path,
|
||||
replica_endpoint=replica_endpoint,
|
||||
job_id_prefix=f"{job_type}-{object_id}",
|
||||
existing_job_id=existing_job_id,
|
||||
)
|
||||
replication_job = CreateReplicaJob(
|
||||
id=job_id,
|
||||
oid=oid,
|
||||
started=datetime.now(),
|
||||
file_path=file.filename,
|
||||
object_type=object_type,
|
||||
object_id=object_id,
|
||||
primary=file.storage,
|
||||
replica_storage=replica_ref,
|
||||
)
|
||||
# print(
|
||||
# f"File path written into replication_job: {file.filename}", flush=True
|
||||
# )
|
||||
if existing_job_id:
|
||||
replication_job = await self.get_background_job(existing_job_id, org.id)
|
||||
previous_attempt = {
|
||||
"started": replication_job.started,
|
||||
"finished": replication_job.finished,
|
||||
}
|
||||
if replication_job.previousAttempts:
|
||||
replication_job.previousAttempts.append(previous_attempt)
|
||||
else:
|
||||
replication_job.previousAttempts = [previous_attempt]
|
||||
replication_job.started = datetime.now()
|
||||
replication_job.finished = None
|
||||
replication_job.success = None
|
||||
else:
|
||||
replication_job = CreateReplicaJob(
|
||||
id=job_id,
|
||||
oid=org.id,
|
||||
started=datetime.now(),
|
||||
file_path=file.filename,
|
||||
object_type=object_type,
|
||||
object_id=object_id,
|
||||
primary=file.storage,
|
||||
replica_storage=replica_ref,
|
||||
)
|
||||
|
||||
await self.jobs.find_one_and_update(
|
||||
{"_id": job_id}, {"$set": replication_job.to_dict()}, upsert=True
|
||||
)
|
||||
ids.append(job_id)
|
||||
|
||||
return {"added": True, "ids": ids}
|
||||
return job_id
|
||||
except Exception as exc:
|
||||
# pylint: disable=raise-missing-from
|
||||
raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}")
|
||||
|
||||
async def create_delete_replica_jobs(
|
||||
self, org: Organization, file: BaseFile, object_id: str, object_type: str
|
||||
) -> Dict[str, Union[bool, List[str]]]:
|
||||
"""Create a job to delete each replica for the given file"""
|
||||
|
||||
ids = []
|
||||
oid = str(org.id)
|
||||
|
||||
for replica_ref in file.replicas or []:
|
||||
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
|
||||
replica_endpoint, bucket_suffix = self.strip_bucket(
|
||||
replica_storage.endpoint_url
|
||||
job_id = await self.create_delete_replica_job(
|
||||
org, file, object_id, object_type, replica_ref
|
||||
)
|
||||
replica_file_path = bucket_suffix + file.filename
|
||||
ids.append(job_id)
|
||||
|
||||
# print(f"replica: {replica_ref.get_storage_secret_name(oid)}")
|
||||
# print(f" endpoint: {replica_endpoint}")
|
||||
# print(f" path: {replica_file_path}")
|
||||
return {"added": True, "ids": ids}
|
||||
|
||||
job_type = BgJobType.DELETE_REPLICA.value
|
||||
async def create_delete_replica_job(
|
||||
self,
|
||||
org: Organization,
|
||||
file: BaseFile,
|
||||
object_id: str,
|
||||
object_type: str,
|
||||
replica_ref: StorageRef,
|
||||
existing_job_id: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Create a job to delete one replica of a given file"""
|
||||
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
|
||||
replica_endpoint, bucket_suffix = self.strip_bucket(
|
||||
replica_storage.endpoint_url
|
||||
)
|
||||
replica_file_path = bucket_suffix + file.filename
|
||||
|
||||
job_type = BgJobType.DELETE_REPLICA.value
|
||||
|
||||
try:
|
||||
job_id = await self.crawl_manager.run_replica_job(
|
||||
oid=oid,
|
||||
oid=str(org.id),
|
||||
job_type=job_type,
|
||||
replica_storage=replica_ref,
|
||||
replica_file_path=replica_file_path,
|
||||
replica_endpoint=replica_endpoint,
|
||||
job_id_prefix=f"{job_type}-{object_id}",
|
||||
existing_job_id=existing_job_id,
|
||||
)
|
||||
|
||||
delete_replica_job = DeleteReplicaJob(
|
||||
id=job_id,
|
||||
oid=oid,
|
||||
started=datetime.now(),
|
||||
file_path=file.filename,
|
||||
object_id=object_id,
|
||||
object_type=object_type,
|
||||
replica_storage=replica_ref,
|
||||
)
|
||||
if existing_job_id:
|
||||
delete_replica_job = await self.get_background_job(
|
||||
existing_job_id, org.id
|
||||
)
|
||||
previous_attempt = {
|
||||
"started": delete_replica_job.started,
|
||||
"finished": delete_replica_job.finished,
|
||||
}
|
||||
if delete_replica_job.previousAttempts:
|
||||
delete_replica_job.previousAttempts.append(previous_attempt)
|
||||
else:
|
||||
delete_replica_job.previousAttempts = [previous_attempt]
|
||||
delete_replica_job.started = datetime.now()
|
||||
delete_replica_job.finished = None
|
||||
delete_replica_job.success = None
|
||||
else:
|
||||
delete_replica_job = DeleteReplicaJob(
|
||||
id=job_id,
|
||||
oid=org.id,
|
||||
started=datetime.now(),
|
||||
file_path=file.filename,
|
||||
object_id=object_id,
|
||||
object_type=object_type,
|
||||
replica_storage=replica_ref,
|
||||
)
|
||||
|
||||
await self.jobs.find_one_and_update(
|
||||
{"_id": job_id}, {"$set": delete_replica_job.to_dict()}, upsert=True
|
||||
)
|
||||
|
||||
ids.append(job_id)
|
||||
return job_id
|
||||
|
||||
return {"added": True, "ids": ids}
|
||||
except Exception as exc:
|
||||
# pylint: disable=raise-missing-from
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Error starting background job: {exc}"
|
||||
)
|
||||
|
||||
async def job_finished(
|
||||
self,
|
||||
@ -241,7 +308,9 @@ class BackgroundJobOps:
|
||||
{"$set": {"success": success, "finished": finished}},
|
||||
)
|
||||
|
||||
async def get_background_job(self, job_id: str, oid: UUID) -> BackgroundJob:
|
||||
async def get_background_job(
|
||||
self, job_id: str, oid: UUID
|
||||
) -> Union[CreateReplicaJob, DeleteReplicaJob]:
|
||||
"""Get background job"""
|
||||
query: dict[str, object] = {"_id": job_id, "oid": oid}
|
||||
res = await self.jobs.find_one(query)
|
||||
@ -255,10 +324,9 @@ class BackgroundJobOps:
|
||||
if data["type"] == BgJobType.CREATE_REPLICA:
|
||||
return CreateReplicaJob.from_dict(data)
|
||||
|
||||
if data["type"] == BgJobType.DELETE_REPLICA:
|
||||
return DeleteReplicaJob.from_dict(data)
|
||||
return DeleteReplicaJob.from_dict(data)
|
||||
|
||||
return BackgroundJob.from_dict(data)
|
||||
# return BackgroundJob.from_dict(data)
|
||||
|
||||
async def list_background_jobs(
|
||||
self,
|
||||
@ -324,6 +392,69 @@ class BackgroundJobOps:
|
||||
|
||||
return jobs, total
|
||||
|
||||
async def get_replica_job_file(
|
||||
self, job: Union[CreateReplicaJob, DeleteReplicaJob], org: Organization
|
||||
) -> BaseFile:
|
||||
"""Return file from replica job"""
|
||||
try:
|
||||
if job.object_type == "profile":
|
||||
profile = await self.profile_ops.get_profile(UUID(job.object_id), org)
|
||||
return BaseFile(**profile.resource.dict())
|
||||
|
||||
item_res = await self.base_crawl_ops.get_crawl_raw(job.object_id, org)
|
||||
matching_file = [
|
||||
f for f in item_res.get("files", []) if f["filename"] == job.file_path
|
||||
][0]
|
||||
return BaseFile(**matching_file)
|
||||
# pylint: disable=broad-exception-caught, raise-missing-from
|
||||
except Exception:
|
||||
raise HTTPException(status_code=404, detail="file_not_found")
|
||||
|
||||
async def retry_background_job(
|
||||
self, job_id: str, org: Organization
|
||||
) -> Dict[str, Union[bool, Optional[str]]]:
|
||||
"""Retry background job and return new job id"""
|
||||
job = await self.get_background_job(job_id, org.id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="job_not_found")
|
||||
|
||||
if not job.finished:
|
||||
raise HTTPException(status_code=400, detail="job_not_finished")
|
||||
|
||||
if job.success:
|
||||
raise HTTPException(status_code=400, detail="job_already_succeeded")
|
||||
|
||||
file = await self.get_replica_job_file(job, org)
|
||||
|
||||
if job.type == BgJobType.CREATE_REPLICA:
|
||||
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
|
||||
primary_endpoint, bucket_suffix = self.strip_bucket(
|
||||
primary_storage.endpoint_url
|
||||
)
|
||||
primary_file_path = bucket_suffix + file.filename
|
||||
await self.create_replica_job(
|
||||
org,
|
||||
file,
|
||||
job.object_id,
|
||||
job.object_type,
|
||||
job.replica_storage,
|
||||
primary_file_path,
|
||||
primary_endpoint,
|
||||
existing_job_id=job_id,
|
||||
)
|
||||
|
||||
if job.type == BgJobType.DELETE_REPLICA:
|
||||
await self.create_delete_replica_job(
|
||||
org,
|
||||
file,
|
||||
job.object_id,
|
||||
job.object_type,
|
||||
job.replica_storage,
|
||||
existing_job_id=job_id,
|
||||
)
|
||||
|
||||
return {"success": True}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
|
||||
@ -344,7 +475,6 @@ def init_background_jobs_api(
|
||||
|
||||
@router.get(
|
||||
"/{job_id}",
|
||||
tags=["backgroundjobs"],
|
||||
response_model=AnyJob,
|
||||
)
|
||||
async def get_background_job(
|
||||
@ -354,7 +484,17 @@ def init_background_jobs_api(
|
||||
"""Retrieve information for background job"""
|
||||
return await ops.get_background_job(job_id, org.id)
|
||||
|
||||
@router.get("", tags=["backgroundjobs"], response_model=PaginatedResponse)
|
||||
@router.post(
|
||||
"/{job_id}/retry",
|
||||
)
|
||||
async def retry_background_job(
|
||||
job_id: str,
|
||||
org: Organization = Depends(org_crawl_dep),
|
||||
):
|
||||
"""Retry background job"""
|
||||
return await ops.retry_background_job(job_id, org)
|
||||
|
||||
@router.get("", response_model=PaginatedResponse)
|
||||
async def list_background_jobs(
|
||||
org: Organization = Depends(org_crawl_dep),
|
||||
pageSize: int = DEFAULT_PAGE_SIZE,
|
||||
|
@ -74,14 +74,18 @@ class CrawlManager(K8sAPI):
|
||||
primary_file_path: Optional[str] = None,
|
||||
primary_endpoint: Optional[str] = None,
|
||||
job_id_prefix: Optional[str] = None,
|
||||
existing_job_id: Optional[str] = None,
|
||||
):
|
||||
"""run job to replicate file from primary storage to replica storage"""
|
||||
|
||||
if not job_id_prefix:
|
||||
job_id_prefix = job_type
|
||||
if existing_job_id:
|
||||
job_id = existing_job_id
|
||||
else:
|
||||
if not job_id_prefix:
|
||||
job_id_prefix = job_type
|
||||
|
||||
# ensure name is <=63 characters
|
||||
job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}"
|
||||
# ensure name is <=63 characters
|
||||
job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}"
|
||||
|
||||
params = {
|
||||
"id": job_id,
|
||||
|
@ -1237,6 +1237,8 @@ class BackgroundJob(BaseMongoModel):
|
||||
started: datetime
|
||||
finished: Optional[datetime] = None
|
||||
|
||||
previousAttempts: Optional[List[Dict[str, Optional[datetime]]]] = None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class CreateReplicaJob(BackgroundJob):
|
||||
|
@ -400,7 +400,7 @@ def init_event_webhooks_api(mdb, org_ops, app):
|
||||
):
|
||||
return await ops.get_notification(org, notificationid)
|
||||
|
||||
@router.get("/{notificationid}/retry")
|
||||
@router.post("/{notificationid}/retry")
|
||||
async def retry_notification(
|
||||
notificationid: UUID,
|
||||
org: Organization = Depends(org_owner_dep),
|
||||
|
@ -99,7 +99,7 @@ def test_get_webhook_event(admin_auth_headers, default_org_id):
|
||||
|
||||
def test_retry_webhook_event(admin_auth_headers, default_org_id):
|
||||
# Expect to fail because we haven't set up URLs that accept webhooks
|
||||
r = requests.get(
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/webhooks/{_webhook_event_id}/retry",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user