diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index d4d6681d..1ce00cd6 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -37,6 +37,7 @@ class CrawlManager(K8sAPI): baseprofile: str = "", profile_filename: str = "", proxy_id: str = "", + profileid: str = "", ) -> str: """run browser for profile creation""" @@ -60,6 +61,7 @@ class CrawlManager(K8sAPI): "crawler_image": crawler_image, "image_pull_policy": image_pull_policy, "proxy_id": proxy_id or DEFAULT_PROXY_ID, + "profileid": profileid, } data = self.templates.env.get_template("profile_job.yaml").render(params) @@ -365,14 +367,21 @@ class CrawlManager(K8sAPI): except: return {} - return browser["metadata"]["labels"] + metadata = browser["metadata"]["labels"] - async def ping_profile_browser(self, browserid: str) -> None: - """return ping profile browser""" + metadata["committing"] = browser.get("spec", {}).get("committing") + + return metadata + + async def keep_alive_profile_browser(self, browserid: str, committing="") -> None: + """update profile browser to not expire""" expire_at = dt_now() + timedelta(seconds=30) - await self._patch_job( - browserid, {"expireTime": date_to_str(expire_at)}, "profilejobs" - ) + + update = {"expireTime": date_to_str(expire_at)} + if committing: + update["committing"] = committing + + await self._patch_job(browserid, update, "profilejobs") async def rollover_restart_crawl(self, crawl_id: str) -> dict: """Rolling restart of crawl by updating restartTime field""" diff --git a/backend/btrixcloud/profiles.py b/backend/btrixcloud/profiles.py index dee56c53..38df55a1 100644 --- a/backend/btrixcloud/profiles.py +++ b/backend/btrixcloud/profiles.py @@ -3,6 +3,7 @@ from typing import Optional, TYPE_CHECKING, Any, cast, Dict, List, Tuple from uuid import UUID, uuid4 import os +import asyncio from urllib.parse import urlencode @@ -61,6 +62,8 @@ class ProfileOps: browser_fqdn_suffix: str router: APIRouter + bg_tasks: set + def __init__(self, mdb, orgs, crawl_manager, storage_ops, background_job_ops): self.profiles = mdb["profiles"] self.orgs = orgs @@ -79,6 +82,10 @@ class ProfileOps: self.crawlconfigs = cast(CrawlConfigOps, None) + # to avoid background tasks being garbage collected + # see: https://stackoverflow.com/a/74059981 + self.bg_tasks = set() + def set_crawlconfigs(self, crawlconfigs): """set crawlconfigs ops""" self.crawlconfigs = crawlconfigs @@ -128,6 +135,7 @@ class ProfileOps: baseprofile=prev_profile_id, profile_filename=prev_profile_path, proxy_id=proxy_id, + profileid=str(uuid4()), ) if not browserid: @@ -166,8 +174,6 @@ class ProfileOps: async def ping_profile_browser(self, browserid: str) -> dict[str, Any]: """ping profile browser to keep it running""" - await self.crawl_manager.ping_profile_browser(browserid) - json = await self._send_browser_req(browserid, "/ping") return {"success": True, "origins": json.get("origins") or []} @@ -182,93 +188,130 @@ class ProfileOps: async def commit_to_profile( self, + metadata: dict, browser_commit: ProfileCreate, org: Organization, user: User, - metadata: dict, existing_profile: Optional[Profile] = None, ) -> dict[str, Any]: - """commit profile and shutdown profile browser""" - # pylint: disable=too-many-locals - - now = dt_now() - - if existing_profile: - profileid = existing_profile.id - created = existing_profile.created - created_by = existing_profile.createdBy - created_by_name = existing_profile.createdByName - else: - profileid = uuid4() - created = now - created_by = user.id - created_by_name = user.name if user.name else user.email - - filename_data = {"filename": f"profiles/profile-{profileid}.tar.gz"} - - json = await self._send_browser_req( - browser_commit.browserid, "/createProfileJS", "POST", json=filename_data - ) - - try: - resource = json["resource"] - except: - # pylint: disable=raise-missing-from + """commit to profile async, returning if committed, or waiting""" + profileid = metadata.get("profileid") + if not profileid: raise HTTPException(status_code=400, detail="browser_not_valid") - await self.crawl_manager.delete_profile_browser(browser_commit.browserid) - - # backwards compatibility - file_size = resource.get("size") or resource.get("bytes") - - profile_file = ProfileFile( - hash=resource["hash"], - size=file_size, - filename=resource["path"], - storage=org.storage, - ) - - baseid = metadata.get("btrix.baseprofile") - if baseid: - print("baseid", baseid) - baseid = UUID(baseid) - self.orgs.can_write_data(org, include_time=False) - profile = Profile( - id=profileid, - name=browser_commit.name, - description=browser_commit.description, - created=created, - createdBy=created_by, - createdByName=created_by_name, - modified=now, - modifiedBy=user.id, - modifiedByName=user.name if user.name else user.email, - origins=json["origins"], - resource=profile_file, - userid=UUID(metadata.get("btrix.user")), - oid=org.id, - baseid=baseid, - crawlerChannel=browser_commit.crawlerChannel, - proxyId=browser_commit.proxyId, - ) + committing = metadata.get("committing") + if not committing: + self._run_task( + self.do_commit_to_profile( + metadata=metadata, + browser_commit=browser_commit, + org=org, + user=user, + existing_profile=existing_profile, + ) + ) - await self.profiles.find_one_and_update( - {"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True - ) + if committing == "done": + await self.crawl_manager.delete_profile_browser(browser_commit.browserid) + return { + "added": True, + "id": profileid, + "storageQuotaReached": self.orgs.storage_quota_reached(org), + } - await self.background_job_ops.create_replica_jobs( - org.id, profile_file, str(profileid), "profile" - ) + raise HTTPException(status_code=200, detail="waiting_for_browser") - await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile") + async def do_commit_to_profile( + self, + metadata: dict, + browser_commit: ProfileCreate, + org: Organization, + user: User, + existing_profile: Optional[Profile] = None, + ) -> bool: + """commit profile and shutdown profile browser""" + # pylint: disable=too-many-locals + try: + now = dt_now() - return { - "added": True, - "id": str(profile.id), - "storageQuotaReached": self.orgs.storage_quota_reached(org), - } + if existing_profile: + profileid = existing_profile.id + created = existing_profile.created + created_by = existing_profile.createdBy + created_by_name = existing_profile.createdByName + else: + profileid = UUID(metadata["profileid"]) + created = now + created_by = user.id + created_by_name = user.name if user.name else user.email + + filename_data = {"filename": f"profiles/profile-{profileid}.tar.gz"} + + json = await self._send_browser_req( + browser_commit.browserid, + "/createProfileJS", + "POST", + json=filename_data, + committing="committing", + ) + resource = json["resource"] + + # backwards compatibility + file_size = resource.get("size") or resource.get("bytes") + + profile_file = ProfileFile( + hash=resource["hash"], + size=file_size, + filename=resource["path"], + storage=org.storage, + ) + + baseid = metadata.get("btrix.baseprofile") + if baseid: + print("baseid", baseid) + baseid = UUID(baseid) + + profile = Profile( + id=profileid, + name=browser_commit.name, + description=browser_commit.description, + created=created, + createdBy=created_by, + createdByName=created_by_name, + modified=now, + modifiedBy=user.id, + modifiedByName=user.name if user.name else user.email, + origins=json["origins"], + resource=profile_file, + userid=UUID(metadata.get("btrix.user")), + oid=org.id, + baseid=baseid, + crawlerChannel=browser_commit.crawlerChannel, + proxyId=browser_commit.proxyId, + ) + + await self.profiles.find_one_and_update( + {"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True + ) + + await self.background_job_ops.create_replica_jobs( + org.id, profile_file, str(profileid), "profile" + ) + + await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile") + + await self.crawl_manager.keep_alive_profile_browser( + browser_commit.browserid, committing="done" + ) + + # pylint: disable=broad-except + except Exception as e: + print("Profile commit failed", e) + return False + + return True async def update_profile_metadata( self, profileid: UUID, update: ProfileUpdate, user: User @@ -432,8 +475,13 @@ class ProfileOps: path: str, method: str = "GET", json: Optional[dict[str, Any]] = None, + committing="", ) -> dict[str, Any]: """make request to browser api to get state""" + await self.crawl_manager.keep_alive_profile_browser( + browserid, committing=committing + ) + try: async with aiohttp.ClientSession() as session: async with session.request( @@ -443,7 +491,8 @@ class ProfileOps: ) as resp: json = await resp.json() - except Exception: + except Exception as e: + print(e) # pylint: disable=raise-missing-from raise HTTPException(status_code=200, detail="waiting_for_browser") @@ -470,6 +519,12 @@ class ProfileOps: return total_size + def _run_task(self, func) -> None: + """add bg tasks to set to avoid premature garbage collection""" + task = asyncio.create_task(func) + self.bg_tasks.add(task) + task.add_done_callback(self.bg_tasks.discard) + # ============================================================================ # pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments @@ -529,7 +584,9 @@ def init_profiles_api( ): metadata = await browser_get_metadata(browser_commit.browserid, org) - return await ops.commit_to_profile(browser_commit, org, user, metadata) + return await ops.commit_to_profile( + browser_commit=browser_commit, org=org, user=user, metadata=metadata + ) @router.patch("/{profileid}", response_model=UpdatedResponse) async def commit_browser_to_existing( diff --git a/backend/test/conftest.py b/backend/test/conftest.py index cc4f30df..1848049b 100644 --- a/backend/test/conftest.py +++ b/backend/test/conftest.py @@ -702,7 +702,7 @@ def profile_id(admin_auth_headers, default_org_id, profile_browser_id): # Create profile start_time = time.monotonic() - time_limit = 300 + time_limit = 30 while True: try: r = requests.post( @@ -793,7 +793,7 @@ def profile_2_id(admin_auth_headers, default_org_id, profile_browser_2_id): # Create profile start_time = time.monotonic() - time_limit = 300 + time_limit = 30 while True: try: r = requests.post( diff --git a/backend/test/test_profiles.py b/backend/test/test_profiles.py index f6dbf776..75971bcf 100644 --- a/backend/test/test_profiles.py +++ b/backend/test/test_profiles.py @@ -206,20 +206,24 @@ def test_commit_browser_to_existing_profile( ) # Commit new browser to existing profile - r = requests.patch( - f"{API_PREFIX}/orgs/{default_org_id}/profiles/{profile_id}", - headers=admin_auth_headers, - json={ - "browserid": profile_browser_3_id, - "name": PROFILE_NAME_UPDATED, - "description": PROFILE_DESC_UPDATED, - }, - ) - assert r.status_code == 200 + while True: + r = requests.patch( + f"{API_PREFIX}/orgs/{default_org_id}/profiles/{profile_id}", + headers=admin_auth_headers, + json={ + "browserid": profile_browser_3_id, + "name": PROFILE_NAME_UPDATED, + "description": PROFILE_DESC_UPDATED, + }, + ) + assert r.status_code == 200 + if r.json().get("detail") == "waiting_for_browser": + time.sleep(5) + continue + + break + assert r.json()["updated"] - - time.sleep(5) - # Ensure modified was updated but created was not r = requests.get( f"{API_PREFIX}/orgs/{default_org_id}/profiles/{profile_id}", @@ -340,7 +344,7 @@ def test_create_profile_read_only_org( # Try to create profile, verify we get 403 forbidden start_time = time.monotonic() - time_limit = 300 + time_limit = 30 while True: try: r = requests.post( @@ -357,9 +361,9 @@ def test_create_profile_read_only_org( if detail == "waiting_for_browser": time.sleep(5) continue - if detail == "org_set_to_read_only": - assert r.status_code == 403 - break + assert detail == "org_set_to_read_only" + assert r.status_code == 403 + break except: if time.monotonic() - start_time > time_limit: raise diff --git a/chart/app-templates/profile_job.yaml b/chart/app-templates/profile_job.yaml index 24d67cc9..5fcea65a 100644 --- a/chart/app-templates/profile_job.yaml +++ b/chart/app-templates/profile_job.yaml @@ -11,6 +11,7 @@ metadata: btrix.baseprofile: "{{ base_profile }}" {%- endif %} btrix.storage: "{{ storage_name }}" + profileid: {{ profileid }} spec: selector: diff --git a/frontend/src/pages/org/browser-profiles-detail.ts b/frontend/src/pages/org/browser-profiles-detail.ts index 45b4838f..d57abb7a 100644 --- a/frontend/src/pages/org/browser-profiles-detail.ts +++ b/frontend/src/pages/org/browser-profiles-detail.ts @@ -70,7 +70,10 @@ export class BrowserProfilesDetail extends BtrixElement { private readonly validateNameMax = maxLengthValidator(50); private readonly validateDescriptionMax = maxLengthValidator(500); + private updatedProfileTimer?: number; + disconnectedCallback() { + window.clearTimeout(this.updatedProfileTimer); if (this.browserId) { void this.deleteBrowser(this.browserId); } @@ -679,27 +682,45 @@ export class BrowserProfilesDetail extends BtrixElement { }; try { - const data = await this.api.fetch<{ updated: boolean }>( - `/orgs/${this.orgId}/profiles/${this.profileId}`, - { + let retriesLeft = 300; + + while (retriesLeft > 0) { + const data = await this.api.fetch<{ + updated?: boolean; + detail?: string; + }>(`/orgs/${this.orgId}/profiles/${this.profileId}`, { method: "PATCH", body: JSON.stringify(params), - }, - ); - - if (data.updated) { - this.notify.toast({ - message: msg("Successfully saved browser profile."), - variant: "success", - icon: "check2-circle", - id: "browser-profile-save-status", }); + if (data.updated !== undefined) { + break; + } + if (data.detail === "waiting_for_browser") { + await new Promise((resolve) => { + this.updatedProfileTimer = window.setTimeout(resolve, 2000); + }); + } else { + throw new Error("unknown response"); + } - this.browserId = undefined; - } else { - throw data; + retriesLeft -= 1; } + + if (!retriesLeft) { + throw new Error("too many retries waiting for browser"); + } + + this.notify.toast({ + message: msg("Successfully saved browser profile."), + variant: "success", + icon: "check2-circle", + id: "browser-profile-save-status", + }); + + this.browserId = undefined; } catch (e) { + console.debug(e); + this.notify.toast({ message: msg("Sorry, couldn't save browser profile at this time."), variant: "danger", diff --git a/frontend/src/pages/org/browser-profiles-new.ts b/frontend/src/pages/org/browser-profiles-new.ts index 5a4f7f55..62cb8828 100644 --- a/frontend/src/pages/org/browser-profiles-new.ts +++ b/frontend/src/pages/org/browser-profiles-new.ts @@ -313,13 +313,36 @@ export class BrowserProfilesNew extends BtrixElement { }; try { - const data = await this.api.fetch<{ id: string }>( - `/orgs/${this.orgId}/profiles`, - { - method: "POST", - body: JSON.stringify(params), - }, - ); + let data; + let retriesLeft = 300; + + while (retriesLeft > 0) { + data = await this.api.fetch<{ id?: string; detail?: string }>( + `/orgs/${this.orgId}/profiles`, + { + method: "POST", + body: JSON.stringify(params), + }, + ); + if (data.id) { + break; + } + if (data.detail === "waiting_for_browser") { + await new Promise((resolve) => setTimeout(resolve, 2000)); + } else { + throw new Error("unknown response"); + } + + retriesLeft -= 1; + } + + if (!retriesLeft) { + throw new Error("too many retries waiting for browser"); + } + + if (!data) { + throw new Error("unknown response"); + } this.notify.toast({ message: msg("Successfully created browser profile."), @@ -332,6 +355,8 @@ export class BrowserProfilesNew extends BtrixElement { `${this.navigate.orgBasePath}/browser-profiles/profile/${data.id}`, ); } catch (e) { + console.debug(e); + this.isSubmitting = false; let message = msg("Sorry, couldn't create browser profile at this time.");