Watch Stream Directly from Browsertrix Crawler (#189)

* watch work: proxy directly to crawls instead of redis pubsub
- add 'watchIPs' to crawl detail output
- cache crawl ips for quick access for auth
- add '/ipaccess/{ip}' endpoint for watch ws connection to ensure ws has access to the specified container ip
- enable 'auth_request' in nginx frontend
- requirements: update to latest redis-py
remaining fixes for #134
This commit is contained in:
Ilya Kreymer 2022-03-04 14:55:11 -08:00 committed by GitHub
parent c18418ff09
commit cdd0ab34a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 80 additions and 55 deletions

View File

@ -8,11 +8,11 @@ import os
from typing import Optional, List, Dict, Union
from datetime import datetime
import websockets
from fastapi import Depends, HTTPException, WebSocket
from fastapi import Depends, HTTPException
from pydantic import BaseModel, UUID4, conint
import pymongo
import aioredis
from redis import asyncio as aioredis
from db import BaseMongoModel
from archives import Archive, MAX_CRAWL_SCALE
@ -89,6 +89,8 @@ class CrawlOut(Crawl):
configName: Optional[str]
resources: Optional[List[CrawlFileOut]] = []
watchIPs: Optional[List[str]] = []
# ============================================================================
class ListCrawlOut(BaseMongoModel):
@ -169,7 +171,6 @@ class CrawlOps:
self.redis = await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
self.pubsub = self.redis.pubsub()
loop = asyncio.get_running_loop()
loop.create_task(self.run_crawl_complete_loop())
@ -324,6 +325,7 @@ class CrawlOps:
crawl = await self.crawl_manager.get_running_crawl(crawlid, archive.id_str)
if crawl:
await self.get_redis_stats([crawl])
await self.cache_ips(crawl)
else:
files = [CrawlFile(**data) for data in res["files"]]
@ -421,6 +423,19 @@ class CrawlOps:
for crawl, (done, total) in zip(crawl_list, pairwise(results)):
crawl.stats = {"done": done, "found": total}
async def cache_ips(self, crawl: CrawlOut):
""" cache ips for ws auth check """
if crawl.watchIPs:
await self.redis.sadd(f"{crawl.id}:ips", *crawl.watchIPs)
await self.redis.expire(f"{crawl.id}:ips", 300)
async def ip_access_check(self, crawl_id, crawler_ip):
""" check if ip has access to this crawl based on redis cached ip """
if await self.redis.sismember(f"{crawl_id}:ips", crawler_ip):
return {}
raise HTTPException(status_code=403, detail="Unauthorized")
async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList):
""" Delete a list of crawls by id for given archive """
res = await self.crawls.delete_many(
@ -428,37 +443,6 @@ class CrawlOps:
)
return res.deleted_count
async def handle_watch_ws(self, crawl_id: str, websocket: WebSocket):
""" Handle watch WS by proxying screencast data via redis pubsub """
# ensure websocket connected
await websocket.accept()
ctrl_channel = f"c:{crawl_id}:ctrl"
cast_channel = f"c:{crawl_id}:cast"
await self.redis.publish(ctrl_channel, "connect")
async with self.pubsub as chan:
await chan.subscribe(cast_channel)
# pylint: disable=broad-except
try:
while True:
message = await chan.get_message(ignore_subscribe_messages=True)
if not message:
continue
await websocket.send_text(message["data"])
except websockets.exceptions.ConnectionClosedOK:
pass
except Exception as exc:
print(exc, flush=True)
finally:
await self.redis.publish(ctrl_channel, "disconnect")
# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals
@ -559,13 +543,15 @@ def init_crawls_api(
return {"scaled": scale.scale}
@app.websocket("/archives/{aid}/crawls/{crawl_id}/watch/ws")
async def watch_ws(
crawl_id, websocket: WebSocket, archive: Archive = Depends(archive_crawl_dep)
):
# ensure crawl exists
await ops.get_crawl(crawl_id, archive)
@app.get(
"/archives/{aid}/crawls/{crawl_id}/ipaccess/{crawler_ip}",
tags=["crawls"],
)
await ops.handle_watch_ws(crawl_id, websocket)
# pylint: disable=unused-argument
async def ip_access_check(
crawl_id, crawler_ip, archive: Archive = Depends(archive_crawl_dep)
):
return await ops.ip_access_check(crawl_id, crawler_ip)
return ops

View File

@ -15,7 +15,8 @@ from tempfile import NamedTemporaryFile
import aiodocker
import aioprocessing
import aioredis
from redis import asyncio as aioredis
from scheduler import run_scheduler
@ -360,6 +361,7 @@ class DockerManager:
async def get_running_crawl(self, crawl_id, aid=None):
""" Return a single running crawl as CrawlOut """
# pylint: disable=broad-except,bare-except
try:
container = await self.client.containers.get(crawl_id)
@ -373,10 +375,17 @@ class DockerManager:
if stop_type == "canceled":
return None
return self._make_crawl_for_container(
crawl = self._make_crawl_for_container(
container, "stopping" if stop_type else "running", False, CrawlOut
)
# pylint: disable=broad-except
try:
crawl.watchIPs = [container.attrs["NetworkSettings"]["IPAddress"]]
except:
crawl.watchIPs = []
return crawl
except Exception as exc:
print(exc, flush=True)
return None

View File

@ -5,7 +5,8 @@ import datetime
import json
import asyncio
import base64
import aioredis
from redis import asyncio as aioredis
from kubernetes_asyncio import client, config, watch
from kubernetes_asyncio.stream import WsApiClient
@ -388,7 +389,17 @@ class K8SManager:
if not status:
return None
return self._make_crawl_for_job(job, status, False, CrawlOut)
crawl = self._make_crawl_for_job(job, status, False, CrawlOut)
pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={name},btrix.archive={aid}",
)
crawl.watchIPs = [
pod.status.pod_ip for pod in pods.items if pod.status.pod_ip
]
return crawl
# pylint: disable=broad-except
except Exception:

View File

@ -8,5 +8,4 @@ aiodocker
apscheduler
aioprocessing
aiobotocore
aioredis
websockets
redis>=4.2.0rc1

View File

@ -64,18 +64,16 @@ spec:
httpGet:
path: /healthz
port: 8000
failureThreshold: 12
initialDelaySeconds: 20
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /healthz
port: 8000
initialDelaySeconds: 5
initialDelaySeconds: 15
periodSeconds: 30
timeoutSeconds: 3
failureThreshold: 5
livenessProbe:
@ -84,7 +82,6 @@ spec:
port: 8000
initialDelaySeconds: 15
periodSeconds: 30
timeoutSeconds: 3
failureThreshold: 5

View File

@ -36,5 +36,28 @@ server {
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-Proto $scheme;
}
location ~* /watch/([^/]+)/([^/]+)/([^/]+)/ws {
set $archive $1;
set $crawl $2;
set $crawlerip $3;
set $auth_bearer $arg_auth_bearer;
auth_request /ipaccess;
proxy_pass http://$crawlerip:9037/ws;
proxy_set_header Host "localhost";
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $http_connection;
}
location = /ipaccess {
internal;
proxy_pass http://${BACKEND_HOST}:8000/archives/$archive/crawls/$crawl/ipaccess/$crawlerip?auth_bearer=$auth_bearer;
proxy_pass_request_body off;
proxy_set_header Content-Length "";
}
}