New WS Endpoint for Watching Crawl (#152)

* backend support for new watch system (#134):
- support for watch via redis pubsub and websocket connection to backend
- can support watch from any number of crawler instances to support scaled crawls
- use /archives/{aid}/crawls/{crawl_id}/watch/ws websocket endpoint
- ws: ignore graceful connectionclosedok exception, log other exceptions
- set logging to info to instead of debug for now (debug logs all ws traffic)
- remove old watch apis in backend
- remove old websocket routing to crawler instance for old watch system
- oauth bearer check: support websockets, use websocket object if no request object
- crawler args: replace --screencastPort with --screencastRedis
This commit is contained in:
Ilya Kreymer 2022-02-22 10:33:10 -08:00 committed by GitHub
parent aa5207915c
commit 9bd402fa17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 57 additions and 87 deletions

View File

@ -8,5 +8,5 @@ RUN pip install -r requirements.txt
ADD . /app
CMD uvicorn main:app --host 0.0.0.0 --root-path /api --reload --access-log --log-level debug
CMD uvicorn main:app --host 0.0.0.0 --root-path /api --reload --access-log --log-level info

View File

@ -8,7 +8,8 @@ import os
from typing import Optional, List, Dict, Union
from datetime import datetime
from fastapi import Depends, Request, HTTPException
import websockets
from fastapi import Depends, HTTPException, WebSocket
from pydantic import BaseModel, UUID4, conint
import pymongo
import aioredis
@ -168,6 +169,7 @@ class CrawlOps:
self.redis = await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
self.pubsub = self.redis.pubsub()
loop = asyncio.get_running_loop()
loop.create_task(self.run_crawl_complete_loop())
@ -426,6 +428,37 @@ class CrawlOps:
)
return res.deleted_count
async def handle_watch_ws(self, crawl_id: str, websocket: WebSocket):
""" Handle watch WS by proxying screencast data via redis pubsub """
# ensure websocket connected
await websocket.accept()
ctrl_channel = f"c:{crawl_id}:ctrl"
cast_channel = f"c:{crawl_id}:cast"
await self.redis.publish(ctrl_channel, "connect")
async with self.pubsub as chan:
await chan.subscribe(cast_channel)
# pylint: disable=broad-except
try:
while True:
message = await chan.get_message(ignore_subscribe_messages=True)
if not message:
continue
await websocket.send_text(message["data"])
except websockets.exceptions.ConnectionClosedOK:
pass
except Exception as exc:
print(exc, flush=True)
finally:
await self.redis.publish(ctrl_channel, "disconnect")
# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals
@ -526,15 +559,13 @@ def init_crawls_api(
return {"scaled": scale.scale}
@app.post("/archives/{aid}/crawls/{crawl_id}/watch", tags=["crawls"])
async def watch_crawl(
crawl_id, request: Request, archive: Archive = Depends(archive_crawl_dep)
@app.websocket("/archives/{aid}/crawls/{crawl_id}/watch/ws")
async def watch_ws(
crawl_id, websocket: WebSocket, archive: Archive = Depends(archive_crawl_dep)
):
aid_str = archive.id_str
await crawl_manager.init_crawl_screencast(crawl_id, aid_str)
watch_url = (
f"{request.url.scheme}://{request.url.netloc}/watch/{aid_str}/{crawl_id}/ws"
)
return {"watch_url": watch_url}
# ensure crawl exists
await ops.get_crawl(crawl_id, archive)
await ops.handle_watch_ws(crawl_id, websocket)
return ops

View File

@ -301,31 +301,6 @@ class K8SManager:
return crawls
async def init_crawl_screencast(self, crawl_id, aid):
""" Init service for this job/crawl_id to support screencasting """
labels = {"btrix.archive": aid}
service = client.V1Service(
kind="Service",
api_version="v1",
metadata={
"name": crawl_id,
"labels": labels,
},
spec={
"selector": {"job-name": crawl_id},
"ports": [{"protocol": "TCP", "port": 9037, "name": "screencast"}],
},
)
try:
await self.core_api.create_namespaced_service(
body=service, namespace=self.namespace
)
except client.exceptions.ApiException as api_exc:
if api_exc.status != 409:
raise api_exc
async def process_crawl_complete(self, crawlcomplete):
"""Ensure the crawlcomplete data is valid (job exists and user matches)
Fill in additional details about the crawl"""
@ -549,17 +524,6 @@ class K8SManager:
propagation_policy="Foreground",
)
try:
await self.core_api.delete_namespaced_service(
name=name,
namespace=self.namespace,
grace_period_seconds=60,
propagation_policy="Foreground",
)
# pylint: disable=bare-except
except:
pass
def _create_config_map(self, crawlconfig, labels):
""" Create Config Map based on CrawlConfig + labels """
config_map = client.V1ConfigMap(

View File

@ -9,3 +9,4 @@ apscheduler
aioprocessing
aiobotocore
aioredis
websockets

View File

@ -11,7 +11,7 @@ from typing import Dict, Optional
from pydantic import EmailStr, UUID4
import passlib.pwd
from fastapi import Request, Response, HTTPException, Depends
from fastapi import Request, Response, HTTPException, Depends, WebSocket
from fastapi.security import OAuth2PasswordBearer
from fastapi_users import FastAPIUsers, models, BaseUserManager
@ -261,9 +261,13 @@ def init_user_manager(mdb, emailsender, invites):
class OA2BearerOrQuery(OAuth2PasswordBearer):
""" Override bearer check to also test query """
async def __call__(self, request: Request) -> Optional[str]:
async def __call__(
self, request: Request = None, websocket: WebSocket = None
) -> Optional[str]:
param = None
exc = None
# use websocket as request if no request
request = request or websocket
try:
param = await super().__call__(request)
if param:
@ -275,10 +279,13 @@ class OA2BearerOrQuery(OAuth2PasswordBearer):
param = request.query_params.get("auth_bearer")
if not param and exc:
if param:
return param
if exc:
raise exc
return param
raise HTTPException(status_code=404, detail="Not Found")
# ============================================================================

View File

@ -52,9 +52,6 @@ spec:
- name: BACKEND_HOST
value: {{ .Values.name }}-backend
- name: BROWSER_SCREENCAST_URL
value: http://$2.crawlers.svc.cluster.local:9037
resources:
limits:
cpu: {{ .Values.nginx_limits_cpu }}

View File

@ -96,7 +96,7 @@ crawler_namespace: "crawlers"
crawl_retries: 3
# browsertrix-crawler args:
crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037 --text --workers 2"
crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ --text --workers 2 --screencastRedis"
crawler_requests_cpu: "800m"
crawler_limits_cpu: "1200m"

View File

@ -32,7 +32,7 @@ REDIS_URL=redis://redis/0
# Browsertrix Crawler image to use
CRAWLER_IMAGE=webrecorder/browsertrix-crawler
CRAWL_ARGS="--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037"
CRAWL_ARGS="--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastRedis"
REGISTRATION_ENABLED=1

View File

@ -28,7 +28,6 @@ services:
environment:
- BACKEND_HOST=backend
- BROWSER_SCREENCAST_URL=http://$$2:9037
redis:
image: redis

View File

@ -10,7 +10,7 @@ server {
index index.html index.htm;
error_page 500 501 502 503 504 /50x.html;
merge_slashes off;
location = /50x.html {
root /usr/share/nginx/html;
@ -19,35 +19,6 @@ server {
# fallback to index for any page
error_page 404 /index.html;
location ~* /watch/([^/]+)/([^/]+)/ws {
set $archive $1;
set $crawl $2;
#auth_request /authcheck;
proxy_pass ${BROWSER_SCREENCAST_URL}/ws;
proxy_set_header Host "localhost";
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $http_connection;
}
location ~* /watch/([^/]+)/([^/]+)/ {
set $archive $1;
set $crawl $2;
#auth_request /authcheck;
proxy_pass ${BROWSER_SCREENCAST_URL}/;
proxy_set_header Host "localhost";
}
location = /authcheck {
internal;
proxy_pass http://localhost:8000/archives/$archive/crawls/$crawl;
proxy_pass_request_body off;
proxy_set_header Content-Length "";
}
location / {
root /usr/share/nginx/html;
index index.html index.htm;