From 9bd402fa17301273a340124869cf5abd38e69cb1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 22 Feb 2022 10:33:10 -0800 Subject: [PATCH] New WS Endpoint for Watching Crawl (#152) * backend support for new watch system (#134): - support for watch via redis pubsub and websocket connection to backend - can support watch from any number of crawler instances to support scaled crawls - use /archives/{aid}/crawls/{crawl_id}/watch/ws websocket endpoint - ws: ignore graceful connectionclosedok exception, log other exceptions - set logging to info to instead of debug for now (debug logs all ws traffic) - remove old watch apis in backend - remove old websocket routing to crawler instance for old watch system - oauth bearer check: support websockets, use websocket object if no request object - crawler args: replace --screencastPort with --screencastRedis --- backend/Dockerfile | 2 +- backend/crawls.py | 51 ++++++++++++++++++++++++++++------- backend/k8sman.py | 36 ------------------------- backend/requirements.txt | 1 + backend/users.py | 15 ++++++++--- chart/templates/frontend.yaml | 3 --- chart/values.yaml | 2 +- configs/config.sample.env | 2 +- docker-compose.yml | 1 - frontend/nginx.conf.template | 31 +-------------------- 10 files changed, 57 insertions(+), 87 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index d3dd0d0f..c234f497 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -8,5 +8,5 @@ RUN pip install -r requirements.txt ADD . /app -CMD uvicorn main:app --host 0.0.0.0 --root-path /api --reload --access-log --log-level debug +CMD uvicorn main:app --host 0.0.0.0 --root-path /api --reload --access-log --log-level info diff --git a/backend/crawls.py b/backend/crawls.py index 692a2d04..7c062f45 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -8,7 +8,8 @@ import os from typing import Optional, List, Dict, Union from datetime import datetime -from fastapi import Depends, Request, HTTPException +import websockets +from fastapi import Depends, HTTPException, WebSocket from pydantic import BaseModel, UUID4, conint import pymongo import aioredis @@ -168,6 +169,7 @@ class CrawlOps: self.redis = await aioredis.from_url( redis_url, encoding="utf-8", decode_responses=True ) + self.pubsub = self.redis.pubsub() loop = asyncio.get_running_loop() loop.create_task(self.run_crawl_complete_loop()) @@ -426,6 +428,37 @@ class CrawlOps: ) return res.deleted_count + async def handle_watch_ws(self, crawl_id: str, websocket: WebSocket): + """ Handle watch WS by proxying screencast data via redis pubsub """ + # ensure websocket connected + await websocket.accept() + + ctrl_channel = f"c:{crawl_id}:ctrl" + cast_channel = f"c:{crawl_id}:cast" + + await self.redis.publish(ctrl_channel, "connect") + + async with self.pubsub as chan: + await chan.subscribe(cast_channel) + + # pylint: disable=broad-except + try: + while True: + message = await chan.get_message(ignore_subscribe_messages=True) + if not message: + continue + + await websocket.send_text(message["data"]) + + except websockets.exceptions.ConnectionClosedOK: + pass + + except Exception as exc: + print(exc, flush=True) + + finally: + await self.redis.publish(ctrl_channel, "disconnect") + # ============================================================================ # pylint: disable=too-many-arguments, too-many-locals @@ -526,15 +559,13 @@ def init_crawls_api( return {"scaled": scale.scale} - @app.post("/archives/{aid}/crawls/{crawl_id}/watch", tags=["crawls"]) - async def watch_crawl( - crawl_id, request: Request, archive: Archive = Depends(archive_crawl_dep) + @app.websocket("/archives/{aid}/crawls/{crawl_id}/watch/ws") + async def watch_ws( + crawl_id, websocket: WebSocket, archive: Archive = Depends(archive_crawl_dep) ): - aid_str = archive.id_str - await crawl_manager.init_crawl_screencast(crawl_id, aid_str) - watch_url = ( - f"{request.url.scheme}://{request.url.netloc}/watch/{aid_str}/{crawl_id}/ws" - ) - return {"watch_url": watch_url} + # ensure crawl exists + await ops.get_crawl(crawl_id, archive) + + await ops.handle_watch_ws(crawl_id, websocket) return ops diff --git a/backend/k8sman.py b/backend/k8sman.py index db39731e..c50974ed 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -301,31 +301,6 @@ class K8SManager: return crawls - async def init_crawl_screencast(self, crawl_id, aid): - """ Init service for this job/crawl_id to support screencasting """ - labels = {"btrix.archive": aid} - - service = client.V1Service( - kind="Service", - api_version="v1", - metadata={ - "name": crawl_id, - "labels": labels, - }, - spec={ - "selector": {"job-name": crawl_id}, - "ports": [{"protocol": "TCP", "port": 9037, "name": "screencast"}], - }, - ) - - try: - await self.core_api.create_namespaced_service( - body=service, namespace=self.namespace - ) - except client.exceptions.ApiException as api_exc: - if api_exc.status != 409: - raise api_exc - async def process_crawl_complete(self, crawlcomplete): """Ensure the crawlcomplete data is valid (job exists and user matches) Fill in additional details about the crawl""" @@ -549,17 +524,6 @@ class K8SManager: propagation_policy="Foreground", ) - try: - await self.core_api.delete_namespaced_service( - name=name, - namespace=self.namespace, - grace_period_seconds=60, - propagation_policy="Foreground", - ) - # pylint: disable=bare-except - except: - pass - def _create_config_map(self, crawlconfig, labels): """ Create Config Map based on CrawlConfig + labels """ config_map = client.V1ConfigMap( diff --git a/backend/requirements.txt b/backend/requirements.txt index b8cdb69b..c741b56c 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -9,3 +9,4 @@ apscheduler aioprocessing aiobotocore aioredis +websockets diff --git a/backend/users.py b/backend/users.py index 275ddc8a..6b2bffdf 100644 --- a/backend/users.py +++ b/backend/users.py @@ -11,7 +11,7 @@ from typing import Dict, Optional from pydantic import EmailStr, UUID4 import passlib.pwd -from fastapi import Request, Response, HTTPException, Depends +from fastapi import Request, Response, HTTPException, Depends, WebSocket from fastapi.security import OAuth2PasswordBearer from fastapi_users import FastAPIUsers, models, BaseUserManager @@ -261,9 +261,13 @@ def init_user_manager(mdb, emailsender, invites): class OA2BearerOrQuery(OAuth2PasswordBearer): """ Override bearer check to also test query """ - async def __call__(self, request: Request) -> Optional[str]: + async def __call__( + self, request: Request = None, websocket: WebSocket = None + ) -> Optional[str]: param = None exc = None + # use websocket as request if no request + request = request or websocket try: param = await super().__call__(request) if param: @@ -275,10 +279,13 @@ class OA2BearerOrQuery(OAuth2PasswordBearer): param = request.query_params.get("auth_bearer") - if not param and exc: + if param: + return param + + if exc: raise exc - return param + raise HTTPException(status_code=404, detail="Not Found") # ============================================================================ diff --git a/chart/templates/frontend.yaml b/chart/templates/frontend.yaml index 6b656c70..36de9892 100644 --- a/chart/templates/frontend.yaml +++ b/chart/templates/frontend.yaml @@ -52,9 +52,6 @@ spec: - name: BACKEND_HOST value: {{ .Values.name }}-backend - - name: BROWSER_SCREENCAST_URL - value: http://$2.crawlers.svc.cluster.local:9037 - resources: limits: cpu: {{ .Values.nginx_limits_cpu }} diff --git a/chart/values.yaml b/chart/values.yaml index 8bffacf8..49e8c25c 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -96,7 +96,7 @@ crawler_namespace: "crawlers" crawl_retries: 3 # browsertrix-crawler args: -crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037 --text --workers 2" +crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ --text --workers 2 --screencastRedis" crawler_requests_cpu: "800m" crawler_limits_cpu: "1200m" diff --git a/configs/config.sample.env b/configs/config.sample.env index ab343005..8f02a84c 100644 --- a/configs/config.sample.env +++ b/configs/config.sample.env @@ -32,7 +32,7 @@ REDIS_URL=redis://redis/0 # Browsertrix Crawler image to use CRAWLER_IMAGE=webrecorder/browsertrix-crawler -CRAWL_ARGS="--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037" +CRAWL_ARGS="--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastRedis" REGISTRATION_ENABLED=1 diff --git a/docker-compose.yml b/docker-compose.yml index 4a4a1bf8..be04251a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,7 +28,6 @@ services: environment: - BACKEND_HOST=backend - - BROWSER_SCREENCAST_URL=http://$$2:9037 redis: image: redis diff --git a/frontend/nginx.conf.template b/frontend/nginx.conf.template index eadcba8a..8896de05 100644 --- a/frontend/nginx.conf.template +++ b/frontend/nginx.conf.template @@ -10,7 +10,7 @@ server { index index.html index.htm; error_page 500 501 502 503 504 /50x.html; - + merge_slashes off; location = /50x.html { root /usr/share/nginx/html; @@ -19,35 +19,6 @@ server { # fallback to index for any page error_page 404 /index.html; - location ~* /watch/([^/]+)/([^/]+)/ws { - set $archive $1; - set $crawl $2; - #auth_request /authcheck; - - proxy_pass ${BROWSER_SCREENCAST_URL}/ws; - proxy_set_header Host "localhost"; - - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection $http_connection; - } - - location ~* /watch/([^/]+)/([^/]+)/ { - set $archive $1; - set $crawl $2; - #auth_request /authcheck; - - proxy_pass ${BROWSER_SCREENCAST_URL}/; - proxy_set_header Host "localhost"; - } - - location = /authcheck { - internal; - proxy_pass http://localhost:8000/archives/$archive/crawls/$crawl; - proxy_pass_request_body off; - proxy_set_header Content-Length ""; - } - location / { root /usr/share/nginx/html; index index.html index.htm;