rename crawls -> crawlconfigs.py

add crawls for crawl api management
This commit is contained in:
Ilya Kreymer 2021-08-20 12:09:28 -07:00
parent f2d9d7ba6a
commit 170958be37
4 changed files with 282 additions and 264 deletions

237
backend/crawlconfigs.py Normal file
View File

@ -0,0 +1,237 @@
"""
Crawl Config API handling
"""
from typing import List, Union, Optional
from enum import Enum
import uuid
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException
from users import User
from archives import Archive
from db import BaseMongoModel
# ============================================================================
class ScopeType(str, Enum):
"""Crawl scope type"""
PAGE = "page"
PAGE_SPA = "page-spa"
PREFIX = "prefix"
HOST = "host"
ANY = "any"
# ============================================================================
class Seed(BaseModel):
"""Crawl seed"""
url: str
scopeType: Optional[ScopeType] = ScopeType.PREFIX
include: Union[str, List[str], None]
exclude: Union[str, List[str], None]
sitemap: Union[bool, str, None]
allowHash: Optional[bool]
depth: Optional[int]
# ============================================================================
class RawCrawlConfig(BaseModel):
"""Base Crawl Config"""
seeds: List[Union[str, Seed]]
collection: Optional[str] = "my-web-archive"
scopeType: Optional[ScopeType] = ScopeType.PREFIX
scope: Union[str, List[str], None] = ""
exclude: Union[str, List[str], None] = ""
depth: Optional[int] = -1
limit: Optional[int] = 0
behaviorTimeout: Optional[int] = 90
workers: Optional[int] = 1
headless: Optional[bool] = False
generateWACZ: Optional[bool] = False
combineWARC: Optional[bool] = False
logging: Optional[str] = ""
behaviors: Optional[str] = "autoscroll"
# ============================================================================
class CrawlConfigIn(BaseModel):
"""CrawlConfig input model, submitted via API"""
schedule: Optional[str] = ""
runNow: Optional[bool] = False
# storageName: Optional[str] = "default"
config: RawCrawlConfig
# ============================================================================
class CrawlConfig(BaseMongoModel):
"""Schedulable config"""
schedule: Optional[str] = ""
runNow: Optional[bool] = False
# storageName: Optional[str] = "default"
archive: Optional[str]
config: RawCrawlConfig
# ============================================================================
class CrawlOps:
"""Crawl Config Operations"""
def __init__(self, mdb, archive_ops, crawl_manager):
self.crawl_configs = mdb["crawl_configs"]
self.archive_ops = archive_ops
self.crawl_manager = crawl_manager
self.default_crawl_params = [
"--timeout",
"90",
"--logging",
"behaviors,debug",
"--generateWACZ",
]
self.router = APIRouter(
prefix="/crawlconfigs",
tags=["crawlconfigs"],
responses={404: {"description": "Not found"}},
)
async def add_crawl_config(
self, config: CrawlConfigIn, archive: Archive, user: User
):
"""Add new crawl config"""
data = config.dict()
data["archive"] = archive.id
data["user"] = user.id
data["_id"] = str(uuid.uuid4())
result = await self.crawl_configs.insert_one(data)
crawlconfig = CrawlConfig.from_dict(data)
await self.crawl_manager.add_crawl_config(
userid=str(user.id),
aid=str(archive.id),
storage=archive.storage,
crawlconfig=crawlconfig,
extra_crawl_params=self.default_crawl_params,
)
return result
async def get_crawl_configs(self, archive: Archive):
"""Get all crawl configs for an archive is a member of"""
cursor = self.crawl_configs.find({"archive": archive.id})
results = await cursor.to_list(length=1000)
return [CrawlConfig.from_dict(res) for res in results]
async def get_crawl_config(self, cid: str, archive: Archive):
"""Get an archive for user by unique id"""
res = await self.crawl_configs.find_one({"_id": cid, "archive": archive.id})
return CrawlConfig.from_dict(res)
async def delete_crawl_config(self, cid: str, archive: Archive):
"""Delete config"""
await self.crawl_manager.delete_crawl_config_by_id(cid)
return await self.crawl_configs.delete_one({"_id": cid, "archive": archive.id})
async def delete_crawl_configs(self, archive: Archive):
"""Delete all crawl configs for user"""
await self.crawl_manager.delete_crawl_configs_for_archive(archive.id)
return await self.crawl_configs.delete_many({"archive": archive.id})
# ============================================================================
# pylint: disable=redefined-builtin,invalid-name
def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
"""Init /crawlconfigs api routes"""
ops = CrawlOps(mdb, archive_ops, crawl_manager)
router = ops.router
archive_dep = archive_ops.archive_dep
async def crawls_dep(cid: str, archive: Archive = Depends(archive_dep)):
crawl_config = await ops.get_crawl_config(cid, archive)
if not crawl_config:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
return archive
@router.get("")
async def get_crawl_configs(archive: Archive = Depends(archive_dep)):
results = await ops.get_crawl_configs(archive)
return {"crawl_configs": [res.serialize() for res in results]}
@router.get("/{cid}")
async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)):
return crawl_config.serialize()
@router.post("/")
async def add_crawl_config(
config: CrawlConfigIn,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
res = await ops.add_crawl_config(config, archive, user)
return {"added": str(res.inserted_id)}
@router.delete("")
async def delete_crawl_configs(
archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
result = await ops.delete_crawl_configs(archive)
return {"deleted": result.deleted_count}
@router.delete("/{id}")
async def delete_crawl_config(
id: str, archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
result = await ops.delete_crawl_config(id, archive)
if not result or not result.deleted_count:
raise HTTPException(status_code=404, detail="Crawl Config Not Found")
return {"deleted": 1}
archive_ops.router.include_router(router)
return ops

View File

@ -1,246 +1,42 @@
"""
Crawl Config API handling
"""
""" Crawl API """
from typing import List, Union, Optional
from enum import Enum
import uuid
from typing import Optional
from datetime import datetime
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException
from users import User
from archives import Archive
from db import BaseMongoModel
# ============================================================================
class ScopeType(str, Enum):
"""Crawl scope type"""
class CrawlComplete(BaseModel):
""" Store State of Completed Crawls """
id: str
PAGE = "page"
PAGE_SPA = "page-spa"
PREFIX = "prefix"
HOST = "host"
ANY = "any"
user: str
aid: Optional[str]
cid: Optional[str]
# ============================================================================
class Seed(BaseModel):
"""Crawl seed"""
url: str
scopeType: Optional[ScopeType] = ScopeType.PREFIX
include: Union[str, List[str], None]
exclude: Union[str, List[str], None]
sitemap: Union[bool, str, None]
allowHash: Optional[bool]
depth: Optional[int]
# ============================================================================
class RawCrawlConfig(BaseModel):
"""Base Crawl Config"""
seeds: List[Union[str, Seed]]
collection: Optional[str] = "my-web-archive"
scopeType: Optional[ScopeType] = ScopeType.PREFIX
scope: Union[str, List[str], None] = ""
exclude: Union[str, List[str], None] = ""
depth: Optional[int] = -1
limit: Optional[int] = 0
behaviorTimeout: Optional[int] = 90
workers: Optional[int] = 1
headless: Optional[bool] = False
generateWACZ: Optional[bool] = False
combineWARC: Optional[bool] = False
logging: Optional[str] = ""
behaviors: Optional[str] = "autoscroll"
# ============================================================================
class CrawlConfigIn(BaseModel):
"""CrawlConfig input model, submitted via API"""
schedule: Optional[str] = ""
runNow: Optional[bool] = False
# storageName: Optional[str] = "default"
config: RawCrawlConfig
# ============================================================================
class CrawlConfig(BaseMongoModel):
"""Schedulable config"""
schedule: Optional[str] = ""
runNow: Optional[bool] = False
# storageName: Optional[str] = "default"
archive: Optional[str]
config: RawCrawlConfig
# ============================================================================
class CrawlCompleteMsg(BaseModel):
filename: Optional[str]
user: Optional[str]
crawl: Optional[str]
filename: str
size: int
hash: str
# ============================================================================
class CrawlOps:
"""Crawl Config Operations"""
def __init__(self, mdb, archive_ops, crawl_manager):
self.crawl_configs = mdb["crawl_configs"]
self.archive_ops = archive_ops
self.crawl_manager = crawl_manager
self.default_crawl_params = [
"--timeout",
"90",
"--logging",
"behaviors,debug",
"--generateWACZ",
]
self.router = APIRouter(
prefix="/crawlconfigs",
tags=["crawlconfigs"],
responses={404: {"description": "Not found"}},
)
async def add_crawl_config(
self, config: CrawlConfigIn, archive: Archive, user: User
):
"""Add new crawl config"""
data = config.dict()
data["archive"] = archive.id
data["user"] = user.id
data["_id"] = str(uuid.uuid4())
result = await self.crawl_configs.insert_one(data)
crawlconfig = CrawlConfig.from_dict(data)
await self.crawl_manager.add_crawl_config(
userid=str(user.id),
aid=str(archive.id),
storage=archive.storage,
crawlconfig=crawlconfig,
extra_crawl_params=self.default_crawl_params,
)
return result
async def get_crawl_configs(self, archive: Archive):
"""Get all crawl configs for an archive is a member of"""
cursor = self.crawl_configs.find({"archive": archive.id})
results = await cursor.to_list(length=1000)
return [CrawlConfig.from_dict(res) for res in results]
async def get_crawl_config(self, cid: str, archive: Archive):
"""Get an archive for user by unique id"""
res = await self.crawl_configs.find_one({"_id": cid, "archive": archive.id})
return CrawlConfig.from_dict(res)
async def delete_crawl_config(self, cid: str, archive: Archive):
"""Delete config"""
await self.crawl_manager.delete_crawl_config_by_id(cid)
return await self.crawl_configs.delete_one({"_id": cid, "archive": archive.id})
async def delete_crawl_configs(self, archive: Archive):
"""Delete all crawl configs for user"""
await self.crawl_manager.delete_crawl_configs_for_archive(archive.id)
return await self.crawl_configs.delete_many({"archive": archive.id})
created: Optional[datetime]
finished: Optional[datetime]
# ============================================================================
# pylint: disable=redefined-builtin,invalid-name
def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
"""Init /crawlconfigs api routes"""
ops = CrawlOps(mdb, archive_ops, crawl_manager)
def init_crawls_api(app, crawl_manager):
""" API for crawl management, including crawl done callback"""
router = ops.router
async def on_handle_crawl_complete(msg: CrawlComplete):
data = await crawl_manager.validate_crawl_data(msg)
if data:
print(msg)
else:
print("Not a valid crawl complete msg!")
archive_dep = archive_ops.archive_dep
async def crawls_dep(cid: str, archive: Archive = Depends(archive_dep)):
crawl_config = await ops.get_crawl_config(cid, archive)
if not crawl_config:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
return archive
@router.get("")
async def get_crawl_configs(archive: Archive = Depends(archive_dep)):
results = await ops.get_crawl_configs(archive)
return {"crawl_configs": [res.serialize() for res in results]}
@router.get("/{cid}")
async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)):
return crawl_config.serialize()
@router.post("/")
async def add_crawl_config(
config: CrawlConfigIn,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
res = await ops.add_crawl_config(config, archive, user)
return {"added": str(res.inserted_id)}
@router.delete("")
async def delete_crawl_configs(
archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
result = await ops.delete_crawl_configs(archive)
return {"deleted": result.deleted_count}
@router.delete("/{id}")
async def delete_crawl_config(
id: str, archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
result = await ops.delete_crawl_config(id, archive)
if not result or not result.deleted_count:
raise HTTPException(status_code=404, detail="Crawl Config Not Found")
return {"deleted": 1}
archive_ops.router.include_router(router)
return ops
@app.post("/crawls/done")
async def webhook(msg: CrawlComplete):
#background_tasks.add_task(on_handle_crawl_complete, msg)
#asyncio.ensure_future(on_handle_crawl_complete(msg))
await on_handle_crawl_complete(msg)
return {"message": "webhook received"}

View File

@ -1,7 +1,7 @@
""" K8s support"""
import os
import datetime
import json
from kubernetes_asyncio import client, config
@ -30,18 +30,20 @@ class K8SManager:
self.crawler_image = os.environ.get("CRAWLER_IMAGE")
self.crawler_image_pull_policy = "IfNotPresent"
async def validate_crawl_data(self, data):
pod = await self.core_api.read_namespaced_pod(data.crawl, self.namespace)
async def validate_crawl_data(self, crawlcomplete):
""" Ensure the crawlcomplete data is valid (pod exists and user matches)
Fill in additional details about the crawl """
pod = await self.core_api.read_namespaced_pod(name=crawlcomplete.id, namespace=self.namespace)
if not pod or pod.metadata.labels["btrix.user"] != data.user:
return None
if not pod or pod.metadata.labels["btrix.user"] != crawlcomplete.user:
return False
result = {}
data.crawl = pod.metadata.labels["job-name"]
result["created"] = pod.metadata.creation_timestamp
result["archive"] = pod.metadata.labels["btrix.archive"]
result["crawlconfig"] = pod.metadata.labels["btrix.crawlconfig"]
return result
crawlcomplete.id = pod.metadata.labels["job-name"]
crawlcomplete.created = pod.metadata.creation_timestamp
crawlcomplete.aid = pod.metadata.labels["btrix.archive"]
crawlcomplete.cid = pod.metadata.labels["btrix.crawlconfig"]
crawlcomplete.finished = datetime.datetime.utcnow()
return True
async def add_crawl_config(
self,
@ -93,7 +95,7 @@ class K8SManager:
"STORE_ENDPOINT_URL": endpoint_with_coll_url,
"STORE_ACCESS_KEY": storage.access_key,
"STORE_SECRET_KEY": storage.secret_key,
"WEBHOOK_URL": "http://browsertrix-cloud.default:8000/crawldone",
"WEBHOOK_URL": "http://browsertrix-cloud.default:8000/crawls/done",
},
)

View File

@ -4,15 +4,15 @@ supports docker and kubernetes based deployments of multiple browsertrix-crawler
"""
import os
import asyncio
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi import FastAPI, Request, HTTPException
from db import init_db
from users import init_users_api, UserDB
from archives import init_archives_api
from crawls import init_crawl_config_api, CrawlCompleteMsg
from crawlconfigs import init_crawl_config_api
from crawls import init_crawls_api
from emailsender import EmailSender
app = FastAPI()
@ -72,31 +72,14 @@ class BrowsertrixAPI:
self.crawl_manager,
)
init_crawls_api(self.app, self.crawl_manager)
self.app.include_router(self.archive_ops.router)
# @app.get("/")
# async def root():
# return {"message": "Hello World"}
async def on_handle_crawl_complete(msg: CrawlCompleteMsg):
print("crawl complete started")
try:
data = await self.crawl_manager.validate_crawl_data(msg)
if data:
data.update(msg.dict())
print(data)
else:
print("Not a valid crawl complete msg!")
except Exception as e:
print(e)
@app.post("/crawldone")
async def webhook(msg: CrawlCompleteMsg, background_tasks: BackgroundTasks):
#background_tasks.add_task(on_handle_crawl_complete, msg)
#asyncio.ensure_future(on_handle_crawl_complete(msg))
await on_handle_crawl_complete(msg)
return {"message": "webhook received"}
# pylint: disable=no-self-use, unused-argument
async def on_after_register(self, user: UserDB, request: Request):