add redis for storing crawl state data!

- supported in both docker and k8s
- additional pods with same job id automatically use same crawl state in redis
- support dynamic scaling (#2) via /scale endpoint - k8s job parallelism adjusted dynamically for running job (only supported in k8s so far)
This commit is contained in:
Ilya Kreymer 2021-09-17 15:02:11 -07:00
parent 223658cfa2
commit b6d1e492d7
10 changed files with 186 additions and 98 deletions

View File

@ -76,6 +76,7 @@ class CrawlConfigIn(BaseModel):
runNow: Optional[bool] = False
crawlTimeout: Optional[int] = 0
parallel: Optional[int] = 1
config: RawCrawlConfig
@ -94,6 +95,7 @@ class CrawlConfig(BaseMongoModel):
config: RawCrawlConfig
crawlTimeout: Optional[int] = 0
parallel: Optional[int] = 1
crawlCount: Optional[int] = 0

View File

@ -2,12 +2,13 @@
import asyncio
from typing import Optional, List
from typing import Optional, List, Dict
from datetime import datetime
from fastapi import Depends, HTTPException
from pydantic import BaseModel
import pymongo
import aioredis
from db import BaseMongoModel
from archives import Archive
@ -20,6 +21,13 @@ class DeleteCrawlList(BaseModel):
crawl_ids: List[str]
# ============================================================================
class CrawlScale(BaseModel):
""" scale the crawl to N parallel containers """
scale: int = 1
# ============================================================================
class Crawl(BaseMongoModel):
""" Store State of a Crawl (Finished or Running) """
@ -36,6 +44,10 @@ class Crawl(BaseMongoModel):
state: str
scale: int = 1
stats: Optional[Dict[str, str]]
filename: Optional[str]
size: Optional[int]
hash: Optional[str]
@ -60,14 +72,22 @@ class CrawlCompleteIn(BaseModel):
class CrawlOps:
""" Crawl Ops """
def __init__(self, mdb, crawl_manager, crawl_configs, archives):
# pylint: disable=too-many-arguments
def __init__(self, mdb, redis_url, crawl_manager, crawl_configs, archives):
self.crawls = mdb["crawls"]
self.crawl_manager = crawl_manager
self.crawl_configs = crawl_configs
self.archives = archives
self.redis = None
asyncio.create_task(self.init_redis(redis_url))
self.crawl_manager.set_crawl_ops(self)
async def init_redis(self, redis_url):
""" init redis async """
self.redis = await aioredis.from_url(redis_url)
async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
""" Handle completed crawl, add to crawls db collection, also update archive usage """
crawl = await self.crawl_manager.validate_crawl_complete(msg)
@ -99,8 +119,10 @@ class CrawlOps:
await self.crawl_configs.inc_crawls(crawl.cid)
async def list_crawls(self, aid: str, cid: str = None):
"""Get all crawl configs for an archive is a member of"""
return True
async def list_db_crawls(self, aid: str, cid: str = None):
"""List all finished crawls from the db """
query = {"aid": aid}
if cid:
query["cid"] = cid
@ -109,6 +131,42 @@ class CrawlOps:
results = await cursor.to_list(length=1000)
return [Crawl.from_dict(res) for res in results]
async def list_crawls(self, aid: str):
""" list finished and running crawl data """
running_crawls = await self.crawl_manager.list_running_crawls(aid=aid)
await self.get_redis_stats(running_crawls)
finished_crawls = await self.list_db_crawls(aid)
return {
"running": [
crawl.dict(exclude_none=True, exclude_unset=True)
for crawl in running_crawls
],
"finished": finished_crawls,
}
# pylint: disable=too-many-arguments
async def get_redis_stats(self, crawl_list):
""" Add additional live crawl stats from redis """
results = None
def pairwise(iterable):
val = iter(iterable)
return zip(val, val)
async with self.redis.pipeline(transaction=True) as pipe:
for crawl in crawl_list:
key = crawl.id
pipe.llen(f"{key}:d")
pipe.scard(f"{key}:s")
results = await pipe.execute()
for crawl, (done, total) in zip(crawl_list, pairwise(results)):
crawl.stats = {"done": done, "found": total}
async def delete_crawls(self, aid: str, delete_list: DeleteCrawlList):
""" Delete a list of crawls by id for given archive """
res = await self.crawls.delete_many(
@ -118,10 +176,11 @@ class CrawlOps:
# ============================================================================
def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives):
# pylint: disable=too-many-arguments
def init_crawls_api(app, mdb, redis_url, crawl_manager, crawl_config_ops, archives):
""" API for crawl management, including crawl done callback"""
ops = CrawlOps(mdb, crawl_manager, crawl_config_ops, archives)
ops = CrawlOps(mdb, redis_url, crawl_manager, crawl_config_ops, archives)
archive_crawl_dep = archives.archive_crawl_dep
@ -134,19 +193,7 @@ def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives):
@app.get("/archives/{aid}/crawls", tags=["crawls"])
async def list_crawls(archive: Archive = Depends(archive_crawl_dep)):
aid = str(archive.id)
running_crawls = await crawl_manager.list_running_crawls(aid=aid)
finished_crawls = await ops.list_crawls(aid)
return {
"running": [
crawl.dict(exclude_none=True, exclude_unset=True)
for crawl in running_crawls
],
"finished": finished_crawls,
}
return await ops.list_crawls(archive.id)
@app.post(
"/archives/{aid}/crawls/{crawl_id}/cancel",
@ -207,3 +254,17 @@ def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives):
res = await ops.delete_crawls(archive.id, delete_list)
return {"deleted": res}
@app.post(
"/archives/{aid}/crawls/{crawl_id}/scale",
tags=["crawls"],
)
async def scale_crawl(
scale: CrawlScale, crawl_id, archive: Archive = Depends(archive_crawl_dep)
):
error = await crawl_manager.scale_crawl(crawl_id, archive.id, scale.scale)
if error:
raise HTTPException(status_code=400, detail=error)
return {"scaled": scale.scale}

