From 00fb8ac0480ec3ffabed154f07e149cf6ce564b1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 30 May 2023 15:38:03 -0700 Subject: [PATCH] Concurrent Crawl Limit (#874) concurrent crawl limits: (addresses #866) - support limits on concurrent crawls that can be run within a single org - change 'waiting' state to 'waiting_org_limit' for concurrent crawl limit and 'waiting_capacity' for capacity-based limits orgs: - add 'maxConcurrentCrawl' to new 'quotas' object on orgs - add /quotas endpoint for updating quotas object operator: - add all crawljobs as related, appear to be returned in creation order - operator: if concurrent crawl limit set, ensures current job is in the first N set of crawljobs (as provided via 'related' list of crawljob objects) before it can proceed to 'starting', otherwise set to 'waiting_org_limit' - api: add org /quotas endpoint for configuring quotas - remove 'new' state, always start with 'starting' - crawljob: add 'oid' to crawljob spec and label for easier querying - more stringent state transitions: add allowed_from to set_state() - ensure state transitions only happened from allowed states, while failed/canceled can happen from any state - ensure finished and state synched from db if transition not allowed - add crawl indices by oid and cid frontend: - show different waiting states on frontend: 'Waiting (Crawl Limit) and 'Waiting (At Capacity)' - add gear icon on orgs admin page - and initial popup for setting org quotas, showing all properties from org 'quotas' object tests: - add concurrent crawl limit nightly tests - fix state waiting -> waiting_capacity - ci: add logging of operator output on test failure --- .github/workflows/k3d-ci.yaml | 8 +- backend/btrixcloud/crawlmanager.py | 11 +- backend/btrixcloud/crawls.py | 61 +++++-- backend/btrixcloud/k8sapi.py | 19 +-- backend/btrixcloud/main_scheduled_job.py | 3 +- backend/btrixcloud/operator.py | 159 +++++++++++++++--- backend/btrixcloud/orgs.py | 52 +++++- backend/btrixcloud/templates/crawl_job.yaml | 3 + backend/test/test_stop_cancel_crawl.py | 4 +- backend/test_nightly/conftest.py | 12 ++ .../test_concurrent_crawl_limit.py | 107 ++++++++++++ frontend/src/components/crawl-status.ts | 5 +- frontend/src/components/orgs-list.ts | 70 +++++++- frontend/src/pages/home.ts | 10 ++ frontend/src/pages/org/crawl-detail.ts | 3 +- frontend/src/pages/org/workflow-detail.ts | 26 ++- frontend/src/types/crawler.ts | 3 +- frontend/src/types/org.ts | 1 + frontend/src/utils/crawler.ts | 3 +- 19 files changed, 484 insertions(+), 76 deletions(-) create mode 100644 backend/test_nightly/test_concurrent_crawl_limit.py diff --git a/.github/workflows/k3d-ci.yaml b/.github/workflows/k3d-ci.yaml index 0945bd69..31680336 100644 --- a/.github/workflows/k3d-ci.yaml +++ b/.github/workflows/k3d-ci.yaml @@ -84,6 +84,10 @@ jobs: - name: Run Tests run: pytest -vv ./backend/test/*.py - - name: Print Backend Logs + - name: Print Backend Logs (API) if: ${{ failure() }} - run: kubectl logs svc/browsertrix-cloud-backend + run: kubectl logs svc/browsertrix-cloud-backend -c api + + - name: Print Backend Logs (Operator) + if: ${{ failure() }} + run: kubectl logs svc/browsertrix-cloud-backend -c op diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index c9988096..8bdae825 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -125,7 +125,12 @@ class CrawlManager(K8sAPI): cid = str(crawlconfig.id) return await self.new_crawl_job( - cid, userid, crawlconfig.scale, crawlconfig.crawlTimeout, manual=True + cid, + userid, + crawlconfig.oid, + crawlconfig.scale, + crawlconfig.crawlTimeout, + manual=True, ) async def update_crawl_config(self, crawlconfig, update, profile_filename=None): @@ -275,9 +280,7 @@ class CrawlManager(K8sAPI): patch = {"stopping": True} return await self._patch_job(crawl_id, patch) - await self.delete_crawl_job(crawl_id) - - return {"success": True} + return await self.delete_crawl_job(crawl_id) async def delete_crawl_configs_for_org(self, org): """Delete all crawl configs for given org""" diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index e6ae3f77..6d969227 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -35,13 +35,17 @@ from .utils import dt_now, ts_now, get_redis_crawl_stats, parse_jsonl_error_mess RUNNING_STATES = ("running", "pending-wait", "generate-wacz", "uploading-wacz") -RUNNING_AND_STARTING_STATES = ("starting", "waiting", *RUNNING_STATES) +STARTING_STATES = ("starting", "waiting_capacity", "waiting_org_limit") FAILED_STATES = ("canceled", "failed") SUCCESSFUL_STATES = ("complete", "partial_complete", "timed_out") -ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *FAILED_STATES, *SUCCESSFUL_STATES) +RUNNING_AND_STARTING_STATES = (*STARTING_STATES, *RUNNING_STATES) + +NON_RUNNING_STATES = (*FAILED_STATES, *SUCCESSFUL_STATES) + +ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES) # ============================================================================ @@ -218,6 +222,9 @@ class CrawlOps: async def init_index(self): """init index for crawls db collection""" await self.crawls.create_index([("finished", pymongo.DESCENDING)]) + await self.crawls.create_index([("oid", pymongo.HASHED)]) + await self.crawls.create_index([("cid", pymongo.HASHED)]) + await self.crawls.create_index([("state", pymongo.HASHED)]) async def list_crawls( self, @@ -591,6 +598,22 @@ class CrawlOps: return True + async def update_crawl_state(self, crawl_id: str, state: str): + """called only when job container is being stopped/canceled""" + + data = {"state": state} + # if cancelation, set the finish time here + if state == "canceled": + data["finished"] = dt_now() + + await self.crawls.find_one_and_update( + { + "_id": crawl_id, + "state": {"$in": RUNNING_AND_STARTING_STATES}, + }, + {"$set": data}, + ) + async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool): """stop or cancel specified crawl""" result = None @@ -614,6 +637,15 @@ class CrawlOps: status_code=404, detail=f"crawl_not_found, (details: {exc})" ) + # if job no longer running, canceling is considered success, + # but graceful stoppage is not possible, so would be a failure + if result.get("error") == "Not Found": + if not graceful: + await self.update_crawl_state(crawl_id, "canceled") + crawl = await self.get_crawl_raw(crawl_id, org) + await self.crawl_configs.stats_recompute_remove_crawl(crawl["cid"], 0) + return {"success": True} + # return whatever detail may be included in the response raise HTTPException(status_code=400, detail=result) @@ -880,16 +912,25 @@ async def add_new_crawl( # ============================================================================ -async def update_crawl_state_if_changed(crawls, crawl_id, state, **kwargs): +async def update_crawl_state_if_allowed( + crawls, crawl_id, state, allowed_from, **kwargs +): """update crawl state and other properties in db if state has changed""" kwargs["state"] = state - res = await crawls.find_one_and_update( - {"_id": crawl_id, "state": {"$ne": state}}, - {"$set": kwargs}, - return_document=pymongo.ReturnDocument.AFTER, - ) - print("** UPDATE", crawl_id, state, res is not None) - return res + query = {"_id": crawl_id} + if allowed_from: + query["state"] = {"$in": allowed_from} + + return await crawls.find_one_and_update(query, {"$set": kwargs}) + + +# ============================================================================ +async def get_crawl_state(crawls, crawl_id): + """return current crawl state of a crawl""" + res = await crawls.find_one({"_id": crawl_id}, projection=["state", "finished"]) + if not res: + return None, None + return res.get("state"), res.get("finished") # ============================================================================ diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index 3086c57e..414ee62a 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -11,7 +11,7 @@ from kubernetes_asyncio.stream import WsApiClient from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.client.api import custom_objects_api from kubernetes_asyncio.utils import create_from_dict - +from kubernetes_asyncio.client.exceptions import ApiException from fastapi.templating import Jinja2Templates from .utils import get_templates_dir, dt_now, to_k8s_date @@ -63,7 +63,9 @@ class K8sAPI: return redis_url # pylint: disable=too-many-arguments - async def new_crawl_job(self, cid, userid, scale=1, crawl_timeout=0, manual=True): + async def new_crawl_job( + self, cid, userid, oid, scale=1, crawl_timeout=0, manual=True + ): """load job template from yaml""" if crawl_timeout: crawl_expire_time = to_k8s_date(dt_now() + timedelta(seconds=crawl_timeout)) @@ -77,6 +79,7 @@ class K8sAPI: params = { "id": crawl_id, "cid": cid, + "oid": oid, "userid": userid, "scale": scale, "expire_time": crawl_expire_time, @@ -135,12 +138,10 @@ class K8sAPI: grace_period_seconds=0, propagation_policy="Foreground", ) - return True + return {"success": True} - # pylint: disable=broad-except - except Exception as exc: - print("CrawlJob delete failed", exc) - return False + except ApiException as api_exc: + return {"error": str(api_exc.reason)} async def delete_profile_browser(self, browserid): """delete custom crawljob object""" @@ -156,9 +157,7 @@ class K8sAPI: ) return True - # pylint: disable=broad-except - except Exception as exc: - print("ProfileJob delete failed", exc) + except ApiException: return False async def get_profile_browser(self, browserid): diff --git a/backend/btrixcloud/main_scheduled_job.py b/backend/btrixcloud/main_scheduled_job.py index 8cb88e6b..bd13e747 100644 --- a/backend/btrixcloud/main_scheduled_job.py +++ b/backend/btrixcloud/main_scheduled_job.py @@ -37,12 +37,13 @@ class ScheduledJob(K8sAPI): userid = data["USER_ID"] scale = int(data.get("INITIAL_SCALE", 0)) crawl_timeout = int(data.get("CRAWL_TIMEOUT", 0)) + oid = data["ORG_ID"] crawlconfig = await get_crawl_config(self.crawlconfigs, uuid.UUID(self.cid)) # k8s create crawl_id = await self.new_crawl_job( - self.cid, userid, scale, crawl_timeout, manual=False + self.cid, userid, oid, scale, crawl_timeout, manual=False ) # db create diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index c08227ef..6f002baa 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -23,15 +23,17 @@ from .utils import ( from .k8sapi import K8sAPI from .db import init_db -from .orgs import inc_org_stats +from .orgs import inc_org_stats, get_max_concurrent_crawls from .colls import add_successful_crawl_to_collections from .crawlconfigs import stats_recompute_last from .crawls import ( CrawlFile, CrawlCompleteIn, add_crawl_file, - update_crawl_state_if_changed, + update_crawl_state_if_allowed, + get_crawl_state, add_crawl_errors, + NON_RUNNING_STATES, SUCCESSFUL_STATES, ) @@ -40,6 +42,7 @@ STS = "StatefulSet.apps/v1" CMAP = "ConfigMap.v1" PVC = "PersistentVolumeClaim.v1" POD = "Pod.v1" +CJS = "CrawlJob.btrix.cloud/v1" DEFAULT_TTL = 30 @@ -98,6 +101,7 @@ class CrawlStatus(BaseModel): # ============================================================================ +# pylint: disable=too-many-statements class BtrixOperator(K8sAPI): """BtrixOperator Handler""" @@ -152,6 +156,7 @@ class BtrixOperator(K8sAPI): spec = data.parent.get("spec", {}) crawl_id = spec["id"] cid = spec["cid"] + oid = spec["oid"] scale = spec.get("scale", 1) status.scale = scale @@ -186,7 +191,7 @@ class BtrixOperator(K8sAPI): crawl = CrawlSpec( id=crawl_id, cid=cid, - oid=configmap["ORG_ID"], + oid=oid, storage_name=configmap["STORAGE_NAME"], storage_path=configmap["STORE_PATH"], scale=scale, @@ -195,6 +200,14 @@ class BtrixOperator(K8sAPI): expire_time=from_k8s_date(spec.get("expireTime")), ) + if status.state in ("starting", "waiting_org_limit"): + if not await self.can_start_new(crawl, data, status): + return self._done_response(status) + + await self.set_state( + "starting", status, crawl.id, allowed_from=["waiting_org_limit"] + ) + crawl_sts = f"crawl-{crawl_id}" redis_sts = f"redis-{crawl_id}" @@ -202,9 +215,10 @@ class BtrixOperator(K8sAPI): if has_crawl_children: pods = data.related[POD] status = await self.sync_crawl_state(redis_url, crawl, status, pods) - - if status.finished: - return await self.handle_finished_delete_if_needed(crawl_id, status, spec) + if status.finished: + return await self.handle_finished_delete_if_needed( + crawl_id, status, spec + ) params = {} params.update(self.shared_params) @@ -239,6 +253,53 @@ class BtrixOperator(K8sAPI): return {"status": status.dict(exclude_none=True), "children": children} + async def set_state(self, state, status, crawl_id, allowed_from, **kwargs): + """set status state and update db, if changed + if allowed_from passed in, can only transition from allowed_from state, + otherwise get current state from db and return + the following state transitions are supported: + + from starting to org concurrent crawl limit and back: + - starting -> waiting_org_capacity -> starting + + from starting to running: + - starting -> running + + from running to complete or partial_complete: + - running -> complete + - running -> partial_complete + + from starting or running to waiting for capacity (pods pending) and back: + - starting -> waiting_capacity + - running -> waiting_capacity + - waiting_capacity -> running + + from any state to canceled or failed: + - -> canceled + - -> failed + """ + if not allowed_from or status.state in allowed_from: + res = await update_crawl_state_if_allowed( + self.crawls, crawl_id, state=state, allowed_from=allowed_from, **kwargs + ) + if res: + print(f"Setting state: {status.state} -> {state}, {crawl_id}") + status.state = state + return True + + # get actual crawl state + new_state, finished = await get_crawl_state(self.crawls, crawl_id) + if new_state: + status.state = state + if finished: + status.finished = to_k8s_date(finished) + + if status.state != state: + print( + f"Not setting state: {status.state} -> {state}, {crawl_id} not allowed" + ) + return False + def load_from_yaml(self, filename, params): """load and parse k8s template from yaml file""" return list( @@ -250,6 +311,7 @@ class BtrixOperator(K8sAPI): spec = data.parent.get("spec", {}) cid = spec["cid"] crawl_id = spec["id"] + oid = spec.get("oid") return { "relatedResources": [ { @@ -269,9 +331,51 @@ class BtrixOperator(K8sAPI): "matchLabels": {"crawl": crawl_id, "role": "crawler"} }, }, + { + "apiVersion": "btrix.cloud/v1", + "resource": "crawljobs", + "labelSelector": {"matchLabels": {"oid": oid}}, + }, ] } + async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status): + """return true if crawl can start, otherwise set crawl to 'queued' state + until more crawls for org finish""" + max_crawls = await get_max_concurrent_crawls(self.orgs, crawl.oid) + if not max_crawls: + return True + + if len(data.related[CJS]) <= max_crawls: + return True + + name = data.parent.get("metadata").get("name") + + # def metadata_key(val): + # return val.get("metadata").get("creationTimestamp") + + # all_crawljobs = sorted(data.related[CJS].values(), key=metadata_key) + # print(list(data.related[CJS].keys())) + + i = 0 + for crawl_sorted in data.related[CJS].values(): + if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES: + continue + + # print(i, crawl_sorted.get("metadata").get("name")) + if crawl_sorted.get("metadata").get("name") == name: + # print("found: ", name, "index", i) + if i < max_crawls: + return True + + break + i += 1 + + await self.set_state( + "waiting_org_limit", status, crawl.id, allowed_from=["starting"] + ) + return False + async def handle_finished_delete_if_needed(self, crawl_id, status, spec): """return status for finished job (no children) also check if deletion is necessary @@ -443,22 +547,21 @@ class BtrixOperator(K8sAPI): # check if at least one pod started running # otherwise, mark as 'waiting' and return if not await self.check_if_pods_running(pods): - if status.state not in ("waiting", "canceled"): - await update_crawl_state_if_changed( - self.crawls, crawl.id, state="waiting" - ) - status.state = "waiting" + await self.set_state( + "waiting_capacity", + status, + crawl.id, + allowed_from=["starting", "running"], + ) return status - # optimization: don't update db once crawl is already running - # will set stats at when crawl is finished, otherwise can read - # directly from redis - if status.state != "running": - await update_crawl_state_if_changed(self.crawls, crawl.id, state="running") + # set state to running (if not already) + await self.set_state( + "running", status, crawl.id, allowed_from=["starting", "waiting_capacity"] + ) # update status - status.state = "running" status.pagesDone = stats["done"] status.pagesFound = stats["found"] if stats["size"] is not None: @@ -510,27 +613,27 @@ class BtrixOperator(K8sAPI): ): """mark crawl as finished, set finished timestamp and final state""" - # already marked as finished - if status.state == state: - print("already finished, ignoring mark_finished") - return status - finished = dt_now() - status.state = state - status.finished = to_k8s_date(finished) - kwargs = {"finished": finished} if stats: kwargs["stats"] = stats - if not await update_crawl_state_if_changed( - self.crawls, crawl_id, state=state, **kwargs + if state in SUCCESSFUL_STATES: + allowed_from = ["running"] + else: + allowed_from = [] + + # if set_state returns false, already set to same status, return + if not await self.set_state( + state, status, crawl_id, allowed_from=allowed_from, **kwargs ): print("already finished, ignoring mark_finished") return status - if crawl: + status.finished = to_k8s_date(finished) + + if crawl and state in SUCCESSFUL_STATES: await self.inc_crawl_complete_stats(crawl, finished) asyncio.create_task( diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index f0619124..64d8ee13 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -78,6 +78,13 @@ class S3Storage(BaseModel): use_access_for_presign: Optional[bool] = True +# ============================================================================ +class OrgQuotas(BaseModel): + """Organization quotas (settable by superadmin)""" + + maxConcurrentCrawls: Optional[int] = 0 + + # ============================================================================ class Organization(BaseMongoModel): """Organization Base Model""" @@ -92,6 +99,8 @@ class Organization(BaseMongoModel): default: bool = False + quotas: Optional[OrgQuotas] = OrgQuotas() + def is_owner(self, user): """Check if user is owner""" return self._is_auth(user, UserRole.OWNER) @@ -162,6 +171,8 @@ class OrgOut(BaseMongoModel): usage: Optional[Dict[str, int]] default: bool = False + quotas: Optional[OrgQuotas] = OrgQuotas() + # ============================================================================ class OrgOps: @@ -315,6 +326,19 @@ class OrgOps: {"_id": org.id}, {"$set": {"storage": storage.dict()}} ) + async def update_quotas(self, org: Organization, quotas: OrgQuotas): + """update organization quotas""" + return await self.orgs.find_one_and_update( + {"_id": org.id}, + { + "$set": { + "quotas": quotas.dict( + exclude_unset=True, exclude_defaults=True, exclude_none=True + ) + } + }, + ) + async def handle_new_user_invite(self, invite_token: str, user: User): """Handle invite from a new user""" new_user_invite = await self.invites.get_valid_invite(invite_token, user.email) @@ -363,6 +387,16 @@ async def inc_org_stats(orgs, oid, duration): await orgs.find_one_and_update({"_id": oid}, {"$inc": {f"usage.{yymm}": duration}}) +# ============================================================================ +async def get_max_concurrent_crawls(orgs, oid): + """return max allowed concurrent crawls, if any""" + org = await orgs.find_one({"_id": oid}) + if org: + org = Organization.from_dict(org) + return org.quotas.maxConcurrentCrawls + return 0 + + # ============================================================================ # pylint: disable=too-many-statements def init_orgs_api(app, mdb, user_manager, invites, user_dep: User): @@ -447,9 +481,10 @@ def init_orgs_api(app, mdb, user_manager, invites, user_dep: User): users={}, storage=DefaultStorage(name="default", path=storage_path), ) - await ops.add_org(org) + if not await ops.add_org(org): + return {"added": False, "error": "already_exists"} - return {"added": True} + return {"id": id_, "added": True} @router.get("", tags=["organizations"]) async def get_org( @@ -471,6 +506,19 @@ def init_orgs_api(app, mdb, user_manager, invites, user_dep: User): return {"updated": True} + @router.post("/quotas", tags=["organizations"]) + async def update_quotas( + quotas: OrgQuotas, + org: Organization = Depends(org_owner_dep), + user: User = Depends(user_dep), + ): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + await ops.update_quotas(org, quotas) + + return {"updated": True} + @router.patch("/user-role", tags=["organizations"]) async def set_role( update: UpdateRole, diff --git a/backend/btrixcloud/templates/crawl_job.yaml b/backend/btrixcloud/templates/crawl_job.yaml index 4f3df710..0fbe4054 100644 --- a/backend/btrixcloud/templates/crawl_job.yaml +++ b/backend/btrixcloud/templates/crawl_job.yaml @@ -5,6 +5,8 @@ metadata: labels: crawl: "{{ id }}" role: "job" + oid: "{{ oid }}" + userid: "{{ userid }}" spec: selector: @@ -14,6 +16,7 @@ spec: id: "{{ id }}" userid: "{{ userid }}" cid: "{{ cid }}" + oid: "{{ oid }}" scale: {{ scale }} ttlSecondsAfterFinished: 30 diff --git a/backend/test/test_stop_cancel_crawl.py b/backend/test/test_stop_cancel_crawl.py index b13ef38f..e4b98e75 100644 --- a/backend/test/test_stop_cancel_crawl.py +++ b/backend/test/test_stop_cancel_crawl.py @@ -48,7 +48,7 @@ def test_cancel_crawl(default_org_id, crawler_auth_headers): data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) - while data["state"] in ("running", "waiting"): + while data["state"] in ("running", "waiting_capacity"): data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) assert data["state"] == "canceled" @@ -87,7 +87,7 @@ def test_start_crawl_and_stop_immediately( ) assert r.json()["lastCrawlStopping"] == True - while data["state"] in ("starting", "running", "waiting"): + while data["state"] in ("starting", "running", "waiting_capacity"): data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) assert data["state"] in ("canceled", "partial_complete") diff --git a/backend/test_nightly/conftest.py b/backend/test_nightly/conftest.py index cbf84e4c..8199ea08 100644 --- a/backend/test_nightly/conftest.py +++ b/backend/test_nightly/conftest.py @@ -1,6 +1,7 @@ import pytest import requests import time +import datetime HOST_PREFIX = "http://127.0.0.1:30870" @@ -257,3 +258,14 @@ def error_crawl_id(admin_auth_headers, default_org_id): if data["state"] == "complete": return crawl_id time.sleep(5) + + +@pytest.fixture(scope="session") +def org_with_quotas(admin_auth_headers): + name = "Quota Org " + datetime.datetime.utcnow().isoformat() + r = requests.post( + f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": name} + ) + data = r.json() + + return data["id"] diff --git a/backend/test_nightly/test_concurrent_crawl_limit.py b/backend/test_nightly/test_concurrent_crawl_limit.py new file mode 100644 index 00000000..84bfa645 --- /dev/null +++ b/backend/test_nightly/test_concurrent_crawl_limit.py @@ -0,0 +1,107 @@ +import requests +import time + +from .conftest import API_PREFIX + +crawl_id_a = None +crawl_id_b = None + + +def test_set_concurrent_crawl_limit(org_with_quotas, admin_auth_headers): + r = requests.post( + f"{API_PREFIX}/orgs/{org_with_quotas}/quotas", + headers=admin_auth_headers, + json={"maxConcurrentCrawls": 1}, + ) + data = r.json() + assert data.get("updated") == True + + +def test_run_two_only_one_concurrent(org_with_quotas, admin_auth_headers): + global crawl_id_a + crawl_id_a = run_crawl(org_with_quotas, admin_auth_headers) + time.sleep(1) + + global crawl_id_b + crawl_id_b = run_crawl(org_with_quotas, admin_auth_headers) + + while ( + get_crawl_status(org_with_quotas, crawl_id_a, admin_auth_headers) == "starting" + ): + time.sleep(2) + + assert ( + get_crawl_status(org_with_quotas, crawl_id_a, admin_auth_headers) == "running" + ) + + while ( + get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers) == "starting" + ): + time.sleep(2) + + assert ( + get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers) + == "waiting_org_limit" + ) + + +def test_cancel_and_run_other(org_with_quotas, admin_auth_headers): + r = requests.post( + f"{API_PREFIX}/orgs/{org_with_quotas}/crawls/{crawl_id_a}/cancel", + headers=admin_auth_headers, + ) + data = r.json() + assert data["success"] == True + + while ( + get_crawl_status(org_with_quotas, crawl_id_a, admin_auth_headers) != "canceled" + ): + time.sleep(2) + + while ( + get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers) + == "waiting_org_limit" + ): + time.sleep(5) + + assert get_crawl_status(org_with_quotas, crawl_id_b, admin_auth_headers) in ( + "starting", + "running", + ) + + # cancel second crawl as well + r = requests.post( + f"{API_PREFIX}/orgs/{org_with_quotas}/crawls/{crawl_id_b}/cancel", + headers=admin_auth_headers, + ) + data = r.json() + assert data["success"] == True + + +def run_crawl(org_id, headers): + crawl_data = { + "runNow": True, + "name": "Concurrent Crawl", + "config": { + "seeds": [{"url": "https://specs.webrecorder.net/"}], + "limit": 1, + }, + } + r = requests.post( + f"{API_PREFIX}/orgs/{org_id}/crawlconfigs/", + headers=headers, + json=crawl_data, + ) + data = r.json() + + return data["run_now_job"] + + +def get_crawl_status(org_id, crawl_id, headers): + while True: + r = requests.get( + f"{API_PREFIX}/orgs/{org_id}/crawls/{crawl_id}/replay.json", + headers=headers, + ) + data = r.json() + return data["state"] diff --git a/frontend/src/components/crawl-status.ts b/frontend/src/components/crawl-status.ts index 914fad29..4982122b 100644 --- a/frontend/src/components/crawl-status.ts +++ b/frontend/src/components/crawl-status.ts @@ -79,14 +79,15 @@ export class CrawlStatus extends LitElement { break; } - case "waiting": { + case "waiting_capacity": + case "waiting_org_limit": { icon = html``; - label = msg("Waiting"); + label = state === "waiting_capacity" ? msg("Waiting (At Capacity)") : msg("Waiting (Crawl Limit)"); break; } diff --git a/frontend/src/components/orgs-list.ts b/frontend/src/components/orgs-list.ts index 99aa6b1d..41111df1 100644 --- a/frontend/src/components/orgs-list.ts +++ b/frontend/src/components/orgs-list.ts @@ -7,6 +7,7 @@ import LiteElement, { html } from "../utils/LiteElement"; import { isAdmin } from "../utils/orgs"; import { DASHBOARD_ROUTE } from "../routes"; +import { SlInput } from "@shoelace-style/shoelace"; @localized() export class OrgsList extends LiteElement { @@ -22,6 +23,9 @@ export class OrgsList extends LiteElement { @property({ type: Boolean }) skeleton? = false; + @property({ type: Object }) + currOrg?: OrgData | null = null; + render() { if (this.skeleton) { return this.renderSkeleton(); @@ -30,10 +34,63 @@ export class OrgsList extends LiteElement { return html`
    ${this.orgList?.map(this.renderOrg)} + ${this.renderOrgQuotas()}
`; } + private renderOrgQuotas() { + if (!this.currOrg) { + return html``; + } + + return html` + (this.currOrg = null)} + > + ${Object.entries(this.currOrg.quotas).map(([key, value]) => { + return html` + ${key}`; + })} + Update Quotas + + + `; + } + + private onUpdateQuota(e: CustomEvent) { + const inputEl = e.target as SlInput; + const quotas = this.currOrg?.quotas; + if (quotas) { + quotas[inputEl.name] = Number(inputEl.value); + } + } + + private onSubmitQuotas() { + if (this.currOrg) { + this.dispatchEvent(new CustomEvent("update-quotas", {detail: this.currOrg})); + } + this.currOrg = null; + } + + private showQuotas(org: OrgData) { + const stop = (e: Event) => { + e.preventDefault(); + e.stopPropagation(); + this.currOrg = org; + return false; + }; + + return stop; + } + private renderOrg = (org: OrgData) => { let defaultLabel: any; if (this.defaultOrg && org.id === this.defaultOrg.id) { @@ -52,10 +109,15 @@ export class OrgsList extends LiteElement {
${defaultLabel}${org.name}
-
- ${memberCount === 1 - ? msg(`1 member`) - : msg(str`${memberCount} members`)} +
+ + + +
+ ${memberCount === 1 + ? msg(`1 member`) + : msg(str`${memberCount} members`)} +
`; diff --git a/frontend/src/pages/home.ts b/frontend/src/pages/home.ts index b97ec195..0596f322 100644 --- a/frontend/src/pages/home.ts +++ b/frontend/src/pages/home.ts @@ -155,6 +155,7 @@ export class Home extends LiteElement { .defaultOrg=${ifDefined( this.userInfo?.orgs.find((org) => org.default === true) )} + @update-quotas=${this.onUpdateOrgQuotas} >
@@ -315,6 +316,15 @@ export class Home extends LiteElement { this.isSubmittingNewOrg = false; } + async onUpdateOrgQuotas(e: CustomEvent) { + const org = e.detail as OrgData; + + await this.apiFetch(`/orgs/${org.id}/quotas`, this.authState!, { + method: "POST", + body: JSON.stringify(org.quotas), + }); + } + async checkFormValidity(formEl: HTMLFormElement) { await this.updateComplete; return !formEl.querySelector("[data-invalid]"); diff --git a/frontend/src/pages/org/crawl-detail.ts b/frontend/src/pages/org/crawl-detail.ts index 22b1bffb..69a65281 100644 --- a/frontend/src/pages/org/crawl-detail.ts +++ b/frontend/src/pages/org/crawl-detail.ts @@ -90,7 +90,8 @@ export class CrawlDetail extends LiteElement { return ( this.crawl.state === "running" || this.crawl.state === "starting" || - this.crawl.state === "waiting" || + this.crawl.state === "waiting_capacity" || + this.crawl.state === "waiting_org_limit" || this.crawl.state === "stopping" ); } diff --git a/frontend/src/pages/org/workflow-detail.ts b/frontend/src/pages/org/workflow-detail.ts index aa98b327..3420a9bd 100644 --- a/frontend/src/pages/org/workflow-detail.ts +++ b/frontend/src/pages/org/workflow-detail.ts @@ -924,21 +924,31 @@ export class WorkflowDetail extends LiteElement { private renderWatchCrawl = () => { if (!this.authState || !(this.workflow?.lastCrawlState)) return ""; - const isStarting = this.workflow.lastCrawlState === "starting"; - const isWaiting = this.workflow.lastCrawlState === "waiting"; + let waitingMsg = null; + + switch (this.workflow.lastCrawlState) { + case "starting": + waitingMsg = msg("Crawl starting..."); + break; + + case "waiting_capacity": + waitingMsg = msg("Crawl waiting for available resources before it can start..."); + break; + + case "waiting_org_limit": + waitingMsg = msg("Crawl waiting for others to finish, concurrent limit per Organization reached..."); + break; + } + const isRunning = this.workflow.lastCrawlState === "running"; const isStopping = this.workflow.lastCrawlStopping; const authToken = this.authState.headers.Authorization.split(" ")[1]; return html` - ${isStarting || isWaiting + ${waitingMsg ? html`

- ${isStarting - ? msg("Crawl starting...") - : msg( - "Crawl waiting for available resources before it can start..." - )} + ${waitingMsg}

` : isActive(this.workflow.lastCrawlState) diff --git a/frontend/src/types/crawler.ts b/frontend/src/types/crawler.ts index 879a3ddf..5ea9d685 100644 --- a/frontend/src/types/crawler.ts +++ b/frontend/src/types/crawler.ts @@ -88,7 +88,8 @@ export type Profile = { export type CrawlState = | "starting" - | "waiting" + | "waiting_capacity" + | "waiting_org_limit" | "running" | "complete" | "failed" diff --git a/frontend/src/types/org.ts b/frontend/src/types/org.ts index f07139ad..7ad558a4 100644 --- a/frontend/src/types/org.ts +++ b/frontend/src/types/org.ts @@ -11,6 +11,7 @@ export const AccessCode: Record = { export type OrgData = { id: string; name: string; + quotas: Record; users?: { [id: string]: { role: (typeof AccessCode)[UserRole]; diff --git a/frontend/src/utils/crawler.ts b/frontend/src/utils/crawler.ts index a1828602..79fe8423 100644 --- a/frontend/src/utils/crawler.ts +++ b/frontend/src/utils/crawler.ts @@ -1,7 +1,8 @@ import type { CrawlState } from "../types/crawler"; export const activeCrawlStates: CrawlState[] = [ "starting", - "waiting", + "waiting_org_limit", + "waiting_capacity", "running", "stopping", ];