Move org deletion to background job with access to backend ops classes (#2098)

This PR introduces background jobs that have full access to the backend
ops classes and moves the delete org job to a background job.
This commit is contained in:
Tessa Walsh 2024-10-10 14:41:05 -04:00 committed by GitHub
parent 84a74c43a4
commit 1b1819ba5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 404 additions and 16 deletions

View File

@ -1,6 +1,7 @@
"""k8s background jobs"""
import asyncio
import os
from datetime import datetime
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
from uuid import UUID
@ -19,6 +20,7 @@ from .models import (
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
@ -273,6 +275,51 @@ class BackgroundJobOps:
)
return None
async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
"""Create background job to delete org and its data"""
try:
job_id = await self.crawl_manager.run_delete_org_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
delete_org_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": delete_org_job.started,
"finished": delete_org_job.finished,
}
if delete_org_job.previousAttempts:
delete_org_job.previousAttempts.append(previous_attempt)
else:
delete_org_job.previousAttempts = [previous_attempt]
delete_org_job.started = dt_now()
delete_org_job.finished = None
delete_org_job.success = None
else:
delete_org_job = DeleteOrgJob(
id=job_id,
oid=org.id,
started=dt_now(),
)
await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": delete_org_job.to_dict()}, upsert=True
)
return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None
async def job_finished(
self,
job_id: str,
@ -316,10 +363,13 @@ class BackgroundJobOps:
)
async def get_background_job(
self, job_id: str, oid: UUID
) -> Union[CreateReplicaJob, DeleteReplicaJob]:
self, job_id: str, oid: Optional[UUID] = None
) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id, "oid": oid}
query: dict[str, object] = {"_id": job_id}
if oid:
query["oid"] = oid
res = await self.jobs.find_one(query)
if not res:
raise HTTPException(status_code=404, detail="job_not_found")
@ -331,9 +381,10 @@ class BackgroundJobOps:
if data["type"] == BgJobType.CREATE_REPLICA:
return CreateReplicaJob.from_dict(data)
return DeleteReplicaJob.from_dict(data)
if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)
# return BackgroundJob.from_dict(data)
return DeleteOrgJob.from_dict(data)
async def list_background_jobs(
self,
@ -432,9 +483,8 @@ class BackgroundJobOps:
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")
file = await self.get_replica_job_file(job, org)
if job.type == BgJobType.CREATE_REPLICA:
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
primary_storage.endpoint_url
@ -452,6 +502,7 @@ class BackgroundJobOps:
)
if job.type == BgJobType.DELETE_REPLICA:
file = await self.get_replica_job_file(job, org)
await self.create_delete_replica_job(
org,
file,
@ -461,6 +512,12 @@ class BackgroundJobOps:
existing_job_id=job_id,
)
if job.type == BgJobType.DELETE_ORG:
await self.create_delete_org_job(
org,
existing_job_id=job_id,
)
return {"success": True}
async def retry_failed_background_jobs(
@ -523,6 +580,14 @@ def init_background_jobs_api(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)
@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")
return await ops.get_background_job(job_id)
@router.post("/{job_id}/retry", response_model=SuccessResponse)
async def retry_background_job(
job_id: str,

View File

@ -17,6 +17,8 @@ from .models import StorageRef, CrawlConfig, BgJobType
# ============================================================================
DEFAULT_PROXY_ID: str = os.environ.get("DEFAULT_PROXY_ID", "")
DEFAULT_NAMESPACE: str = os.environ.get("DEFAULT_NAMESPACE", "default")
# ============================================================================
class CrawlManager(K8sAPI):
@ -110,6 +112,34 @@ class CrawlManager(K8sAPI):
return job_id
async def run_delete_org_job(
self,
oid: str,
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
):
"""run job to delete org and all of its data"""
if existing_job_id:
job_id = existing_job_id
else:
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"
params = {
"id": job_id,
"oid": oid,
"job_type": BgJobType.DELETE_ORG.value,
"backend_image": backend_image,
"pull_policy": pull_policy,
}
data = self.templates.env.get_template("background_job.yaml").render(params)
await self.create_from_yaml(data, namespace=DEFAULT_NAMESPACE)
return job_id
async def create_crawl_job(
self,
crawlconfig: CrawlConfig,

View File

@ -244,7 +244,7 @@ def main() -> None:
init_uploads_api(*base_crawl_init)
org_ops.set_ops(base_crawl_ops, profiles, coll_ops)
org_ops.set_ops(base_crawl_ops, profiles, coll_ops, background_job_ops)
user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

View File

@ -0,0 +1,144 @@
""" entrypoint module for background jobs """
import asyncio
import os
import sys
import traceback
from uuid import UUID
from .crawlmanager import CrawlManager
from .db import init_db
from .emailsender import EmailSender
# from .utils import register_exit_handler
from .models import BgJobType
from .basecrawls import BaseCrawlOps
from .invites import InviteOps
from .users import init_user_manager
from .orgs import OrgOps
from .colls import CollectionOps
from .crawlconfigs import CrawlConfigOps
from .crawls import CrawlOps
from .profiles import ProfileOps
from .storages import StorageOps
from .webhooks import EventWebhookOps
from .background_jobs import BackgroundJobOps
from .pages import PageOps
job_type = os.environ.get("BG_JOB_TYPE")
oid = os.environ.get("OID")
# ============================================================================
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals
async def main():
"""main init"""
email = EmailSender()
crawl_manager = None
dbclient, mdb = init_db()
invite_ops = InviteOps(mdb, email)
user_manager = init_user_manager(mdb, email, invite_ops)
org_ops = OrgOps(mdb, invite_ops, user_manager)
event_webhook_ops = EventWebhookOps(mdb, org_ops)
# pylint: disable=import-outside-toplevel
if not os.environ.get("KUBERNETES_SERVICE_HOST"):
print(
"Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\
Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting"
)
sys.exit(1)
crawl_manager = CrawlManager()
storage_ops = StorageOps(org_ops, crawl_manager)
background_job_ops = BackgroundJobOps(
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
)
profile_ops = ProfileOps(
mdb, org_ops, crawl_manager, storage_ops, background_job_ops
)
crawl_config_ops = CrawlConfigOps(
dbclient,
mdb,
user_manager,
org_ops,
crawl_manager,
profile_ops,
)
coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops)
base_crawl_ops = BaseCrawlOps(
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)
crawl_ops = CrawlOps(
crawl_manager,
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)
page_ops = PageOps(mdb, crawl_ops, org_ops, storage_ops)
base_crawl_ops.set_page_ops(page_ops)
crawl_ops.set_page_ops(page_ops)
background_job_ops.set_ops(crawl_ops, profile_ops)
org_ops.set_ops(base_crawl_ops, profile_ops, coll_ops, background_job_ops)
user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)
background_job_ops.set_ops(base_crawl_ops, profile_ops)
crawl_config_ops.set_coll_ops(coll_ops)
# Run job
if job_type == BgJobType.DELETE_ORG:
if not oid:
print("Org id missing, quitting")
return 1
org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
return 1
try:
await org_ops.delete_org_and_data(org, user_manager)
return 0
# pylint: disable=broad-exception-caught
except Exception:
traceback.print_exc()
return 1
print(f"Provided job type {job_type} not currently supported")
return 1
# # ============================================================================
if __name__ == "__main__":
return_code = asyncio.run(main())
sys.exit(return_code)

View File

@ -2013,6 +2013,7 @@ class BgJobType(str, Enum):
CREATE_REPLICA = "create-replica"
DELETE_REPLICA = "delete-replica"
DELETE_ORG = "delete-org"
# ============================================================================
@ -2051,10 +2052,19 @@ class DeleteReplicaJob(BackgroundJob):
replica_storage: StorageRef
# ============================================================================
class DeleteOrgJob(BackgroundJob):
"""Model for tracking deletion of org data jobs"""
type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG
# ============================================================================
# Union of all job types, for response model
AnyJob = RootModel[Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob]]
AnyJob = RootModel[
Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob]
]
# ============================================================================
@ -2274,6 +2284,13 @@ class DeletedResponse(BaseModel):
deleted: bool
# ============================================================================
class DeletedResponseId(DeletedResponse):
"""Response for delete API endpoints that return job id"""
id: str
# ============================================================================
class DeletedResponseQuota(DeletedResponse):
"""Response for delete API endpoints"""

