diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py new file mode 100644 index 00000000..4ce39f3a --- /dev/null +++ b/backend/crawlconfigs.py @@ -0,0 +1,237 @@ +""" +Crawl Config API handling +""" + +from typing import List, Union, Optional +from enum import Enum +import uuid + +from pydantic import BaseModel +from fastapi import APIRouter, Depends, HTTPException + +from users import User +from archives import Archive + +from db import BaseMongoModel + + +# ============================================================================ +class ScopeType(str, Enum): + """Crawl scope type""" + + PAGE = "page" + PAGE_SPA = "page-spa" + PREFIX = "prefix" + HOST = "host" + ANY = "any" + + +# ============================================================================ +class Seed(BaseModel): + """Crawl seed""" + + url: str + scopeType: Optional[ScopeType] = ScopeType.PREFIX + + include: Union[str, List[str], None] + exclude: Union[str, List[str], None] + sitemap: Union[bool, str, None] + allowHash: Optional[bool] + depth: Optional[int] + + +# ============================================================================ +class RawCrawlConfig(BaseModel): + """Base Crawl Config""" + + seeds: List[Union[str, Seed]] + + collection: Optional[str] = "my-web-archive" + + scopeType: Optional[ScopeType] = ScopeType.PREFIX + scope: Union[str, List[str], None] = "" + exclude: Union[str, List[str], None] = "" + + depth: Optional[int] = -1 + limit: Optional[int] = 0 + + behaviorTimeout: Optional[int] = 90 + + workers: Optional[int] = 1 + + headless: Optional[bool] = False + + generateWACZ: Optional[bool] = False + combineWARC: Optional[bool] = False + + logging: Optional[str] = "" + behaviors: Optional[str] = "autoscroll" + + +# ============================================================================ +class CrawlConfigIn(BaseModel): + """CrawlConfig input model, submitted via API""" + + schedule: Optional[str] = "" + runNow: Optional[bool] = False + + # storageName: Optional[str] = "default" + + config: RawCrawlConfig + + +# ============================================================================ +class CrawlConfig(BaseMongoModel): + """Schedulable config""" + + schedule: Optional[str] = "" + runNow: Optional[bool] = False + + # storageName: Optional[str] = "default" + + archive: Optional[str] + + config: RawCrawlConfig + + +# ============================================================================ +class CrawlOps: + """Crawl Config Operations""" + + def __init__(self, mdb, archive_ops, crawl_manager): + self.crawl_configs = mdb["crawl_configs"] + self.archive_ops = archive_ops + self.crawl_manager = crawl_manager + self.default_crawl_params = [ + "--timeout", + "90", + "--logging", + "behaviors,debug", + "--generateWACZ", + ] + + self.router = APIRouter( + prefix="/crawlconfigs", + tags=["crawlconfigs"], + responses={404: {"description": "Not found"}}, + ) + + async def add_crawl_config( + self, config: CrawlConfigIn, archive: Archive, user: User + ): + """Add new crawl config""" + data = config.dict() + data["archive"] = archive.id + data["user"] = user.id + data["_id"] = str(uuid.uuid4()) + + result = await self.crawl_configs.insert_one(data) + + crawlconfig = CrawlConfig.from_dict(data) + + await self.crawl_manager.add_crawl_config( + userid=str(user.id), + aid=str(archive.id), + storage=archive.storage, + crawlconfig=crawlconfig, + extra_crawl_params=self.default_crawl_params, + ) + return result + + async def get_crawl_configs(self, archive: Archive): + """Get all crawl configs for an archive is a member of""" + cursor = self.crawl_configs.find({"archive": archive.id}) + results = await cursor.to_list(length=1000) + return [CrawlConfig.from_dict(res) for res in results] + + async def get_crawl_config(self, cid: str, archive: Archive): + """Get an archive for user by unique id""" + res = await self.crawl_configs.find_one({"_id": cid, "archive": archive.id}) + return CrawlConfig.from_dict(res) + + async def delete_crawl_config(self, cid: str, archive: Archive): + """Delete config""" + await self.crawl_manager.delete_crawl_config_by_id(cid) + + return await self.crawl_configs.delete_one({"_id": cid, "archive": archive.id}) + + async def delete_crawl_configs(self, archive: Archive): + """Delete all crawl configs for user""" + await self.crawl_manager.delete_crawl_configs_for_archive(archive.id) + + return await self.crawl_configs.delete_many({"archive": archive.id}) + + +# ============================================================================ +# pylint: disable=redefined-builtin,invalid-name +def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): + """Init /crawlconfigs api routes""" + ops = CrawlOps(mdb, archive_ops, crawl_manager) + + router = ops.router + + archive_dep = archive_ops.archive_dep + + async def crawls_dep(cid: str, archive: Archive = Depends(archive_dep)): + crawl_config = await ops.get_crawl_config(cid, archive) + if not crawl_config: + raise HTTPException( + status_code=404, detail=f"Crawl Config '{cid}' not found" + ) + + return archive + + @router.get("") + async def get_crawl_configs(archive: Archive = Depends(archive_dep)): + results = await ops.get_crawl_configs(archive) + return {"crawl_configs": [res.serialize() for res in results]} + + @router.get("/{cid}") + async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)): + return crawl_config.serialize() + + @router.post("/") + async def add_crawl_config( + config: CrawlConfigIn, + archive: Archive = Depends(archive_dep), + user: User = Depends(user_dep), + ): + + if not archive.is_crawler(user): + raise HTTPException( + status_code=403, detail="User does not have permission to modify crawls" + ) + + res = await ops.add_crawl_config(config, archive, user) + return {"added": str(res.inserted_id)} + + @router.delete("") + async def delete_crawl_configs( + archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) + ): + if not archive.is_crawler(user): + raise HTTPException( + status_code=403, detail="User does not have permission to modify crawls" + ) + + result = await ops.delete_crawl_configs(archive) + return {"deleted": result.deleted_count} + + @router.delete("/{id}") + async def delete_crawl_config( + id: str, archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) + ): + if not archive.is_crawler(user): + raise HTTPException( + status_code=403, detail="User does not have permission to modify crawls" + ) + + result = await ops.delete_crawl_config(id, archive) + if not result or not result.deleted_count: + raise HTTPException(status_code=404, detail="Crawl Config Not Found") + + return {"deleted": 1} + + archive_ops.router.include_router(router) + + return ops diff --git a/backend/crawls.py b/backend/crawls.py index b3c58afe..8420b3f3 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -1,246 +1,42 @@ -""" -Crawl Config API handling -""" +""" Crawl API """ -from typing import List, Union, Optional -from enum import Enum -import uuid +from typing import Optional +from datetime import datetime from pydantic import BaseModel -from fastapi import APIRouter, Depends, HTTPException - -from users import User -from archives import Archive - -from db import BaseMongoModel # ============================================================================ -class ScopeType(str, Enum): - """Crawl scope type""" +class CrawlComplete(BaseModel): + """ Store State of Completed Crawls """ + id: str - PAGE = "page" - PAGE_SPA = "page-spa" - PREFIX = "prefix" - HOST = "host" - ANY = "any" + user: str + aid: Optional[str] + cid: Optional[str] - -# ============================================================================ -class Seed(BaseModel): - """Crawl seed""" - - url: str - scopeType: Optional[ScopeType] = ScopeType.PREFIX - - include: Union[str, List[str], None] - exclude: Union[str, List[str], None] - sitemap: Union[bool, str, None] - allowHash: Optional[bool] - depth: Optional[int] - - -# ============================================================================ -class RawCrawlConfig(BaseModel): - """Base Crawl Config""" - - seeds: List[Union[str, Seed]] - - collection: Optional[str] = "my-web-archive" - - scopeType: Optional[ScopeType] = ScopeType.PREFIX - scope: Union[str, List[str], None] = "" - exclude: Union[str, List[str], None] = "" - - depth: Optional[int] = -1 - limit: Optional[int] = 0 - - behaviorTimeout: Optional[int] = 90 - - workers: Optional[int] = 1 - - headless: Optional[bool] = False - - generateWACZ: Optional[bool] = False - combineWARC: Optional[bool] = False - - logging: Optional[str] = "" - behaviors: Optional[str] = "autoscroll" - - -# ============================================================================ -class CrawlConfigIn(BaseModel): - """CrawlConfig input model, submitted via API""" - - schedule: Optional[str] = "" - runNow: Optional[bool] = False - - # storageName: Optional[str] = "default" - - config: RawCrawlConfig - - -# ============================================================================ -class CrawlConfig(BaseMongoModel): - """Schedulable config""" - - schedule: Optional[str] = "" - runNow: Optional[bool] = False - - # storageName: Optional[str] = "default" - - archive: Optional[str] - - config: RawCrawlConfig - - -# ============================================================================ -class CrawlCompleteMsg(BaseModel): - filename: Optional[str] - user: Optional[str] - crawl: Optional[str] + filename: str size: int hash: str - -# ============================================================================ -class CrawlOps: - """Crawl Config Operations""" - - def __init__(self, mdb, archive_ops, crawl_manager): - self.crawl_configs = mdb["crawl_configs"] - self.archive_ops = archive_ops - self.crawl_manager = crawl_manager - self.default_crawl_params = [ - "--timeout", - "90", - "--logging", - "behaviors,debug", - "--generateWACZ", - ] - - self.router = APIRouter( - prefix="/crawlconfigs", - tags=["crawlconfigs"], - responses={404: {"description": "Not found"}}, - ) - - async def add_crawl_config( - self, config: CrawlConfigIn, archive: Archive, user: User - ): - """Add new crawl config""" - data = config.dict() - data["archive"] = archive.id - data["user"] = user.id - data["_id"] = str(uuid.uuid4()) - - result = await self.crawl_configs.insert_one(data) - - crawlconfig = CrawlConfig.from_dict(data) - - await self.crawl_manager.add_crawl_config( - userid=str(user.id), - aid=str(archive.id), - storage=archive.storage, - crawlconfig=crawlconfig, - extra_crawl_params=self.default_crawl_params, - ) - return result - - async def get_crawl_configs(self, archive: Archive): - """Get all crawl configs for an archive is a member of""" - cursor = self.crawl_configs.find({"archive": archive.id}) - results = await cursor.to_list(length=1000) - return [CrawlConfig.from_dict(res) for res in results] - - async def get_crawl_config(self, cid: str, archive: Archive): - """Get an archive for user by unique id""" - res = await self.crawl_configs.find_one({"_id": cid, "archive": archive.id}) - return CrawlConfig.from_dict(res) - - async def delete_crawl_config(self, cid: str, archive: Archive): - """Delete config""" - await self.crawl_manager.delete_crawl_config_by_id(cid) - - return await self.crawl_configs.delete_one({"_id": cid, "archive": archive.id}) - - async def delete_crawl_configs(self, archive: Archive): - """Delete all crawl configs for user""" - await self.crawl_manager.delete_crawl_configs_for_archive(archive.id) - - return await self.crawl_configs.delete_many({"archive": archive.id}) + created: Optional[datetime] + finished: Optional[datetime] # ============================================================================ -# pylint: disable=redefined-builtin,invalid-name -def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager): - """Init /crawlconfigs api routes""" - ops = CrawlOps(mdb, archive_ops, crawl_manager) +def init_crawls_api(app, crawl_manager): + """ API for crawl management, including crawl done callback""" - router = ops.router + async def on_handle_crawl_complete(msg: CrawlComplete): + data = await crawl_manager.validate_crawl_data(msg) + if data: + print(msg) + else: + print("Not a valid crawl complete msg!") - archive_dep = archive_ops.archive_dep - - async def crawls_dep(cid: str, archive: Archive = Depends(archive_dep)): - crawl_config = await ops.get_crawl_config(cid, archive) - if not crawl_config: - raise HTTPException( - status_code=404, detail=f"Crawl Config '{cid}' not found" - ) - - return archive - - @router.get("") - async def get_crawl_configs(archive: Archive = Depends(archive_dep)): - results = await ops.get_crawl_configs(archive) - return {"crawl_configs": [res.serialize() for res in results]} - - @router.get("/{cid}") - async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)): - return crawl_config.serialize() - - @router.post("/") - async def add_crawl_config( - config: CrawlConfigIn, - archive: Archive = Depends(archive_dep), - user: User = Depends(user_dep), - ): - - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - - res = await ops.add_crawl_config(config, archive, user) - return {"added": str(res.inserted_id)} - - @router.delete("") - async def delete_crawl_configs( - archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) - ): - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - - result = await ops.delete_crawl_configs(archive) - return {"deleted": result.deleted_count} - - @router.delete("/{id}") - async def delete_crawl_config( - id: str, archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) - ): - if not archive.is_crawler(user): - raise HTTPException( - status_code=403, detail="User does not have permission to modify crawls" - ) - - result = await ops.delete_crawl_config(id, archive) - if not result or not result.deleted_count: - raise HTTPException(status_code=404, detail="Crawl Config Not Found") - - return {"deleted": 1} - - archive_ops.router.include_router(router) - - return ops + @app.post("/crawls/done") + async def webhook(msg: CrawlComplete): + #background_tasks.add_task(on_handle_crawl_complete, msg) + #asyncio.ensure_future(on_handle_crawl_complete(msg)) + await on_handle_crawl_complete(msg) + return {"message": "webhook received"} diff --git a/backend/k8sman.py b/backend/k8sman.py index cffb2a70..1aa06931 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -1,7 +1,7 @@ """ K8s support""" import os - +import datetime import json from kubernetes_asyncio import client, config @@ -30,18 +30,20 @@ class K8SManager: self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image_pull_policy = "IfNotPresent" - async def validate_crawl_data(self, data): - pod = await self.core_api.read_namespaced_pod(data.crawl, self.namespace) + async def validate_crawl_data(self, crawlcomplete): + """ Ensure the crawlcomplete data is valid (pod exists and user matches) + Fill in additional details about the crawl """ + pod = await self.core_api.read_namespaced_pod(name=crawlcomplete.id, namespace=self.namespace) - if not pod or pod.metadata.labels["btrix.user"] != data.user: - return None + if not pod or pod.metadata.labels["btrix.user"] != crawlcomplete.user: + return False - result = {} - data.crawl = pod.metadata.labels["job-name"] - result["created"] = pod.metadata.creation_timestamp - result["archive"] = pod.metadata.labels["btrix.archive"] - result["crawlconfig"] = pod.metadata.labels["btrix.crawlconfig"] - return result + crawlcomplete.id = pod.metadata.labels["job-name"] + crawlcomplete.created = pod.metadata.creation_timestamp + crawlcomplete.aid = pod.metadata.labels["btrix.archive"] + crawlcomplete.cid = pod.metadata.labels["btrix.crawlconfig"] + crawlcomplete.finished = datetime.datetime.utcnow() + return True async def add_crawl_config( self, @@ -93,7 +95,7 @@ class K8SManager: "STORE_ENDPOINT_URL": endpoint_with_coll_url, "STORE_ACCESS_KEY": storage.access_key, "STORE_SECRET_KEY": storage.secret_key, - "WEBHOOK_URL": "http://browsertrix-cloud.default:8000/crawldone", + "WEBHOOK_URL": "http://browsertrix-cloud.default:8000/crawls/done", }, ) diff --git a/backend/main.py b/backend/main.py index 4e773ac5..217d79cc 100644 --- a/backend/main.py +++ b/backend/main.py @@ -4,15 +4,15 @@ supports docker and kubernetes based deployments of multiple browsertrix-crawler """ import os -import asyncio -from fastapi import FastAPI, Request, HTTPException, BackgroundTasks +from fastapi import FastAPI, Request, HTTPException from db import init_db from users import init_users_api, UserDB from archives import init_archives_api -from crawls import init_crawl_config_api, CrawlCompleteMsg +from crawlconfigs import init_crawl_config_api +from crawls import init_crawls_api from emailsender import EmailSender app = FastAPI() @@ -72,31 +72,14 @@ class BrowsertrixAPI: self.crawl_manager, ) + init_crawls_api(self.app, self.crawl_manager) + self.app.include_router(self.archive_ops.router) # @app.get("/") # async def root(): # return {"message": "Hello World"} - async def on_handle_crawl_complete(msg: CrawlCompleteMsg): - print("crawl complete started") - try: - data = await self.crawl_manager.validate_crawl_data(msg) - if data: - data.update(msg.dict()) - print(data) - else: - print("Not a valid crawl complete msg!") - except Exception as e: - print(e) - - @app.post("/crawldone") - async def webhook(msg: CrawlCompleteMsg, background_tasks: BackgroundTasks): - #background_tasks.add_task(on_handle_crawl_complete, msg) - #asyncio.ensure_future(on_handle_crawl_complete(msg)) - await on_handle_crawl_complete(msg) - return {"message": "webhook received"} - # pylint: disable=no-self-use, unused-argument async def on_after_register(self, user: UserDB, request: Request):