Optionally delay replica deletion (#2252)

Fixes #2170

The number of days to delay file replication deletion by is configurable
in the Helm chart with `replica_deletion_delay_days` (set by default to
7 days in `values.yaml` to encourage good practice, though we could
change this).

When `replica_deletion_delay_days` is set to an int above 0, when a
delete replica job would otherwise be started as a Kubernetes Job,
a CronJob is created instead with a cron schedule set to run yearly,
starting x days from the current moment. This cronjob is then deleted by
the operator after the job successfully completes. If a failed
background job is retried, it is re-run immediately as a Job rather
than being scheduled out into the future again.

---------
Co-authored-by: Ilya Kreymer <ikreymer@users.noreply.github.com>
This commit is contained in:
Tessa Walsh 2024-12-19 21:50:28 -05:00 committed by GitHub
parent 2060ee78b4
commit 589819682e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 169 additions and 22 deletions

View File

@ -96,6 +96,11 @@ class BackgroundJobOps:
if not res: if not res:
print("File deleted before replication job started, ignoring", flush=True) print("File deleted before replication job started, ignoring", flush=True)
async def handle_delete_replica_job_finished(self, job: DeleteReplicaJob) -> None:
"""After successful replica deletion, delete cronjob if scheduled"""
if job.schedule:
await self.crawl_manager.delete_replica_deletion_scheduled_job(job.id)
async def create_replica_jobs( async def create_replica_jobs(
self, oid: UUID, file: BaseFile, object_id: str, object_type: str self, oid: UUID, file: BaseFile, object_id: str, object_type: str
) -> Dict[str, Union[bool, List[str]]]: ) -> Dict[str, Union[bool, List[str]]]:
@ -146,7 +151,7 @@ class BackgroundJobOps:
job_type = BgJobType.CREATE_REPLICA.value job_type = BgJobType.CREATE_REPLICA.value
try: try:
job_id = await self.crawl_manager.run_replica_job( job_id, _ = await self.crawl_manager.run_replica_job(
oid=str(org.id), oid=str(org.id),
job_type=job_type, job_type=job_type,
primary_storage=file.storage, primary_storage=file.storage,
@ -155,7 +160,7 @@ class BackgroundJobOps:
replica_storage=replica_ref, replica_storage=replica_ref,
replica_file_path=replica_file_path, replica_file_path=replica_file_path,
replica_endpoint=replica_endpoint, replica_endpoint=replica_endpoint,
job_id_prefix=f"{job_type}-{object_id}", delay_days=0,
existing_job_id=existing_job_id, existing_job_id=existing_job_id,
) )
if existing_job_id: if existing_job_id:
@ -188,9 +193,13 @@ class BackgroundJobOps:
) )
return job_id return job_id
# pylint: disable=broad-exception-caught
except Exception as exc: except Exception as exc:
# pylint: disable=raise-missing-from print(
raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}") "warning: replica job could not be started "
+ f"for {object_type} {file}: {exc}"
)
return ""
async def create_delete_replica_jobs( async def create_delete_replica_jobs(
self, org: Organization, file: BaseFile, object_id: str, object_type: str self, org: Organization, file: BaseFile, object_id: str, object_type: str
@ -214,8 +223,9 @@ class BackgroundJobOps:
object_id: str, object_id: str,
object_type: str, object_type: str,
replica_ref: StorageRef, replica_ref: StorageRef,
force_start_immediately: bool = False,
existing_job_id: Optional[str] = None, existing_job_id: Optional[str] = None,
) -> Optional[str]: ) -> str:
"""Create a job to delete one replica of a given file""" """Create a job to delete one replica of a given file"""
try: try:
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref) replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
@ -226,20 +236,23 @@ class BackgroundJobOps:
job_type = BgJobType.DELETE_REPLICA.value job_type = BgJobType.DELETE_REPLICA.value
job_id = await self.crawl_manager.run_replica_job( delay_days = int(os.environ.get("REPLICA_DELETION_DELAY_DAYS", 0))
if force_start_immediately:
delay_days = 0
job_id, schedule = await self.crawl_manager.run_replica_job(
oid=str(org.id), oid=str(org.id),
job_type=job_type, job_type=job_type,
replica_storage=replica_ref, replica_storage=replica_ref,
replica_file_path=replica_file_path, replica_file_path=replica_file_path,
replica_endpoint=replica_endpoint, replica_endpoint=replica_endpoint,
job_id_prefix=f"{job_type}-{object_id}", delay_days=delay_days,
existing_job_id=existing_job_id, existing_job_id=existing_job_id,
) )
if existing_job_id: if existing_job_id:
delete_replica_job = await self.get_background_job( job = await self.get_background_job(existing_job_id, org.id)
existing_job_id, org.id delete_replica_job = cast(DeleteReplicaJob, job)
)
previous_attempt = { previous_attempt = {
"started": delete_replica_job.started, "started": delete_replica_job.started,
"finished": delete_replica_job.finished, "finished": delete_replica_job.finished,
@ -251,6 +264,7 @@ class BackgroundJobOps:
delete_replica_job.started = dt_now() delete_replica_job.started = dt_now()
delete_replica_job.finished = None delete_replica_job.finished = None
delete_replica_job.success = None delete_replica_job.success = None
delete_replica_job.schedule = None
else: else:
delete_replica_job = DeleteReplicaJob( delete_replica_job = DeleteReplicaJob(
id=job_id, id=job_id,
@ -260,6 +274,7 @@ class BackgroundJobOps:
object_id=object_id, object_id=object_id,
object_type=object_type, object_type=object_type,
replica_storage=replica_ref, replica_storage=replica_ref,
schedule=schedule,
) )
await self.jobs.find_one_and_update( await self.jobs.find_one_and_update(
@ -274,7 +289,7 @@ class BackgroundJobOps:
"warning: replica deletion job could not be started " "warning: replica deletion job could not be started "
+ f"for {object_type} {file}: {exc}" + f"for {object_type} {file}: {exc}"
) )
return None return ""
async def create_delete_org_job( async def create_delete_org_job(
self, self,
@ -387,6 +402,10 @@ class BackgroundJobOps:
if success: if success:
if job_type == BgJobType.CREATE_REPLICA: if job_type == BgJobType.CREATE_REPLICA:
await self.handle_replica_job_finished(cast(CreateReplicaJob, job)) await self.handle_replica_job_finished(cast(CreateReplicaJob, job))
if job_type == BgJobType.DELETE_REPLICA:
await self.handle_delete_replica_job_finished(
cast(DeleteReplicaJob, job)
)
else: else:
print( print(
f"Background job {job.id} failed, sending email to superuser", f"Background job {job.id} failed, sending email to superuser",
@ -560,6 +579,7 @@ class BackgroundJobOps:
job.object_id, job.object_id,
job.object_type, job.object_type,
job.replica_storage, job.replica_storage,
force_start_immediately=True,
existing_job_id=job_id, existing_job_id=job_id,
) )

View File

@ -3,7 +3,7 @@
import os import os
import secrets import secrets
from typing import Optional, Dict from typing import Optional, Dict, Tuple
from datetime import timedelta from datetime import timedelta
from fastapi import HTTPException from fastapi import HTTPException
@ -72,24 +72,21 @@ class CrawlManager(K8sAPI):
replica_storage: StorageRef, replica_storage: StorageRef,
replica_file_path: str, replica_file_path: str,
replica_endpoint: str, replica_endpoint: str,
delay_days: int = 0,
primary_storage: Optional[StorageRef] = None, primary_storage: Optional[StorageRef] = None,
primary_file_path: Optional[str] = None, primary_file_path: Optional[str] = None,
primary_endpoint: Optional[str] = None, primary_endpoint: Optional[str] = None,
job_id_prefix: Optional[str] = None,
existing_job_id: Optional[str] = None, existing_job_id: Optional[str] = None,
): ) -> Tuple[str, Optional[str]]:
"""run job to replicate file from primary storage to replica storage""" """run job to replicate file from primary storage to replica storage"""
if existing_job_id: if existing_job_id:
job_id = existing_job_id job_id = existing_job_id
else: else:
if not job_id_prefix: # Keep name shorter than in past to avoid k8s issues with length
job_id_prefix = job_type job_id = f"{job_type}-{secrets.token_hex(5)}"
# ensure name is <=63 characters params: Dict[str, object] = {
job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}"
params = {
"id": job_id, "id": job_id,
"oid": oid, "oid": oid,
"job_type": job_type, "job_type": job_type,
@ -106,11 +103,17 @@ class CrawlManager(K8sAPI):
"BgJobType": BgJobType, "BgJobType": BgJobType,
} }
if job_type == BgJobType.DELETE_REPLICA.value and delay_days > 0:
# If replica deletion delay is configured, schedule as cronjob
return await self.create_replica_deletion_scheduled_job(
job_id, params, delay_days
)
data = self.templates.env.get_template("replica_job.yaml").render(params) data = self.templates.env.get_template("replica_job.yaml").render(params)
await self.create_from_yaml(data) await self.create_from_yaml(data)
return job_id return job_id, None
async def run_delete_org_job( async def run_delete_org_job(
self, self,
@ -393,3 +396,37 @@ class CrawlManager(K8sAPI):
await self.create_from_yaml(data, self.namespace) await self.create_from_yaml(data, self.namespace)
return cron_job_id return cron_job_id
async def create_replica_deletion_scheduled_job(
self,
job_id: str,
params: Dict[str, object],
delay_days: int,
) -> Tuple[str, Optional[str]]:
"""create scheduled job to delay replica file in x days"""
now = dt_now()
run_at = now + timedelta(days=delay_days)
schedule = f"{run_at.minute} {run_at.hour} {run_at.day} {run_at.month} *"
params["schedule"] = schedule
print(f"Replica deletion cron schedule: '{schedule}'", flush=True)
data = self.templates.env.get_template("replica_deletion_cron_job.yaml").render(
params
)
await self.create_from_yaml(data, self.namespace)
return job_id, schedule
async def delete_replica_deletion_scheduled_job(self, job_id: str):
"""delete scheduled job to delay replica file in x days"""
cron_job = await self.batch_api.read_namespaced_cron_job(
name=job_id,
namespace=self.namespace,
)
if cron_job:
await self.batch_api.delete_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace
)

View File

@ -2058,6 +2058,7 @@ class DeleteReplicaJob(BackgroundJob):
object_type: str object_type: str
object_id: str object_id: str
replica_storage: StorageRef replica_storage: StorageRef
schedule: Optional[str] = None
# ============================================================================ # ============================================================================

View File

@ -35,7 +35,7 @@ class BgJobOperator(BaseOperator):
labels: dict[str, str] = metadata.get("labels", {}) labels: dict[str, str] = metadata.get("labels", {})
oid: str = labels.get("btrix.org") or "" oid: str = labels.get("btrix.org") or ""
job_type: str = labels.get("job_type") or "" job_type: str = labels.get("job_type") or ""
job_id: str = metadata.get("name") job_id: str = labels.get("job_id") or metadata.get("name")
status = data.object["status"] status = data.object["status"]
success = status.get("succeeded") == 1 success = status.get("succeeded") == 1

View File

@ -0,0 +1,81 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: "{{ id }}"
labels:
role: "cron-background-job"
job_type: {{ job_type }}
btrix.org: {{ oid }}
spec:
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 2
schedule: "{{ schedule }}"
jobTemplate:
metadata:
labels:
role: "background-job"
job_type: {{ job_type }}
job_id: {{ id }}
btrix.org: {{ oid }}
spec:
template:
spec:
restartPolicy: Never
priorityClassName: bg-job
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: rclone
operator: NotIn
values: [0]
containers:
- name: rclone
image: rclone/rclone:latest
env:
- name: RCLONE_CONFIG_REPLICA_TYPE
value: "s3"
- name: RCLONE_CONFIG_REPLICA_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: "{{ replica_secret_name }}"
key: STORE_ACCESS_KEY
- name: RCLONE_CONFIG_REPLICA_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: "{{ replica_secret_name }}"
key: STORE_SECRET_KEY
- name: RCLONE_CONFIG_REPLICA_REGION
valueFrom:
secretKeyRef:
name: "{{ replica_secret_name }}"
key: STORE_REGION
- name: RCLONE_CONFIG_REPLICA_PROVIDER
valueFrom:
secretKeyRef:
name: "{{ replica_secret_name }}"
key: STORE_S3_PROVIDER
- name: RCLONE_CONFIG_REPLICA_ENDPOINT
value: "{{ replica_endpoint }}"
command: ["rclone", "-vv", "delete", "replica:{{ replica_file_path }}"]
resources:
limits:
memory: "200Mi"
requests:
memory: "200Mi"
cpu: "50m"

View File

@ -85,6 +85,8 @@ data:
LOCALES_ENABLED: "{{ .Values.locales_enabled }}" LOCALES_ENABLED: "{{ .Values.locales_enabled }}"
REPLICA_DELETION_DELAY_DAYS: "{{ .Values.replica_deletion_delay_days | default 0 }}"
--- ---
apiVersion: v1 apiVersion: v1

View File

@ -96,6 +96,10 @@ superuser:
# Set name for default organization created with superuser # Set name for default organization created with superuser
default_org: "My Organization" default_org: "My Organization"
# Set number of days replica file deletion should be delayed by
# if set >0, will keep replicas (if any) for this number of days
replica_deletion_delay_days: 0
# API Image # API Image
# ========================================= # =========================================

View File

@ -133,6 +133,8 @@ storages:
access_endpoint_url: "https://my-custom-domain.example.com/path/" access_endpoint_url: "https://my-custom-domain.example.com/path/"
``` ```
When replica locations are set, the default behavior when a crawl, upload, or browser profile is deleted is that the replica files are deleted at the same time as the file in primary storage. To delay deletion of replicas, set `replica_deletion_delay_days` in the Helm chart to the number of days by which to delay replica file deletion. This feature gives Browsertrix administrators time in the event of files being deleted accidentally or maliciously to recover copies from configured replica locations.
## Horizontal Autoscaling ## Horizontal Autoscaling
Browsertrix also includes support for horizontal auto-scaling for both the backend and frontend pods. Browsertrix also includes support for horizontal auto-scaling for both the backend and frontend pods.