Crawl Config Editing Support (#141)

* support inactive configs in same collection, configs with `inactive` set to true (#137)
- add `inactive`, `newId`, `oldId` to crawlconfigs
- filter out inactive configs by default for most operations
- add index for aid + inactive field for faster querying
- delete returns status: 'deactivated' or 'deleted'
- if no crawls ran, config can be deleted, otherwise it is deactivated

* update crawl endpoint: add general PATCH crawl config endpoint, support updating schedule and name
This commit is contained in:
Ilya Kreymer 2022-02-17 16:04:07 -08:00 committed by GitHub
parent e9d6c68f6a
commit d05f04be9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 205 additions and 99 deletions

View File

@ -5,8 +5,10 @@ Crawl Config API handling
from typing import List, Union, Optional
from enum import Enum
import uuid
import asyncio
from datetime import datetime
import pymongo
from pydantic import BaseModel, UUID4
from fastapi import APIRouter, Depends, HTTPException
@ -85,6 +87,8 @@ class CrawlConfigIn(BaseModel):
crawlTimeout: Optional[int] = 0
parallel: Optional[int] = 1
oldId: Optional[UUID4]
# ============================================================================
class CrawlConfig(BaseMongoModel):
@ -108,14 +112,18 @@ class CrawlConfig(BaseMongoModel):
userid: UUID4
crawlCount: Optional[int] = 0
lastCrawlId: Optional[str]
lastCrawlTime: Optional[datetime]
lastCrawlState: Optional[str]
newId: Optional[UUID4]
oldId: Optional[UUID4]
inactive: Optional[bool] = False
def get_raw_config(self):
""" serialize config for browsertrix-crawler """
return self.config.dict(
exclude_unset=True, exclude_none=True
)
return self.config.dict(exclude_unset=True, exclude_none=True)
# ============================================================================
@ -134,17 +142,20 @@ class CrawlConfigsResponse(BaseModel):
# ============================================================================
class UpdateSchedule(BaseModel):
""" Update the crawl schedule """
class UpdateScheduleOrName(BaseModel):
""" Update crawl config name or crawl schedule """
schedule: str
name: Optional[str]
schedule: Optional[str]
# ============================================================================
class CrawlOps:
# pylint: disable=too-many-instance-attributes,too-many-arguments
class CrawlConfigOps:
"""Crawl Config Operations"""
def __init__(self, mdb, user_manager, archive_ops, crawl_manager):
def __init__(self, dbclient, mdb, user_manager, archive_ops, crawl_manager):
self.dbclient = dbclient
self.crawl_configs = mdb["crawl_configs"]
self.user_manager = user_manager
self.archive_ops = archive_ops
@ -158,6 +169,14 @@ class CrawlOps:
self.coll_ops = None
asyncio.create_task(self.init_index())
async def init_index(self):
""" init index for crawls db """
await self.crawl_configs.create_index(
[("aid", pymongo.HASHED), ("inactive", pymongo.ASCENDING)]
)
def set_coll_ops(self, coll_ops):
""" set collection ops """
self.coll_ops = coll_ops
@ -177,7 +196,17 @@ class CrawlOps:
archive.id, config.colls
)
result = await self.crawl_configs.insert_one(data)
old_id = data.get("oldId")
if old_id:
old_config = await self.get_crawl_config(old_id, archive)
async with await self.dbclient.start_session() as sesh:
async with sesh.start_transaction():
await self.make_inactive(old_config, data["_id"])
result = await self.crawl_configs.insert_one(data)
else:
result = await self.crawl_configs.insert_one(data)
crawlconfig = CrawlConfig.from_dict(data)
@ -187,24 +216,55 @@ class CrawlOps:
return result, new_name
async def update_crawl_schedule(self, cid: str, update: UpdateSchedule):
""" Update schedule for existing crawl config"""
async def update_crawl_config(self, cid: uuid.UUID, update: UpdateScheduleOrName):
""" Update name and/or schedule for an existing crawl config """
if update.schedule is None and update.name is None:
raise HTTPException(status_code=400, detail="no_update_data")
# update schedule in crawl manager first
if update.schedule is not None:
try:
await self.crawl_manager.update_crawl_schedule(
str(cid), update.schedule
)
except Exception:
# pylint: disable=raise-missing-from
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
# set update query
query = {}
if update.schedule is not None:
query["schedule"] = update.schedule
if update.name is not None:
query["name"] = update.name
# update in db
if not await self.crawl_configs.find_one_and_update(
{"_id": uuid.UUID(cid)}, {"$set": {"schedule": update.schedule}}
{"_id": cid, "inactive": {"$ne": True}}, {"$set": query}
):
return False
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
await self.crawl_manager.update_crawl_schedule(cid, update.schedule)
return True
return {"success": True}
async def inc_crawls(self, cid: uuid.UUID, crawl_id: str, finished: datetime):
async def inc_crawls(
self, cid: uuid.UUID, crawl_id: str, finished: datetime, state: str
):
""" Increment Crawl Counter """
await self.crawl_configs.find_one_and_update(
{"_id": cid},
{"_id": cid, "inactive": {"$ne": True}},
{
"$inc": {"crawlCount": 1},
"$set": {"lastCrawlId": crawl_id, "lastCrawlTime": finished},
"$set": {
"lastCrawlId": crawl_id,
"lastCrawlTime": finished,
"lastCrawlState": state,
},
},
)
@ -212,7 +272,7 @@ class CrawlOps:
"""Get all crawl configs for an archive is a member of"""
cursor = self.crawl_configs.aggregate(
[
{"$match": {"aid": archive.id}},
{"$match": {"aid": archive.id, "inactive": {"$ne": True}}},
{
"$lookup": {
"from": "users",
@ -242,64 +302,118 @@ class CrawlOps:
return CrawlConfigsResponse(crawlConfigs=configs)
async def get_crawl_config_out(self, crawlconfig):
""" Return CrawlConfigOut, including state of currently running crawl"""
async def get_running_crawl(self, crawlconfig: CrawlConfig):
""" Return the id of currently running crawl for this config, if any """
crawls = await self.crawl_manager.list_running_crawls(cid=crawlconfig.id)
out = CrawlConfigOut(**crawlconfig.serialize())
if len(crawls) == 1:
out.currCrawlId = crawls[0].id
return crawls[0].id
user = await self.user_manager.get(crawlconfig.userid)
# pylint: disable=invalid-name
if user:
out.userName = user.name
return None
return out
async def get_crawl_config_out(self, cid: uuid.UUID, archive: Archive):
"""Return CrawlConfigOut, including state of currently running crawl, if active
also include inactive crawl configs"""
async def get_crawl_config(self, cid: uuid.UUID, archive: Archive):
"""Get an archive for user by unique id"""
res = await self.crawl_configs.find_one({"_id": cid, "aid": archive.id})
return CrawlConfig.from_dict(res)
async def delete_crawl_config(self, cid: str, archive: Archive):
"""Delete config"""
await self.crawl_manager.delete_crawl_config_by_id(cid)
return await self.crawl_configs.delete_one({"_id": cid, "aid": archive.id})
# async def delete_crawl_configs(self, archive: Archive):
# """Delete all crawl configs for user"""
# await self.crawl_manager.delete_crawl_configs_for_archive(archive.id_str)
# return await self.crawl_configs.delete_many({"aid": archive.id})
# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals
def init_crawl_config_api(mdb, user_dep, user_manager, archive_ops, crawl_manager):
"""Init /crawlconfigs api routes"""
ops = CrawlOps(mdb, user_manager, archive_ops, crawl_manager)
router = ops.router
archive_crawl_dep = archive_ops.archive_crawl_dep
async def crawls_dep(cid: str, archive: Archive = Depends(archive_crawl_dep)):
crawl_config = await ops.get_crawl_config(uuid.UUID(cid), archive)
if not crawl_config:
crawlconfig = await self.get_crawl_config(
cid, archive, active_only=False, config_cls=CrawlConfigOut
)
if not crawlconfig:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
return crawl_config
if not crawlconfig.inactive:
crawlconfig.currCrawlId = await self.get_running_crawl(crawlconfig)
user = await self.user_manager.get(crawlconfig.userid)
# pylint: disable=invalid-name
if user:
crawlconfig.userName = user.name
return crawlconfig
async def get_crawl_config(
self,
cid: uuid.UUID,
archive: Archive,
active_only: bool = True,
config_cls=CrawlConfig,
):
"""Get an archive for user by unique id"""
query = {"_id": cid, "aid": archive.id}
if active_only:
query["inactive"] = {"$ne": True}
res = await self.crawl_configs.find_one(query)
return config_cls.from_dict(res)
async def make_inactive(self, crawlconfig: CrawlConfig, new_id: uuid.UUID = None):
"""Delete config, if no crawls ran yet. Otherwise, move to inactive list"""
if new_id:
crawlconfig.newId = new_id
if await self.get_running_crawl(crawlconfig):
raise HTTPException(status_code=400, detail="crawl_running_cant_deactivate")
# set to either "deleted" or "deactivated"
status = None
# if no crawls have been run, actually delete
if not crawlconfig.crawlCount:
result = await self.crawl_configs.delete_one(
{"_id": crawlconfig.id, "aid": crawlconfig.aid}
)
if result.deleted_count != 1:
raise HTTPException(status_code=404, detail="failed_to_delete")
status = "deleted"
else:
if not await self.crawl_configs.find_one_and_update(
{"_id": crawlconfig.id, "inactive": {"$ne": True}},
{"$set": {"inactive": True}},
):
raise HTTPException(status_code=404, detail="failed_to_deactivate")
status = "deactivated"
# delete from crawl manager, but not from db
await self.crawl_manager.delete_crawl_config_by_id(crawlconfig.id)
return status
async def do_make_inactive(self, crawlconfig: CrawlConfig):
""" perform make_inactive in a transaction """
async with await self.dbclient.start_session() as sesh:
async with sesh.start_transaction():
status = await self.make_inactive(crawlconfig)
return {"success": True, "status": status}
# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
def init_crawl_config_api(
dbclient, mdb, user_dep, user_manager, archive_ops, crawl_manager
):
"""Init /crawlconfigs api routes"""
ops = CrawlConfigOps(dbclient, mdb, user_manager, archive_ops, crawl_manager)
router = ops.router
archive_crawl_dep = archive_ops.archive_crawl_dep
@router.get("", response_model=CrawlConfigsResponse)
async def get_crawl_configs(archive: Archive = Depends(archive_crawl_dep)):
return await ops.get_crawl_configs(archive)
@router.get("/{cid}", response_model=CrawlConfigOut)
async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)):
return await ops.get_crawl_config_out(crawl_config)
async def get_crawl_config(cid: str, archive: Archive = Depends(archive_crawl_dep)):
return await ops.get_crawl_config_out(uuid.UUID(cid), archive)
@router.post("/")
async def add_crawl_config(
@ -310,28 +424,20 @@ def init_crawl_config_api(mdb, user_dep, user_manager, archive_ops, crawl_manage
res, new_job_name = await ops.add_crawl_config(config, archive, user)
return {"added": str(res.inserted_id), "run_now_job": new_job_name}
@router.patch("/{cid}/schedule", dependencies=[Depends(archive_crawl_dep)])
async def update_crawl_schedule(
update: UpdateSchedule,
@router.patch("/{cid}", dependencies=[Depends(archive_crawl_dep)])
async def update_crawl_config(
update: UpdateScheduleOrName,
cid: str,
):
return await ops.update_crawl_config(uuid.UUID(cid), update)
success = False
try:
success = await ops.update_crawl_schedule(cid, update)
except Exception as e:
# pylint: disable=raise-missing-from
raise HTTPException(
status_code=403, detail=f"Error updating crawl config: {e}"
)
if not success:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
return {"updated": cid}
# depcreated: to remove in favor of general patch
@router.patch("/{cid}/schedule", dependencies=[Depends(archive_crawl_dep)])
async def update_crawl_schedule(
update: UpdateScheduleOrName,
cid: str,
):
return await ops.update_crawl_config(uuid.UUID(cid), update)
@router.post("/{cid}/run")
async def run_now(
@ -339,9 +445,9 @@ def init_crawl_config_api(mdb, user_dep, user_manager, archive_ops, crawl_manage
archive: Archive = Depends(archive_crawl_dep),
user: User = Depends(user_dep),
):
crawl_config = await ops.get_crawl_config(uuid.UUID(cid), archive)
crawlconfig = await ops.get_crawl_config(uuid.UUID(cid), archive)
if not crawl_config:
if not crawlconfig:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
@ -355,22 +461,17 @@ def init_crawl_config_api(mdb, user_dep, user_manager, archive_ops, crawl_manage
return {"started": crawl_id}
# @router.delete("")
# async def delete_crawl_configs(archive: Archive = Depends(archive_crawl_dep)):
# result = await ops.delete_crawl_configs(archive)
# return {"deleted": result.deleted_count}
@router.delete("/{cid}")
async def delete_crawl_config(
cid: str, archive: Archive = Depends(archive_crawl_dep)
):
result = await ops.delete_crawl_config(cid, archive)
if not result or not result.deleted_count:
async def make_inactive(cid: str, archive: Archive = Depends(archive_crawl_dep)):
crawlconfig = await ops.get_crawl_config(uuid.UUID(cid), archive)
if not crawlconfig:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' Not Found"
status_code=404, detail=f"Crawl Config '{cid}' not found"
)
return {"deleted": 1}
return await ops.do_make_inactive(crawlconfig)
archive_ops.router.include_router(router)

View File

@ -228,7 +228,9 @@ class CrawlOps:
await self.archives.inc_usage(crawl.aid, dura)
await self.crawl_configs.inc_crawls(crawl.cid, crawl.id, crawl.finished)
await self.crawl_configs.inc_crawls(
crawl.cid, crawl.id, crawl.finished, crawl.state
)
return True
@ -338,7 +340,9 @@ class CrawlOps:
self, crawl: Union[CrawlOut, ListCrawlOut], archive: Archive
):
""" Resolve running crawl data """
config = await self.crawl_configs.get_crawl_config(crawl.cid, archive)
config = await self.crawl_configs.get_crawl_config(
crawl.cid, archive, active_only=False
)
if config:
crawl.configName = config.name

View File

@ -24,7 +24,7 @@ def init_db():
mdb = client["browsertrixcloud"]
return mdb
return client, mdb
# ============================================================================

View File

@ -30,7 +30,7 @@ def main():
email = EmailSender()
crawl_manager = None
mdb = init_db()
dbclient, mdb = init_db()
settings = {
"registrationEnabled": os.environ.get("REGISTRATION_ENABLED") == "1",
@ -64,6 +64,7 @@ def main():
init_storages_api(archive_ops, crawl_manager, current_active_user)
crawl_config_ops = init_crawl_config_api(
dbclient,
mdb,
current_active_user,
user_manager,