diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index 656b5ced..294bffec 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -76,6 +76,7 @@ class CrawlConfigIn(BaseModel): runNow: Optional[bool] = False crawlTimeout: Optional[int] = 0 + parallel: Optional[int] = 1 config: RawCrawlConfig @@ -94,6 +95,7 @@ class CrawlConfig(BaseMongoModel): config: RawCrawlConfig crawlTimeout: Optional[int] = 0 + parallel: Optional[int] = 1 crawlCount: Optional[int] = 0 diff --git a/backend/crawls.py b/backend/crawls.py index 3508f4ab..202495f7 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -2,12 +2,13 @@ import asyncio -from typing import Optional, List +from typing import Optional, List, Dict from datetime import datetime from fastapi import Depends, HTTPException from pydantic import BaseModel import pymongo +import aioredis from db import BaseMongoModel from archives import Archive @@ -20,6 +21,13 @@ class DeleteCrawlList(BaseModel): crawl_ids: List[str] +# ============================================================================ +class CrawlScale(BaseModel): + """ scale the crawl to N parallel containers """ + + scale: int = 1 + + # ============================================================================ class Crawl(BaseMongoModel): """ Store State of a Crawl (Finished or Running) """ @@ -36,6 +44,10 @@ class Crawl(BaseMongoModel): state: str + scale: int = 1 + + stats: Optional[Dict[str, str]] + filename: Optional[str] size: Optional[int] hash: Optional[str] @@ -60,14 +72,22 @@ class CrawlCompleteIn(BaseModel): class CrawlOps: """ Crawl Ops """ - def __init__(self, mdb, crawl_manager, crawl_configs, archives): + # pylint: disable=too-many-arguments + def __init__(self, mdb, redis_url, crawl_manager, crawl_configs, archives): self.crawls = mdb["crawls"] self.crawl_manager = crawl_manager self.crawl_configs = crawl_configs self.archives = archives + self.redis = None + asyncio.create_task(self.init_redis(redis_url)) + self.crawl_manager.set_crawl_ops(self) + async def init_redis(self, redis_url): + """ init redis async """ + self.redis = await aioredis.from_url(redis_url) + async def on_handle_crawl_complete(self, msg: CrawlCompleteIn): """ Handle completed crawl, add to crawls db collection, also update archive usage """ crawl = await self.crawl_manager.validate_crawl_complete(msg) @@ -99,8 +119,10 @@ class CrawlOps: await self.crawl_configs.inc_crawls(crawl.cid) - async def list_crawls(self, aid: str, cid: str = None): - """Get all crawl configs for an archive is a member of""" + return True + + async def list_db_crawls(self, aid: str, cid: str = None): + """List all finished crawls from the db """ query = {"aid": aid} if cid: query["cid"] = cid @@ -109,6 +131,42 @@ class CrawlOps: results = await cursor.to_list(length=1000) return [Crawl.from_dict(res) for res in results] + async def list_crawls(self, aid: str): + """ list finished and running crawl data """ + running_crawls = await self.crawl_manager.list_running_crawls(aid=aid) + + await self.get_redis_stats(running_crawls) + + finished_crawls = await self.list_db_crawls(aid) + + return { + "running": [ + crawl.dict(exclude_none=True, exclude_unset=True) + for crawl in running_crawls + ], + "finished": finished_crawls, + } + + # pylint: disable=too-many-arguments + async def get_redis_stats(self, crawl_list): + """ Add additional live crawl stats from redis """ + results = None + + def pairwise(iterable): + val = iter(iterable) + return zip(val, val) + + async with self.redis.pipeline(transaction=True) as pipe: + for crawl in crawl_list: + key = crawl.id + pipe.llen(f"{key}:d") + pipe.scard(f"{key}:s") + + results = await pipe.execute() + + for crawl, (done, total) in zip(crawl_list, pairwise(results)): + crawl.stats = {"done": done, "found": total} + async def delete_crawls(self, aid: str, delete_list: DeleteCrawlList): """ Delete a list of crawls by id for given archive """ res = await self.crawls.delete_many( @@ -118,10 +176,11 @@ class CrawlOps: # ============================================================================ -def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives): +# pylint: disable=too-many-arguments +def init_crawls_api(app, mdb, redis_url, crawl_manager, crawl_config_ops, archives): """ API for crawl management, including crawl done callback""" - ops = CrawlOps(mdb, crawl_manager, crawl_config_ops, archives) + ops = CrawlOps(mdb, redis_url, crawl_manager, crawl_config_ops, archives) archive_crawl_dep = archives.archive_crawl_dep @@ -134,19 +193,7 @@ def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives): @app.get("/archives/{aid}/crawls", tags=["crawls"]) async def list_crawls(archive: Archive = Depends(archive_crawl_dep)): - aid = str(archive.id) - - running_crawls = await crawl_manager.list_running_crawls(aid=aid) - - finished_crawls = await ops.list_crawls(aid) - - return { - "running": [ - crawl.dict(exclude_none=True, exclude_unset=True) - for crawl in running_crawls - ], - "finished": finished_crawls, - } + return await ops.list_crawls(archive.id) @app.post( "/archives/{aid}/crawls/{crawl_id}/cancel", @@ -207,3 +254,17 @@ def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives): res = await ops.delete_crawls(archive.id, delete_list) return {"deleted": res} + + @app.post( + "/archives/{aid}/crawls/{crawl_id}/scale", + tags=["crawls"], + ) + async def scale_crawl( + scale: CrawlScale, crawl_id, archive: Archive = Depends(archive_crawl_dep) + ): + + error = await crawl_manager.scale_crawl(crawl_id, archive.id, scale.scale) + if error: + raise HTTPException(status_code=400, detail=error) + + return {"scaled": scale.scale} diff --git a/backend/dockerman.py b/backend/dockerman.py index ec1a6bb1..98aef8f0 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -15,10 +15,10 @@ from tempfile import NamedTemporaryFile import aiodocker import aioprocessing -from crawls import Crawl - from scheduler import run_scheduler +from crawls import Crawl + # ============================================================================ class DockerManager: @@ -260,6 +260,10 @@ class DockerManager: return crawl + async def scale_crawl(self): # job_name, aid, parallelism=1): + """ Scale running crawl, currently only supported in k8s""" + return "Not Supported" + async def delete_crawl_config_by_id(self, cid): """ Delete Crawl Config by Crawl Config Id""" await self._delete_volume_by_labels([f"btrix.crawlconfig={cid}"]) @@ -346,7 +350,13 @@ class DockerManager: # pylint: disable=too-many-arguments async def _run_crawl_now(self, storage, labels, volume, schedule="", manual=True): # Set Run Config - command = ["crawl", "--config", "/tmp/crawlconfig/crawl-config.json"] + command = [ + "crawl", + "--config", + "/tmp/crawlconfig/crawl-config.json", + "--redisStoreUrl", + "redis://redis:6379/0", + ] if self.extra_crawl_params: command += self.extra_crawl_params diff --git a/backend/k8sman.py b/backend/k8sman.py index af7ce214..dd1d8df5 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -12,8 +12,9 @@ from crawls import Crawl # ============================================================================ -DEFAULT_NAMESPACE = os.environ.get("CRAWLER_NAMESPACE") or "crawlers" +CRAWLER_NAMESPACE = os.environ.get("CRAWLER_NAMESPACE") or "crawlers" +# an 2/31 schedule that will never run as empty is not allowed DEFAULT_NO_SCHEDULE = "* * 31 2 *" @@ -22,7 +23,7 @@ class K8SManager: # pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments """K8SManager, manager creation of k8s resources from crawl api requests""" - def __init__(self, extra_crawl_params=None, namespace=DEFAULT_NAMESPACE): + def __init__(self, namespace=CRAWLER_NAMESPACE): config.load_incluster_config() self.crawl_ops = None @@ -33,7 +34,7 @@ class K8SManager: self.batch_beta_api = client.BatchV1beta1Api() self.namespace = namespace - self.extra_crawl_params = extra_crawl_params or [] + print(self.namespace, flush=True) self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image_pull_policy = "IfNotPresent" @@ -138,7 +139,11 @@ class K8SManager: suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) job_template = self._get_job_template( - cid, labels, annotations, crawlconfig.crawlTimeout, self.extra_crawl_params + cid, + labels, + annotations, + crawlconfig.crawlTimeout, + crawlconfig.parallel, ) spec = client.V1beta1CronJobSpec( @@ -286,6 +291,31 @@ class K8SManager: return result + async def scale_crawl(self, job_name, aid, parallelism=1): + """ Set the crawl scale (job parallelism) on the specified job """ + + try: + job = await self.batch_api.read_namespaced_job( + name=job_name, namespace=self.namespace + ) + # pylint: disable=broad-except + except Exception: + return "Crawl not found" + + if not job or job.metadata.labels["btrix.archive"] != aid: + return "Invalid Crawled" + + if parallelism < 1 or parallelism > 10: + return "Invalid Scale: Must be between 1 and 10" + + job.spec.parallelism = parallelism + + await self.batch_api.patch_namespaced_job( + name=job.metadata.name, namespace=self.namespace, body=job + ) + + return None + async def delete_crawl_configs_for_archive(self, archive): """Delete all crawl configs for given archive""" return await self._delete_crawl_configs(f"btrix.archive={archive}") @@ -307,9 +337,12 @@ class K8SManager: crawl = self._make_crawl_for_job(job, reason, True) - await self.crawl_ops.store_crawl(crawl) + # if update succeeds, than crawl has not completed, so likely a failure + failure = await self.crawl_ops.store_crawl(crawl) - await self._delete_job(job_name) + # keep failed jobs around, for now + if not failure: + await self._delete_job(job_name) # ======================================================================== # Internal Methods @@ -322,6 +355,7 @@ class K8SManager: return Crawl( id=job.metadata.name, state=state, + scale=job.spec.parallelism or 1, user=job.metadata.labels["btrix.user"], aid=job.metadata.labels["btrix.archive"], cid=job.metadata.labels["btrix.crawlconfig"], @@ -455,16 +489,9 @@ class K8SManager: body=job, namespace=self.namespace ) - def _get_job_template( - self, uid, labels, annotations, crawl_timeout, extra_crawl_params - ): + def _get_job_template(self, uid, labels, annotations, crawl_timeout, parallel): """Return crawl job template for crawl job, including labels, adding optiona crawl params""" - command = ["crawl", "--config", "/tmp/crawl-config.json"] - - if extra_crawl_params: - command += extra_crawl_params - requests_memory = "256M" limit_memory = "1G" @@ -486,6 +513,7 @@ class K8SManager: "metadata": {"annotations": annotations}, "spec": { "backoffLimit": self.crawl_retries, + "parallelism": parallel, "template": { "metadata": {"labels": labels}, "spec": { @@ -494,7 +522,11 @@ class K8SManager: "name": "crawler", "image": self.crawler_image, "imagePullPolicy": "Never", - "command": command, + "command": [ + "crawl", + "--config", + "/tmp/crawl-config.json", + ], "volumeMounts": [ { "name": "crawl-config", @@ -504,7 +536,8 @@ class K8SManager: } ], "envFrom": [ - {"secretRef": {"name": f"crawl-secret-{uid}"}} + {"configMapRef": {"name": "shared-crawler-config"}}, + {"secretRef": {"name": f"crawl-secret-{uid}"}}, ], "env": [ { diff --git a/backend/main.py b/backend/main.py index a82b5bbc..174c8b99 100644 --- a/backend/main.py +++ b/backend/main.py @@ -9,11 +9,11 @@ from fastapi import FastAPI, Request, HTTPException from db import init_db +from emailsender import EmailSender from users import init_users_api, UserDB from archives import init_archives_api from crawlconfigs import init_crawl_config_api from crawls import init_crawls_api -from emailsender import EmailSender app = FastAPI() @@ -38,14 +38,6 @@ class BrowsertrixAPI: self.email = EmailSender() self.crawl_manager = None - self.default_crawl_params = [ - "--timeout", - "90", - "--logging", - "behaviors,debug", - "--generateWACZ", - ] - self.mdb = init_db() self.fastapi_users = init_users_api( @@ -66,13 +58,11 @@ class BrowsertrixAPI: if os.environ.get("KUBERNETES_SERVICE_HOST"): from k8sman import K8SManager - self.crawl_manager = K8SManager(self.default_crawl_params) + self.crawl_manager = K8SManager() else: from dockerman import DockerManager - self.crawl_manager = DockerManager( - self.archive_ops, self.default_crawl_params - ) + self.crawl_manager = DockerManager(self.archive_ops) self.crawl_config_ops = init_crawl_config_api( self.mdb, @@ -84,6 +74,7 @@ class BrowsertrixAPI: init_crawls_api( self.app, self.mdb, + os.environ.get("REDIS_URL"), self.crawl_manager, self.crawl_config_ops, self.archive_ops, diff --git a/backend/requirements.txt b/backend/requirements.txt index b58e21bd..48edb2c3 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -6,3 +6,4 @@ kubernetes-asyncio aiodocker apscheduler aioprocessing +aioredis diff --git a/chart/templates/main.yaml b/chart/templates/main.yaml index f8d63975..89784d86 100644 --- a/chart/templates/main.yaml +++ b/chart/templates/main.yaml @@ -1,51 +1,10 @@ ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: {{ .Values.name }}-env-config - namespace: {{ .Release.Namespace }} - -data: - MONGO_HOST: {{ .Values.mongo_host }} - - CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }} - CRAWLER_IMAGE: {{ .Values.crawler_image }} - - CRAWL_TIMEOUT: "{{ .Values.crawl_timeout }}" - CRAWL_RETRIES: "{{ .Values.crawl_retries }}" - - ---- -apiVersion: v1 -kind: Secret -metadata: - name: storage-auth - namespace: {{ .Release.Namespace }} - -type: Opaque -stringData: - PASSWORD_SECRET: "{{ .Values.api_password_secret }}" - -{{- if .Values.minio_local }} - MINIO_ROOT_USER: "{{ .Values.storage.access_key }}" - MINIO_ROOT_PASSWORD: "{{ .Values.storage.secret_key }}" - - MC_HOST: "{{ .Values.minio_scheme }}://{{ .Values.storage.access_key }}:{{ .Values.storage.secret_key }}@{{ .Values.minio_host }}" -{{- end }} - - STORE_ACCESS_KEY: "{{ .Values.storage.access_key }}" - STORE_SECRET_KEY: "{{ .Values.storage.secret_key }}" - STORE_ENDPOINT_URL: "{{ .Values.storage.endpoint }}" - #S3_FORCE_PATH_STYLE: "{{ .Values.storage.force_path_style | quote }}" - - - --- apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Values.name }} namespace: {{ .Release.Namespace }} + spec: selector: matchLabels: @@ -56,6 +15,10 @@ spec: labels: app: {{ .Values.name }} + annotations: + # force helm to update the deployment each time + "helm.update": {{ randAlphaNum 5 | quote }} + spec: {{- if .Values.minio_local }} diff --git a/chart/templates/namespaces.yaml b/chart/templates/namespaces.yaml index e5747d93..fb4bc2de 100644 --- a/chart/templates/namespaces.yaml +++ b/chart/templates/namespaces.yaml @@ -4,3 +4,5 @@ metadata: name: {{ .Values.crawler_namespace }} labels: release: {{ .Release.Name }} + annotations: + "helm.sh/resource-policy": keep diff --git a/chart/values.yaml b/chart/values.yaml index 53599f2b..f7a0072d 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -33,6 +33,17 @@ mongo_auth: password: example +# Redis Image +# ========================================= +redis_local: true + +redis_image: "redis" +redis_pull_policy: "IfNotPresent" + +redis_url: "redis://local-redis.default:6379/1" + + + # Crawler Image # ========================================= @@ -44,6 +55,10 @@ crawler_namespace: "crawlers" # num retries crawl_retries: 1 +# browsertrix-crawler args: +crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ" + + # Storage # ========================================= diff --git a/docker-compose.yml b/docker-compose.yml index 9805fe92..89dc8700 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,12 +16,21 @@ services: depends_on: - minio - mongo - - scheduler + + redis: + image: redis + command: redis-server --appendonly yes + + ports: + - 6379:6379 + + volumes: + - btrix-redis-data:/data mongo: image: mongo volumes: - - mongodata:/data/db + - btrix-mongo-data:/data/db env_file: - ./config.env @@ -34,7 +43,7 @@ services: - 9001:9001 volumes: - - miniodata:/data + - btrix-minio-data:/data env_file: - ./config.env @@ -52,8 +61,9 @@ services: volumes: - mongodata: - miniodata: + btrix-redis-data: + btrix-mongo-data: + btrix-minio-data: networks: