Concurrent Crawl Limit (#874)
concurrent crawl limits: (addresses #866) - support limits on concurrent crawls that can be run within a single org - change 'waiting' state to 'waiting_org_limit' for concurrent crawl limit and 'waiting_capacity' for capacity-based limits orgs: - add 'maxConcurrentCrawl' to new 'quotas' object on orgs - add /quotas endpoint for updating quotas object operator: - add all crawljobs as related, appear to be returned in creation order - operator: if concurrent crawl limit set, ensures current job is in the first N set of crawljobs (as provided via 'related' list of crawljob objects) before it can proceed to 'starting', otherwise set to 'waiting_org_limit' - api: add org /quotas endpoint for configuring quotas - remove 'new' state, always start with 'starting' - crawljob: add 'oid' to crawljob spec and label for easier querying - more stringent state transitions: add allowed_from to set_state() - ensure state transitions only happened from allowed states, while failed/canceled can happen from any state - ensure finished and state synched from db if transition not allowed - add crawl indices by oid and cid frontend: - show different waiting states on frontend: 'Waiting (Crawl Limit) and 'Waiting (At Capacity)' - add gear icon on orgs admin page - and initial popup for setting org quotas, showing all properties from org 'quotas' object tests: - add concurrent crawl limit nightly tests - fix state waiting -> waiting_capacity - ci: add logging of operator output on test failure
This commit is contained in:
parent
ab518f51fb
commit
00fb8ac048
8
.github/workflows/k3d-ci.yaml
vendored
8
.github/workflows/k3d-ci.yaml
vendored
@ -84,6 +84,10 @@ jobs:
|
||||
- name: Run Tests
|
||||
run: pytest -vv ./backend/test/*.py
|
||||
|
||||
- name: Print Backend Logs
|
||||
- name: Print Backend Logs (API)
|
||||
if: ${{ failure() }}
|
||||
run: kubectl logs svc/browsertrix-cloud-backend
|
||||
run: kubectl logs svc/browsertrix-cloud-backend -c api
|
||||
|
||||
- name: Print Backend Logs (Operator)
|
||||
if: ${{ failure() }}
|
||||
run: kubectl logs svc/browsertrix-cloud-backend -c op
|
||||
|
@ -125,7 +125,12 @@ class CrawlManager(K8sAPI):
|
||||
cid = str(crawlconfig.id)
|
||||
|
||||
return await self.new_crawl_job(
|
||||
cid, userid, crawlconfig.scale, crawlconfig.crawlTimeout, manual=True
|
||||
cid,
|
||||
userid,
|
||||
crawlconfig.oid,
|
||||
crawlconfig.scale,
|
||||
crawlconfig.crawlTimeout,
|
||||
manual=True,
|
||||
)
|
||||
|
||||
async def update_crawl_config(self, crawlconfig, update, profile_filename=None):
|
||||
@ -275,9 +280,7 @@ class CrawlManager(K8sAPI):
|
||||
patch = {"stopping": True}
|
||||
return await self._patch_job(crawl_id, patch)
|
||||
|
||||
await self.delete_crawl_job(crawl_id)
|
||||
|
||||
return {"success": True}
|
||||
return await self.delete_crawl_job(crawl_id)
|
||||
|
||||
async def delete_crawl_configs_for_org(self, org):
|
||||
"""Delete all crawl configs for given org"""
|
||||
|
@ -35,13 +35,17 @@ from .utils import dt_now, ts_now, get_redis_crawl_stats, parse_jsonl_error_mess
|
||||
|
||||
RUNNING_STATES = ("running", "pending-wait", "generate-wacz", "uploading-wacz")
|
||||
|
||||
RUNNING_AND_STARTING_STATES = ("starting", "waiting", *RUNNING_STATES)
|
||||
STARTING_STATES = ("starting", "waiting_capacity", "waiting_org_limit")
|
||||
|
||||
FAILED_STATES = ("canceled", "failed")
|
||||
|
||||
SUCCESSFUL_STATES = ("complete", "partial_complete", "timed_out")
|
||||
|
||||
ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *FAILED_STATES, *SUCCESSFUL_STATES)
|
||||
RUNNING_AND_STARTING_STATES = (*STARTING_STATES, *RUNNING_STATES)
|
||||
|
||||
NON_RUNNING_STATES = (*FAILED_STATES, *SUCCESSFUL_STATES)
|
||||
|
||||
ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -218,6 +222,9 @@ class CrawlOps:
|
||||
async def init_index(self):
|
||||
"""init index for crawls db collection"""
|
||||
await self.crawls.create_index([("finished", pymongo.DESCENDING)])
|
||||
await self.crawls.create_index([("oid", pymongo.HASHED)])
|
||||
await self.crawls.create_index([("cid", pymongo.HASHED)])
|
||||
await self.crawls.create_index([("state", pymongo.HASHED)])
|
||||
|
||||
async def list_crawls(
|
||||
self,
|
||||
@ -591,6 +598,22 @@ class CrawlOps:
|
||||
|
||||
return True
|
||||
|
||||
async def update_crawl_state(self, crawl_id: str, state: str):
|
||||
"""called only when job container is being stopped/canceled"""
|
||||
|
||||
data = {"state": state}
|
||||
# if cancelation, set the finish time here
|
||||
if state == "canceled":
|
||||
data["finished"] = dt_now()
|
||||
|
||||
await self.crawls.find_one_and_update(
|
||||
{
|
||||
"_id": crawl_id,
|
||||
"state": {"$in": RUNNING_AND_STARTING_STATES},
|
||||
},
|
||||
{"$set": data},
|
||||
)
|
||||
|
||||
async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool):
|
||||
"""stop or cancel specified crawl"""
|
||||
result = None
|
||||
@ -614,6 +637,15 @@ class CrawlOps:
|
||||
status_code=404, detail=f"crawl_not_found, (details: {exc})"
|
||||
)
|
||||
|
||||
# if job no longer running, canceling is considered success,
|
||||
# but graceful stoppage is not possible, so would be a failure
|
||||
if result.get("error") == "Not Found":
|
||||
if not graceful:
|
||||
await self.update_crawl_state(crawl_id, "canceled")
|
||||
crawl = await self.get_crawl_raw(crawl_id, org)
|
||||
await self.crawl_configs.stats_recompute_remove_crawl(crawl["cid"], 0)
|
||||
return {"success": True}
|
||||
|
||||
# return whatever detail may be included in the response
|
||||
raise HTTPException(status_code=400, detail=result)
|
||||
|
||||
@ -880,16 +912,25 @@ async def add_new_crawl(
|
||||
|
||||
|
||||
# ============================================================================
|
||||
async def update_crawl_state_if_changed(crawls, crawl_id, state, **kwargs):
|
||||
async def update_crawl_state_if_allowed(
|
||||
crawls, crawl_id, state, allowed_from, **kwargs
|
||||
):
|
||||
"""update crawl state and other properties in db if state has changed"""
|
||||
kwargs["state"] = state
|
||||
res = await crawls.find_one_and_update(
|
||||
{"_id": crawl_id, "state": {"$ne": state}},
|
||||
{"$set": kwargs},
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
)
|
||||
print("** UPDATE", crawl_id, state, res is not None)
|
||||
return res
|
||||
query = {"_id": crawl_id}
|
||||
if allowed_from:
|
||||
query["state"] = {"$in": allowed_from}
|
||||
|
||||
return await crawls.find_one_and_update(query, {"$set": kwargs})
|
||||
|
||||
|
||||
# ============================================================================
|
||||
async def get_crawl_state(crawls, crawl_id):
|
||||
"""return current crawl state of a crawl"""
|
||||
res = await crawls.find_one({"_id": crawl_id}, projection=["state", "finished"])
|
||||
if not res:
|
||||
return None, None
|
||||
return res.get("state"), res.get("finished")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
|
@ -11,7 +11,7 @@ from kubernetes_asyncio.stream import WsApiClient
|
||||
from kubernetes_asyncio.client.api_client import ApiClient
|
||||
from kubernetes_asyncio.client.api import custom_objects_api
|
||||
from kubernetes_asyncio.utils import create_from_dict
|
||||
|
||||
from kubernetes_asyncio.client.exceptions import ApiException
|
||||
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from .utils import get_templates_dir, dt_now, to_k8s_date
|
||||
@ -63,7 +63,9 @@ class K8sAPI:
|
||||
return redis_url
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
async def new_crawl_job(self, cid, userid, scale=1, crawl_timeout=0, manual=True):
|
||||
async def new_crawl_job(
|
||||
self, cid, userid, oid, scale=1, crawl_timeout=0, manual=True
|
||||
):
|
||||
"""load job template from yaml"""
|
||||
if crawl_timeout:
|
||||
crawl_expire_time = to_k8s_date(dt_now() + timedelta(seconds=crawl_timeout))
|
||||
@ -77,6 +79,7 @@ class K8sAPI:
|
||||
params = {
|
||||
"id": crawl_id,
|
||||
"cid": cid,
|
||||
"oid": oid,
|
||||
"userid": userid,
|
||||
"scale": scale,
|
||||
"expire_time": crawl_expire_time,
|
||||
@ -135,12 +138,10 @@ class K8sAPI:
|
||||
grace_period_seconds=0,
|
||||
propagation_policy="Foreground",
|
||||
)
|
||||
return True
|
||||
return {"success": True}
|
||||
|
||||
# pylint: disable=broad-except
|
||||
except Exception as exc:
|
||||
print("CrawlJob delete failed", exc)
|
||||
return False
|
||||
except ApiException as api_exc:
|
||||
return {"error": str(api_exc.reason)}
|
||||
|
||||
async def delete_profile_browser(self, browserid):
|
||||
"""delete custom crawljob object"""
|
||||
@ -156,9 +157,7 @@ class K8sAPI:
|
||||
)
|
||||
return True
|
||||
|
||||
# pylint: disable=broad-except
|
||||
except Exception as exc:
|
||||
print("ProfileJob delete failed", exc)
|
||||
except ApiException:
|
||||
return False
|
||||
|
||||
async def get_profile_browser(self, browserid):
|
||||
|
@ -37,12 +37,13 @@ class ScheduledJob(K8sAPI):
|
||||
userid = data["USER_ID"]
|
||||
scale = int(data.get("INITIAL_SCALE", 0))
|
||||
crawl_timeout = int(data.get("CRAWL_TIMEOUT", 0))
|
||||
oid = data["ORG_ID"]
|
||||
|
||||
crawlconfig = await get_crawl_config(self.crawlconfigs, uuid.UUID(self.cid))
|
||||
|
||||
# k8s create
|
||||
crawl_id = await self.new_crawl_job(
|
||||
self.cid, userid, scale, crawl_timeout, manual=False
|
||||
self.cid, userid, oid, scale, crawl_timeout, manual=False
|
||||
)
|
||||
|
||||
# db create
|
||||
|
@ -23,15 +23,17 @@ from .utils import (
|
||||
from .k8sapi import K8sAPI
|
||||
|
||||
from .db import init_db
|
||||
from .orgs import inc_org_stats
|
||||
from .orgs import inc_org_stats, get_max_concurrent_crawls
|
||||
from .colls import add_successful_crawl_to_collections
|
||||
from .crawlconfigs import stats_recompute_last
|
||||
from .crawls import (
|
||||
CrawlFile,
|
||||
CrawlCompleteIn,
|
||||
add_crawl_file,
|
||||
update_crawl_state_if_changed,
|
||||
update_crawl_state_if_allowed,
|
||||
get_crawl_state,
|
||||
add_crawl_errors,
|
||||
NON_RUNNING_STATES,
|
||||
SUCCESSFUL_STATES,
|
||||
)
|
||||
|
||||
@ -40,6 +42,7 @@ STS = "StatefulSet.apps/v1"
|
||||
CMAP = "ConfigMap.v1"
|
||||
PVC = "PersistentVolumeClaim.v1"
|
||||
POD = "Pod.v1"
|
||||
CJS = "CrawlJob.btrix.cloud/v1"
|
||||
|
||||
DEFAULT_TTL = 30
|
||||
|
||||
@ -98,6 +101,7 @@ class CrawlStatus(BaseModel):
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# pylint: disable=too-many-statements
|
||||
class BtrixOperator(K8sAPI):
|
||||
"""BtrixOperator Handler"""
|
||||
|
||||
@ -152,6 +156,7 @@ class BtrixOperator(K8sAPI):
|
||||
spec = data.parent.get("spec", {})
|
||||
crawl_id = spec["id"]
|
||||
cid = spec["cid"]
|
||||
oid = spec["oid"]
|
||||
|
||||
scale = spec.get("scale", 1)
|
||||
status.scale = scale
|
||||
@ -186,7 +191,7 @@ class BtrixOperator(K8sAPI):
|
||||
crawl = CrawlSpec(
|
||||
id=crawl_id,
|
||||
cid=cid,
|
||||
oid=configmap["ORG_ID"],
|
||||
oid=oid,
|
||||
storage_name=configmap["STORAGE_NAME"],
|
||||
storage_path=configmap["STORE_PATH"],
|
||||
scale=scale,
|
||||
@ -195,6 +200,14 @@ class BtrixOperator(K8sAPI):
|
||||
expire_time=from_k8s_date(spec.get("expireTime")),
|
||||
)
|
||||
|
||||
if status.state in ("starting", "waiting_org_limit"):
|
||||
if not await self.can_start_new(crawl, data, status):
|
||||
return self._done_response(status)
|
||||
|
||||
await self.set_state(
|
||||
"starting", status, crawl.id, allowed_from=["waiting_org_limit"]
|
||||
)
|
||||
|
||||
crawl_sts = f"crawl-{crawl_id}"
|
||||
redis_sts = f"redis-{crawl_id}"
|
||||
|
||||
@ -202,9 +215,10 @@ class BtrixOperator(K8sAPI):
|
||||
if has_crawl_children:
|
||||
pods = data.related[POD]
|
||||
status = await self.sync_crawl_state(redis_url, crawl, status, pods)
|
||||
|
||||
if status.finished:
|
||||
return await self.handle_finished_delete_if_needed(crawl_id, status, spec)
|
||||
if status.finished:
|
||||
return await self.handle_finished_delete_if_needed(
|
||||
crawl_id, status, spec
|
||||
)
|
||||
|
||||
params = {}
|
||||
params.update(self.shared_params)
|
||||
@ -239,6 +253,53 @@ class BtrixOperator(K8sAPI):
|
||||
|
||||
return {"status": status.dict(exclude_none=True), "children": children}
|
||||
|
||||
async def set_state(self, state, status, crawl_id, allowed_from, **kwargs):
|
||||
"""set status state and update db, if changed
|
||||
if allowed_from passed in, can only transition from allowed_from state,
|
||||
otherwise get current state from db and return
|
||||
the following state transitions are supported:
|
||||
|
||||
from starting to org concurrent crawl limit and back:
|
||||
- starting -> waiting_org_capacity -> starting
|
||||
|
||||
from starting to running:
|
||||
- starting -> running
|
||||
|
||||
from running to complete or partial_complete:
|
||||
- running -> complete
|
||||
- running -> partial_complete
|
||||
|
||||
from starting or running to waiting for capacity (pods pending) and back:
|
||||
- starting -> waiting_capacity
|
||||
- running -> waiting_capacity
|
||||
- waiting_capacity -> running
|
||||
|
||||
from any state to canceled or failed:
|
||||
- <any> -> canceled
|
||||
- <any> -> failed
|
||||
"""
|
||||
if not allowed_from or status.state in allowed_from:
|
||||
res = await update_crawl_state_if_allowed(
|
||||
self.crawls, crawl_id, state=state, allowed_from=allowed_from, **kwargs
|
||||
)
|
||||
if res:
|
||||
print(f"Setting state: {status.state} -> {state}, {crawl_id}")
|
||||
status.state = state
|
||||
return True
|
||||
|
||||
# get actual crawl state
|
||||
new_state, finished = await get_crawl_state(self.crawls, crawl_id)
|
||||
if new_state:
|
||||
status.state = state
|
||||
if finished:
|
||||
status.finished = to_k8s_date(finished)
|
||||
|
||||
if status.state != state:
|
||||
print(
|
||||
f"Not setting state: {status.state} -> {state}, {crawl_id} not allowed"
|
||||
)
|
||||
return False
|
||||
|
||||
def load_from_yaml(self, filename, params):
|
||||
"""load and parse k8s template from yaml file"""
|
||||
return list(
|
||||
@ -250,6 +311,7 @@ class BtrixOperator(K8sAPI):
|
||||
spec = data.parent.get("spec", {})
|
||||
cid = spec["cid"]
|
||||
crawl_id = spec["id"]
|
||||
oid = spec.get("oid")
|
||||
return {
|
||||
"relatedResources": [
|
||||
{
|
||||
@ -269,9 +331,51 @@ class BtrixOperator(K8sAPI):
|
||||
"matchLabels": {"crawl": crawl_id, "role": "crawler"}
|
||||
},
|
||||
},
|
||||
{
|
||||
"apiVersion": "btrix.cloud/v1",
|
||||
"resource": "crawljobs",
|
||||
"labelSelector": {"matchLabels": {"oid": oid}},
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status):
|
||||
"""return true if crawl can start, otherwise set crawl to 'queued' state
|
||||
until more crawls for org finish"""
|
||||
max_crawls = await get_max_concurrent_crawls(self.orgs, crawl.oid)
|
||||
if not max_crawls:
|
||||
return True
|
||||
|
||||
if len(data.related[CJS]) <= max_crawls:
|
||||
return True
|
||||
|
||||
name = data.parent.get("metadata").get("name")
|
||||
|
||||
# def metadata_key(val):
|
||||
# return val.get("metadata").get("creationTimestamp")
|
||||
|
||||
# all_crawljobs = sorted(data.related[CJS].values(), key=metadata_key)
|
||||
# print(list(data.related[CJS].keys()))
|
||||
|
||||
i = 0
|
||||
for crawl_sorted in data.related[CJS].values():
|
||||
if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES:
|
||||
continue
|
||||
|
||||
# print(i, crawl_sorted.get("metadata").get("name"))
|
||||
if crawl_sorted.get("metadata").get("name") == name:
|
||||
# print("found: ", name, "index", i)
|
||||
if i < max_crawls:
|
||||
return True
|
||||
|
||||
break
|
||||
i += 1
|
||||
|
||||
await self.set_state(
|
||||
"waiting_org_limit", status, crawl.id, allowed_from=["starting"]
|
||||
)
|
||||
return False
|
||||
|
||||
async def handle_finished_delete_if_needed(self, crawl_id, status, spec):
|
||||
"""return status for finished job (no children)
|
||||
also check if deletion is necessary
|
||||
@ -443,22 +547,21 @@ class BtrixOperator(K8sAPI):
|
||||
# check if at least one pod started running
|
||||
# otherwise, mark as 'waiting' and return
|
||||
if not await self.check_if_pods_running(pods):
|
||||
if status.state not in ("waiting", "canceled"):
|
||||
await update_crawl_state_if_changed(
|
||||
self.crawls, crawl.id, state="waiting"
|
||||
)
|
||||
status.state = "waiting"
|
||||
await self.set_state(
|
||||
"waiting_capacity",
|
||||
status,
|
||||
crawl.id,
|
||||
allowed_from=["starting", "running"],
|
||||
)
|
||||
|
||||
return status
|
||||
|
||||
# optimization: don't update db once crawl is already running
|
||||
# will set stats at when crawl is finished, otherwise can read
|
||||
# directly from redis
|
||||
if status.state != "running":
|
||||
await update_crawl_state_if_changed(self.crawls, crawl.id, state="running")
|
||||
# set state to running (if not already)
|
||||
await self.set_state(
|
||||
"running", status, crawl.id, allowed_from=["starting", "waiting_capacity"]
|
||||
)
|
||||
|
||||
# update status
|
||||
status.state = "running"
|
||||
status.pagesDone = stats["done"]
|
||||
status.pagesFound = stats["found"]
|
||||
if stats["size"] is not None:
|
||||
@ -510,27 +613,27 @@ class BtrixOperator(K8sAPI):
|
||||
):
|
||||
"""mark crawl as finished, set finished timestamp and final state"""
|
||||
|
||||
# already marked as finished
|
||||
if status.state == state:
|
||||
print("already finished, ignoring mark_finished")
|
||||
return status
|
||||
|
||||
finished = dt_now()
|
||||
|
||||
status.state = state
|
||||
status.finished = to_k8s_date(finished)
|
||||
|
||||
kwargs = {"finished": finished}
|
||||
if stats:
|
||||
kwargs["stats"] = stats
|
||||
|
||||
if not await update_crawl_state_if_changed(
|
||||
self.crawls, crawl_id, state=state, **kwargs
|
||||
if state in SUCCESSFUL_STATES:
|
||||
allowed_from = ["running"]
|
||||
else:
|
||||
allowed_from = []
|
||||
|
||||
# if set_state returns false, already set to same status, return
|
||||
if not await self.set_state(
|
||||
state, status, crawl_id, allowed_from=allowed_from, **kwargs
|
||||
):
|
||||
print("already finished, ignoring mark_finished")
|
||||
return status
|
||||
|
||||
if crawl:
|
||||
status.finished = to_k8s_date(finished)
|
||||
|
||||
if crawl and state in SUCCESSFUL_STATES:
|
||||
await self.inc_crawl_complete_stats(crawl, finished)
|
||||
|
||||
asyncio.create_task(
|
||||
|
@ -78,6 +78,13 @@ class S3Storage(BaseModel):
|
||||
use_access_for_presign: Optional[bool] = True
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class OrgQuotas(BaseModel):
|
||||
"""Organization quotas (settable by superadmin)"""
|
||||
|
||||
maxConcurrentCrawls: Optional[int] = 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class Organization(BaseMongoModel):
|
||||
"""Organization Base Model"""
|
||||
@ -92,6 +99,8 @@ class Organization(BaseMongoModel):
|
||||
|
||||
default: bool = False
|
||||
|
||||
quotas: Optional[OrgQuotas] = OrgQuotas()
|
||||
|
||||
def is_owner(self, user):
|
||||
"""Check if user is owner"""
|
||||
return self._is_auth(user, UserRole.OWNER)
|
||||
@ -162,6 +171,8 @@ class OrgOut(BaseMongoModel):
|
||||
usage: Optional[Dict[str, int]]
|
||||
default: bool = False
|
||||
|
||||
quotas: Optional[OrgQuotas] = OrgQuotas()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class OrgOps:
|
||||
@ -315,6 +326,19 @@ class OrgOps:
|
||||
{"_id": org.id}, {"$set": {"storage": storage.dict()}}
|
||||
)
|
||||
|
||||
async def update_quotas(self, org: Organization, quotas: OrgQuotas):
|
||||
"""update organization quotas"""
|
||||
return await self.orgs.find_one_and_update(
|
||||
{"_id": org.id},
|
||||
{
|
||||
"$set": {
|
||||
"quotas": quotas.dict(
|
||||
exclude_unset=True, exclude_defaults=True, exclude_none=True
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
async def handle_new_user_invite(self, invite_token: str, user: User):
|
||||
"""Handle invite from a new user"""
|
||||
new_user_invite = await self.invites.get_valid_invite(invite_token, user.email)
|
||||
@ -363,6 +387,16 @@ async def inc_org_stats(orgs, oid, duration):
|
||||
await orgs.find_one_and_update({"_id": oid}, {"$inc": {f"usage.{yymm}": duration}})
|
||||
|
||||
|
||||
# ============================================================================
|
||||
async def get_max_concurrent_crawls(orgs, oid):
|
||||
"""return max allowed concurrent crawls, if any"""
|
||||
org = await orgs.find_one({"_id": oid})
|
||||
if org:
|
||||
org = Organization.from_dict(org)
|
||||
return org.quotas.maxConcurrentCrawls
|
||||
return 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# pylint: disable=too-many-statements
|
||||
def init_orgs_api(app, mdb, user_manager, invites, user_dep: User):
|
||||
@ -447,9 +481,10 @@ def init_orgs_api(app, mdb, user_manager, invites, user_dep: User):
|
||||
users={},
|
||||
storage=DefaultStorage(name="default", path=storage_path),
|
||||
)
|
||||
await ops.add_org(org)
|
||||
if not await ops.add_org(org):
|
||||
return {"added": False, "error": "already_exists"}
|
||||
|
||||
return {"added": True}
|
||||
return {"id": id_, "added": True}
|
||||
|
||||
@router.get("", tags=["organizations"])
|
||||
async def get_org(
|
||||
@ -471,6 +506,19 @@ def init_orgs_api(app, mdb, user_manager, invites, user_dep: User):
|
||||
|
||||
return {"updated": True}
|
||||
|
||||
@router.post("/quotas", tags=["organizations"])
|
||||
async def update_quotas(
|
||||
quotas: OrgQuotas,
|
||||
org: Organization = Depends(org_owner_dep),
|
||||
user: User = Depends(user_dep),
|
||||
):
|
||||
if not user.is_superuser:
|
||||
raise HTTPException(status_code=403, detail="Not Allowed")
|
||||
|
||||
await ops.update_quotas(org, quotas)
|
||||
|
||||
return {"updated": True}
|
||||
|
||||
@router.patch("/user-role", tags=["organizations"])
|
||||
async def set_role(
|
||||
update: UpdateRole,
|
||||
|
@ -5,6 +5,8 @@ metadata:
|
||||
labels:
|
||||
crawl: "{{ id }}"
|
||||
role: "job"
|
||||
oid: "{{ oid }}"
|
||||
userid: "{{ userid }}"
|
||||
|
||||
spec:
|
||||
selector:
|
||||
@ -14,6 +16,7 @@ spec:
|
||||
id: "{{ id }}"
|
||||
userid: "{{ userid }}"
|
||||
cid: "{{ cid }}"
|
||||
oid: "{{ oid }}"
|
||||
scale: {{ scale }}
|
||||
ttlSecondsAfterFinished: 30
|
||||
|
||||
|
@ -48,7 +48,7 @@ def test_cancel_crawl(default_org_id, crawler_auth_headers):
|
||||
|
||||
data = get_crawl(default_org_id, crawler_auth_headers, crawl_id)
|
||||
|
||||
while data["state"] in ("running", "waiting"):
|
||||
while data["state"] in ("running", "waiting_capacity"):
|
||||
data = get_crawl(default_org_id, crawler_auth_headers, crawl_id)
|
||||
|
||||
assert data["state"] == "canceled"
|
||||
@ -87,7 +87,7 @@ def test_start_crawl_and_stop_immediately(
|
||||
)
|
||||
assert r.json()["lastCrawlStopping"] == True
|
||||
|
||||
while data["state"] in ("starting", "running", "waiting"):
|
||||
while data["state"] in ("starting", "running", "waiting_capacity"):
|
||||
data = get_crawl(default_org_id, crawler_auth_headers, crawl_id)
|
||||
|
||||
assert data["state"] in ("canceled", "partial_complete")
|
||||
|
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
import requests
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
HOST_PREFIX = "http://127.0.0.1:30870"
|
||||
@ -257,3 +258,14 @@ def error_crawl_id(admin_auth_headers, default_org_id):
|
||||
if data["state"] == "complete":
|
||||
return crawl_id
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def org_with_quotas(admin_auth_headers):
|
||||
name = "Quota Org " + datetime.datetime.utcnow().isoformat()
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": name}
|
||||
)
|
||||
data = r.json()
|
||||
|
||||
return data["id"]
|
||||
|
107
backend/test_nightly/test_concurrent_crawl_limit.py
Normal file
107
backend/test_nightly/test_concurrent_crawl_limit.py
Normal file
@ -0,0 +1,107 @@
|
||||
import requests
|
||||
import time
|
||||
|
||||
from .conftest import API_PREFIX
|
||||
|
||||
crawl_id_a = None
|
||||
crawl_id_b = None
|
||||
|
||||
|
||||
def test_set_concurrent_crawl_limit(org_with_quotas, admin_auth_headers):
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{org_with_quotas}/quotas",
|
||||
headers=admin_auth_headers,
|
||||
json={"maxConcurrentCrawls": 1},
|
||||
)
|
||||
data = r.json()
|
||||
assert data.get("updated") == True
|
||||
|
||||
|
||||
def test_run_two_only_one_concurrent(org_with_quotas, admin_auth_headers):
|
||||
global crawl_id_a
|
||||
crawl_id_a = run_crawl(org_with_quotas, admin_auth_headers)
|
||||
time.sleep(1)
|
||||
|
||||
global crawl_id_b
|
||||
crawl_id_b = run_crawl(org_with_quotas, admin_auth_headers)
|
||||
|
||||
while (
|
||||
get_crawl_status(org_with_quotas, crawl_id_a, admin_auth_headers) == "starting"
|
||||
):
|
||||
time.sleep(2)
|
||||
|
||||
assert (
|
||||
get_crawl_status(org_with_quotas, crawl_id_a, admin_auth_headers) == "running"
|
||||
)
|
||||
|
||||
while (
|
||||
get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers) == "starting"
|
||||
):
|
||||
time.sleep(2)
|
||||
|
||||
assert (
|
||||
get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers)
|
||||
== "waiting_org_limit"
|
||||
)
|
||||
|
||||
|
||||
def test_cancel_and_run_other(org_with_quotas, admin_auth_headers):
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{org_with_quotas}/crawls/{crawl_id_a}/cancel",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
data = r.json()
|
||||
assert data["success"] == True
|
||||
|
||||
while (
|
||||
get_crawl_status(org_with_quotas, crawl_id_a, admin_auth_headers) != "canceled"
|
||||
):
|
||||
time.sleep(2)
|
||||
|
||||
while (
|
||||
get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers)
|
||||
== "waiting_org_limit"
|
||||
):
|
||||
time.sleep(5)
|
||||
|
||||
assert get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers) in (
|
||||
"starting",
|
||||
"running",
|
||||
)
|
||||
|
||||
# cancel second crawl as well
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{org_with_quotas}/crawls/{crawl_id_b}/cancel",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
data = r.json()
|
||||
assert data["success"] == True
|
||||
|
||||
|
||||
def run_crawl(org_id, headers):
|
||||
crawl_data = {
|
||||
"runNow": True,
|
||||
"name": "Concurrent Crawl",
|
||||
"config": {
|
||||
"seeds": [{"url": "https://specs.webrecorder.net/"}],
|
||||
"limit": 1,
|
||||
},
|
||||
}
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{org_id}/crawlconfigs/",
|
||||
headers=headers,
|
||||
json=crawl_data,
|
||||
)
|
||||
data = r.json()
|
||||
|
||||
return data["run_now_job"]
|
||||
|
||||
|
||||
def get_crawl_status(org_id, crawl_id, headers):
|
||||
while True:
|
||||
r = requests.get(
|
||||
f"{API_PREFIX}/orgs/{org_id}/crawls/{crawl_id}/replay.json",
|
||||
headers=headers,
|
||||
)
|
||||
data = r.json()
|
||||
return data["state"]
|
@ -79,14 +79,15 @@ export class CrawlStatus extends LitElement {
|
||||
break;
|
||||
}
|
||||
|
||||
case "waiting": {
|
||||
case "waiting_capacity":
|
||||
case "waiting_org_limit": {
|
||||
icon = html`<sl-icon
|
||||
name="hourglass-split"
|
||||
class="animatePulse"
|
||||
slot="prefix"
|
||||
style="color: var(--sl-color-purple-600)"
|
||||
></sl-icon>`;
|
||||
label = msg("Waiting");
|
||||
label = state === "waiting_capacity" ? msg("Waiting (At Capacity)") : msg("Waiting (Crawl Limit)");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import LiteElement, { html } from "../utils/LiteElement";
|
||||
|
||||
import { isAdmin } from "../utils/orgs";
|
||||
import { DASHBOARD_ROUTE } from "../routes";
|
||||
import { SlInput } from "@shoelace-style/shoelace";
|
||||
|
||||
@localized()
|
||||
export class OrgsList extends LiteElement {
|
||||
@ -22,6 +23,9 @@ export class OrgsList extends LiteElement {
|
||||
@property({ type: Boolean })
|
||||
skeleton? = false;
|
||||
|
||||
@property({ type: Object })
|
||||
currOrg?: OrgData | null = null;
|
||||
|
||||
render() {
|
||||
if (this.skeleton) {
|
||||
return this.renderSkeleton();
|
||||
@ -30,10 +34,63 @@ export class OrgsList extends LiteElement {
|
||||
return html`
|
||||
<ul class="border rounded-lg overflow-hidden">
|
||||
${this.orgList?.map(this.renderOrg)}
|
||||
${this.renderOrgQuotas()}
|
||||
</ul>
|
||||
`;
|
||||
}
|
||||
|
||||
private renderOrgQuotas() {
|
||||
if (!this.currOrg) {
|
||||
return html``;
|
||||
}
|
||||
|
||||
return html`
|
||||
<sl-dialog
|
||||
label=${msg(str`Quotas for: ${this.currOrg.name}`)}
|
||||
?open=${!!this.currOrg}
|
||||
@sl-request-close=${() => (this.currOrg = null)}
|
||||
>
|
||||
${Object.entries(this.currOrg.quotas).map(([key, value]) => {
|
||||
return html`
|
||||
<sl-input
|
||||
name=${key}
|
||||
value=${value}
|
||||
type="number"
|
||||
@sl-input="${this.onUpdateQuota}"
|
||||
><span slot="prefix">${key}</span></sl-input>`;
|
||||
})}
|
||||
<sl-button @click="${this.onSubmitQuotas}" class="mt-2" variant="primary">Update Quotas</sl-button>
|
||||
|
||||
</sl-dialog>
|
||||
`;
|
||||
}
|
||||
|
||||
private onUpdateQuota(e: CustomEvent) {
|
||||
const inputEl = e.target as SlInput;
|
||||
const quotas = this.currOrg?.quotas;
|
||||
if (quotas) {
|
||||
quotas[inputEl.name] = Number(inputEl.value);
|
||||
}
|
||||
}
|
||||
|
||||
private onSubmitQuotas() {
|
||||
if (this.currOrg) {
|
||||
this.dispatchEvent(new CustomEvent("update-quotas", {detail: this.currOrg}));
|
||||
}
|
||||
this.currOrg = null;
|
||||
}
|
||||
|
||||
private showQuotas(org: OrgData) {
|
||||
const stop = (e: Event) => {
|
||||
e.preventDefault();
|
||||
e.stopPropagation();
|
||||
this.currOrg = org;
|
||||
return false;
|
||||
};
|
||||
|
||||
return stop;
|
||||
}
|
||||
|
||||
private renderOrg = (org: OrgData) => {
|
||||
let defaultLabel: any;
|
||||
if (this.defaultOrg && org.id === this.defaultOrg.id) {
|
||||
@ -52,10 +109,15 @@ export class OrgsList extends LiteElement {
|
||||
<div class="font-medium mr-2 transition-colors">
|
||||
${defaultLabel}${org.name}
|
||||
</div>
|
||||
<div class="text-xs text-neutral-400">
|
||||
${memberCount === 1
|
||||
? msg(`1 member`)
|
||||
: msg(str`${memberCount} members`)}
|
||||
<div class="flex flex-row items-center">
|
||||
<sl-button size="small" class="mr-3" @click="${this.showQuotas(org)}">
|
||||
<sl-icon name="gear" slot="prefix"></sl-icon>
|
||||
</sl-button>
|
||||
<div class="text-xs text-neutral-400">
|
||||
${memberCount === 1
|
||||
? msg(`1 member`)
|
||||
: msg(str`${memberCount} members`)}
|
||||
</div>
|
||||
</div>
|
||||
</li>
|
||||
`;
|
||||
|
@ -155,6 +155,7 @@ export class Home extends LiteElement {
|
||||
.defaultOrg=${ifDefined(
|
||||
this.userInfo?.orgs.find((org) => org.default === true)
|
||||
)}
|
||||
@update-quotas=${this.onUpdateOrgQuotas}
|
||||
></btrix-orgs-list>
|
||||
</section>
|
||||
</div>
|
||||
@ -315,6 +316,15 @@ export class Home extends LiteElement {
|
||||
this.isSubmittingNewOrg = false;
|
||||
}
|
||||
|
||||
async onUpdateOrgQuotas(e: CustomEvent) {
|
||||
const org = e.detail as OrgData;
|
||||
|
||||
await this.apiFetch(`/orgs/${org.id}/quotas`, this.authState!, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(org.quotas),
|
||||
});
|
||||
}
|
||||
|
||||
async checkFormValidity(formEl: HTMLFormElement) {
|
||||
await this.updateComplete;
|
||||
return !formEl.querySelector("[data-invalid]");
|
||||
|
@ -90,7 +90,8 @@ export class CrawlDetail extends LiteElement {
|
||||
return (
|
||||
this.crawl.state === "running" ||
|
||||
this.crawl.state === "starting" ||
|
||||
this.crawl.state === "waiting" ||
|
||||
this.crawl.state === "waiting_capacity" ||
|
||||
this.crawl.state === "waiting_org_limit" ||
|
||||
this.crawl.state === "stopping"
|
||||
);
|
||||
}
|
||||
|
@ -924,21 +924,31 @@ export class WorkflowDetail extends LiteElement {
|
||||
private renderWatchCrawl = () => {
|
||||
if (!this.authState || !(this.workflow?.lastCrawlState)) return "";
|
||||
|
||||
const isStarting = this.workflow.lastCrawlState === "starting";
|
||||
const isWaiting = this.workflow.lastCrawlState === "waiting";
|
||||
let waitingMsg = null;
|
||||
|
||||
switch (this.workflow.lastCrawlState) {
|
||||
case "starting":
|
||||
waitingMsg = msg("Crawl starting...");
|
||||
break;
|
||||
|
||||
case "waiting_capacity":
|
||||
waitingMsg = msg("Crawl waiting for available resources before it can start...");
|
||||
break;
|
||||
|
||||
case "waiting_org_limit":
|
||||
waitingMsg = msg("Crawl waiting for others to finish, concurrent limit per Organization reached...");
|
||||
break;
|
||||
}
|
||||
|
||||
const isRunning = this.workflow.lastCrawlState === "running";
|
||||
const isStopping = this.workflow.lastCrawlStopping;
|
||||
const authToken = this.authState.headers.Authorization.split(" ")[1];
|
||||
|
||||
return html`
|
||||
${isStarting || isWaiting
|
||||
${waitingMsg
|
||||
? html`<div class="rounded border p-3">
|
||||
<p class="text-sm text-neutral-600 motion-safe:animate-pulse">
|
||||
${isStarting
|
||||
? msg("Crawl starting...")
|
||||
: msg(
|
||||
"Crawl waiting for available resources before it can start..."
|
||||
)}
|
||||
${waitingMsg}
|
||||
</p>
|
||||
</div>`
|
||||
: isActive(this.workflow.lastCrawlState)
|
||||
|
@ -88,7 +88,8 @@ export type Profile = {
|
||||
|
||||
export type CrawlState =
|
||||
| "starting"
|
||||
| "waiting"
|
||||
| "waiting_capacity"
|
||||
| "waiting_org_limit"
|
||||
| "running"
|
||||
| "complete"
|
||||
| "failed"
|
||||
|
@ -11,6 +11,7 @@ export const AccessCode: Record<UserRole, number> = {
|
||||
export type OrgData = {
|
||||
id: string;
|
||||
name: string;
|
||||
quotas: Record<string, number>;
|
||||
users?: {
|
||||
[id: string]: {
|
||||
role: (typeof AccessCode)[UserRole];
|
||||
|
@ -1,7 +1,8 @@
|
||||
import type { CrawlState } from "../types/crawler";
|
||||
export const activeCrawlStates: CrawlState[] = [
|
||||
"starting",
|
||||
"waiting",
|
||||
"waiting_org_limit",
|
||||
"waiting_capacity",
|
||||
"running",
|
||||
"stopping",
|
||||
];
|
||||
|
Loading…
Reference in New Issue
Block a user