Better cacheing of presigned URLs + support for thumbnails (#2446)
Overhauls URL presigning by: - cache the presigned urls in a flat, separate mongodb collection which has an expiring index - update presigned urls if not found / expired automatically in index - remove logic on storing presignedUrl in files - support cacheing presigned URL for thumbnails. - add endpoints to clear presigned urls for org or for all files in all orgs (superadmin only) - supersedes #2438, fix for #2437 - removes previous presignedUrl and expireAt data from crawls and QA runs --------- Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
parent
631b019baf
commit
702c9ab3b7
@ -1,6 +1,6 @@
|
||||
"""base crawl type"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast, Tuple
|
||||
from uuid import UUID
|
||||
import os
|
||||
@ -29,10 +29,9 @@ from .models import (
|
||||
UpdatedResponse,
|
||||
DeletedResponseQuota,
|
||||
CrawlSearchValuesResponse,
|
||||
PRESIGN_DURATION_SECONDS,
|
||||
)
|
||||
from .pagination import paginated_format, DEFAULT_PAGE_SIZE
|
||||
from .utils import dt_now, date_to_str, get_origin
|
||||
from .utils import dt_now, get_origin, date_to_str
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .crawlconfigs import CrawlConfigOps
|
||||
@ -65,9 +64,6 @@ class BaseCrawlOps:
|
||||
background_job_ops: BackgroundJobOps
|
||||
page_ops: PageOps
|
||||
|
||||
presign_duration_seconds: int
|
||||
expire_at_duration_seconds: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mdb,
|
||||
@ -89,9 +85,6 @@ class BaseCrawlOps:
|
||||
self.background_job_ops = background_job_ops
|
||||
self.page_ops = cast(PageOps, None)
|
||||
|
||||
# renew when <25% of time remaining
|
||||
self.expire_at_duration_seconds = int(PRESIGN_DURATION_SECONDS * 0.75)
|
||||
|
||||
def set_page_ops(self, page_ops):
|
||||
"""set page ops reference"""
|
||||
self.page_ops = page_ops
|
||||
@ -124,13 +117,12 @@ class BaseCrawlOps:
|
||||
files: List[Dict],
|
||||
org: Organization,
|
||||
crawlid: str,
|
||||
qa_run_id: Optional[str] = None,
|
||||
) -> List[CrawlFileOut]:
|
||||
if not files:
|
||||
return []
|
||||
|
||||
crawl_files = [CrawlFile(**data) for data in files]
|
||||
return await self.resolve_signed_urls(crawl_files, org, crawlid, qa_run_id)
|
||||
return await self.resolve_signed_urls(crawl_files, org, crawlid)
|
||||
|
||||
async def get_wacz_files(self, crawl_id: str, org: Organization):
|
||||
"""Return list of WACZ files associated with crawl."""
|
||||
@ -464,50 +456,19 @@ class BaseCrawlOps:
|
||||
files: List[CrawlFile],
|
||||
org: Organization,
|
||||
crawl_id: Optional[str] = None,
|
||||
qa_run_id: Optional[str] = None,
|
||||
update_presigned_url: bool = False,
|
||||
force_update=False,
|
||||
) -> List[CrawlFileOut]:
|
||||
"""Regenerate presigned URLs for files as necessary"""
|
||||
if not files:
|
||||
print("no files")
|
||||
return []
|
||||
|
||||
delta = timedelta(seconds=self.expire_at_duration_seconds)
|
||||
|
||||
out_files = []
|
||||
|
||||
for file_ in files:
|
||||
presigned_url = file_.presignedUrl
|
||||
now = dt_now()
|
||||
|
||||
if (
|
||||
update_presigned_url
|
||||
or not presigned_url
|
||||
or (file_.expireAt and now >= file_.expireAt)
|
||||
):
|
||||
exp = now + delta
|
||||
presigned_url = await self.storage_ops.get_presigned_url(
|
||||
org, file_, PRESIGN_DURATION_SECONDS
|
||||
)
|
||||
|
||||
prefix = "files"
|
||||
if qa_run_id:
|
||||
prefix = f"qaFinished.{qa_run_id}.{prefix}"
|
||||
|
||||
await self.crawls.find_one_and_update(
|
||||
{f"{prefix}.filename": file_.filename},
|
||||
{
|
||||
"$set": {
|
||||
f"{prefix}.$.presignedUrl": presigned_url,
|
||||
f"{prefix}.$.expireAt": exp,
|
||||
}
|
||||
},
|
||||
)
|
||||
file_.expireAt = exp
|
||||
|
||||
expire_at_str = ""
|
||||
if file_.expireAt:
|
||||
expire_at_str = date_to_str(file_.expireAt)
|
||||
presigned_url, expire_at = await self.storage_ops.get_presigned_url(
|
||||
org, file_, force_update=force_update
|
||||
)
|
||||
|
||||
out_files.append(
|
||||
CrawlFileOut(
|
||||
@ -517,7 +478,7 @@ class BaseCrawlOps:
|
||||
size=file_.size,
|
||||
crawlId=crawl_id,
|
||||
numReplicas=len(file_.replicas) if file_.replicas else 0,
|
||||
expireAt=expire_at_str,
|
||||
expireAt=date_to_str(expire_at),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@ -1026,9 +1026,7 @@ class CrawlOps(BaseCrawlOps):
|
||||
if not org:
|
||||
raise HTTPException(status_code=400, detail="missing_org")
|
||||
|
||||
resources = await self.resolve_signed_urls(
|
||||
qa_run.files, org, crawl.id, qa_run_id
|
||||
)
|
||||
resources = await self.resolve_signed_urls(qa_run.files, org, crawl.id)
|
||||
|
||||
qa_run.files = []
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@ from pymongo.errors import InvalidName
|
||||
from .migrations import BaseMigration
|
||||
|
||||
|
||||
CURR_DB_VERSION = "0042"
|
||||
CURR_DB_VERSION = "0043"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -108,6 +108,7 @@ async def update_and_prepare_db(
|
||||
invite_ops,
|
||||
user_manager,
|
||||
page_ops,
|
||||
storage_ops,
|
||||
)
|
||||
await user_manager.create_super_user()
|
||||
await org_ops.create_default_org()
|
||||
@ -211,7 +212,14 @@ async def drop_indexes(mdb):
|
||||
# ============================================================================
|
||||
# pylint: disable=too-many-arguments
|
||||
async def create_indexes(
|
||||
org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops, user_manager, page_ops
|
||||
org_ops,
|
||||
crawl_ops,
|
||||
crawl_config_ops,
|
||||
coll_ops,
|
||||
invite_ops,
|
||||
user_manager,
|
||||
page_ops,
|
||||
storage_ops,
|
||||
):
|
||||
"""Create database indexes."""
|
||||
print("Creating database indexes", flush=True)
|
||||
@ -222,6 +230,7 @@ async def create_indexes(
|
||||
await invite_ops.init_index()
|
||||
await user_manager.init_index()
|
||||
await page_ops.init_index()
|
||||
await storage_ops.init_index()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
|
||||
@ -190,7 +190,9 @@ def main() -> None:
|
||||
|
||||
crawl_manager = CrawlManager()
|
||||
|
||||
storage_ops = init_storages_api(org_ops, crawl_manager)
|
||||
storage_ops = init_storages_api(
|
||||
org_ops, crawl_manager, app, mdb, current_active_user
|
||||
)
|
||||
|
||||
background_job_ops = init_background_jobs_api(
|
||||
app,
|
||||
|
||||
@ -0,0 +1,59 @@
|
||||
"""
|
||||
Migration 0043 - Remove expireAt and presignedUrl from files, now stored in separate collection
|
||||
"""
|
||||
|
||||
from btrixcloud.migrations import BaseMigration
|
||||
|
||||
|
||||
MIGRATION_VERSION = "0043"
|
||||
|
||||
|
||||
class Migration(BaseMigration):
|
||||
"""Migration class."""
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def __init__(self, mdb, **kwargs):
|
||||
super().__init__(mdb, migration_version=MIGRATION_VERSION)
|
||||
|
||||
self.crawls = mdb["crawls"]
|
||||
|
||||
async def migrate_up(self) -> None:
|
||||
"""Perform migration up."""
|
||||
|
||||
print("Clearing crawl file WACZ presigned URLs", flush=True)
|
||||
await self.crawls.update_many(
|
||||
{},
|
||||
{
|
||||
"$unset": {
|
||||
"files.$[].presignedUrl": None,
|
||||
"files.$[].expireAt": None,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Clear presign for QA crawl files
|
||||
qa_query = {
|
||||
"type": "crawl",
|
||||
"qaFinished": {"$nin": [None, {}]},
|
||||
}
|
||||
|
||||
total = await self.crawls.count_documents(qa_query)
|
||||
index = 1
|
||||
|
||||
async for crawl_with_qa in self.crawls.find(qa_query):
|
||||
print(f"Clearing QA WACZ presigned URLs, crawl {index}/{total}", flush=True)
|
||||
index += 1
|
||||
|
||||
qa_finished = crawl_with_qa.get("qaFinished")
|
||||
if not qa_finished:
|
||||
continue
|
||||
for qa_run_id in qa_finished:
|
||||
await self.crawls.find_one_and_update(
|
||||
{"_id": crawl_with_qa.get("id")},
|
||||
{
|
||||
"$set": {
|
||||
f"qaFinished.{qa_run_id}.files.$[].presignedUrl": None,
|
||||
f"qaFinished.{qa_run_id}.files.$[].expireAt": None,
|
||||
}
|
||||
},
|
||||
)
|
||||
@ -708,6 +708,16 @@ class StorageRef(BaseModel):
|
||||
return ""
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class PresignedUrl(BaseMongoModel):
|
||||
"""Base model for presigned url"""
|
||||
|
||||
id: str
|
||||
url: str
|
||||
oid: UUID
|
||||
signedAt: datetime
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class BaseFile(BaseModel):
|
||||
"""Base model for crawl and profile files"""
|
||||
@ -724,9 +734,6 @@ class BaseFile(BaseModel):
|
||||
class CrawlFile(BaseFile):
|
||||
"""file from a crawl"""
|
||||
|
||||
presignedUrl: Optional[str] = None
|
||||
expireAt: Optional[datetime] = None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class CrawlFileOut(BaseModel):
|
||||
@ -1151,9 +1158,7 @@ class ImageFile(BaseFile):
|
||||
|
||||
async def get_image_file_out(self, org, storage_ops) -> ImageFileOut:
|
||||
"""Get ImageFileOut with new presigned url"""
|
||||
presigned_url = await storage_ops.get_presigned_url(
|
||||
org, self, PRESIGN_DURATION_SECONDS
|
||||
)
|
||||
presigned_url, _ = await storage_ops.get_presigned_url(org, self)
|
||||
|
||||
return ImageFileOut(
|
||||
name=self.filename,
|
||||
@ -1169,9 +1174,7 @@ class ImageFile(BaseFile):
|
||||
|
||||
async def get_public_image_file_out(self, org, storage_ops) -> PublicImageFileOut:
|
||||
"""Get PublicImageFileOut with new presigned url"""
|
||||
presigned_url = await storage_ops.get_presigned_url(
|
||||
org, self, PRESIGN_DURATION_SECONDS
|
||||
)
|
||||
presigned_url, _ = await storage_ops.get_presigned_url(org, self)
|
||||
|
||||
return PublicImageFileOut(
|
||||
name=self.filename,
|
||||
|
||||
@ -51,7 +51,7 @@ def init_ops() -> Tuple[
|
||||
|
||||
crawl_manager = CrawlManager()
|
||||
|
||||
storage_ops = StorageOps(org_ops, crawl_manager)
|
||||
storage_ops = StorageOps(org_ops, crawl_manager, mdb)
|
||||
|
||||
background_job_ops = BackgroundJobOps(
|
||||
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
|
||||
|
||||
@ -1305,7 +1305,7 @@ class OrgOps:
|
||||
|
||||
# Regenerate presigned URLs
|
||||
await self.base_crawl_ops.resolve_signed_urls(
|
||||
item_obj.files, org, update_presigned_url=True, crawl_id=item_id
|
||||
item_obj.files, org, crawl_id=item_id, force_update=True
|
||||
)
|
||||
|
||||
# pages
|
||||
|
||||
@ -24,10 +24,10 @@ import zlib
|
||||
import json
|
||||
import os
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from zipfile import ZipInfo
|
||||
|
||||
from fastapi import Depends, HTTPException
|
||||
from fastapi import Depends, HTTPException, APIRouter
|
||||
from stream_zip import stream_zip, NO_COMPRESSION_64, Method
|
||||
from remotezip import RemoteZip
|
||||
from aiobotocore.config import AioConfig
|
||||
@ -50,9 +50,13 @@ from .models import (
|
||||
DeletedResponse,
|
||||
UpdatedResponse,
|
||||
AddedResponseName,
|
||||
PRESIGN_DURATION_SECONDS,
|
||||
PresignedUrl,
|
||||
SuccessResponse,
|
||||
User,
|
||||
)
|
||||
|
||||
from .utils import slug_from_name
|
||||
from .utils import slug_from_name, dt_now
|
||||
from .version import __version__
|
||||
|
||||
|
||||
@ -81,10 +85,19 @@ class StorageOps:
|
||||
|
||||
frontend_origin: str
|
||||
|
||||
def __init__(self, org_ops, crawl_manager) -> None:
|
||||
expire_at_duration_seconds: int
|
||||
signed_duration_delta: timedelta
|
||||
|
||||
def __init__(self, org_ops, crawl_manager, mdb) -> None:
|
||||
self.org_ops = org_ops
|
||||
self.crawl_manager = crawl_manager
|
||||
|
||||
self.presigned_urls = mdb["presigned_urls"]
|
||||
|
||||
# renew when <25% of time remaining
|
||||
self.expire_at_duration_seconds = int(PRESIGN_DURATION_SECONDS * 0.75)
|
||||
self.signed_duration_delta = timedelta(seconds=self.expire_at_duration_seconds)
|
||||
|
||||
frontend_origin = os.environ.get(
|
||||
"FRONTEND_ORIGIN", "http://browsertrix-cloud-frontend"
|
||||
)
|
||||
@ -129,6 +142,12 @@ class StorageOps:
|
||||
|
||||
self.org_ops.set_default_primary_storage(self.default_primary)
|
||||
|
||||
async def init_index(self):
|
||||
"""init index for storages"""
|
||||
await self.presigned_urls.create_index(
|
||||
"signedAt", expireAfterSeconds=self.expire_at_duration_seconds
|
||||
)
|
||||
|
||||
def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage:
|
||||
"""create S3Storage object"""
|
||||
endpoint_url = storage["endpoint_url"]
|
||||
@ -445,10 +464,17 @@ class StorageOps:
|
||||
return False
|
||||
|
||||
async def get_presigned_url(
|
||||
self, org: Organization, crawlfile: CrawlFile, duration=3600
|
||||
) -> str:
|
||||
self, org: Organization, crawlfile: CrawlFile, force_update=False
|
||||
) -> tuple[str, datetime]:
|
||||
"""generate pre-signed url for crawl file"""
|
||||
|
||||
res = None
|
||||
if not force_update:
|
||||
res = await self.presigned_urls.find_one({"_id": crawlfile.filename})
|
||||
if res:
|
||||
presigned = PresignedUrl.from_dict(res)
|
||||
return presigned.url, presigned.signedAt + self.signed_duration_delta
|
||||
|
||||
s3storage = self.get_org_storage_by_ref(org, crawlfile.storage)
|
||||
|
||||
async with self.get_s3_client(
|
||||
@ -459,7 +485,9 @@ class StorageOps:
|
||||
key += crawlfile.filename
|
||||
|
||||
presigned_url = await client.generate_presigned_url(
|
||||
"get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=duration
|
||||
"get_object",
|
||||
Params={"Bucket": bucket, "Key": key},
|
||||
ExpiresIn=PRESIGN_DURATION_SECONDS,
|
||||
)
|
||||
|
||||
if (
|
||||
@ -474,7 +502,20 @@ class StorageOps:
|
||||
host_endpoint_url, s3storage.access_endpoint_url
|
||||
)
|
||||
|
||||
return presigned_url
|
||||
now = dt_now()
|
||||
|
||||
presigned = PresignedUrl(
|
||||
id=crawlfile.filename, url=presigned_url, signedAt=dt_now(), oid=org.id
|
||||
)
|
||||
await self.presigned_urls.find_one_and_update(
|
||||
{"_id": crawlfile.filename},
|
||||
{
|
||||
"$set": presigned.to_dict(),
|
||||
},
|
||||
upsert=True,
|
||||
)
|
||||
|
||||
return presigned_url, now + self.signed_duration_delta
|
||||
|
||||
async def delete_file_object(self, org: Organization, crawlfile: BaseFile) -> bool:
|
||||
"""delete crawl file from storage."""
|
||||
@ -753,10 +794,12 @@ def _parse_json(line) -> dict:
|
||||
|
||||
|
||||
# ============================================================================
|
||||
def init_storages_api(org_ops, crawl_manager):
|
||||
def init_storages_api(
|
||||
org_ops: OrgOps, crawl_manager: CrawlManager, app: APIRouter, mdb, user_dep
|
||||
) -> StorageOps:
|
||||
"""API for updating storage for an org"""
|
||||
|
||||
storage_ops = StorageOps(org_ops, crawl_manager)
|
||||
storage_ops = StorageOps(org_ops, crawl_manager, mdb)
|
||||
|
||||
if not org_ops.router:
|
||||
return storage_ops
|
||||
@ -775,6 +818,29 @@ def init_storages_api(org_ops, crawl_manager):
|
||||
def get_available_storages(org: Organization = Depends(org_owner_dep)):
|
||||
return storage_ops.get_available_storages(org)
|
||||
|
||||
@router.post(
|
||||
"/clear-presigned-urls",
|
||||
tags=["organizations"],
|
||||
response_model=SuccessResponse,
|
||||
)
|
||||
async def clear_presigned_url(org: Organization = Depends(org_owner_dep)):
|
||||
await storage_ops.presigned_urls.delete_many({"oid": org.id})
|
||||
|
||||
return {"success": True}
|
||||
|
||||
@app.post(
|
||||
"/orgs/clear-presigned-urls",
|
||||
tags=["organizations"],
|
||||
response_model=SuccessResponse,
|
||||
)
|
||||
async def clear_all_presigned_urls(user: User = Depends(user_dep)):
|
||||
if not user.is_superuser:
|
||||
raise HTTPException(status_code=403, detail="Not Allowed")
|
||||
|
||||
await storage_ops.presigned_urls.delete_many({})
|
||||
|
||||
return {"success": True}
|
||||
|
||||
# pylint: disable=unreachable, fixme
|
||||
# todo: enable when ready to support custom storage
|
||||
return storage_ops
|
||||
|
||||
@ -282,7 +282,7 @@ class EventWebhookOps:
|
||||
# know for certain what state the crawl will be in at this point
|
||||
try:
|
||||
qa_resources = await self.crawl_ops.resolve_signed_urls(
|
||||
qa_run.files, org, crawl_id, qa_run.id
|
||||
qa_run.files, org, crawl_id
|
||||
)
|
||||
|
||||
# pylint: disable=broad-exception-caught
|
||||
|
||||
@ -1030,6 +1030,39 @@ def test_update_upload_metadata_all_crawls(
|
||||
assert data["collectionIds"] == []
|
||||
|
||||
|
||||
def test_clear_all_presigned_urls(
|
||||
admin_auth_headers, crawler_auth_headers, default_org_id
|
||||
):
|
||||
# All orgs
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/clear-presigned-urls",
|
||||
headers=crawler_auth_headers,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert r.json()["detail"] == "Not Allowed"
|
||||
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/clear-presigned-urls",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json()["success"]
|
||||
|
||||
# Per-org
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/clear-presigned-urls",
|
||||
headers=crawler_auth_headers,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/clear-presigned-urls",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json()["success"]
|
||||
|
||||
|
||||
def test_delete_form_upload_and_crawls_from_all_crawls(
|
||||
admin_auth_headers,
|
||||
crawler_auth_headers,
|
||||
|
||||
@ -85,6 +85,7 @@ def test_get_webhook_event(admin_auth_headers, default_org_id):
|
||||
|
||||
if event in ("crawlFinished", "uploadFinished"):
|
||||
assert len(body["resources"]) >= 1
|
||||
assert body["resources"][0]["expireAt"]
|
||||
assert body["itemId"]
|
||||
|
||||
elif event in ("crawlStarted"):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user