Add crawl, upload, and collection delete webhook event notifications (#1363)

Fixes #1307
Fixes #1132
Related to #1306

Deleted webhook notifications include the org id and item/collection id.
This PR also includes API docs for the new webhooks and extends the
existing tests to account for the new webhooks.

This PR also does some additional cleanup for existing webhooks:
- Remove `downloadUrls` from item finished webhook bodies
- Rename collection webhook body `downloadUrls` to `downloadUrl`, since
we only ever have one per collection
- Fix API docs for existing webhooks, one of which had the wrong
response body
This commit is contained in:
Tessa Walsh 2023-11-09 21:19:08 -05:00 committed by GitHub
parent 1afc411114
commit f3cbd9e179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 225 additions and 24 deletions

View File

@ -7,6 +7,7 @@ from uuid import UUID
import urllib.parse
import contextlib
import asyncio
from fastapi import HTTPException, Depends
from .models import (
@ -346,6 +347,19 @@ class BaseCrawlOps:
cids_to_update[cid]["inc"] = 1
cids_to_update[cid]["size"] = crawl_size
if type_ == "crawl":
asyncio.create_task(
self.event_webhook_ops.create_crawl_deleted_notification(
crawl_id, org
)
)
if type_ == "upload":
asyncio.create_task(
self.event_webhook_ops.create_upload_deleted_notification(
crawl_id, org
)
)
query = {"_id": {"$in": delete_list.crawl_ids}, "oid": org.id, "type": type_}
res = await self.crawls.delete_many(query)

View File

@ -305,6 +305,10 @@ class CollectionOps:
if result.deleted_count < 1:
raise HTTPException(status_code=404, detail="collection_not_found")
asyncio.create_task(
self.event_webhook_ops.create_collection_deleted_notification(coll_id, org)
)
return {"success": True}
async def download_collection(self, coll_id: UUID, org: Organization):

View File

@ -824,9 +824,12 @@ class OrgWebhookUrls(BaseModel):
crawlStarted: Optional[AnyHttpUrl] = None
crawlFinished: Optional[AnyHttpUrl] = None
crawlDeleted: Optional[AnyHttpUrl] = None
uploadFinished: Optional[AnyHttpUrl] = None
uploadDeleted: Optional[AnyHttpUrl] = None
addedToCollection: Optional[AnyHttpUrl] = None
removedFromCollection: Optional[AnyHttpUrl] = None
collectionDeleted: Optional[AnyHttpUrl] = None
# ============================================================================
@ -1119,8 +1122,6 @@ class UserUpdatePassword(BaseModel):
class WebhookNotificationBody(BaseModel):
"""Base POST body model for webhook notifications"""
downloadUrls: Optional[List] = None
# Store as str, not UUID, to make JSON-serializable
orgId: str
@ -1131,10 +1132,14 @@ class WebhookEventType(str, Enum):
CRAWL_STARTED = "crawlStarted"
CRAWL_FINISHED = "crawlFinished"
CRAWL_DELETED = "crawlDeleted"
UPLOAD_FINISHED = "uploadFinished"
UPLOAD_DELETED = "uploadDeleted"
ADDED_TO_COLLECTION = "addedToCollection"
REMOVED_FROM_COLLECTION = "removedFromCollection"
COLLECTION_DELETED = "collectionDeleted"
# ============================================================================
@ -1143,6 +1148,7 @@ class BaseCollectionItemBody(WebhookNotificationBody):
collectionId: str
itemIds: List[str]
downloadUrl: str
# ============================================================================
@ -1163,12 +1169,29 @@ class CollectionItemRemovedBody(BaseCollectionItemBody):
] = WebhookEventType.REMOVED_FROM_COLLECTION
# ============================================================================
class CollectionDeletedBody(WebhookNotificationBody):
"""Webhook notification base POST body for collection changes"""
event: Literal[
WebhookEventType.COLLECTION_DELETED
] = WebhookEventType.COLLECTION_DELETED
collectionId: str
# ============================================================================
class BaseArchivedItemBody(WebhookNotificationBody):
"""Webhook notification POST body for when archived item is started or finished"""
itemId: str
resources: Optional[List[CrawlFileOut]] = None
# ============================================================================
class BaseArchivedItemFinishedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when archived item is finished"""
resources: List[CrawlFileOut]
state: str
# ============================================================================
@ -1180,19 +1203,31 @@ class CrawlStartedBody(BaseArchivedItemBody):
# ============================================================================
class CrawlFinishedBody(BaseArchivedItemBody):
class CrawlFinishedBody(BaseArchivedItemFinishedBody):
"""Webhook notification POST body for when crawl finishes"""
event: Literal[WebhookEventType.CRAWL_FINISHED] = WebhookEventType.CRAWL_FINISHED
state: str
# ============================================================================
class UploadFinishedBody(BaseArchivedItemBody):
class CrawlDeletedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when crawl is deleted"""
event: Literal[WebhookEventType.CRAWL_DELETED] = WebhookEventType.CRAWL_DELETED
# ============================================================================
class UploadFinishedBody(BaseArchivedItemFinishedBody):
"""Webhook notification POST body for when upload finishes"""
event: Literal[WebhookEventType.UPLOAD_FINISHED] = WebhookEventType.UPLOAD_FINISHED
state: str
# ============================================================================
class UploadDeletedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when upload finishes"""
event: Literal[WebhookEventType.UPLOAD_DELETED] = WebhookEventType.UPLOAD_DELETED
# ============================================================================
@ -1204,9 +1239,12 @@ class WebhookNotification(BaseMongoModel):
body: Union[
CrawlStartedBody,
CrawlFinishedBody,
CrawlDeletedBody,
UploadFinishedBody,
UploadDeletedBody,
CollectionItemAddedBody,
CollectionItemRemovedBody,
CollectionDeletedBody,
]
success: bool = False
attempts: int = 0

View File

@ -15,9 +15,12 @@ from .models import (
WebhookNotification,
CrawlStartedBody,
CrawlFinishedBody,
CrawlDeletedBody,
UploadFinishedBody,
UploadDeletedBody,
CollectionItemAddedBody,
CollectionItemRemovedBody,
CollectionDeletedBody,
PaginatedResponse,
Organization,
)
@ -217,6 +220,25 @@ class EventWebhookOps:
crawl_ids=[crawl_id], coll_id=coll_id, org=org
)
async def _create_deleted_notification(
self,
org: Organization,
event: str,
body: Union[CrawlDeletedBody, UploadDeletedBody, CollectionDeletedBody],
):
"""Create webhook notification for deleted crawl/upload/collection."""
notification = WebhookNotification(
id=uuid4(),
event=event,
oid=org.id,
body=body,
created=datetime.utcnow(),
)
await self.webhooks.insert_one(notification.to_dict())
await self.send_notification(org, notification)
async def create_crawl_finished_notification(
self, crawl_id: str, oid: UUID, state: str
) -> None:
@ -234,6 +256,23 @@ class EventWebhookOps:
itemId=crawl_id,
orgId=str(org.id),
state=state,
resources=[],
),
)
async def create_crawl_deleted_notification(
self, crawl_id: str, org: Organization
) -> None:
"""Create webhook notification for deleted crawl."""
if not org.webhookUrls or not org.webhookUrls.crawlDeleted:
return
await self._create_deleted_notification(
org,
event=WebhookEventType.CRAWL_DELETED,
body=CrawlDeletedBody(
itemId=crawl_id,
orgId=str(org.id),
),
)
@ -251,10 +290,23 @@ class EventWebhookOps:
org,
event=WebhookEventType.UPLOAD_FINISHED,
body=UploadFinishedBody(
itemId=crawl_id, orgId=str(org.id), state="complete"
itemId=crawl_id, orgId=str(org.id), state="complete", resources=[]
),
)
async def create_upload_deleted_notification(
self, crawl_id: str, org: Organization
) -> None:
"""Create webhook notification for deleted upload."""
if not org.webhookUrls or not org.webhookUrls.uploadDeleted:
return
await self._create_deleted_notification(
org,
event=WebhookEventType.UPLOAD_DELETED,
body=UploadDeletedBody(itemId=crawl_id, orgId=str(org.id)),
)
async def create_crawl_started_notification(
self, crawl_id: str, oid: UUID, scheduled: bool = False
) -> None:
@ -304,7 +356,7 @@ class EventWebhookOps:
f"{org.origin}/api/orgs/{org.id}/collections/{coll_id}/download"
)
body.downloadUrls = [coll_download_url]
body.downloadUrl = coll_download_url
notification = WebhookNotification(
id=uuid4(),
@ -334,6 +386,7 @@ class EventWebhookOps:
event=WebhookEventType.ADDED_TO_COLLECTION,
body=CollectionItemAddedBody(
itemIds=crawl_ids,
downloadUrl="",
collectionId=str(coll_id),
orgId=str(org.id),
),
@ -355,6 +408,25 @@ class EventWebhookOps:
event=WebhookEventType.REMOVED_FROM_COLLECTION,
body=CollectionItemRemovedBody(
itemIds=crawl_ids,
downloadUrl="",
collectionId=str(coll_id),
orgId=str(org.id),
),
)
async def create_collection_deleted_notification(
self,
coll_id: UUID,
org: Organization,
) -> None:
"""Create webhook notification for item removed from collection"""
if not org.webhookUrls or not org.webhookUrls.collectionDeleted:
return
await self._create_deleted_notification(
org,
event=WebhookEventType.REMOVED_FROM_COLLECTION,
body=CollectionDeletedBody(
collectionId=str(coll_id),
orgId=str(org.id),
),
@ -426,18 +498,30 @@ def init_openapi_webhooks(app):
@app.webhooks.post(WebhookEventType.CRAWL_FINISHED)
def crawl_finished(body: CrawlFinishedBody):
"""Sent when a crawl if finished"""
"""Sent when a crawl has finished"""
@app.webhooks.post(WebhookEventType.CRAWL_DELETED)
def crawl_deleted(body: CrawlDeletedBody):
"""Sent when a crawl is deleted"""
@app.webhooks.post(WebhookEventType.UPLOAD_FINISHED)
def upload_finished(body: UploadFinishedBody):
"""Sent when an upload has finished"""
@app.webhooks.post(WebhookEventType.UPLOAD_DELETED)
def upload_deleted(body: UploadDeletedBody):
"""Sent when an upload is deleted"""
@app.webhooks.post(WebhookEventType.ADDED_TO_COLLECTION)
def added_to_collection(body: CollectionItemAddedBody):
"""Sent when an archived item (crawl or upload)
is added to a collection"""
@app.webhooks.post(WebhookEventType.REMOVED_FROM_COLLECTION)
def remove_from_collection(body: CrawlStartedBody):
def remove_from_collection(body: CollectionItemRemovedBody):
"""Sent when an archived item (crawl or upload)
is removed from a collection"""
@app.webhooks.post(WebhookEventType.COLLECTION_DELETED)
def collection_deleted(body: CollectionDeletedBody):
"""Sent when a collection is deleted"""

View File

@ -344,16 +344,22 @@ def test_update_event_webhook_urls_org_admin(admin_auth_headers, default_org_id)
webhooks = data.get("webhooks")
assert webhooks.get("crawlStarted") is None
assert webhooks.get("crawlFinished") is None
assert webhooks.get("crawlDeleted") is None
assert webhooks.get("uploadFinished") is None
assert webhooks.get("uploadDeleted") is None
assert webhooks.get("addedToCollection") is None
assert webhooks.get("removedFromCollection") is None
assert webhooks.get("collectionDeleted") is None
# Set URLs and verify
CRAWL_STARTED_URL = "https://example.com/crawl/started"
CRAWL_FINISHED_URL = "https://example.com/crawl/finished"
UPLOAD_FINISHED_URL = "https://example.com/crawl/finished"
CRAWL_DELETED_URL = "https://example.com/crawl/deleted"
UPLOAD_FINISHED_URL = "https://example.com/upload/finished"
UPLOAD_DELETED_URL = "https://example.com/upload/deleted"
COLL_ADDED_URL = "https://example.com/coll/added"
COLL_REMOVED_URL = "http://example.com/coll/removed"
COLL_DELETED_URL = "http://example.com/coll/deleted"
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/event-webhook-urls",
@ -361,9 +367,12 @@ def test_update_event_webhook_urls_org_admin(admin_auth_headers, default_org_id)
json={
"crawlStarted": CRAWL_STARTED_URL,
"crawlFinished": CRAWL_FINISHED_URL,
"crawlDeleted": CRAWL_DELETED_URL,
"uploadFinished": UPLOAD_FINISHED_URL,
"uploadDeleted": UPLOAD_DELETED_URL,
"addedToCollection": COLL_ADDED_URL,
"removedFromCollection": COLL_REMOVED_URL,
"collectionDeleted": COLL_DELETED_URL,
},
)
assert r.status_code == 200
@ -378,9 +387,14 @@ def test_update_event_webhook_urls_org_admin(admin_auth_headers, default_org_id)
urls = data["webhookUrls"]
assert urls["crawlStarted"] == CRAWL_STARTED_URL
assert urls["crawlFinished"] == CRAWL_FINISHED_URL
assert urls["crawlDeleted"] == CRAWL_DELETED_URL
assert urls["uploadFinished"] == UPLOAD_FINISHED_URL
assert urls["uploadDeleted"] == UPLOAD_DELETED_URL
assert urls["addedToCollection"] == COLL_ADDED_URL
assert urls["removedFromCollection"] == COLL_REMOVED_URL
assert urls["collectionDeleted"] == COLL_DELETED_URL
def test_update_event_webhook_urls_org_crawler(crawler_auth_headers, default_org_id):

View File

@ -32,9 +32,12 @@ def test_list_webhook_events(admin_auth_headers, default_org_id):
urls = data["webhookUrls"]
assert urls["crawlStarted"]
assert urls["crawlFinished"]
assert urls["crawlDeleted"]
assert urls["uploadFinished"]
assert urls["uploadDeleted"]
assert urls["addedToCollection"]
assert urls["removedFromCollection"]
assert urls["collectionDeleted"]
# Verify list endpoint works as expected
r = requests.get(
@ -82,17 +85,15 @@ def test_get_webhook_event(admin_auth_headers, default_org_id):
if event in ("crawlFinished", "uploadFinished"):
assert len(body["resources"]) >= 1
assert len(body.get("downloadUrls", [])) == 0
assert body["itemId"]
elif event in ("crawlStarted"):
assert len(body.get("resources", [])) == 0
assert len(body.get("downloadUrls", [])) == 0
assert body["itemId"]
elif event in ("addedToCollection", "removedFromCollection"):
assert len(body.get("resources", [])) == 0
assert len(body["downloadUrls"]) == 1
assert body["downloadUrl"]
assert body["collectionId"]
assert len(body["itemIds"]) >= 1
@ -139,9 +140,12 @@ def test_webhooks_sent(
json={
"crawlStarted": ECHO_SERVER_URL_FROM_K8S,
"crawlFinished": ECHO_SERVER_URL_FROM_K8S,
"crawlDeleted": ECHO_SERVER_URL_FROM_K8S,
"uploadFinished": ECHO_SERVER_URL_FROM_K8S,
"uploadDeleted": ECHO_SERVER_URL_FROM_K8S,
"addedToCollection": ECHO_SERVER_URL_FROM_K8S,
"removedFromCollection": ECHO_SERVER_URL_FROM_K8S,
"collectionDeleted": ECHO_SERVER_URL_FROM_K8S,
},
)
assert r.status_code == 200
@ -214,16 +218,43 @@ def test_webhooks_sent(
data = r.json()
assert data["id"]
# Re-add upload to collection
# Delete upload
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}/add",
json={"crawlIds": [webhooks_upload_id]},
f"{API_PREFIX}/orgs/{default_org_id}/uploads/delete",
json={"crawl_ids": [webhooks_upload_id]},
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["deleted"]
# Remove crawls from collection
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}/remove",
json={"crawlIds": [webhooks_crawl_id, all_crawls_crawl_id]},
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["id"]
# Delete crawl
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/delete",
json={"crawl_ids": [webhooks_crawl_id]},
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["deleted"]
# Delete collection
r = requests.delete(
f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}",
headers=admin_auth_headers,
)
assert r.status_code == 200
# Wait to ensure async notifications are all sent
time.sleep(30)
@ -235,9 +266,12 @@ def test_webhooks_sent(
crawl_started_count = 0
crawl_finished_count = 0
crawl_deleted_count = 0
upload_finished_count = 0
upload_deleted_count = 0
added_to_collection_count = 0
removed_from_collection_count = 0
collection_deleted_count = 0
for post in data["post_bodies"]:
assert post["orgId"]
@ -248,7 +282,6 @@ def test_webhooks_sent(
crawl_started_count += 1
assert post["itemId"]
assert post["scheduled"] in (True, False)
assert post.get("downloadUrls") is None
assert post.get("resources") is None
elif event == "crawlFinished":
@ -256,7 +289,10 @@ def test_webhooks_sent(
assert post["itemId"]
assert post["state"]
assert post["resources"]
assert post.get("downloadUrls") is None
elif event == "crawlDeleted":
crawl_deleted_count += 1
assert post["itemId"]
elif event == "uploadFinished":
upload_finished_count += 1
@ -265,26 +301,37 @@ def test_webhooks_sent(
assert post["resources"]
assert post.get("downloadUrls") is None
elif event == "uploadDeleted":
upload_deleted_count += 1
assert post["itemId"]
elif event == "addedToCollection":
added_to_collection_count += 1
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
assert post["downloadUrl"]
assert post.get("resources") is None
assert post["itemIds"]
assert post["collectionId"]
elif event == "removedFromCollection":
removed_from_collection_count += 1
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
assert post["downloadUrl"]
assert post.get("resources") is None
assert post["itemIds"]
assert post["collectionId"]
elif event == "collectionDeleted":
collection_deleted_count += 1
assert post["collectionId"]
# Allow for some variability here due to timing of crawls
assert crawl_started_count >= 1
assert crawl_finished_count >= 1
assert crawl_deleted_count == 1
assert upload_finished_count == 1
assert added_to_collection_count >= 3
assert removed_from_collection_count == 1
assert upload_deleted_count == 1
assert added_to_collection_count >= 2
assert removed_from_collection_count == 2
assert collection_deleted_count == 1
# Check that we've had expected number of successful webhook notifications
r = requests.get(