From 7cf2b11eb752dce01000a4f3efec1c1c2fbc4e70 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Wed, 13 Sep 2023 01:08:40 -0400 Subject: [PATCH] Add event webhook tests (#1155) * Add success filter to webhook list GET endpoint * Add sorting to webhooks list API and add event filter * Test webhooks via echo server * Set address to echo server on host from CI env var for k3d and microk8s * Add -s back to pytest command for k3d ci * Change pytest test path to avoid hanging on collecting tests * Revert microk8s to only run on push to main --- .github/workflows/k3d-ci.yaml | 5 +- .github/workflows/k3d-nightly-ci.yaml | 2 +- .github/workflows/microk8s-ci.yaml | 8 +- backend/btrixcloud/colls.py | 2 +- backend/btrixcloud/models.py | 11 +- backend/btrixcloud/webhooks.py | 83 ++++++++++-- backend/test/conftest.py | 18 +++ backend/test/echo_server.py | 29 +++++ backend/test/test_webhooks.py | 180 +++++++++++++++++++++++++- btrix | 4 +- 10 files changed, 316 insertions(+), 26 deletions(-) create mode 100644 backend/test/echo_server.py diff --git a/.github/workflows/k3d-ci.yaml b/.github/workflows/k3d-ci.yaml index 73180334..4750cbb7 100644 --- a/.github/workflows/k3d-ci.yaml +++ b/.github/workflows/k3d-ci.yaml @@ -11,6 +11,9 @@ on: - 'backend/**' - 'chart/**' +env: + ECHO_SERVER_HOST_URL: http://host.k3d.internal:18080 + jobs: btrix-k3d-test: runs-on: ubuntu-latest @@ -82,7 +85,7 @@ jobs: run: kubectl wait --for=condition=ready pod --all --timeout=240s - name: Run Tests - run: pytest -s -vv ./backend/test/*.py + run: pytest -s -vv ./backend/test/test_*.py - name: Print Backend Logs (API) if: ${{ failure() }} diff --git a/.github/workflows/k3d-nightly-ci.yaml b/.github/workflows/k3d-nightly-ci.yaml index cb382793..fcac8ac0 100644 --- a/.github/workflows/k3d-nightly-ci.yaml +++ b/.github/workflows/k3d-nightly-ci.yaml @@ -76,7 +76,7 @@ jobs: run: kubectl wait --for=condition=ready pod --all --timeout=240s - name: Run Tests - run: pytest -vv ./backend/test_nightly/*.py + run: pytest -vv ./backend/test_nightly/test_*.py - name: Print Backend Logs (API) if: ${{ failure() }} diff --git a/.github/workflows/microk8s-ci.yaml b/.github/workflows/microk8s-ci.yaml index 83febfc2..cbb9ff23 100644 --- a/.github/workflows/microk8s-ci.yaml +++ b/.github/workflows/microk8s-ci.yaml @@ -7,6 +7,10 @@ on: paths: - 'backend/**' - 'chart/**' + +env: + ECHO_SERVER_HOST_URL: http://10.0.1.1:18080 + jobs: btrix-microk8s-test: runs-on: ubuntu-latest @@ -14,7 +18,7 @@ jobs: - uses: balchua/microk8s-actions@v0.3.1 with: channel: '1.25/stable' - addons: '["dns", "helm3", "hostpath-storage", "registry", "metrics-server"]' + addons: '["dns", "helm3", "hostpath-storage", "registry", "metrics-server", "host-access"]' - name: Checkout uses: actions/checkout@v3 @@ -60,7 +64,7 @@ jobs: run: sudo microk8s kubectl wait --for=condition=ready pod --all --timeout=240s - name: Run Tests - run: pytest -vv ./backend/test/*.py + run: pytest -vv ./backend/test/test_*.py - name: Print Backend Logs if: ${{ failure() }} diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index 5eeaece2..73f9a498 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -200,7 +200,7 @@ class CollectionOps: name_prefix: Optional[str] = None, ): """List all collections for org""" - # pylint: disable=too-many-locals + # pylint: disable=too-many-locals, duplicate-code # Zero-index page for query page = page - 1 skip = page * page_size diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index d4bc9675..6f651e14 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -633,11 +633,11 @@ class OrgQuotas(BaseModel): class OrgWebhookUrls(BaseModel): """Organization webhook URLs""" - crawlStarted: Optional[HttpUrl] - crawlFinished: Optional[HttpUrl] - uploadFinished: Optional[HttpUrl] - addedToCollection: Optional[HttpUrl] - removedFromCollection: Optional[HttpUrl] + crawlStarted: Optional[AnyHttpUrl] + crawlFinished: Optional[AnyHttpUrl] + uploadFinished: Optional[AnyHttpUrl] + addedToCollection: Optional[AnyHttpUrl] + removedFromCollection: Optional[AnyHttpUrl] # ============================================================================ @@ -953,6 +953,7 @@ class UploadFinishedBody(BaseArchivedItemBody): """Webhook notification POST body for when upload finishes""" event: str = Field(WebhookEventType.UPLOAD_FINISHED, const=True) + state: str # ============================================================================ diff --git a/backend/btrixcloud/webhooks.py b/backend/btrixcloud/webhooks.py index 3c086955..35223182 100644 --- a/backend/btrixcloud/webhooks.py +++ b/backend/btrixcloud/webhooks.py @@ -2,7 +2,7 @@ import asyncio from datetime import datetime -from typing import List, Union +from typing import List, Union, Optional import uuid import aiohttp @@ -28,6 +28,8 @@ from .models import ( class EventWebhookOps: """Event webhook notification management""" + # pylint: disable=invalid-name, too-many-arguments, too-many-locals + def __init__(self, mdb, org_ops): self.webhooks = mdb["webhooks"] self.colls = mdb["collections"] @@ -53,19 +55,62 @@ class EventWebhookOps: org: Organization, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1, + success: Optional[bool] = None, + event: Optional[str] = None, + sort_by: Optional[str] = None, + sort_direction: Optional[int] = -1, ): """List all webhook notifications""" + # pylint: disable=duplicate-code # Zero-index page for query page = page - 1 skip = page_size * page query = {"oid": org.id} - total = await self.webhooks.count_documents(query) + if success in (True, False): + query["success"] = success - cursor = self.webhooks.find(query, skip=skip, limit=page_size) - results = await cursor.to_list(length=page_size) - notifications = [WebhookNotification.from_dict(res) for res in results] + if event: + query["event"] = event + + aggregate = [{"$match": query}] + + if sort_by: + SORT_FIELDS = ("success", "event", "attempts", "created", "lastAttempted") + if sort_by not in SORT_FIELDS: + raise HTTPException(status_code=400, detail="invalid_sort_by") + if sort_direction not in (1, -1): + raise HTTPException(status_code=400, detail="invalid_sort_direction") + + aggregate.extend([{"$sort": {sort_by: sort_direction}}]) + + aggregate.extend( + [ + { + "$facet": { + "items": [ + {"$skip": skip}, + {"$limit": page_size}, + ], + "total": [{"$count": "count"}], + } + }, + ] + ) + + # Get total + cursor = self.webhooks.aggregate(aggregate) + results = await cursor.to_list(length=1) + result = results[0] + items = result["items"] + + try: + total = int(result["total"][0]["count"]) + except (IndexError, ValueError): + total = 0 + + notifications = [WebhookNotification.from_dict(res) for res in items] return notifications, total @@ -79,7 +124,12 @@ class EventWebhookOps: return WebhookNotification.from_dict(res) - @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=5, max_time=60) + @backoff.on_exception( + backoff.expo, + (aiohttp.ClientError, aiohttp.client_exceptions.ClientConnectorError), + max_tries=5, + max_time=60, + ) async def send_notification( self, org: Organization, notification: WebhookNotification ): @@ -183,10 +233,7 @@ class EventWebhookOps: ), ) - async def create_upload_finished_notification( - self, - crawl_id: str, - ): + async def create_upload_finished_notification(self, crawl_id: str): """Create webhook notification for finished upload.""" crawl_res = await self.crawls.find_one({"_id": crawl_id}) org = await self.org_ops.get_org_by_id(crawl_res["oid"]) @@ -199,8 +246,7 @@ class EventWebhookOps: org, event=WebhookEventType.UPLOAD_FINISHED, body=UploadFinishedBody( - itemId=crawl_id, - orgId=str(org.id), + itemId=crawl_id, orgId=str(org.id), state="complete" ), ) @@ -313,6 +359,7 @@ class EventWebhookOps: # pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme def init_event_webhooks_api(mdb, org_ops): """init event webhooks system""" + # pylint: disable=invalid-name ops = EventWebhookOps(mdb, org_ops) @@ -325,9 +372,19 @@ def init_event_webhooks_api(mdb, org_ops): org: Organization = Depends(org_owner_dep), pageSize: int = DEFAULT_PAGE_SIZE, page: int = 1, + success: Optional[bool] = None, + event: Optional[str] = None, + sortBy: Optional[str] = None, + sortDirection: Optional[int] = -1, ): notifications, total = await ops.list_notifications( - org, page_size=pageSize, page=page + org, + page_size=pageSize, + page=page, + success=success, + event=event, + sort_by=sortBy, + sort_direction=sortDirection, ) return paginated_format(notifications, total, page, pageSize) diff --git a/backend/test/conftest.py b/backend/test/conftest.py index 2ec39e81..9fbfa963 100644 --- a/backend/test/conftest.py +++ b/backend/test/conftest.py @@ -1,5 +1,8 @@ +import os import pytest import requests +import socket +import subprocess import time @@ -24,6 +27,8 @@ NON_DEFAULT_ORG_NAME = "Non-default org" FINISHED_STATES = ("complete", "partial_complete", "canceled", "failed") +curr_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) + @pytest.fixture(scope="session") def admin_auth_headers(): @@ -401,3 +406,16 @@ def uploads_collection_id(crawler_auth_headers, default_org_id): ) assert r.status_code == 200 return r.json()["id"] + + +@pytest.fixture(scope="function") +def echo_server(): + print(f"Echo server starting", flush=True) + p = subprocess.Popen(["python3", os.path.join(curr_dir, "echo_server.py")]) + print(f"Echo server started", flush=True) + time.sleep(1) + yield p + time.sleep(10) + print(f"Echo server terminating", flush=True) + p.terminate() + print(f"Echo server terminated", flush=True) diff --git a/backend/test/echo_server.py b/backend/test/echo_server.py new file mode 100644 index 00000000..f858c776 --- /dev/null +++ b/backend/test/echo_server.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +""" +A web server to record POST requests and return them on a GET request +""" +from http.server import HTTPServer, BaseHTTPRequestHandler +import json + +BIND_HOST = "0.0.0.0" +PORT = 18080 + +post_bodies = [] + + +class EchoServerHTTPRequestHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps({"post_bodies": post_bodies}).encode("utf-8")) + + def do_POST(self): + content_length = int(self.headers.get("content-length", 0)) + body = self.rfile.read(content_length) + self.send_response(200) + self.end_headers() + post_bodies.append(json.loads(body.decode("utf-8").replace("'", '"'))) + + +httpd = HTTPServer((BIND_HOST, PORT), EchoServerHTTPRequestHandler) +httpd.serve_forever() diff --git a/backend/test/test_webhooks.py b/backend/test/test_webhooks.py index 04153f3b..7b515786 100644 --- a/backend/test/test_webhooks.py +++ b/backend/test/test_webhooks.py @@ -1,11 +1,25 @@ +import json +import os import time import requests from .conftest import API_PREFIX +from .utils import read_in_chunks _webhook_event_id = None +curr_dir = os.path.dirname(os.path.realpath(__file__)) + +ECHO_SERVER_URL = "http://localhost:18080" + +# Pull address to echo server running on host from CI env var. +# If not set, default to host.docker.internal (for local testing with +# Docker Desktop). +ECHO_SERVER_URL_FROM_K8S = os.environ.get( + "ECHO_SERVER_HOST_URL", "http://host.docker.internal:18080" +) + def test_list_webhook_events(admin_auth_headers, default_org_id): # Verify that webhook URLs have been set in previous tests @@ -89,7 +103,7 @@ def test_retry_webhook_event(admin_auth_headers, default_org_id): assert r.status_code == 200 assert r.json()["success"] - # Give it some time to run + # Give it some time to run with exponential backoff retries time.sleep(90) # Verify attempts have been increased @@ -107,3 +121,167 @@ def test_retry_webhook_event(admin_auth_headers, default_org_id): assert item["attempts"] == 2 assert item["created"] assert item["lastAttempted"] + + +def test_webhooks_sent( + admin_auth_headers, + default_org_id, + all_crawls_crawl_id, + echo_server, +): + # Reconfigure event webhooks to use echo server + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/event-webhook-urls", + headers=admin_auth_headers, + json={ + "crawlStarted": ECHO_SERVER_URL_FROM_K8S, + "crawlFinished": ECHO_SERVER_URL_FROM_K8S, + "uploadFinished": ECHO_SERVER_URL_FROM_K8S, + "addedToCollection": ECHO_SERVER_URL_FROM_K8S, + "removedFromCollection": ECHO_SERVER_URL_FROM_K8S, + }, + ) + assert r.status_code == 200 + assert r.json()["updated"] + + # Create collection with all_crawls_crawl_id already in it + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/collections", + headers=admin_auth_headers, + json={ + "name": "Event webhooks test collection", + "crawlIds": [all_crawls_crawl_id], + }, + ) + assert r.status_code == 200 + webhooks_coll_id = r.json()["id"] + assert webhooks_coll_id + + # Create and run workflow that adds crawl to collection + crawl_data = { + "runNow": True, + "name": "Webhook crawl test", + "autoAddCollections": [webhooks_coll_id], + "config": { + "seeds": [{"url": "https://webrecorder.net/"}], + }, + } + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/", + headers=admin_auth_headers, + json=crawl_data, + ) + assert r.status_code == 200 + data = r.json() + webhooks_config_id = data["id"] + assert webhooks_config_id + webhooks_crawl_id = data["run_now_job"] + + # Wait for crawl to complete + while True: + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{webhooks_crawl_id}/replay.json", + headers=admin_auth_headers, + ) + data = r.json() + if data["state"] == "complete": + break + time.sleep(5) + + # Create upload and add to collection + with open(os.path.join(curr_dir, "data", "example.wacz"), "rb") as fh: + r = requests.put( + f"{API_PREFIX}/orgs/{default_org_id}/uploads/stream?filename=webhookstest.wacz&name=Webhooks%20Upload&collections={webhooks_coll_id}", + headers=admin_auth_headers, + data=read_in_chunks(fh), + ) + + assert r.status_code == 200 + data = r.json() + assert data["added"] + webhooks_upload_id = data["id"] + + # Remove upload from collection + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}/remove", + json={"crawlIds": [webhooks_upload_id]}, + headers=admin_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + assert data["id"] + + # Re-add upload to collection + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/collections/{webhooks_coll_id}/add", + json={"crawlIds": [webhooks_upload_id]}, + headers=admin_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + assert data["id"] + + # Wait to ensure async notifications are all sent + time.sleep(10) + + # Send GET request to echo server to retrieve and verify POSTed data + r = requests.get(ECHO_SERVER_URL) + assert r.status_code == 200 + + data = r.json() + + crawl_started_count = 0 + crawl_finished_count = 0 + upload_finished_count = 0 + added_to_collection_count = 0 + removed_from_collection_count = 0 + + for post in data["post_bodies"]: + assert post["orgId"] + event = post["event"] + assert event + + if event == "crawlStarted": + crawl_started_count += 1 + assert post["itemId"] + assert post["scheduled"] in (True, False) + assert post.get("downloadUrls") is None + + elif event == "crawlFinished": + crawl_finished_count += 1 + assert post["itemId"] + assert post["state"] + assert post["downloadUrls"] + + elif event == "uploadFinished": + upload_finished_count += 1 + assert post["itemId"] + assert post["state"] + assert post["downloadUrls"] + + elif event == "addedToCollection": + added_to_collection_count += 1 + assert post["downloadUrls"] and len(post["downloadUrls"]) == 1 + assert post["itemIds"] + assert post["collectionId"] + + elif event == "removedFromCollection": + removed_from_collection_count += 1 + assert post["downloadUrls"] and len(post["downloadUrls"]) == 1 + assert post["itemIds"] + assert post["collectionId"] + + # Allow for some variability here due to timing of crawls + assert crawl_started_count >= 1 + assert crawl_finished_count >= 1 + assert upload_finished_count == 1 + assert added_to_collection_count >= 3 + assert removed_from_collection_count == 1 + + # Check that we've had expected number of successful webhook notifications + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/webhooks?success=True", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + assert r.json()["total"] >= 7 diff --git a/btrix b/btrix index 763ef2c2..7f6b7f23 100755 --- a/btrix +++ b/btrix @@ -79,12 +79,12 @@ resetMicrok8s(){ runTests() { echo "Running backend tests..." - python3 -m pytest backend/test/*.py + python3 -m pytest backend/test/test_*.py } runNightlyTests() { echo "Running nightly backend tests..." - python3 -m pytest backend/test_nightly/*.py + python3 -m pytest backend/test_nightly/test_*.py } CONTEXT=$(cat ~/.kube/config | grep "current-context:" | sed "s/current-context: //")