View File

@ -15,10 +15,10 @@ from tempfile import NamedTemporaryFile
import aiodocker
import aioprocessing
from crawls import Crawl
from scheduler import run_scheduler
from crawls import Crawl
# ============================================================================
class DockerManager:
@ -260,6 +260,10 @@ class DockerManager:
return crawl
async def scale_crawl(self): # job_name, aid, parallelism=1):
""" Scale running crawl, currently only supported in k8s"""
return "Not Supported"
async def delete_crawl_config_by_id(self, cid):
""" Delete Crawl Config by Crawl Config Id"""
await self._delete_volume_by_labels([f"btrix.crawlconfig={cid}"])
@ -346,7 +350,13 @@ class DockerManager:
# pylint: disable=too-many-arguments
async def _run_crawl_now(self, storage, labels, volume, schedule="", manual=True):
# Set Run Config
command = ["crawl", "--config", "/tmp/crawlconfig/crawl-config.json"]
command = [
"crawl",
"--config",
"/tmp/crawlconfig/crawl-config.json",
"--redisStoreUrl",
"redis://redis:6379/0",
]
if self.extra_crawl_params:
command += self.extra_crawl_params

View File

@ -12,8 +12,9 @@ from crawls import Crawl
# ============================================================================
DEFAULT_NAMESPACE = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
CRAWLER_NAMESPACE = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
# an 2/31 schedule that will never run as empty is not allowed
DEFAULT_NO_SCHEDULE = "* * 31 2 *"
@ -22,7 +23,7 @@ class K8SManager:
# pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments
"""K8SManager, manager creation of k8s resources from crawl api requests"""
def __init__(self, extra_crawl_params=None, namespace=DEFAULT_NAMESPACE):
def __init__(self, namespace=CRAWLER_NAMESPACE):
config.load_incluster_config()
self.crawl_ops = None
@ -33,7 +34,7 @@ class K8SManager:
self.batch_beta_api = client.BatchV1beta1Api()
self.namespace = namespace
self.extra_crawl_params = extra_crawl_params or []
print(self.namespace, flush=True)
self.crawler_image = os.environ.get("CRAWLER_IMAGE")
self.crawler_image_pull_policy = "IfNotPresent"
@ -138,7 +139,11 @@ class K8SManager:
suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig)
job_template = self._get_job_template(
cid, labels, annotations, crawlconfig.crawlTimeout, self.extra_crawl_params
cid,
labels,
annotations,
crawlconfig.crawlTimeout,
crawlconfig.parallel,
)
spec = client.V1beta1CronJobSpec(
@ -286,6 +291,31 @@ class K8SManager:
return result
async def scale_crawl(self, job_name, aid, parallelism=1):
""" Set the crawl scale (job parallelism) on the specified job """
try:
job = await self.batch_api.read_namespaced_job(
name=job_name, namespace=self.namespace
)
# pylint: disable=broad-except
except Exception:
return "Crawl not found"
if not job or job.metadata.labels["btrix.archive"] != aid:
return "Invalid Crawled"
if parallelism < 1 or parallelism > 10:
return "Invalid Scale: Must be between 1 and 10"
job.spec.parallelism = parallelism
await self.batch_api.patch_namespaced_job(
name=job.metadata.name, namespace=self.namespace, body=job
)
return None
async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive"""
return await self._delete_crawl_configs(f"btrix.archive={archive}")
@ -307,9 +337,12 @@ class K8SManager:
crawl = self._make_crawl_for_job(job, reason, True)
await self.crawl_ops.store_crawl(crawl)
# if update succeeds, than crawl has not completed, so likely a failure
failure = await self.crawl_ops.store_crawl(crawl)
await self._delete_job(job_name)
# keep failed jobs around, for now
if not failure:
await self._delete_job(job_name)
# ========================================================================
# Internal Methods
@ -322,6 +355,7 @@ class K8SManager:
return Crawl(
id=job.metadata.name,
state=state,
scale=job.spec.parallelism or 1,
user=job.metadata.labels["btrix.user"],
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
@ -455,16 +489,9 @@ class K8SManager:
body=job, namespace=self.namespace
)
def _get_job_template(
self, uid, labels, annotations, crawl_timeout, extra_crawl_params
):
def _get_job_template(self, uid, labels, annotations, crawl_timeout, parallel):
"""Return crawl job template for crawl job, including labels, adding optiona crawl params"""
command = ["crawl", "--config", "/tmp/crawl-config.json"]
if extra_crawl_params:
command += extra_crawl_params
requests_memory = "256M"
limit_memory = "1G"
@ -486,6 +513,7 @@ class K8SManager:
"metadata": {"annotations": annotations},
"spec": {
"backoffLimit": self.crawl_retries,
"parallelism": parallel,
"template": {
"metadata": {"labels": labels},
"spec": {
@ -494,7 +522,11 @@ class K8SManager:
"name": "crawler",
"image": self.crawler_image,
"imagePullPolicy": "Never",
"command": command,
"command": [
"crawl",
"--config",
"/tmp/crawl-config.json",
],
"volumeMounts": [
{
"name": "crawl-config",
@ -504,7 +536,8 @@ class K8SManager:
}
],
"envFrom": [
{"secretRef": {"name": f"crawl-secret-{uid}"}}
{"configMapRef": {"name": "shared-crawler-config"}},
{"secretRef": {"name": f"crawl-secret-{uid}"}},
],
"env": [
{

View File

@ -9,11 +9,11 @@ from fastapi import FastAPI, Request, HTTPException
from db import init_db
from emailsender import EmailSender
from users import init_users_api, UserDB
from archives import init_archives_api
from crawlconfigs import init_crawl_config_api
from crawls import init_crawls_api
from emailsender import EmailSender
app = FastAPI()
@ -38,14 +38,6 @@ class BrowsertrixAPI:
self.email = EmailSender()
self.crawl_manager = None
self.default_crawl_params = [
"--timeout",
"90",
"--logging",
"behaviors,debug",
"--generateWACZ",
]
self.mdb = init_db()
self.fastapi_users = init_users_api(
@ -66,13 +58,11 @@ class BrowsertrixAPI:
if os.environ.get("KUBERNETES_SERVICE_HOST"):
from k8sman import K8SManager
self.crawl_manager = K8SManager(self.default_crawl_params)
self.crawl_manager = K8SManager()
else:
from dockerman import DockerManager
self.crawl_manager = DockerManager(
self.archive_ops, self.default_crawl_params
)
self.crawl_manager = DockerManager(self.archive_ops)
self.crawl_config_ops = init_crawl_config_api(
self.mdb,
@ -84,6 +74,7 @@ class BrowsertrixAPI:
init_crawls_api(
self.app,
self.mdb,
os.environ.get("REDIS_URL"),
self.crawl_manager,
self.crawl_config_ops,
self.archive_ops,

View File

@ -6,3 +6,4 @@ kubernetes-asyncio
aiodocker
apscheduler
aioprocessing
aioredis

View File

@ -1,51 +1,10 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.name }}-env-config
namespace: {{ .Release.Namespace }}
data:
MONGO_HOST: {{ .Values.mongo_host }}
CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }}
CRAWLER_IMAGE: {{ .Values.crawler_image }}
CRAWL_TIMEOUT: "{{ .Values.crawl_timeout }}"
CRAWL_RETRIES: "{{ .Values.crawl_retries }}"
---
apiVersion: v1
kind: Secret
metadata:
name: storage-auth
namespace: {{ .Release.Namespace }}
type: Opaque
stringData:
PASSWORD_SECRET: "{{ .Values.api_password_secret }}"
{{- if .Values.minio_local }}
MINIO_ROOT_USER: "{{ .Values.storage.access_key }}"
MINIO_ROOT_PASSWORD: "{{ .Values.storage.secret_key }}"
MC_HOST: "{{ .Values.minio_scheme }}://{{ .Values.storage.access_key }}:{{ .Values.storage.secret_key }}@{{ .Values.minio_host }}"
{{- end }}
STORE_ACCESS_KEY: "{{ .Values.storage.access_key }}"
STORE_SECRET_KEY: "{{ .Values.storage.secret_key }}"
STORE_ENDPOINT_URL: "{{ .Values.storage.endpoint }}"
#S3_FORCE_PATH_STYLE: "{{ .Values.storage.force_path_style | quote }}"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.name }}
namespace: {{ .Release.Namespace }}
spec:
selector:
matchLabels:
@ -56,6 +15,10 @@ spec:
labels:
app: {{ .Values.name }}
annotations:
# force helm to update the deployment each time
"helm.update": {{ randAlphaNum 5 | quote }}
spec:
{{- if .Values.minio_local }}

View File

@ -4,3 +4,5 @@ metadata:
name: {{ .Values.crawler_namespace }}
labels:
release: {{ .Release.Name }}
annotations:
"helm.sh/resource-policy": keep

View File

@ -33,6 +33,17 @@ mongo_auth:
password: example
# Redis Image
# =========================================
redis_local: true
redis_image: "redis"
redis_pull_policy: "IfNotPresent"
redis_url: "redis://local-redis.default:6379/1"
# Crawler Image
# =========================================
@ -44,6 +55,10 @@ crawler_namespace: "crawlers"
# num retries
crawl_retries: 1
# browsertrix-crawler args:
crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ"
# Storage
# =========================================

View File

@ -16,12 +16,21 @@ services:
depends_on:
- minio
- mongo
- scheduler
redis:
image: redis
command: redis-server --appendonly yes
ports:
- 6379:6379
volumes:
- btrix-redis-data:/data
mongo:
image: mongo
volumes:
- mongodata:/data/db
- btrix-mongo-data:/data/db
env_file:
- ./config.env
@ -34,7 +43,7 @@ services:
- 9001:9001
volumes:
- miniodata:/data
- btrix-minio-data:/data
env_file:
- ./config.env
@ -52,8 +61,9 @@ services:
volumes:
mongodata:
miniodata:
btrix-redis-data:
btrix-mongo-data:
btrix-minio-data:
networks: