Add webhooks for qaAnalysisStarted, qaAnalysisFinished, and crawlReviewed (#1974)

Fixes #1957 

Adds three new webhook events related to QA: analysis started, analysis
ended, and crawl reviewed.

Tests have been updated accordingly.

---------

Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
This commit is contained in:
Tessa Walsh 2024-07-25 19:53:49 -04:00 committed by GitHub
parent daeb7448f5
commit 551660bb62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 266 additions and 13 deletions

View File

@ -9,6 +9,7 @@ import urllib.parse
import asyncio import asyncio
from fastapi import HTTPException, Depends from fastapi import HTTPException, Depends
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
import pymongo
from .models import ( from .models import (
CrawlFile, CrawlFile,
@ -244,13 +245,19 @@ class BaseCrawlOps:
# update in db # update in db
result = await self.crawls.find_one_and_update( result = await self.crawls.find_one_and_update(
query, query, {"$set": update_values}, return_document=pymongo.ReturnDocument.AFTER
{"$set": update_values},
) )
if not result: if not result:
raise HTTPException(status_code=404, detail="crawl_not_found") raise HTTPException(status_code=404, detail="crawl_not_found")
if update_values.get("reviewStatus"):
crawl = BaseCrawl.from_dict(result)
await self.event_webhook_ops.create_crawl_reviewed_notification(
crawl.id, crawl.oid, crawl.reviewStatus, crawl.description
)
return {"updated": True} return {"updated": True}
async def update_crawl_state(self, crawl_id: str, state: str): async def update_crawl_state(self, crawl_id: str, state: str):

View File

@ -929,12 +929,15 @@ class CrawlOps(BaseCrawlOps):
if crawl.qa.finished and crawl.qa.state in NON_RUNNING_STATES: if crawl.qa.finished and crawl.qa.state in NON_RUNNING_STATES:
query[f"qaFinished.{crawl.qa.id}"] = crawl.qa.dict() query[f"qaFinished.{crawl.qa.id}"] = crawl.qa.dict()
if await self.crawls.find_one_and_update( res = await self.crawls.find_one_and_update(
{"_id": crawl_id, "type": "crawl"}, {"$set": query} {"_id": crawl_id, "type": "crawl"}, {"$set": query}
): )
return True
return False await self.event_webhook_ops.create_qa_analysis_finished_notification(
crawl.qa, crawl.oid, crawl.id
)
return res
async def get_qa_runs( async def get_qa_runs(
self, self,

View File

@ -1277,6 +1277,9 @@ class OrgWebhookUrls(BaseModel):
crawlStarted: Optional[AnyHttpUrl] = None crawlStarted: Optional[AnyHttpUrl] = None
crawlFinished: Optional[AnyHttpUrl] = None crawlFinished: Optional[AnyHttpUrl] = None
crawlDeleted: Optional[AnyHttpUrl] = None crawlDeleted: Optional[AnyHttpUrl] = None
qaAnalysisStarted: Optional[AnyHttpUrl] = None
qaAnalysisFinished: Optional[AnyHttpUrl] = None
crawlReviewed: Optional[AnyHttpUrl] = None
uploadFinished: Optional[AnyHttpUrl] = None uploadFinished: Optional[AnyHttpUrl] = None
uploadDeleted: Optional[AnyHttpUrl] = None uploadDeleted: Optional[AnyHttpUrl] = None
addedToCollection: Optional[AnyHttpUrl] = None addedToCollection: Optional[AnyHttpUrl] = None
@ -1735,6 +1738,11 @@ class WebhookEventType(str, Enum):
CRAWL_FINISHED = "crawlFinished" CRAWL_FINISHED = "crawlFinished"
CRAWL_DELETED = "crawlDeleted" CRAWL_DELETED = "crawlDeleted"
QA_ANALYSIS_STARTED = "qaAnalysisStarted"
QA_ANALYSIS_FINISHED = "qaAnalysisFinished"
CRAWL_REVIEWED = "crawlReviewed"
UPLOAD_FINISHED = "uploadFinished" UPLOAD_FINISHED = "uploadFinished"
UPLOAD_DELETED = "uploadDeleted" UPLOAD_DELETED = "uploadDeleted"
@ -1831,6 +1839,39 @@ class UploadDeletedBody(BaseArchivedItemBody):
event: Literal[WebhookEventType.UPLOAD_DELETED] = WebhookEventType.UPLOAD_DELETED event: Literal[WebhookEventType.UPLOAD_DELETED] = WebhookEventType.UPLOAD_DELETED
# ============================================================================
class QaAnalysisStartedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when qa analysis run starts"""
event: Literal[WebhookEventType.QA_ANALYSIS_STARTED] = (
WebhookEventType.QA_ANALYSIS_STARTED
)
qaRunId: str
# ============================================================================
class QaAnalysisFinishedBody(BaseArchivedItemFinishedBody):
"""Webhook notification POST body for when qa analysis run finishes"""
event: Literal[WebhookEventType.QA_ANALYSIS_FINISHED] = (
WebhookEventType.QA_ANALYSIS_FINISHED
)
qaRunId: str
# ============================================================================
class CrawlReviewedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when crawl is reviewed in qa"""
event: Literal[WebhookEventType.CRAWL_REVIEWED] = WebhookEventType.CRAWL_REVIEWED
reviewStatus: ReviewStatus
reviewStatusLabel: str
description: Optional[str] = None
# ============================================================================ # ============================================================================
class WebhookNotification(BaseMongoModel): class WebhookNotification(BaseMongoModel):
"""Base POST body model for webhook notifications""" """Base POST body model for webhook notifications"""
@ -1841,6 +1882,9 @@ class WebhookNotification(BaseMongoModel):
CrawlStartedBody, CrawlStartedBody,
CrawlFinishedBody, CrawlFinishedBody,
CrawlDeletedBody, CrawlDeletedBody,
QaAnalysisStartedBody,
QaAnalysisFinishedBody,
CrawlReviewedBody,
UploadFinishedBody, UploadFinishedBody,
UploadDeletedBody, UploadDeletedBody,
CollectionItemAddedBody, CollectionItemAddedBody,

View File

@ -720,7 +720,7 @@ class CrawlOperator(BaseOperator):
finalized = True finalized = True
if finalized and crawl.is_qa: if finalized and crawl.is_qa:
await self.crawl_ops.qa_run_finished(crawl.db_crawl_id) self.run_task(self.crawl_ops.qa_run_finished(crawl.db_crawl_id))
return { return {
"status": status.dict(exclude_none=True), "status": status.dict(exclude_none=True),
@ -816,11 +816,18 @@ class CrawlOperator(BaseOperator):
crawl, crawl,
allowed_from=["starting", "waiting_capacity"], allowed_from=["starting", "waiting_capacity"],
): ):
self.run_task( if not crawl.qa_source_crawl_id:
self.event_webhook_ops.create_crawl_started_notification( self.run_task(
crawl.id, crawl.oid, scheduled=crawl.scheduled self.event_webhook_ops.create_crawl_started_notification(
crawl.id, crawl.oid, scheduled=crawl.scheduled
)
)
else:
self.run_task(
self.event_webhook_ops.create_qa_analysis_started_notification(
crawl.id, crawl.oid, crawl.qa_source_crawl_id
)
) )
)
# update lastActiveTime if crawler is running # update lastActiveTime if crawler is running
if crawler_running: if crawler_running:

View File

@ -15,6 +15,9 @@ from .models import (
CrawlStartedBody, CrawlStartedBody,
CrawlFinishedBody, CrawlFinishedBody,
CrawlDeletedBody, CrawlDeletedBody,
QaAnalysisStartedBody,
QaAnalysisFinishedBody,
CrawlReviewedBody,
UploadFinishedBody, UploadFinishedBody,
UploadDeletedBody, UploadDeletedBody,
CollectionItemAddedBody, CollectionItemAddedBody,
@ -22,6 +25,7 @@ from .models import (
CollectionDeletedBody, CollectionDeletedBody,
PaginatedWebhookNotificationResponse, PaginatedWebhookNotificationResponse,
Organization, Organization,
QARun,
) )
from .utils import dt_now from .utils import dt_now
@ -195,7 +199,7 @@ class EventWebhookOps:
crawl_id: str, crawl_id: str,
org: Organization, org: Organization,
event: str, event: str,
body: Union[CrawlFinishedBody, UploadFinishedBody], body: Union[CrawlFinishedBody, QaAnalysisFinishedBody, UploadFinishedBody],
): ):
"""Create webhook notification for finished crawl/upload.""" """Create webhook notification for finished crawl/upload."""
crawl = await self.crawl_ops.get_crawl_out(crawl_id, org) crawl = await self.crawl_ops.get_crawl_out(crawl_id, org)
@ -263,6 +267,46 @@ class EventWebhookOps:
), ),
) )
async def create_qa_analysis_finished_notification(
self, qa_run: QARun, oid: UUID, crawl_id: str
) -> None:
"""Create webhook notification for finished qa analysis run."""
org = await self.org_ops.get_org_by_id(oid)
if not org.webhookUrls or not org.webhookUrls.qaAnalysisFinished:
return
qa_resources = []
# Check both crawl.qa and crawl.qaFinished for files because we don't
# know for certain what state the crawl will be in at this point
try:
qa_resources = await self.crawl_ops.resolve_signed_urls(
qa_run.files, org, crawl_id, qa_run.id
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Error trying to get QA run resources: {err}", flush=True)
notification = WebhookNotification(
id=uuid4(),
event=WebhookEventType.QA_ANALYSIS_FINISHED,
oid=oid,
body=QaAnalysisFinishedBody(
itemId=crawl_id,
qaRunId=qa_run.id,
orgId=str(org.id),
state=qa_run.state,
resources=qa_resources,
),
created=dt_now(),
)
await self.webhooks.insert_one(notification.to_dict())
await self.send_notification(org, notification)
async def create_crawl_deleted_notification( async def create_crawl_deleted_notification(
self, crawl_id: str, org: Organization self, crawl_id: str, org: Organization
) -> None: ) -> None:
@ -345,6 +389,82 @@ class EventWebhookOps:
await self.send_notification(org, notification) await self.send_notification(org, notification)
async def create_qa_analysis_started_notification(
self, qa_run_id: str, oid: UUID, crawl_id: str
) -> None:
"""Create webhook notification for started qa analysis run."""
org = await self.org_ops.get_org_by_id(oid)
if not org.webhookUrls or not org.webhookUrls.qaAnalysisStarted:
return
# Check if already created this event
existing_notification = await self.webhooks.find_one(
{
"event": WebhookEventType.QA_ANALYSIS_STARTED,
"body.qaRunId": qa_run_id,
}
)
if existing_notification:
return
notification = WebhookNotification(
id=uuid4(),
event=WebhookEventType.QA_ANALYSIS_STARTED,
oid=oid,
body=QaAnalysisStartedBody(
itemId=crawl_id,
qaRunId=qa_run_id,
orgId=str(oid),
),
created=dt_now(),
)
await self.webhooks.insert_one(notification.to_dict())
await self.send_notification(org, notification)
async def create_crawl_reviewed_notification(
self,
crawl_id: str,
oid: UUID,
review_status: Optional[int],
description: Optional[str],
) -> None:
"""Create webhook notification for crawl being reviewed in qa"""
org = await self.org_ops.get_org_by_id(oid)
if not org.webhookUrls or not org.webhookUrls.crawlReviewed:
return
review_status_labels = {
1: "Bad",
2: "Poor",
3: "Fair",
4: "Good",
5: "Excellent",
}
notification = WebhookNotification(
id=uuid4(),
event=WebhookEventType.CRAWL_REVIEWED,
oid=oid,
body=CrawlReviewedBody(
itemId=crawl_id,
orgId=str(oid),
reviewStatus=review_status,
reviewStatusLabel=(
review_status_labels.get(review_status, "") if review_status else ""
),
description=description,
),
created=dt_now(),
)
await self.webhooks.insert_one(notification.to_dict())
await self.send_notification(org, notification)
async def _create_collection_items_modified_notification( async def _create_collection_items_modified_notification(
self, self,
coll_id: UUID, coll_id: UUID,
@ -507,6 +627,18 @@ def init_openapi_webhooks(app):
def crawl_deleted(body: CrawlDeletedBody): def crawl_deleted(body: CrawlDeletedBody):
"""Sent when a crawl is deleted""" """Sent when a crawl is deleted"""
@app.webhooks.post(WebhookEventType.QA_ANALYSIS_STARTED)
def qa_analysis_started(body: QaAnalysisStartedBody):
"""Sent when a qa analysis run is started"""
@app.webhooks.post(WebhookEventType.QA_ANALYSIS_FINISHED)
def qa_analysis_finished(body: QaAnalysisFinishedBody):
"""Sent when a qa analysis run has finished"""
@app.webhooks.post(WebhookEventType.CRAWL_REVIEWED)
def crawl_reviewed(body: CrawlReviewedBody):
"""Sent when a crawl has been reviewed in qa"""
@app.webhooks.post(WebhookEventType.UPLOAD_FINISHED) @app.webhooks.post(WebhookEventType.UPLOAD_FINISHED)
def upload_finished(body: UploadFinishedBody): def upload_finished(body: UploadFinishedBody):
"""Sent when an upload has finished""" """Sent when an upload has finished"""

View File

@ -141,6 +141,9 @@ def test_webhooks_sent(
"crawlStarted": ECHO_SERVER_URL_FROM_K8S, "crawlStarted": ECHO_SERVER_URL_FROM_K8S,
"crawlFinished": ECHO_SERVER_URL_FROM_K8S, "crawlFinished": ECHO_SERVER_URL_FROM_K8S,
"crawlDeleted": ECHO_SERVER_URL_FROM_K8S, "crawlDeleted": ECHO_SERVER_URL_FROM_K8S,
"qaAnalysisStarted": ECHO_SERVER_URL_FROM_K8S,
"qaAnalysisFinished": ECHO_SERVER_URL_FROM_K8S,
"crawlReviewed": ECHO_SERVER_URL_FROM_K8S,
"uploadFinished": ECHO_SERVER_URL_FROM_K8S, "uploadFinished": ECHO_SERVER_URL_FROM_K8S,
"uploadDeleted": ECHO_SERVER_URL_FROM_K8S, "uploadDeleted": ECHO_SERVER_URL_FROM_K8S,
"addedToCollection": ECHO_SERVER_URL_FROM_K8S, "addedToCollection": ECHO_SERVER_URL_FROM_K8S,
@ -195,6 +198,42 @@ def test_webhooks_sent(
break break
time.sleep(5) time.sleep(5)
# Run QA analysis on crawl
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{webhooks_crawl_id}/qa/start",
headers=admin_auth_headers,
)
assert r.status_code == 200
qa_run_id = r.json()["started"]
# Wait for QA to complete
count = 0
max_attempts = 24
while count < max_attempts:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{webhooks_crawl_id}/qa/activeQA",
headers=admin_auth_headers,
)
data = r.json()
if not data["qa"]:
break
if count + 1 == max_attempts:
assert False
time.sleep(5)
count += 1
# Review crawl
r = requests.patch(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{webhooks_crawl_id}",
headers=admin_auth_headers,
json={"reviewStatus": 5, "description": "Perfect crawl"},
)
assert r.status_code == 200
# Create upload and add to collection # Create upload and add to collection
with open(os.path.join(curr_dir, "data", "example.wacz"), "rb") as fh: with open(os.path.join(curr_dir, "data", "example.wacz"), "rb") as fh:
r = requests.put( r = requests.put(
@ -267,6 +306,9 @@ def test_webhooks_sent(
crawl_started_count = 0 crawl_started_count = 0
crawl_finished_count = 0 crawl_finished_count = 0
crawl_deleted_count = 0 crawl_deleted_count = 0
qa_analysis_started_count = 0
qa_analysis_finished_count = 0
crawl_reviewed_count = 0
upload_finished_count = 0 upload_finished_count = 0
upload_deleted_count = 0 upload_deleted_count = 0
added_to_collection_count = 0 added_to_collection_count = 0
@ -294,6 +336,21 @@ def test_webhooks_sent(
crawl_deleted_count += 1 crawl_deleted_count += 1
assert post["itemId"] assert post["itemId"]
elif event == "qaAnalysisStarted":
qa_analysis_started_count += 1
assert post["itemId"] == webhooks_crawl_id
assert post["qaRunId"] == qa_run_id
elif event == "qaAnalysisFinished":
qa_analysis_finished_count += 1
assert post["itemId"] == webhooks_crawl_id
assert post["qaRunId"] == qa_run_id
assert post["resources"]
elif event == "crawlReviewed":
crawl_reviewed_count += 1
assert post["itemId"] == webhooks_crawl_id
elif event == "uploadFinished": elif event == "uploadFinished":
upload_finished_count += 1 upload_finished_count += 1
assert post["itemId"] assert post["itemId"]
@ -327,6 +384,9 @@ def test_webhooks_sent(
assert crawl_started_count >= 1 assert crawl_started_count >= 1
assert crawl_finished_count >= 1 assert crawl_finished_count >= 1
assert crawl_deleted_count == 1 assert crawl_deleted_count == 1
assert qa_analysis_started_count == 1
assert qa_analysis_finished_count == 1
assert crawl_reviewed_count == 1
assert upload_finished_count == 1 assert upload_finished_count == 1
assert upload_deleted_count == 1 assert upload_deleted_count == 1
assert added_to_collection_count >= 2 assert added_to_collection_count >= 2
@ -339,4 +399,4 @@ def test_webhooks_sent(
headers=admin_auth_headers, headers=admin_auth_headers,
) )
assert r.status_code == 200 assert r.status_code == 200
assert r.json()["total"] >= 7 assert r.json()["total"] >= 10