Profiles: Make browser commit API call idempotent (#2728)
- Fix race condition related to browser commit time - The profile commit request waits for browser to actual finish, and profile saved. This can cause request to time out, resulting in a retry, in which the browser has already been closed. - With these changes, the commit is now 'idempotent' and returns a waiting_for_browser until the profile is actually committed. - On frontend, keep pinging commit endpoint with a timeout while 'waiting_for_browser' is returned, actual committed when endpoint returns profile id. --------- Co-authored-by: sua yoo <sua@suayoo.com>
This commit is contained in:
parent
3043b67e49
commit
8107b054f6
@ -37,6 +37,7 @@ class CrawlManager(K8sAPI):
|
||||
baseprofile: str = "",
|
||||
profile_filename: str = "",
|
||||
proxy_id: str = "",
|
||||
profileid: str = "",
|
||||
) -> str:
|
||||
"""run browser for profile creation"""
|
||||
|
||||
@ -60,6 +61,7 @@ class CrawlManager(K8sAPI):
|
||||
"crawler_image": crawler_image,
|
||||
"image_pull_policy": image_pull_policy,
|
||||
"proxy_id": proxy_id or DEFAULT_PROXY_ID,
|
||||
"profileid": profileid,
|
||||
}
|
||||
|
||||
data = self.templates.env.get_template("profile_job.yaml").render(params)
|
||||
@ -365,14 +367,21 @@ class CrawlManager(K8sAPI):
|
||||
except:
|
||||
return {}
|
||||
|
||||
return browser["metadata"]["labels"]
|
||||
metadata = browser["metadata"]["labels"]
|
||||
|
||||
async def ping_profile_browser(self, browserid: str) -> None:
|
||||
"""return ping profile browser"""
|
||||
metadata["committing"] = browser.get("spec", {}).get("committing")
|
||||
|
||||
return metadata
|
||||
|
||||
async def keep_alive_profile_browser(self, browserid: str, committing="") -> None:
|
||||
"""update profile browser to not expire"""
|
||||
expire_at = dt_now() + timedelta(seconds=30)
|
||||
await self._patch_job(
|
||||
browserid, {"expireTime": date_to_str(expire_at)}, "profilejobs"
|
||||
)
|
||||
|
||||
update = {"expireTime": date_to_str(expire_at)}
|
||||
if committing:
|
||||
update["committing"] = committing
|
||||
|
||||
await self._patch_job(browserid, update, "profilejobs")
|
||||
|
||||
async def rollover_restart_crawl(self, crawl_id: str) -> dict:
|
||||
"""Rolling restart of crawl by updating restartTime field"""
|
||||
|
@ -3,6 +3,7 @@
|
||||
from typing import Optional, TYPE_CHECKING, Any, cast, Dict, List, Tuple
|
||||
from uuid import UUID, uuid4
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
from urllib.parse import urlencode
|
||||
|
||||
@ -61,6 +62,8 @@ class ProfileOps:
|
||||
browser_fqdn_suffix: str
|
||||
router: APIRouter
|
||||
|
||||
bg_tasks: set
|
||||
|
||||
def __init__(self, mdb, orgs, crawl_manager, storage_ops, background_job_ops):
|
||||
self.profiles = mdb["profiles"]
|
||||
self.orgs = orgs
|
||||
@ -79,6 +82,10 @@ class ProfileOps:
|
||||
|
||||
self.crawlconfigs = cast(CrawlConfigOps, None)
|
||||
|
||||
# to avoid background tasks being garbage collected
|
||||
# see: https://stackoverflow.com/a/74059981
|
||||
self.bg_tasks = set()
|
||||
|
||||
def set_crawlconfigs(self, crawlconfigs):
|
||||
"""set crawlconfigs ops"""
|
||||
self.crawlconfigs = crawlconfigs
|
||||
@ -128,6 +135,7 @@ class ProfileOps:
|
||||
baseprofile=prev_profile_id,
|
||||
profile_filename=prev_profile_path,
|
||||
proxy_id=proxy_id,
|
||||
profileid=str(uuid4()),
|
||||
)
|
||||
|
||||
if not browserid:
|
||||
@ -166,8 +174,6 @@ class ProfileOps:
|
||||
|
||||
async def ping_profile_browser(self, browserid: str) -> dict[str, Any]:
|
||||
"""ping profile browser to keep it running"""
|
||||
await self.crawl_manager.ping_profile_browser(browserid)
|
||||
|
||||
json = await self._send_browser_req(browserid, "/ping")
|
||||
|
||||
return {"success": True, "origins": json.get("origins") or []}
|
||||
@ -182,93 +188,130 @@ class ProfileOps:
|
||||
|
||||
async def commit_to_profile(
|
||||
self,
|
||||
metadata: dict,
|
||||
browser_commit: ProfileCreate,
|
||||
org: Organization,
|
||||
user: User,
|
||||
metadata: dict,
|
||||
existing_profile: Optional[Profile] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""commit profile and shutdown profile browser"""
|
||||
# pylint: disable=too-many-locals
|
||||
|
||||
now = dt_now()
|
||||
|
||||
if existing_profile:
|
||||
profileid = existing_profile.id
|
||||
created = existing_profile.created
|
||||
created_by = existing_profile.createdBy
|
||||
created_by_name = existing_profile.createdByName
|
||||
else:
|
||||
profileid = uuid4()
|
||||
created = now
|
||||
created_by = user.id
|
||||
created_by_name = user.name if user.name else user.email
|
||||
|
||||
filename_data = {"filename": f"profiles/profile-{profileid}.tar.gz"}
|
||||
|
||||
json = await self._send_browser_req(
|
||||
browser_commit.browserid, "/createProfileJS", "POST", json=filename_data
|
||||
)
|
||||
|
||||
try:
|
||||
resource = json["resource"]
|
||||
except:
|
||||
# pylint: disable=raise-missing-from
|
||||
"""commit to profile async, returning if committed, or waiting"""
|
||||
profileid = metadata.get("profileid")
|
||||
if not profileid:
|
||||
raise HTTPException(status_code=400, detail="browser_not_valid")
|
||||
|
||||
await self.crawl_manager.delete_profile_browser(browser_commit.browserid)
|
||||
|
||||
# backwards compatibility
|
||||
file_size = resource.get("size") or resource.get("bytes")
|
||||
|
||||
profile_file = ProfileFile(
|
||||
hash=resource["hash"],
|
||||
size=file_size,
|
||||
filename=resource["path"],
|
||||
storage=org.storage,
|
||||
)
|
||||
|
||||
baseid = metadata.get("btrix.baseprofile")
|
||||
if baseid:
|
||||
print("baseid", baseid)
|
||||
baseid = UUID(baseid)
|
||||
|
||||
self.orgs.can_write_data(org, include_time=False)
|
||||
|
||||
profile = Profile(
|
||||
id=profileid,
|
||||
name=browser_commit.name,
|
||||
description=browser_commit.description,
|
||||
created=created,
|
||||
createdBy=created_by,
|
||||
createdByName=created_by_name,
|
||||
modified=now,
|
||||
modifiedBy=user.id,
|
||||
modifiedByName=user.name if user.name else user.email,
|
||||
origins=json["origins"],
|
||||
resource=profile_file,
|
||||
userid=UUID(metadata.get("btrix.user")),
|
||||
oid=org.id,
|
||||
baseid=baseid,
|
||||
crawlerChannel=browser_commit.crawlerChannel,
|
||||
proxyId=browser_commit.proxyId,
|
||||
)
|
||||
committing = metadata.get("committing")
|
||||
if not committing:
|
||||
self._run_task(
|
||||
self.do_commit_to_profile(
|
||||
metadata=metadata,
|
||||
browser_commit=browser_commit,
|
||||
org=org,
|
||||
user=user,
|
||||
existing_profile=existing_profile,
|
||||
)
|
||||
)
|
||||
|
||||
await self.profiles.find_one_and_update(
|
||||
{"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True
|
||||
)
|
||||
if committing == "done":
|
||||
await self.crawl_manager.delete_profile_browser(browser_commit.browserid)
|
||||
return {
|
||||
"added": True,
|
||||
"id": profileid,
|
||||
"storageQuotaReached": self.orgs.storage_quota_reached(org),
|
||||
}
|
||||
|
||||
await self.background_job_ops.create_replica_jobs(
|
||||
org.id, profile_file, str(profileid), "profile"
|
||||
)
|
||||
raise HTTPException(status_code=200, detail="waiting_for_browser")
|
||||
|
||||
await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile")
|
||||
async def do_commit_to_profile(
|
||||
self,
|
||||
metadata: dict,
|
||||
browser_commit: ProfileCreate,
|
||||
org: Organization,
|
||||
user: User,
|
||||
existing_profile: Optional[Profile] = None,
|
||||
) -> bool:
|
||||
"""commit profile and shutdown profile browser"""
|
||||
# pylint: disable=too-many-locals
|
||||
try:
|
||||
now = dt_now()
|
||||
|
||||
return {
|
||||
"added": True,
|
||||
"id": str(profile.id),
|
||||
"storageQuotaReached": self.orgs.storage_quota_reached(org),
|
||||
}
|
||||
if existing_profile:
|
||||
profileid = existing_profile.id
|
||||
created = existing_profile.created
|
||||
created_by = existing_profile.createdBy
|
||||
created_by_name = existing_profile.createdByName
|
||||
else:
|
||||
profileid = UUID(metadata["profileid"])
|
||||
created = now
|
||||
created_by = user.id
|
||||
created_by_name = user.name if user.name else user.email
|
||||
|
||||
filename_data = {"filename": f"profiles/profile-{profileid}.tar.gz"}
|
||||
|
||||
json = await self._send_browser_req(
|
||||
browser_commit.browserid,
|
||||
"/createProfileJS",
|
||||
"POST",
|
||||
json=filename_data,
|
||||
committing="committing",
|
||||
)
|
||||
resource = json["resource"]
|
||||
|
||||
# backwards compatibility
|
||||
file_size = resource.get("size") or resource.get("bytes")
|
||||
|
||||
profile_file = ProfileFile(
|
||||
hash=resource["hash"],
|
||||
size=file_size,
|
||||
filename=resource["path"],
|
||||
storage=org.storage,
|
||||
)
|
||||
|
||||
baseid = metadata.get("btrix.baseprofile")
|
||||
if baseid:
|
||||
print("baseid", baseid)
|
||||
baseid = UUID(baseid)
|
||||
|
||||
profile = Profile(
|
||||
id=profileid,
|
||||
name=browser_commit.name,
|
||||
description=browser_commit.description,
|
||||
created=created,
|
||||
createdBy=created_by,
|
||||
createdByName=created_by_name,
|
||||
modified=now,
|
||||
modifiedBy=user.id,
|
||||
modifiedByName=user.name if user.name else user.email,
|
||||
origins=json["origins"],
|
||||
resource=profile_file,
|
||||
userid=UUID(metadata.get("btrix.user")),
|
||||
oid=org.id,
|
||||
baseid=baseid,
|
||||
crawlerChannel=browser_commit.crawlerChannel,
|
||||
proxyId=browser_commit.proxyId,
|
||||
)
|
||||
|
||||
await self.profiles.find_one_and_update(
|
||||
{"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True
|
||||
)
|
||||
|
||||
await self.background_job_ops.create_replica_jobs(
|
||||
org.id, profile_file, str(profileid), "profile"
|
||||
)
|
||||
|
||||
await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile")
|
||||
|
||||
await self.crawl_manager.keep_alive_profile_browser(
|
||||
browser_commit.browserid, committing="done"
|
||||
)
|
||||
|
||||
# pylint: disable=broad-except
|
||||
except Exception as e:
|
||||
print("Profile commit failed", e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def update_profile_metadata(
|
||||
self, profileid: UUID, update: ProfileUpdate, user: User
|
||||
@ -432,8 +475,13 @@ class ProfileOps:
|
||||
path: str,
|
||||
method: str = "GET",
|
||||
json: Optional[dict[str, Any]] = None,
|
||||
committing="",
|
||||
) -> dict[str, Any]:
|
||||
"""make request to browser api to get state"""
|
||||
await self.crawl_manager.keep_alive_profile_browser(
|
||||
browserid, committing=committing
|
||||
)
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.request(
|
||||
@ -443,7 +491,8 @@ class ProfileOps:
|
||||
) as resp:
|
||||
json = await resp.json()
|
||||
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# pylint: disable=raise-missing-from
|
||||
raise HTTPException(status_code=200, detail="waiting_for_browser")
|
||||
|
||||
@ -470,6 +519,12 @@ class ProfileOps:
|
||||
|
||||
return total_size
|
||||
|
||||
def _run_task(self, func) -> None:
|
||||
"""add bg tasks to set to avoid premature garbage collection"""
|
||||
task = asyncio.create_task(func)
|
||||
self.bg_tasks.add(task)
|
||||
task.add_done_callback(self.bg_tasks.discard)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
|
||||
@ -529,7 +584,9 @@ def init_profiles_api(
|
||||
):
|
||||
metadata = await browser_get_metadata(browser_commit.browserid, org)
|
||||
|
||||
return await ops.commit_to_profile(browser_commit, org, user, metadata)
|
||||
return await ops.commit_to_profile(
|
||||
browser_commit=browser_commit, org=org, user=user, metadata=metadata
|
||||
)
|
||||
|
||||
@router.patch("/{profileid}", response_model=UpdatedResponse)
|
||||
async def commit_browser_to_existing(
|
||||
|
@ -702,7 +702,7 @@ def profile_id(admin_auth_headers, default_org_id, profile_browser_id):
|
||||
|
||||
# Create profile
|
||||
start_time = time.monotonic()
|
||||
time_limit = 300
|
||||
time_limit = 30
|
||||
while True:
|
||||
try:
|
||||
r = requests.post(
|
||||
@ -793,7 +793,7 @@ def profile_2_id(admin_auth_headers, default_org_id, profile_browser_2_id):
|
||||
|
||||
# Create profile
|
||||
start_time = time.monotonic()
|
||||
time_limit = 300
|
||||
time_limit = 30
|
||||
while True:
|
||||
try:
|
||||
r = requests.post(
|
||||
|
@ -206,20 +206,24 @@ def test_commit_browser_to_existing_profile(
|
||||
)
|
||||
|
||||
# Commit new browser to existing profile
|
||||
r = requests.patch(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/profiles/{profile_id}",
|
||||
headers=admin_auth_headers,
|
||||
json={
|
||||
"browserid": profile_browser_3_id,
|
||||
"name": PROFILE_NAME_UPDATED,
|
||||
"description": PROFILE_DESC_UPDATED,
|
||||
},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
while True:
|
||||
r = requests.patch(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/profiles/{profile_id}",
|
||||
headers=admin_auth_headers,
|
||||
json={
|
||||
"browserid": profile_browser_3_id,
|
||||
"name": PROFILE_NAME_UPDATED,
|
||||
"description": PROFILE_DESC_UPDATED,
|
||||
},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
if r.json().get("detail") == "waiting_for_browser":
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
assert r.json()["updated"]
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# Ensure modified was updated but created was not
|
||||
r = requests.get(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/profiles/{profile_id}",
|
||||
@ -340,7 +344,7 @@ def test_create_profile_read_only_org(
|
||||
|
||||
# Try to create profile, verify we get 403 forbidden
|
||||
start_time = time.monotonic()
|
||||
time_limit = 300
|
||||
time_limit = 30
|
||||
while True:
|
||||
try:
|
||||
r = requests.post(
|
||||
@ -357,9 +361,9 @@ def test_create_profile_read_only_org(
|
||||
if detail == "waiting_for_browser":
|
||||
time.sleep(5)
|
||||
continue
|
||||
if detail == "org_set_to_read_only":
|
||||
assert r.status_code == 403
|
||||
break
|
||||
assert detail == "org_set_to_read_only"
|
||||
assert r.status_code == 403
|
||||
break
|
||||
except:
|
||||
if time.monotonic() - start_time > time_limit:
|
||||
raise
|
||||
|
@ -11,6 +11,7 @@ metadata:
|
||||
btrix.baseprofile: "{{ base_profile }}"
|
||||
{%- endif %}
|
||||
btrix.storage: "{{ storage_name }}"
|
||||
profileid: {{ profileid }}
|
||||
|
||||
spec:
|
||||
selector:
|
||||
|
@ -70,7 +70,10 @@ export class BrowserProfilesDetail extends BtrixElement {
|
||||
private readonly validateNameMax = maxLengthValidator(50);
|
||||
private readonly validateDescriptionMax = maxLengthValidator(500);
|
||||
|
||||
private updatedProfileTimer?: number;
|
||||
|
||||
disconnectedCallback() {
|
||||
window.clearTimeout(this.updatedProfileTimer);
|
||||
if (this.browserId) {
|
||||
void this.deleteBrowser(this.browserId);
|
||||
}
|
||||
@ -679,27 +682,45 @@ export class BrowserProfilesDetail extends BtrixElement {
|
||||
};
|
||||
|
||||
try {
|
||||
const data = await this.api.fetch<{ updated: boolean }>(
|
||||
`/orgs/${this.orgId}/profiles/${this.profileId}`,
|
||||
{
|
||||
let retriesLeft = 300;
|
||||
|
||||
while (retriesLeft > 0) {
|
||||
const data = await this.api.fetch<{
|
||||
updated?: boolean;
|
||||
detail?: string;
|
||||
}>(`/orgs/${this.orgId}/profiles/${this.profileId}`, {
|
||||
method: "PATCH",
|
||||
body: JSON.stringify(params),
|
||||
},
|
||||
);
|
||||
|
||||
if (data.updated) {
|
||||
this.notify.toast({
|
||||
message: msg("Successfully saved browser profile."),
|
||||
variant: "success",
|
||||
icon: "check2-circle",
|
||||
id: "browser-profile-save-status",
|
||||
});
|
||||
if (data.updated !== undefined) {
|
||||
break;
|
||||
}
|
||||
if (data.detail === "waiting_for_browser") {
|
||||
await new Promise((resolve) => {
|
||||
this.updatedProfileTimer = window.setTimeout(resolve, 2000);
|
||||
});
|
||||
} else {
|
||||
throw new Error("unknown response");
|
||||
}
|
||||
|
||||
this.browserId = undefined;
|
||||
} else {
|
||||
throw data;
|
||||
retriesLeft -= 1;
|
||||
}
|
||||
|
||||
if (!retriesLeft) {
|
||||
throw new Error("too many retries waiting for browser");
|
||||
}
|
||||
|
||||
this.notify.toast({
|
||||
message: msg("Successfully saved browser profile."),
|
||||
variant: "success",
|
||||
icon: "check2-circle",
|
||||
id: "browser-profile-save-status",
|
||||
});
|
||||
|
||||
this.browserId = undefined;
|
||||
} catch (e) {
|
||||
console.debug(e);
|
||||
|
||||
this.notify.toast({
|
||||
message: msg("Sorry, couldn't save browser profile at this time."),
|
||||
variant: "danger",
|
||||
|
@ -313,13 +313,36 @@ export class BrowserProfilesNew extends BtrixElement {
|
||||
};
|
||||
|
||||
try {
|
||||
const data = await this.api.fetch<{ id: string }>(
|
||||
`/orgs/${this.orgId}/profiles`,
|
||||
{
|
||||
method: "POST",
|
||||
body: JSON.stringify(params),
|
||||
},
|
||||
);
|
||||
let data;
|
||||
let retriesLeft = 300;
|
||||
|
||||
while (retriesLeft > 0) {
|
||||
data = await this.api.fetch<{ id?: string; detail?: string }>(
|
||||
`/orgs/${this.orgId}/profiles`,
|
||||
{
|
||||
method: "POST",
|
||||
body: JSON.stringify(params),
|
||||
},
|
||||
);
|
||||
if (data.id) {
|
||||
break;
|
||||
}
|
||||
if (data.detail === "waiting_for_browser") {
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
} else {
|
||||
throw new Error("unknown response");
|
||||
}
|
||||
|
||||
retriesLeft -= 1;
|
||||
}
|
||||
|
||||
if (!retriesLeft) {
|
||||
throw new Error("too many retries waiting for browser");
|
||||
}
|
||||
|
||||
if (!data) {
|
||||
throw new Error("unknown response");
|
||||
}
|
||||
|
||||
this.notify.toast({
|
||||
message: msg("Successfully created browser profile."),
|
||||
@ -332,6 +355,8 @@ export class BrowserProfilesNew extends BtrixElement {
|
||||
`${this.navigate.orgBasePath}/browser-profiles/profile/${data.id}`,
|
||||
);
|
||||
} catch (e) {
|
||||
console.debug(e);
|
||||
|
||||
this.isSubmitting = false;
|
||||
|
||||
let message = msg("Sorry, couldn't create browser profile at this time.");
|
||||
|
Loading…
Reference in New Issue
Block a user