Add event webhook tests (#1155)

* Add success filter to webhook list GET endpoint

* Add sorting to webhooks list API and add event filter

* Test webhooks via echo server

* Set address to echo server on host from CI env var for k3d and microk8s

* Add -s back to pytest command for k3d ci

* Change pytest test path to avoid hanging on collecting tests

* Revert microk8s to only run on push to main
This commit is contained in:
Tessa Walsh 2023-09-13 01:08:40 -04:00 committed by GitHub
parent f980c3c509
commit 7cf2b11eb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 316 additions and 26 deletions

View File

@ -11,6 +11,9 @@ on:
- 'backend/**'
- 'chart/**'
env:
ECHO_SERVER_HOST_URL: http://host.k3d.internal:18080
jobs:
btrix-k3d-test:
runs-on: ubuntu-latest
@ -82,7 +85,7 @@ jobs:
run: kubectl wait --for=condition=ready pod --all --timeout=240s
- name: Run Tests
run: pytest -s -vv ./backend/test/*.py
run: pytest -s -vv ./backend/test/test_*.py
- name: Print Backend Logs (API)
if: ${{ failure() }}

View File

@ -76,7 +76,7 @@ jobs:
run: kubectl wait --for=condition=ready pod --all --timeout=240s
- name: Run Tests
run: pytest -vv ./backend/test_nightly/*.py
run: pytest -vv ./backend/test_nightly/test_*.py
- name: Print Backend Logs (API)
if: ${{ failure() }}

View File

@ -7,6 +7,10 @@ on:
paths:
- 'backend/**'
- 'chart/**'
env:
ECHO_SERVER_HOST_URL: http://10.0.1.1:18080
jobs:
btrix-microk8s-test:
runs-on: ubuntu-latest
@ -14,7 +18,7 @@ jobs:
- uses: balchua/microk8s-actions@v0.3.1
with:
channel: '1.25/stable'
addons: '["dns", "helm3", "hostpath-storage", "registry", "metrics-server"]'
addons: '["dns", "helm3", "hostpath-storage", "registry", "metrics-server", "host-access"]'
- name: Checkout
uses: actions/checkout@v3
@ -60,7 +64,7 @@ jobs:
run: sudo microk8s kubectl wait --for=condition=ready pod --all --timeout=240s
- name: Run Tests
run: pytest -vv ./backend/test/*.py
run: pytest -vv ./backend/test/test_*.py
- name: Print Backend Logs
if: ${{ failure() }}

View File

@ -200,7 +200,7 @@ class CollectionOps:
name_prefix: Optional[str] = None,
):
"""List all collections for org"""
# pylint: disable=too-many-locals
# pylint: disable=too-many-locals, duplicate-code
# Zero-index page for query
page = page - 1
skip = page * page_size

View File

@ -633,11 +633,11 @@ class OrgQuotas(BaseModel):
class OrgWebhookUrls(BaseModel):
"""Organization webhook URLs"""
crawlStarted: Optional[HttpUrl]
crawlFinished: Optional[HttpUrl]
uploadFinished: Optional[HttpUrl]
addedToCollection: Optional[HttpUrl]
removedFromCollection: Optional[HttpUrl]
crawlStarted: Optional[AnyHttpUrl]
crawlFinished: Optional[AnyHttpUrl]
uploadFinished: Optional[AnyHttpUrl]
addedToCollection: Optional[AnyHttpUrl]
removedFromCollection: Optional[AnyHttpUrl]
# ============================================================================
@ -953,6 +953,7 @@ class UploadFinishedBody(BaseArchivedItemBody):
"""Webhook notification POST body for when upload finishes"""
event: str = Field(WebhookEventType.UPLOAD_FINISHED, const=True)
state: str
# ============================================================================

View File

@ -2,7 +2,7 @@
import asyncio
from datetime import datetime
from typing import List, Union
from typing import List, Union, Optional
import uuid
import aiohttp
@ -28,6 +28,8 @@ from .models import (
class EventWebhookOps:
"""Event webhook notification management"""
# pylint: disable=invalid-name, too-many-arguments, too-many-locals
def __init__(self, mdb, org_ops):
self.webhooks = mdb["webhooks"]
self.colls = mdb["collections"]
@ -53,19 +55,62 @@ class EventWebhookOps:
org: Organization,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
event: Optional[str] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[int] = -1,
):
"""List all webhook notifications"""
# pylint: disable=duplicate-code
# Zero-index page for query
page = page - 1
skip = page_size * page
query = {"oid": org.id}
total = await self.webhooks.count_documents(query)
if success in (True, False):
query["success"] = success
cursor = self.webhooks.find(query, skip=skip, limit=page_size)
results = await cursor.to_list(length=page_size)
notifications = [WebhookNotification.from_dict(res) for res in results]
if event:
query["event"] = event
aggregate = [{"$match": query}]
if sort_by:
SORT_FIELDS = ("success", "event", "attempts", "created", "lastAttempted")
if sort_by not in SORT_FIELDS:
raise HTTPException(status_code=400, detail="invalid_sort_by")
if sort_direction not in (1, -1):
raise HTTPException(status_code=400, detail="invalid_sort_direction")
aggregate.extend([{"$sort": {sort_by: sort_direction}}])
aggregate.extend(
[
{
"$facet": {
"items": [
{"$skip": skip},
{"$limit": page_size},
],
"total": [{"$count": "count"}],
}
},
]
)
# Get total
cursor = self.webhooks.aggregate(aggregate)
results = await cursor.to_list(length=1)
result = results[0]
items = result["items"]
try:
total = int(result["total"][0]["count"])
except (IndexError, ValueError):
total = 0
notifications = [WebhookNotification.from_dict(res) for res in items]
return notifications, total
@ -79,7 +124,12 @@ class EventWebhookOps:
return WebhookNotification.from_dict(res)
@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=5, max_time=60)
@backoff.on_exception(
backoff.expo,
(aiohttp.ClientError, aiohttp.client_exceptions.ClientConnectorError),
max_tries=5,
max_time=60,
)
async def send_notification(
self, org: Organization, notification: WebhookNotification
):
@ -183,10 +233,7 @@ class EventWebhookOps:
),
)
async def create_upload_finished_notification(
self,
crawl_id: str,
):
async def create_upload_finished_notification(self, crawl_id: str):
"""Create webhook notification for finished upload."""
crawl_res = await self.crawls.find_one({"_id": crawl_id})
org = await self.org_ops.get_org_by_id(crawl_res["oid"])
@ -199,8 +246,7 @@ class EventWebhookOps:
org,
event=WebhookEventType.UPLOAD_FINISHED,
body=UploadFinishedBody(
itemId=crawl_id,
orgId=str(org.id),
itemId=crawl_id, orgId=str(org.id), state="complete"
),
)
@ -313,6 +359,7 @@ class EventWebhookOps:
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
def init_event_webhooks_api(mdb, org_ops):
"""init event webhooks system"""
# pylint: disable=invalid-name
ops = EventWebhookOps(mdb, org_ops)
@ -325,9 +372,19 @@ def init_event_webhooks_api(mdb, org_ops):
org: Organization = Depends(org_owner_dep),
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
event: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
):
notifications, total = await ops.list_notifications(
org, page_size=pageSize, page=page
org,
page_size=pageSize,
page=page,
success=success,
event=event,
sort_by=sortBy,
sort_direction=sortDirection,
)
return paginated_format(notifications, total, page, pageSize)

View File

@ -1,5 +1,8 @@
import os
import pytest
import requests
import socket
import subprocess
import time
@ -24,6 +27,8 @@ NON_DEFAULT_ORG_NAME = "Non-default org"
FINISHED_STATES = ("complete", "partial_complete", "canceled", "failed")
curr_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
@pytest.fixture(scope="session")
def admin_auth_headers():
@ -401,3 +406,16 @@ def uploads_collection_id(crawler_auth_headers, default_org_id):
)
assert r.status_code == 200
return r.json()["id"]
@pytest.fixture(scope="function")
def echo_server():
print(f"Echo server starting", flush=True)
p = subprocess.Popen(["python3", os.path.join(curr_dir, "echo_server.py")])
print(f"Echo server started", flush=True)
time.sleep(1)
yield p
time.sleep(10)
print(f"Echo server terminating", flush=True)
p.terminate()
print(f"Echo server terminated", flush=True)

View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
"""
A web server to record POST requests and return them on a GET request
"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
BIND_HOST = "0.0.0.0"
PORT = 18080
post_bodies = []
class EchoServerHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"post_bodies": post_bodies}).encode("utf-8"))
def do_POST(self):
content_length = int(self.headers.get("content-length", 0))
body = self.rfile.read(content_length)
self.send_response(200)
self.end_headers()
post_bodies.append(json.loads(body.decode("utf-8").replace("'", '"')))
httpd = HTTPServer((BIND_HOST, PORT), EchoServerHTTPRequestHandler)
httpd.serve_forever()

View File

@ -1,11 +1,25 @@
import json
import os
import time
import requests
from .conftest import API_PREFIX
from .utils import read_in_chunks
_webhook_event_id = None
curr_dir = os.path.dirname(os.path.realpath(__file__))
ECHO_SERVER_URL = "http://localhost:18080"
# Pull address to echo server running on host from CI env var.
# If not set, default to host.docker.internal (for local testing with
# Docker Desktop).
ECHO_SERVER_URL_FROM_K8S = os.environ.get(
"ECHO_SERVER_HOST_URL", "http://host.docker.internal:18080"
)
def test_list_webhook_events(admin_auth_headers, default_org_id):
# Verify that webhook URLs have been set in previous tests
@ -89,7 +103,7 @@ def test_retry_webhook_event(admin_auth_headers, default_org_id):
assert r.status_code == 200
assert r.json()["success"]
# Give it some time to run
# Give it some time to run with exponential backoff retries
time.sleep(90)
# Verify attempts have been increased
@ -107,3 +121,167 @@ def test_retry_webhook_event(admin_auth_headers, default_org_id):
assert item["attempts"] == 2
assert item["created"]
assert item["lastAttempted"]
def test_webhooks_sent(
admin_auth_headers,
default_org_id,
all_crawls_crawl_id,
echo_server,
):
# Reconfigure event webhooks to use echo server
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/event-webhook-urls",
headers=admin_auth_headers,
json={
"crawlStarted": ECHO_SERVER_URL_FROM_K8S,
"crawlFinished": ECHO_SERVER_URL_FROM_K8S,
"uploadFinished": ECHO_SERVER_URL_FROM_K8S,
"addedToCollection": ECHO_SERVER_URL_FROM_K8S,
"removedFromCollection": ECHO_SERVER_URL_FROM_K8S,
},
)
assert r.status_code == 200
assert r.json()["updated"]
# Create collection with all_crawls_crawl_id already in it
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/collections",
headers=admin_auth_headers,
json={
"name": "Event webhooks test collection",
"crawlIds": [all_crawls_crawl_id],
},
)
assert r.status_code == 200
webhooks_coll_id = r.json()["id"]
assert webhooks_coll_id
# Create and run workflow that adds crawl to collection
crawl_data = {
"runNow": True,
"name": "Webhook crawl test",
"autoAddCollections": [webhooks_coll_id],
"config": {
"seeds": [{"url": "https://webrecorder.net/"}],
},
}
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
headers=admin_auth_headers,
json=crawl_data,
)
assert r.status_code == 200
data = r.json()
webhooks_config_id = data["id"]
assert webhooks_config_id
webhooks_crawl_id = data["run_now_job"]
# Wait for crawl to complete
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{webhooks_crawl_id}/replay.json",
headers=admin_auth_headers,
)
data = r.json()
if data["state"] == "complete":
break
time.sleep(5)
# Create upload and add to collection
with open(os.path.join(curr_dir, "data", "example.wacz"), "rb") as fh:
r = requests.put(
f"{API_PREFIX}/orgs/{default_org_id}/uploads/stream?filename=webhookstest.wacz&name=Webhooks%20Upload&collections={webhooks_coll_id}",
headers=admin_auth_headers,
data=read_in_chunks(fh),
)
assert r.status_code == 200
data = r.json()
assert data["added"]
webhooks_upload_id = data["id"]
# Remove upload from collection
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}/remove",
json={"crawlIds": [webhooks_upload_id]},
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["id"]
# Re-add upload to collection
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}/add",
json={"crawlIds": [webhooks_upload_id]},
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["id"]
# Wait to ensure async notifications are all sent
time.sleep(10)
# Send GET request to echo server to retrieve and verify POSTed data
r = requests.get(ECHO_SERVER_URL)
assert r.status_code == 200
data = r.json()
crawl_started_count = 0
crawl_finished_count = 0
upload_finished_count = 0
added_to_collection_count = 0
removed_from_collection_count = 0
for post in data["post_bodies"]:
assert post["orgId"]
event = post["event"]
assert event
if event == "crawlStarted":
crawl_started_count += 1
assert post["itemId"]
assert post["scheduled"] in (True, False)
assert post.get("downloadUrls") is None
elif event == "crawlFinished":
crawl_finished_count += 1
assert post["itemId"]
assert post["state"]
assert post["downloadUrls"]
elif event == "uploadFinished":
upload_finished_count += 1
assert post["itemId"]
assert post["state"]
assert post["downloadUrls"]
elif event == "addedToCollection":
added_to_collection_count += 1
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
assert post["itemIds"]
assert post["collectionId"]
elif event == "removedFromCollection":
removed_from_collection_count += 1
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
assert post["itemIds"]
assert post["collectionId"]
# Allow for some variability here due to timing of crawls
assert crawl_started_count >= 1
assert crawl_finished_count >= 1
assert upload_finished_count == 1
assert added_to_collection_count >= 3
assert removed_from_collection_count == 1
# Check that we've had expected number of successful webhook notifications
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/webhooks?success=True",
headers=admin_auth_headers,
)
assert r.status_code == 200
assert r.json()["total"] >= 7

4
btrix
View File

@ -79,12 +79,12 @@ resetMicrok8s(){
runTests() {
echo "Running backend tests..."
python3 -m pytest backend/test/*.py
python3 -m pytest backend/test/test_*.py
}
runNightlyTests() {
echo "Running nightly backend tests..."
python3 -m pytest backend/test_nightly/*.py
python3 -m pytest backend/test_nightly/test_*.py
}
CONTEXT=$(cat ~/.kube/config | grep "current-context:" | sed "s/current-context: //")