- ensure crawlFilenameTemplate is part of the CrawlConfig model - change CrawlConfig init to use type-safe construction - add a run_now_internal() that is shared for starting crawl, either on demand or from new config - add OrgOps.can_run_crawls() to check against org quotas for crawling - cleanup profile updates, remove _lookup_profile, only check for EmptyStr in update --------- Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
parent
27059c91a5
commit
4db3053a9f
@ -4,7 +4,7 @@ Crawl Config API handling
|
|||||||
|
|
||||||
# pylint: disable=too-many-lines
|
# pylint: disable=too-many-lines
|
||||||
|
|
||||||
from typing import List, Union, Optional, Tuple, TYPE_CHECKING, cast
|
from typing import List, Union, Optional, TYPE_CHECKING, cast
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
@ -164,108 +164,96 @@ class CrawlConfigOps:
|
|||||||
|
|
||||||
async def get_profile_filename(
|
async def get_profile_filename(
|
||||||
self, profileid: Optional[UUID], org: Organization
|
self, profileid: Optional[UUID], org: Organization
|
||||||
) -> Optional[str]:
|
) -> str:
|
||||||
"""lookup filename from profileid"""
|
"""lookup filename from profileid"""
|
||||||
_, profile_filename = await self._lookup_profile(profileid, org)
|
if not profileid:
|
||||||
return profile_filename
|
return ""
|
||||||
|
|
||||||
async def _lookup_profile(
|
|
||||||
self, profileid: Union[UUID, EmptyStr, None], org: Organization
|
|
||||||
) -> tuple[Optional[UUID], Optional[str]]:
|
|
||||||
if profileid is None:
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
if isinstance(profileid, EmptyStr) or profileid == "":
|
|
||||||
return None, ""
|
|
||||||
|
|
||||||
profile_filename = await self.profiles.get_profile_storage_path(profileid, org)
|
profile_filename = await self.profiles.get_profile_storage_path(profileid, org)
|
||||||
if not profile_filename:
|
if not profile_filename:
|
||||||
raise HTTPException(status_code=400, detail="invalid_profile_id")
|
raise HTTPException(status_code=400, detail="invalid_profile_id")
|
||||||
|
|
||||||
return profileid, profile_filename
|
return profile_filename
|
||||||
|
|
||||||
# pylint: disable=invalid-name
|
# pylint: disable=invalid-name
|
||||||
async def add_crawl_config(
|
async def add_crawl_config(
|
||||||
self,
|
self,
|
||||||
config: CrawlConfigIn,
|
config_in: CrawlConfigIn,
|
||||||
org: Organization,
|
org: Organization,
|
||||||
user: User,
|
user: User,
|
||||||
) -> Tuple[str, Optional[str], bool, bool]:
|
) -> CrawlConfigAddedResponse:
|
||||||
"""Add new crawl config"""
|
"""Add new crawl config"""
|
||||||
data = config.dict()
|
|
||||||
data["oid"] = org.id
|
|
||||||
data["createdBy"] = user.id
|
|
||||||
data["createdByName"] = user.name
|
|
||||||
data["modifiedBy"] = user.id
|
|
||||||
data["modifiedByName"] = user.name
|
|
||||||
data["_id"] = uuid4()
|
|
||||||
data["created"] = dt_now()
|
|
||||||
data["modified"] = data["created"]
|
|
||||||
|
|
||||||
if config.runNow:
|
# ensure crawlChannel is valid
|
||||||
data["lastStartedBy"] = user.id
|
if not self.get_channel_crawler_image(config_in.crawlerChannel):
|
||||||
data["lastStartedByName"] = user.name
|
raise HTTPException(status_code=404, detail="crawler_not_found")
|
||||||
|
|
||||||
|
# ensure profile is valid, if provided
|
||||||
|
if config_in.profileid:
|
||||||
|
await self.profiles.get_profile(config_in.profileid, org)
|
||||||
|
|
||||||
|
now = dt_now()
|
||||||
|
crawlconfig = CrawlConfig(
|
||||||
|
id=uuid4(),
|
||||||
|
oid=org.id,
|
||||||
|
createdBy=user.id,
|
||||||
|
createdByName=user.name,
|
||||||
|
modifiedBy=user.id,
|
||||||
|
modifiedByName=user.name,
|
||||||
|
created=now,
|
||||||
|
modified=now,
|
||||||
|
schedule=config_in.schedule,
|
||||||
|
config=config_in.config,
|
||||||
|
name=config_in.name,
|
||||||
|
description=config_in.description,
|
||||||
|
tags=config_in.tags,
|
||||||
|
jobType=config_in.jobType,
|
||||||
|
crawlTimeout=config_in.crawlTimeout,
|
||||||
|
maxCrawlSize=config_in.maxCrawlSize,
|
||||||
|
scale=config_in.scale,
|
||||||
|
autoAddCollections=config_in.autoAddCollections,
|
||||||
|
profileid=config_in.profileid,
|
||||||
|
crawlerChannel=config_in.crawlerChannel,
|
||||||
|
crawlFilenameTemplate=config_in.crawlFilenameTemplate,
|
||||||
|
)
|
||||||
|
|
||||||
|
if config_in.runNow:
|
||||||
|
crawlconfig.lastStartedBy = user.id
|
||||||
|
crawlconfig.lastStartedByName = user.name
|
||||||
|
|
||||||
# Ensure page limit is below org maxPagesPerCall if set
|
# Ensure page limit is below org maxPagesPerCall if set
|
||||||
max_pages = await self.org_ops.get_max_pages_per_crawl(org.id)
|
max_pages = await self.org_ops.get_max_pages_per_crawl(org.id)
|
||||||
if max_pages > 0:
|
if max_pages > 0:
|
||||||
data["config"]["limit"] = max_pages
|
crawlconfig.config.limit = max_pages
|
||||||
|
|
||||||
data["profileid"], profile_filename = await self._lookup_profile(
|
# add CrawlConfig to DB here
|
||||||
config.profileid, org
|
result = await self.crawl_configs.insert_one(crawlconfig.to_dict())
|
||||||
)
|
|
||||||
|
|
||||||
if config.autoAddCollections:
|
|
||||||
data["autoAddCollections"] = config.autoAddCollections
|
|
||||||
|
|
||||||
if not self.get_channel_crawler_image(config.crawlerChannel):
|
|
||||||
raise HTTPException(status_code=404, detail="crawler_not_found")
|
|
||||||
|
|
||||||
result = await self.crawl_configs.insert_one(data)
|
|
||||||
|
|
||||||
crawlconfig = CrawlConfig.from_dict(data)
|
|
||||||
|
|
||||||
storage_filename = (
|
|
||||||
data.get("crawlFilenameTemplate") or self.default_filename_template
|
|
||||||
)
|
|
||||||
|
|
||||||
run_now = config.runNow
|
|
||||||
storage_quota_reached = await self.org_ops.storage_quota_reached(org.id)
|
|
||||||
exec_mins_quota_reached = await self.org_ops.exec_mins_quota_reached(org.id)
|
|
||||||
|
|
||||||
if org.readOnly:
|
|
||||||
run_now = False
|
|
||||||
print(f"Org {org.id} set to read-only", flush=True)
|
|
||||||
|
|
||||||
if storage_quota_reached:
|
|
||||||
run_now = False
|
|
||||||
print(f"Storage quota exceeded for org {org.id}", flush=True)
|
|
||||||
|
|
||||||
if exec_mins_quota_reached:
|
|
||||||
run_now = False
|
|
||||||
print(f"Execution minutes quota exceeded for org {org.id}", flush=True)
|
|
||||||
|
|
||||||
await self.crawl_manager.update_scheduled_job(crawlconfig, str(user.id))
|
await self.crawl_manager.update_scheduled_job(crawlconfig, str(user.id))
|
||||||
|
|
||||||
crawl_id = None
|
crawl_id = None
|
||||||
|
storage_quota_reached = False
|
||||||
|
exec_mins_quota_reached = False
|
||||||
|
|
||||||
if run_now:
|
if config_in.runNow:
|
||||||
crawl_id = await self.crawl_manager.create_crawl_job(
|
try:
|
||||||
crawlconfig,
|
crawl_id = await self.run_now_internal(crawlconfig, org, user)
|
||||||
org.storage,
|
except HTTPException as e:
|
||||||
userid=str(crawlconfig.modifiedBy),
|
if e.detail == "storage_quota_reached":
|
||||||
warc_prefix=self.get_warc_prefix(org, crawlconfig),
|
storage_quota_reached = True
|
||||||
storage_filename=storage_filename,
|
elif e.detail == "exec_minutes_quota_reached":
|
||||||
profile_filename=profile_filename or "",
|
exec_mins_quota_reached = True
|
||||||
)
|
print(f"Can't run crawl now: {e.detail}", flush=True)
|
||||||
|
else:
|
||||||
|
storage_quota_reached = await self.org_ops.storage_quota_reached(org.id)
|
||||||
|
exec_mins_quota_reached = await self.org_ops.exec_mins_quota_reached(org.id)
|
||||||
|
|
||||||
await self.add_new_crawl(crawl_id, crawlconfig, user, manual=True)
|
return CrawlConfigAddedResponse(
|
||||||
|
added=True,
|
||||||
return (
|
id=str(result.inserted_id),
|
||||||
result.inserted_id,
|
run_now_job=crawl_id,
|
||||||
crawl_id,
|
storageQuotaReached=storage_quota_reached,
|
||||||
storage_quota_reached,
|
execMinutesQuotaReached=exec_mins_quota_reached,
|
||||||
exec_mins_quota_reached,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def add_new_crawl(
|
async def add_new_crawl(
|
||||||
@ -377,7 +365,13 @@ class CrawlConfigOps:
|
|||||||
query["modifiedByName"] = user.name
|
query["modifiedByName"] = user.name
|
||||||
query["modified"] = dt_now()
|
query["modified"] = dt_now()
|
||||||
|
|
||||||
query["profileid"], _ = await self._lookup_profile(update.profileid, org)
|
# if empty str, just clear the profile
|
||||||
|
if isinstance(update.profileid, EmptyStr) or update.profileid == "":
|
||||||
|
query["profileid"] = None
|
||||||
|
# else, ensure its a valid profile
|
||||||
|
elif update.profileid:
|
||||||
|
await self.profiles.get_profile(update.profileid, org)
|
||||||
|
query["profileid"] = update.profileid
|
||||||
|
|
||||||
if update.config is not None:
|
if update.config is not None:
|
||||||
query["config"] = update.config.dict()
|
query["config"] = update.config.dict()
|
||||||
@ -822,35 +816,29 @@ class CrawlConfigOps:
|
|||||||
"workflowIds": workflow_ids,
|
"workflowIds": workflow_ids,
|
||||||
}
|
}
|
||||||
|
|
||||||
async def prepare_for_run_crawl(self, cid: UUID, org: Organization) -> CrawlConfig:
|
async def run_now(self, cid: UUID, org: Organization, user: User) -> str:
|
||||||
"""prepare for running a crawl, returning crawlconfig and
|
"""run new crawl for cid now, if possible"""
|
||||||
validating that running crawls is allowed"""
|
|
||||||
crawlconfig = await self.get_crawl_config(cid, org.id)
|
crawlconfig = await self.get_crawl_config(cid, org.id)
|
||||||
|
|
||||||
if not crawlconfig:
|
if not crawlconfig:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=404, detail=f"Crawl Config '{cid}' not found"
|
status_code=404, detail=f"Crawl Config '{cid}' not found"
|
||||||
)
|
)
|
||||||
|
|
||||||
if org.readOnly:
|
return await self.run_now_internal(crawlconfig, org, user)
|
||||||
raise HTTPException(status_code=403, detail="org_set_to_read_only")
|
|
||||||
|
|
||||||
if await self.org_ops.storage_quota_reached(org.id):
|
async def run_now_internal(
|
||||||
raise HTTPException(status_code=403, detail="storage_quota_reached")
|
self, crawlconfig: CrawlConfig, org: Organization, user: User
|
||||||
|
) -> str:
|
||||||
if await self.org_ops.exec_mins_quota_reached(org.id):
|
"""run new crawl for specified crawlconfig now"""
|
||||||
raise HTTPException(status_code=403, detail="exec_minutes_quota_reached")
|
await self.org_ops.can_run_crawls(org)
|
||||||
|
|
||||||
return crawlconfig
|
|
||||||
|
|
||||||
async def run_now(self, cid: UUID, org: Organization, user: User):
|
|
||||||
"""run specified crawlconfig now"""
|
|
||||||
crawlconfig = await self.prepare_for_run_crawl(cid, org)
|
|
||||||
|
|
||||||
if await self.get_running_crawl(crawlconfig):
|
if await self.get_running_crawl(crawlconfig):
|
||||||
raise HTTPException(status_code=400, detail="crawl_already_running")
|
raise HTTPException(status_code=400, detail="crawl_already_running")
|
||||||
|
|
||||||
profile_filename = await self.get_profile_filename(crawlconfig.profileid, org)
|
profile_filename = await self.get_profile_filename(crawlconfig.profileid, org)
|
||||||
|
storage_filename = (
|
||||||
|
crawlconfig.crawlFilenameTemplate or self.default_filename_template
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
crawl_id = await self.crawl_manager.create_crawl_job(
|
crawl_id = await self.crawl_manager.create_crawl_job(
|
||||||
@ -858,7 +846,7 @@ class CrawlConfigOps:
|
|||||||
org.storage,
|
org.storage,
|
||||||
userid=str(user.id),
|
userid=str(user.id),
|
||||||
warc_prefix=self.get_warc_prefix(org, crawlconfig),
|
warc_prefix=self.get_warc_prefix(org, crawlconfig),
|
||||||
storage_filename=self.default_filename_template,
|
storage_filename=storage_filename,
|
||||||
profile_filename=profile_filename or "",
|
profile_filename=profile_filename or "",
|
||||||
)
|
)
|
||||||
await self.add_new_crawl(crawl_id, crawlconfig, user, manual=True)
|
await self.add_new_crawl(crawl_id, crawlconfig, user, manual=True)
|
||||||
@ -1120,19 +1108,7 @@ def init_crawl_config_api(
|
|||||||
org: Organization = Depends(org_crawl_dep),
|
org: Organization = Depends(org_crawl_dep),
|
||||||
user: User = Depends(user_dep),
|
user: User = Depends(user_dep),
|
||||||
):
|
):
|
||||||
(
|
return await ops.add_crawl_config(config, org, user)
|
||||||
cid,
|
|
||||||
new_job_name,
|
|
||||||
storage_quota_reached,
|
|
||||||
exec_mins_quota_reached,
|
|
||||||
) = await ops.add_crawl_config(config, org, user)
|
|
||||||
return {
|
|
||||||
"added": True,
|
|
||||||
"id": str(cid),
|
|
||||||
"run_now_job": new_job_name,
|
|
||||||
"storageQuotaReached": storage_quota_reached,
|
|
||||||
"execMinutesQuotaReached": exec_mins_quota_reached,
|
|
||||||
}
|
|
||||||
|
|
||||||
@router.patch(
|
@router.patch(
|
||||||
"/{cid}",
|
"/{cid}",
|
||||||
|
@ -776,7 +776,9 @@ class CrawlOps(BaseCrawlOps):
|
|||||||
if not crawl.cid or crawl.type != "crawl":
|
if not crawl.cid or crawl.type != "crawl":
|
||||||
raise HTTPException(status_code=400, detail="invalid_crawl_for_qa")
|
raise HTTPException(status_code=400, detail="invalid_crawl_for_qa")
|
||||||
|
|
||||||
crawlconfig = await self.crawl_configs.prepare_for_run_crawl(crawl.cid, org)
|
await self.orgs.can_run_crawls(org)
|
||||||
|
|
||||||
|
crawlconfig = await self.crawl_configs.get_crawl_config(crawl.cid, org.id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
qa_run_id = await self.crawl_manager.create_qa_crawl_job(
|
qa_run_id = await self.crawl_manager.create_qa_crawl_job(
|
||||||
|
@ -314,7 +314,7 @@ class CrawlConfigIn(BaseModel):
|
|||||||
|
|
||||||
jobType: Optional[JobType] = JobType.CUSTOM
|
jobType: Optional[JobType] = JobType.CUSTOM
|
||||||
|
|
||||||
profileid: Union[UUID, EmptyStr, None]
|
profileid: Optional[UUID] = None
|
||||||
crawlerChannel: str = "default"
|
crawlerChannel: str = "default"
|
||||||
|
|
||||||
autoAddCollections: Optional[List[UUID]] = []
|
autoAddCollections: Optional[List[UUID]] = []
|
||||||
@ -407,6 +407,8 @@ class CrawlConfigAdditional(BaseModel):
|
|||||||
|
|
||||||
isCrawlRunning: Optional[bool] = False
|
isCrawlRunning: Optional[bool] = False
|
||||||
|
|
||||||
|
crawlFilenameTemplate: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
class CrawlConfig(CrawlConfigCore, CrawlConfigAdditional):
|
class CrawlConfig(CrawlConfigCore, CrawlConfigAdditional):
|
||||||
|
@ -697,6 +697,17 @@ class OrgOps:
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def can_run_crawls(self, org: Organization) -> None:
|
||||||
|
"""check crawl quotas and readOnly state, throw if can not run"""
|
||||||
|
if org.readOnly:
|
||||||
|
raise HTTPException(status_code=403, detail="org_set_to_read_only")
|
||||||
|
|
||||||
|
if await self.storage_quota_reached(org.id):
|
||||||
|
raise HTTPException(status_code=403, detail="storage_quota_reached")
|
||||||
|
|
||||||
|
if await self.exec_mins_quota_reached(org.id):
|
||||||
|
raise HTTPException(status_code=403, detail="exec_minutes_quota_reached")
|
||||||
|
|
||||||
async def get_monthly_crawl_exec_seconds(self, oid: UUID) -> int:
|
async def get_monthly_crawl_exec_seconds(self, oid: UUID) -> int:
|
||||||
"""Return monthlyExecSeconds for current month"""
|
"""Return monthlyExecSeconds for current month"""
|
||||||
org_data = await self.orgs.find_one({"_id": oid})
|
org_data = await self.orgs.find_one({"_id": oid})
|
||||||
|
Loading…
Reference in New Issue
Block a user