diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 7fe555d3..c6fb2327 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -1,6 +1,6 @@ """base crawl type""" -from datetime import datetime, timedelta +from datetime import datetime from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast, Tuple from uuid import UUID import os @@ -29,10 +29,9 @@ from .models import ( UpdatedResponse, DeletedResponseQuota, CrawlSearchValuesResponse, - PRESIGN_DURATION_SECONDS, ) from .pagination import paginated_format, DEFAULT_PAGE_SIZE -from .utils import dt_now, date_to_str, get_origin +from .utils import dt_now, get_origin, date_to_str if TYPE_CHECKING: from .crawlconfigs import CrawlConfigOps @@ -65,9 +64,6 @@ class BaseCrawlOps: background_job_ops: BackgroundJobOps page_ops: PageOps - presign_duration_seconds: int - expire_at_duration_seconds: int - def __init__( self, mdb, @@ -89,9 +85,6 @@ class BaseCrawlOps: self.background_job_ops = background_job_ops self.page_ops = cast(PageOps, None) - # renew when <25% of time remaining - self.expire_at_duration_seconds = int(PRESIGN_DURATION_SECONDS * 0.75) - def set_page_ops(self, page_ops): """set page ops reference""" self.page_ops = page_ops @@ -124,13 +117,12 @@ class BaseCrawlOps: files: List[Dict], org: Organization, crawlid: str, - qa_run_id: Optional[str] = None, ) -> List[CrawlFileOut]: if not files: return [] crawl_files = [CrawlFile(**data) for data in files] - return await self.resolve_signed_urls(crawl_files, org, crawlid, qa_run_id) + return await self.resolve_signed_urls(crawl_files, org, crawlid) async def get_wacz_files(self, crawl_id: str, org: Organization): """Return list of WACZ files associated with crawl.""" @@ -464,50 +456,19 @@ class BaseCrawlOps: files: List[CrawlFile], org: Organization, crawl_id: Optional[str] = None, - qa_run_id: Optional[str] = None, - update_presigned_url: bool = False, + force_update=False, ) -> List[CrawlFileOut]: """Regenerate presigned URLs for files as necessary""" if not files: print("no files") return [] - delta = timedelta(seconds=self.expire_at_duration_seconds) - out_files = [] for file_ in files: - presigned_url = file_.presignedUrl - now = dt_now() - - if ( - update_presigned_url - or not presigned_url - or (file_.expireAt and now >= file_.expireAt) - ): - exp = now + delta - presigned_url = await self.storage_ops.get_presigned_url( - org, file_, PRESIGN_DURATION_SECONDS - ) - - prefix = "files" - if qa_run_id: - prefix = f"qaFinished.{qa_run_id}.{prefix}" - - await self.crawls.find_one_and_update( - {f"{prefix}.filename": file_.filename}, - { - "$set": { - f"{prefix}.$.presignedUrl": presigned_url, - f"{prefix}.$.expireAt": exp, - } - }, - ) - file_.expireAt = exp - - expire_at_str = "" - if file_.expireAt: - expire_at_str = date_to_str(file_.expireAt) + presigned_url, expire_at = await self.storage_ops.get_presigned_url( + org, file_, force_update=force_update + ) out_files.append( CrawlFileOut( @@ -517,7 +478,7 @@ class BaseCrawlOps: size=file_.size, crawlId=crawl_id, numReplicas=len(file_.replicas) if file_.replicas else 0, - expireAt=expire_at_str, + expireAt=date_to_str(expire_at), ) ) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 01c9cc41..b36f2309 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -1026,9 +1026,7 @@ class CrawlOps(BaseCrawlOps): if not org: raise HTTPException(status_code=400, detail="missing_org") - resources = await self.resolve_signed_urls( - qa_run.files, org, crawl.id, qa_run_id - ) + resources = await self.resolve_signed_urls(qa_run.files, org, crawl.id) qa_run.files = [] diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index fca3a217..cee9132c 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -17,7 +17,7 @@ from pymongo.errors import InvalidName from .migrations import BaseMigration -CURR_DB_VERSION = "0042" +CURR_DB_VERSION = "0043" # ============================================================================ @@ -108,6 +108,7 @@ async def update_and_prepare_db( invite_ops, user_manager, page_ops, + storage_ops, ) await user_manager.create_super_user() await org_ops.create_default_org() @@ -211,7 +212,14 @@ async def drop_indexes(mdb): # ============================================================================ # pylint: disable=too-many-arguments async def create_indexes( - org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops, user_manager, page_ops + org_ops, + crawl_ops, + crawl_config_ops, + coll_ops, + invite_ops, + user_manager, + page_ops, + storage_ops, ): """Create database indexes.""" print("Creating database indexes", flush=True) @@ -222,6 +230,7 @@ async def create_indexes( await invite_ops.init_index() await user_manager.init_index() await page_ops.init_index() + await storage_ops.init_index() # ============================================================================ diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index 618cfd1d..a3e58b38 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -190,7 +190,9 @@ def main() -> None: crawl_manager = CrawlManager() - storage_ops = init_storages_api(org_ops, crawl_manager) + storage_ops = init_storages_api( + org_ops, crawl_manager, app, mdb, current_active_user + ) background_job_ops = init_background_jobs_api( app, diff --git a/backend/btrixcloud/migrations/migration_0043_unset_file_expireat.py b/backend/btrixcloud/migrations/migration_0043_unset_file_expireat.py new file mode 100644 index 00000000..ef0a4b12 --- /dev/null +++ b/backend/btrixcloud/migrations/migration_0043_unset_file_expireat.py @@ -0,0 +1,59 @@ +""" +Migration 0043 - Remove expireAt and presignedUrl from files, now stored in separate collection +""" + +from btrixcloud.migrations import BaseMigration + + +MIGRATION_VERSION = "0043" + + +class Migration(BaseMigration): + """Migration class.""" + + # pylint: disable=unused-argument + def __init__(self, mdb, **kwargs): + super().__init__(mdb, migration_version=MIGRATION_VERSION) + + self.crawls = mdb["crawls"] + + async def migrate_up(self) -> None: + """Perform migration up.""" + + print("Clearing crawl file WACZ presigned URLs", flush=True) + await self.crawls.update_many( + {}, + { + "$unset": { + "files.$[].presignedUrl": None, + "files.$[].expireAt": None, + } + }, + ) + + # Clear presign for QA crawl files + qa_query = { + "type": "crawl", + "qaFinished": {"$nin": [None, {}]}, + } + + total = await self.crawls.count_documents(qa_query) + index = 1 + + async for crawl_with_qa in self.crawls.find(qa_query): + print(f"Clearing QA WACZ presigned URLs, crawl {index}/{total}", flush=True) + index += 1 + + qa_finished = crawl_with_qa.get("qaFinished") + if not qa_finished: + continue + for qa_run_id in qa_finished: + await self.crawls.find_one_and_update( + {"_id": crawl_with_qa.get("id")}, + { + "$set": { + f"qaFinished.{qa_run_id}.files.$[].presignedUrl": None, + f"qaFinished.{qa_run_id}.files.$[].expireAt": None, + } + }, + ) diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 5e301ecf..241dfd2c 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -708,6 +708,16 @@ class StorageRef(BaseModel): return "" +# ============================================================================ +class PresignedUrl(BaseMongoModel): + """Base model for presigned url""" + + id: str + url: str + oid: UUID + signedAt: datetime + + # ============================================================================ class BaseFile(BaseModel): """Base model for crawl and profile files""" @@ -724,9 +734,6 @@ class BaseFile(BaseModel): class CrawlFile(BaseFile): """file from a crawl""" - presignedUrl: Optional[str] = None - expireAt: Optional[datetime] = None - # ============================================================================ class CrawlFileOut(BaseModel): @@ -1151,9 +1158,7 @@ class ImageFile(BaseFile): async def get_image_file_out(self, org, storage_ops) -> ImageFileOut: """Get ImageFileOut with new presigned url""" - presigned_url = await storage_ops.get_presigned_url( - org, self, PRESIGN_DURATION_SECONDS - ) + presigned_url, _ = await storage_ops.get_presigned_url(org, self) return ImageFileOut( name=self.filename, @@ -1169,9 +1174,7 @@ class ImageFile(BaseFile): async def get_public_image_file_out(self, org, storage_ops) -> PublicImageFileOut: """Get PublicImageFileOut with new presigned url""" - presigned_url = await storage_ops.get_presigned_url( - org, self, PRESIGN_DURATION_SECONDS - ) + presigned_url, _ = await storage_ops.get_presigned_url(org, self) return PublicImageFileOut( name=self.filename, diff --git a/backend/btrixcloud/ops.py b/backend/btrixcloud/ops.py index c50fef4e..4b64f69d 100644 --- a/backend/btrixcloud/ops.py +++ b/backend/btrixcloud/ops.py @@ -51,7 +51,7 @@ def init_ops() -> Tuple[ crawl_manager = CrawlManager() - storage_ops = StorageOps(org_ops, crawl_manager) + storage_ops = StorageOps(org_ops, crawl_manager, mdb) background_job_ops = BackgroundJobOps( mdb, email, user_manager, org_ops, crawl_manager, storage_ops diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index ad915e97..ac939391 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -1305,7 +1305,7 @@ class OrgOps: # Regenerate presigned URLs await self.base_crawl_ops.resolve_signed_urls( - item_obj.files, org, update_presigned_url=True, crawl_id=item_id + item_obj.files, org, crawl_id=item_id, force_update=True ) # pages diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 768e255d..012aeddd 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -24,10 +24,10 @@ import zlib import json import os -from datetime import datetime +from datetime import datetime, timedelta from zipfile import ZipInfo -from fastapi import Depends, HTTPException +from fastapi import Depends, HTTPException, APIRouter from stream_zip import stream_zip, NO_COMPRESSION_64, Method from remotezip import RemoteZip from aiobotocore.config import AioConfig @@ -50,9 +50,13 @@ from .models import ( DeletedResponse, UpdatedResponse, AddedResponseName, + PRESIGN_DURATION_SECONDS, + PresignedUrl, + SuccessResponse, + User, ) -from .utils import slug_from_name +from .utils import slug_from_name, dt_now from .version import __version__ @@ -81,10 +85,19 @@ class StorageOps: frontend_origin: str - def __init__(self, org_ops, crawl_manager) -> None: + expire_at_duration_seconds: int + signed_duration_delta: timedelta + + def __init__(self, org_ops, crawl_manager, mdb) -> None: self.org_ops = org_ops self.crawl_manager = crawl_manager + self.presigned_urls = mdb["presigned_urls"] + + # renew when <25% of time remaining + self.expire_at_duration_seconds = int(PRESIGN_DURATION_SECONDS * 0.75) + self.signed_duration_delta = timedelta(seconds=self.expire_at_duration_seconds) + frontend_origin = os.environ.get( "FRONTEND_ORIGIN", "http://browsertrix-cloud-frontend" ) @@ -129,6 +142,12 @@ class StorageOps: self.org_ops.set_default_primary_storage(self.default_primary) + async def init_index(self): + """init index for storages""" + await self.presigned_urls.create_index( + "signedAt", expireAfterSeconds=self.expire_at_duration_seconds + ) + def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage: """create S3Storage object""" endpoint_url = storage["endpoint_url"] @@ -445,10 +464,17 @@ class StorageOps: return False async def get_presigned_url( - self, org: Organization, crawlfile: CrawlFile, duration=3600 - ) -> str: + self, org: Organization, crawlfile: CrawlFile, force_update=False + ) -> tuple[str, datetime]: """generate pre-signed url for crawl file""" + res = None + if not force_update: + res = await self.presigned_urls.find_one({"_id": crawlfile.filename}) + if res: + presigned = PresignedUrl.from_dict(res) + return presigned.url, presigned.signedAt + self.signed_duration_delta + s3storage = self.get_org_storage_by_ref(org, crawlfile.storage) async with self.get_s3_client( @@ -459,7 +485,9 @@ class StorageOps: key += crawlfile.filename presigned_url = await client.generate_presigned_url( - "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=duration + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=PRESIGN_DURATION_SECONDS, ) if ( @@ -474,7 +502,20 @@ class StorageOps: host_endpoint_url, s3storage.access_endpoint_url ) - return presigned_url + now = dt_now() + + presigned = PresignedUrl( + id=crawlfile.filename, url=presigned_url, signedAt=dt_now(), oid=org.id + ) + await self.presigned_urls.find_one_and_update( + {"_id": crawlfile.filename}, + { + "$set": presigned.to_dict(), + }, + upsert=True, + ) + + return presigned_url, now + self.signed_duration_delta async def delete_file_object(self, org: Organization, crawlfile: BaseFile) -> bool: """delete crawl file from storage.""" @@ -753,10 +794,12 @@ def _parse_json(line) -> dict: # ============================================================================ -def init_storages_api(org_ops, crawl_manager): +def init_storages_api( + org_ops: OrgOps, crawl_manager: CrawlManager, app: APIRouter, mdb, user_dep +) -> StorageOps: """API for updating storage for an org""" - storage_ops = StorageOps(org_ops, crawl_manager) + storage_ops = StorageOps(org_ops, crawl_manager, mdb) if not org_ops.router: return storage_ops @@ -775,6 +818,29 @@ def init_storages_api(org_ops, crawl_manager): def get_available_storages(org: Organization = Depends(org_owner_dep)): return storage_ops.get_available_storages(org) + @router.post( + "/clear-presigned-urls", + tags=["organizations"], + response_model=SuccessResponse, + ) + async def clear_presigned_url(org: Organization = Depends(org_owner_dep)): + await storage_ops.presigned_urls.delete_many({"oid": org.id}) + + return {"success": True} + + @app.post( + "/orgs/clear-presigned-urls", + tags=["organizations"], + response_model=SuccessResponse, + ) + async def clear_all_presigned_urls(user: User = Depends(user_dep)): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + await storage_ops.presigned_urls.delete_many({}) + + return {"success": True} + # pylint: disable=unreachable, fixme # todo: enable when ready to support custom storage return storage_ops diff --git a/backend/btrixcloud/webhooks.py b/backend/btrixcloud/webhooks.py index 251cc251..1222ccea 100644 --- a/backend/btrixcloud/webhooks.py +++ b/backend/btrixcloud/webhooks.py @@ -282,7 +282,7 @@ class EventWebhookOps: # know for certain what state the crawl will be in at this point try: qa_resources = await self.crawl_ops.resolve_signed_urls( - qa_run.files, org, crawl_id, qa_run.id + qa_run.files, org, crawl_id ) # pylint: disable=broad-exception-caught diff --git a/backend/test/test_uploads.py b/backend/test/test_uploads.py index 1dea54ea..ff35cad9 100644 --- a/backend/test/test_uploads.py +++ b/backend/test/test_uploads.py @@ -1030,6 +1030,39 @@ def test_update_upload_metadata_all_crawls( assert data["collectionIds"] == [] +def test_clear_all_presigned_urls( + admin_auth_headers, crawler_auth_headers, default_org_id +): + # All orgs + r = requests.post( + f"{API_PREFIX}/orgs/clear-presigned-urls", + headers=crawler_auth_headers, + ) + assert r.status_code == 403 + assert r.json()["detail"] == "Not Allowed" + + r = requests.post( + f"{API_PREFIX}/orgs/clear-presigned-urls", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + assert r.json()["success"] + + # Per-org + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/clear-presigned-urls", + headers=crawler_auth_headers, + ) + assert r.status_code == 403 + + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/clear-presigned-urls", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + assert r.json()["success"] + + def test_delete_form_upload_and_crawls_from_all_crawls( admin_auth_headers, crawler_auth_headers, diff --git a/backend/test/test_webhooks.py b/backend/test/test_webhooks.py index 9141ff13..fad3244f 100644 --- a/backend/test/test_webhooks.py +++ b/backend/test/test_webhooks.py @@ -85,6 +85,7 @@ def test_get_webhook_event(admin_auth_headers, default_org_id): if event in ("crawlFinished", "uploadFinished"): assert len(body["resources"]) >= 1 + assert body["resources"][0]["expireAt"] assert body["itemId"] elif event in ("crawlStarted"):