Provide full resources in archived items finished webhooks (#1308)
Fixes #1306 - Include full `resources` with expireAt (as string) in crawlFinished and uploadFinished webhook notifications rather than using the `downloadUrls` field (this is retained for collections). - Set default presigned duration to one minute short of 1 week and enforce maximum supported by S3 - Add 'storage_presign_duration_minutes' commented out to helm values.yaml - Update tests --------- Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
This commit is contained in:
parent
2e5952a444
commit
d58747dfa2
@ -1,6 +1,5 @@
|
||||
""" base crawl type """
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
import os
|
||||
from datetime import timedelta
|
||||
@ -44,6 +43,12 @@ NON_RUNNING_STATES = (*FAILED_STATES, *SUCCESSFUL_STATES)
|
||||
ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES)
|
||||
|
||||
|
||||
# Presign duration must be less than 604800 seconds (one week),
|
||||
# so set this one minute short of a week.
|
||||
PRESIGN_MINUTES_MAX = 10079
|
||||
PRESIGN_MINUTES_DEFAULT = PRESIGN_MINUTES_MAX
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
class BaseCrawlOps:
|
||||
@ -62,8 +67,12 @@ class BaseCrawlOps:
|
||||
self.colls = colls
|
||||
self.storage_ops = storage_ops
|
||||
|
||||
presign_duration_minutes = int(
|
||||
os.environ.get("PRESIGN_DURATION_MINUTES") or PRESIGN_MINUTES_DEFAULT
|
||||
)
|
||||
|
||||
self.presign_duration_seconds = (
|
||||
int(os.environ.get("PRESIGN_DURATION_MINUTES", 60)) * 60
|
||||
min(presign_duration_minutes, PRESIGN_MINUTES_MAX) * 60
|
||||
)
|
||||
|
||||
async def get_crawl_raw(
|
||||
@ -362,7 +371,6 @@ class BaseCrawlOps:
|
||||
|
||||
delta = timedelta(seconds=self.presign_duration_seconds)
|
||||
|
||||
updates = []
|
||||
out_files = []
|
||||
|
||||
for file_ in files:
|
||||
@ -374,17 +382,20 @@ class BaseCrawlOps:
|
||||
presigned_url = await self.storage_ops.get_presigned_url(
|
||||
org, file_, self.presign_duration_seconds
|
||||
)
|
||||
updates.append(
|
||||
(
|
||||
{"files.filename": file_.filename},
|
||||
{
|
||||
"$set": {
|
||||
"files.$.presignedUrl": presigned_url,
|
||||
"files.$.expireAt": exp,
|
||||
}
|
||||
},
|
||||
)
|
||||
await self.crawls.find_one_and_update(
|
||||
{"files.filename": file_.filename},
|
||||
{
|
||||
"$set": {
|
||||
"files.$.presignedUrl": presigned_url,
|
||||
"files.$.expireAt": exp,
|
||||
}
|
||||
},
|
||||
)
|
||||
file_.expireAt = exp
|
||||
|
||||
expire_at_str = ""
|
||||
if file_.expireAt:
|
||||
expire_at_str = file_.expireAt.isoformat()
|
||||
|
||||
out_files.append(
|
||||
CrawlFileOut(
|
||||
@ -393,20 +404,12 @@ class BaseCrawlOps:
|
||||
hash=file_.hash,
|
||||
size=file_.size,
|
||||
crawlId=crawl_id,
|
||||
expireAt=expire_at_str,
|
||||
)
|
||||
)
|
||||
|
||||
if updates:
|
||||
asyncio.create_task(self._update_presigned(updates))
|
||||
|
||||
# print("presigned", out_files)
|
||||
|
||||
return out_files
|
||||
|
||||
async def _update_presigned(self, updates):
|
||||
for update in updates:
|
||||
await self.crawls.find_one_and_update(*update)
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def get_redis(self, crawl_id):
|
||||
"""get redis url for crawl id"""
|
||||
|
@ -406,7 +406,9 @@ class CrawlFileOut(BaseModel):
|
||||
path: str
|
||||
hash: str
|
||||
size: int
|
||||
|
||||
crawlId: Optional[str]
|
||||
expireAt: Optional[str]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -1053,6 +1055,7 @@ class BaseArchivedItemBody(WebhookNotificationBody):
|
||||
"""Webhook notification POST body for when archived item is started or finished"""
|
||||
|
||||
itemId: str
|
||||
resources: Optional[List[CrawlFileOut]] = None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
|
@ -189,12 +189,7 @@ class EventWebhookOps:
|
||||
print(f"Crawl {crawl_id} not found, skipping event webhook", flush=True)
|
||||
return
|
||||
|
||||
download_urls = []
|
||||
for resource in crawl.resources:
|
||||
download_url = f"{org.origin}{resource.path}"
|
||||
download_urls.append(download_url)
|
||||
|
||||
body.downloadUrls = download_urls
|
||||
body.resources = crawl.resources
|
||||
|
||||
notification = WebhookNotification(
|
||||
id=uuid.uuid4(),
|
||||
|
@ -81,14 +81,17 @@ def test_get_webhook_event(admin_auth_headers, default_org_id):
|
||||
assert event
|
||||
|
||||
if event in ("crawlFinished", "uploadFinished"):
|
||||
assert len(body["downloadUrls"]) >= 1
|
||||
assert len(body["resources"]) >= 1
|
||||
assert len(body.get("downloadUrls", [])) == 0
|
||||
assert body["itemId"]
|
||||
|
||||
elif event in ("crawlStarted"):
|
||||
assert len(body["downloadUrls"]) == 0
|
||||
assert len(body.get("resources", [])) == 0
|
||||
assert len(body.get("downloadUrls", [])) == 0
|
||||
assert body["itemId"]
|
||||
|
||||
elif event in ("addedToCollection", "removedFromCollection"):
|
||||
assert len(body.get("resources", [])) == 0
|
||||
assert len(body["downloadUrls"]) == 1
|
||||
assert body["collectionId"]
|
||||
assert len(body["itemIds"]) >= 1
|
||||
@ -246,28 +249,33 @@ def test_webhooks_sent(
|
||||
assert post["itemId"]
|
||||
assert post["scheduled"] in (True, False)
|
||||
assert post.get("downloadUrls") is None
|
||||
assert post.get("resources") is None
|
||||
|
||||
elif event == "crawlFinished":
|
||||
crawl_finished_count += 1
|
||||
assert post["itemId"]
|
||||
assert post["state"]
|
||||
assert post["downloadUrls"]
|
||||
assert post["resources"]
|
||||
assert post.get("downloadUrls") is None
|
||||
|
||||
elif event == "uploadFinished":
|
||||
upload_finished_count += 1
|
||||
assert post["itemId"]
|
||||
assert post["state"]
|
||||
assert post["downloadUrls"]
|
||||
assert post["resources"]
|
||||
assert post.get("downloadUrls") is None
|
||||
|
||||
elif event == "addedToCollection":
|
||||
added_to_collection_count += 1
|
||||
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
|
||||
assert post.get("resources") is None
|
||||
assert post["itemIds"]
|
||||
assert post["collectionId"]
|
||||
|
||||
elif event == "removedFromCollection":
|
||||
removed_from_collection_count += 1
|
||||
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
|
||||
assert post.get("resources") is None
|
||||
assert post["itemIds"]
|
||||
assert post["collectionId"]
|
||||
|
||||
|
@ -36,7 +36,7 @@ data:
|
||||
|
||||
RERUN_FROM_MIGRATION: "{{ .Values.rerun_from_migration }}"
|
||||
|
||||
PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes | default 60 }}"
|
||||
PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes }}"
|
||||
|
||||
FAST_RETRY_SECS: "{{ .Values.operator_fast_resync_secs | default 3 }}"
|
||||
|
||||
|
@ -276,6 +276,12 @@ storages:
|
||||
# shared_storage_profile:
|
||||
|
||||
|
||||
# optional: duration in minutes for WACZ download links to be valid
|
||||
# used by webhooks and replay
|
||||
# max value = 10079 (one week minus one minute)
|
||||
# storage_presign_duration_minutes: 10079
|
||||
|
||||
|
||||
# Email Options
|
||||
# =========================================
|
||||
email:
|
||||
|
Loading…
Reference in New Issue
Block a user