View File

@ -67,7 +67,7 @@ from .models import (
PAUSED_PAYMENT_FAILED,
REASON_PAUSED,
ACTIVE,
DeletedResponse,
DeletedResponseId,
UpdatedResponse,
AddedResponse,
AddedResponseId,
@ -94,8 +94,10 @@ if TYPE_CHECKING:
from .colls import CollectionOps
from .profiles import ProfileOps
from .users import UserManager
from .background_jobs import BackgroundJobOps
else:
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = UserManager = object
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object
BackgroundJobOps = UserManager = object
DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization")
@ -151,12 +153,14 @@ class OrgOps:
base_crawl_ops: BaseCrawlOps,
profile_ops: ProfileOps,
coll_ops: CollectionOps,
background_job_ops: BackgroundJobOps,
) -> None:
"""Set base crawl ops"""
# pylint: disable=attribute-defined-outside-init
self.base_crawl_ops = base_crawl_ops
self.profile_ops = profile_ops
self.coll_ops = coll_ops
self.background_job_ops = background_job_ops
def set_default_primary_storage(self, storage: StorageRef):
"""set default primary storage"""
@ -1451,15 +1455,16 @@ def init_orgs_api(
org_out.execMinutesQuotaReached = ops.exec_mins_quota_reached(org)
return org_out
@router.delete("", tags=["organizations"], response_model=DeletedResponse)
@router.delete("", tags=["organizations"], response_model=DeletedResponseId)
async def delete_org(
org: Organization = Depends(org_dep), user: User = Depends(user_dep)
):
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")
await ops.delete_org_and_data(org, user_manager)
return {"deleted": True}
job_id = await ops.background_job_ops.create_delete_org_job(org)
return {"deleted": True, "id": job_id}
@router.post("/rename", tags=["organizations"], response_model=UpdatedResponse)
async def rename_org(

View File

@ -54,9 +54,41 @@ def test_delete_org_superadmin(admin_auth_headers, default_org_id):
f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers
)
assert r.status_code == 200
assert r.json()["deleted"]
data = r.json()
assert data["deleted"]
job_id = data["id"]
# Check that background job is launched and eventually succeeds
max_attempts = 18
attempts = 1
while True:
try:
r = requests.get(
f"{API_PREFIX}/orgs/all/jobs/{job_id}", headers=admin_auth_headers
)
assert r.status_code == 200
success = r.json()["success"]
if success:
break
if success is False:
assert False
if attempts >= max_attempts:
assert False
time.sleep(10)
except:
pass
attempts += 1
# Ensure org and items got deleted
r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
assert r.status_code == 404
# Ensure items got deleted
for item_id in item_ids:
r = requests.get(
f"{API_PREFIX}/orgs/all/all-crawls/{item_id}/replay.json",

View File

@ -0,0 +1,59 @@
apiVersion: batch/v1
kind: Job
metadata:
name: "{{ id }}"
labels:
role: "background-job"
job_type: {{ job_type }}
btrix.org: {{ oid }}
spec:
ttlSecondsAfterFinished: 0
backoffLimit: 3
template:
spec:
restartPolicy: Never
priorityClassName: bg-job
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: btrixbgjob
operator: NotIn
values: [0]
volumes:
- name: ops-configs
secret:
secretName: ops-configs
containers:
- name: btrixbgjob
image: {{ backend_image }}
imagePullPolicy: {{ pull_policy }}
env:
- name: BG_JOB_TYPE
value: {{ job_type }}
- name: OID
value: {{ oid }}
envFrom:
- configMapRef:
name: backend-env-config
- secretRef:
name: mongo-auth
volumeMounts:
- name: ops-configs
mountPath: /ops-configs/
command: ["python3", "-m", "btrixcloud.main_bg"]
resources:
limits:
memory: "200Mi"
requests:
memory: "200Mi"
cpu: "50m"

View File

@ -77,6 +77,11 @@ data:
LOG_SENT_EMAILS: "{{ .Values.email.log_sent_emails }}"
BACKEND_IMAGE: "{{ .Values.backend_image }}"
BACKEND_IMAGE_PULL_POLICY: "{{ .Values.backend_pull_policy }}"
---
apiVersion: v1
kind: ConfigMap

View File

@ -21,6 +21,17 @@ rules:
resources: ["pods"]
verbs: ["list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: {{ .Release.Namespace }}
name: bg-job
rules:
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
@ -40,3 +51,23 @@ roleRef:
kind: Role
name: crawler-run
apiGroup: rbac.authorization.k8s.io
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: bg-job-role
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: default
namespace: {{ .Release.Namespace }}
- kind: User
name: system:anonymous
namespace: {{ .Release.Namespace }}
roleRef:
kind: Role
name: bg-job
apiGroup: rbac.authorization.k8s.io