Track bytes stored per file type and include in org metrics (#1207)

* Add bytes stored per type to org and metrics

The org now tracks bytesStored by type of crawl, uploads, and browser profiles
in addition to the total, and returns these values in the org metrics endpoint.

A migration is added to precompute these values in existing deployments.

In addition, all /metrics storage values are now returned solely as bytes, as
the GB form wasn't being used in the frontend and is unnecessary.

* Improve deletion of multiple archived item types via `/all-crawls` delete endpoint

- Update `/all-crawls` delete test to check that org and workflow size values
are correct following deletion.
- Fix bug where it was always assumed only one crawl was deleted per cid
and size was not tracked per cid
- Add type check within delete_crawls
This commit is contained in:
Tessa Walsh 2023-09-22 12:55:21 -04:00 committed by GitHub
parent 304ea6d52f
commit 094f27bcff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 329 additions and 52 deletions

View File

@ -190,28 +190,37 @@ class BaseCrawlOps:
return {"updated": True}
async def delete_crawls(
self, org: Organization, delete_list: DeleteCrawlList, type_=None
self, org: Organization, delete_list: DeleteCrawlList, type_: str
):
"""Delete a list of crawls by id for given org"""
cids_to_update = set()
cids_to_update: dict[str, dict[str, int]] = {}
size = 0
for crawl_id in delete_list.crawl_ids:
crawl = await self.get_crawl_raw(crawl_id, org)
size += await self._delete_crawl_files(crawl, org)
if crawl.get("cid"):
cids_to_update.add(crawl.get("cid"))
if crawl.get("type") != type_:
continue
query = {"_id": {"$in": delete_list.crawl_ids}, "oid": org.id}
if type_:
query["type"] = type_
crawl_size = await self._delete_crawl_files(crawl, org)
size += crawl_size
cid = crawl.get("cid")
if cid:
if cids_to_update.get(cid):
cids_to_update[cid]["inc"] += 1
cids_to_update[cid]["size"] += crawl_size
else:
cids_to_update[cid] = {}
cids_to_update[cid]["inc"] = 1
cids_to_update[cid]["size"] = crawl_size
query = {"_id": {"$in": delete_list.crawl_ids}, "oid": org.id, "type": type_}
res = await self.crawls.delete_many(query)
quota_reached = await self.orgs.inc_org_bytes_stored(org.id, -size)
quota_reached = await self.orgs.inc_org_bytes_stored(org.id, -size, type_)
return res.deleted_count, size, cids_to_update, quota_reached
return res.deleted_count, cids_to_update, quota_reached
async def _delete_crawl_files(self, crawl, org: Organization):
"""Delete files associated with crawl from storage."""
@ -496,13 +505,61 @@ class BaseCrawlOps:
self, delete_list: DeleteCrawlList, org: Organization
):
"""Delete uploaded crawls"""
deleted_count, _, _, quota_reached = await self.delete_crawls(org, delete_list)
if len(delete_list.crawl_ids) == 0:
raise HTTPException(status_code=400, detail="nothing_to_delete")
deleted_count = 0
# Value is set in delete calls, but initialize to keep linter happy.
quota_reached = False
crawls_to_delete, uploads_to_delete = await self._split_delete_list_by_type(
delete_list, org
)
if len(crawls_to_delete) > 0:
crawl_delete_list = DeleteCrawlList(crawl_ids=crawls_to_delete)
deleted, cids_to_update, quota_reached = await self.delete_crawls(
org, crawl_delete_list, "crawl"
)
deleted_count += deleted
for cid, cid_dict in cids_to_update.items():
cid_size = cid_dict["size"]
cid_inc = cid_dict["inc"]
await self.crawl_configs.stats_recompute_last(cid, -cid_size, -cid_inc)
if len(uploads_to_delete) > 0:
upload_delete_list = DeleteCrawlList(crawl_ids=uploads_to_delete)
deleted, _, quota_reached = await self.delete_crawls(
org, upload_delete_list, "upload"
)
deleted_count += deleted
if deleted_count < 1:
raise HTTPException(status_code=404, detail="crawl_not_found")
return {"deleted": True, "storageQuotaReached": quota_reached}
async def _split_delete_list_by_type(
self, delete_list: DeleteCrawlList, org: Organization
):
"""Return separate crawl and upload arrays from mixed input"""
crawls: list[str] = []
uploads: list[str] = []
for crawl_id in delete_list.crawl_ids:
try:
crawl_raw = await self.get_crawl_raw(crawl_id, org)
crawl_type = crawl_raw.get("type")
if crawl_type == "crawl":
crawls.append(crawl_id)
elif crawl_type == "upload":
uploads.append(crawl_id)
# pylint: disable=broad-exception-caught
except Exception as err:
print(err, flush=True)
return crawls, uploads
async def get_all_crawl_search_values(
self, org: Organization, type_: Optional[str] = None
):

View File

@ -503,7 +503,9 @@ class CrawlConfigOps:
return None
async def stats_recompute_last(self, cid: uuid.UUID, size: int, inc_crawls=1):
async def stats_recompute_last(
self, cid: uuid.UUID, size: int, inc_crawls: int = 1
):
"""recompute stats by incrementing size counter and number of crawls"""
update_query: dict[str, object] = {
"lastCrawlId": None,

View File

@ -216,18 +216,17 @@ class CrawlOps(BaseCrawlOps):
):
"""Delete a list of crawls by id for given org"""
count, size, cids_to_update, quota_reached = await super().delete_crawls(
count, cids_to_update, quota_reached = await super().delete_crawls(
org, delete_list, type_
)
if count < 1:
raise HTTPException(status_code=404, detail="crawl_not_found")
for cid in cids_to_update:
if not await self.crawl_configs.stats_recompute_last(cid, -size, -1):
raise HTTPException(
status_code=404, detail=f"crawl_config_not_found: {cid}"
)
for cid, cid_dict in cids_to_update.items():
cid_size = cid_dict["size"]
cid_inc = cid_dict["inc"]
await self.crawl_configs.stats_recompute_last(cid, -cid_size, -cid_inc)
return {"deleted": True, "storageQuotaReached": quota_reached}

View File

@ -15,7 +15,7 @@ from pymongo.errors import InvalidName
from .migrations import BaseMigration
CURR_DB_VERSION = "0016"
CURR_DB_VERSION = "0017"
# ============================================================================

View File

@ -7,7 +7,7 @@ from btrixcloud.migrations import BaseMigration
MIGRATION_VERSION = "0015"
# pylint: disable=too-many-locals
# pylint: disable=too-many-locals, duplicate-code
class Migration(BaseMigration):
"""Migration class."""

View File

@ -0,0 +1,73 @@
"""
Migration 0017 - Calculate and store org storage usage by type
"""
from btrixcloud.migrations import BaseMigration
MIGRATION_VERSION = "0017"
# pylint: disable=too-many-locals, duplicate-code
class Migration(BaseMigration):
"""Migration class."""
def __init__(self, mdb, migration_version=MIGRATION_VERSION):
super().__init__(mdb, migration_version)
async def migrate_up(self):
"""Perform migration up.
Calculate and store org storage usage
"""
mdb_orgs = self.mdb["organizations"]
mdb_crawls = self.mdb["crawls"]
mdb_profiles = self.mdb["profiles"]
orgs = [res async for res in mdb_orgs.find({})]
for org in orgs:
oid = org.get("_id")
bytes_stored_crawls = 0
bytes_stored_uploads = 0
bytes_stored_profiles = 0
crawls = [
res
async for res in mdb_crawls.find(
{"oid": oid, "type": {"$in": [None, "crawl"]}}
)
]
for crawl in crawls:
for crawl_file in crawl.get("files", []):
bytes_stored_crawls += crawl_file.get("size", 0)
uploads = [
res async for res in mdb_crawls.find({"oid": oid, "type": "upload"})
]
for upload in uploads:
for upload_file in upload.get("files", []):
bytes_stored_uploads += upload_file.get("size", 0)
profiles = [res async for res in mdb_profiles.find({"oid": oid})]
for profile in profiles:
profile_file = profile.get("resource")
if profile_file:
bytes_stored_profiles += profile_file.get("size", 0)
try:
res = await mdb_orgs.find_one_and_update(
{"_id": oid},
{
"$set": {
"bytesStoredCrawls": bytes_stored_crawls,
"bytesStoredUploads": bytes_stored_uploads,
"bytesStoredProfiles": bytes_stored_profiles,
}
},
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Unable to set bytes stored by type for org {oid}: {err}",
flush=True,
)

View File

@ -666,6 +666,9 @@ class Organization(BaseMongoModel):
usage: Dict[str, int] = {}
bytesStored: int = 0
bytesStoredCrawls: int = 0
bytesStoredUploads: int = 0
bytesStoredProfiles: int = 0
default: bool = False
@ -744,6 +747,9 @@ class OrgOut(BaseMongoModel):
usage: Optional[Dict[str, int]]
default: bool = False
bytesStored: int
bytesStoredCrawls: int
bytesStoredUploads: int
bytesStoredProfiles: int
origin: Optional[AnyHttpUrl]
webhookUrls: Optional[OrgWebhookUrls] = OrgWebhookUrls()
@ -755,9 +761,10 @@ class OrgMetrics(BaseModel):
"""Organization API metrics model"""
storageUsedBytes: int
storageUsedGB: float
storageUsedCrawls: int
storageUsedUploads: int
storageUsedProfiles: int
storageQuotaBytes: int
storageQuotaGB: float
archivedItemCount: int
crawlCount: int
uploadCount: int

View File

@ -887,10 +887,7 @@ class BtrixOperator(K8sAPI):
await self.crawl_config_ops.stats_recompute_last(cid, files_added_size, 1)
if state in SUCCESSFUL_STATES and oid:
await self.org_ops.add_crawl_files_to_org_bytes_stored(
oid, files_added_size
)
await self.org_ops.inc_org_bytes_stored(oid, files_added_size, "crawl")
await self.coll_ops.add_successful_crawl_to_collections(crawl_id, cid)
await self.event_webhook_ops.create_crawl_finished_notification(crawl_id, state)

View File

@ -37,8 +37,6 @@ from .pagination import DEFAULT_PAGE_SIZE, paginated_format
DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization")
BYTES_IN_GB = 1_000_000_000
# ============================================================================
# pylint: disable=too-many-public-methods, too-many-instance-attributes
@ -266,11 +264,22 @@ class OrgOps:
return org.quotas.maxPagesPerCrawl
return 0
async def inc_org_bytes_stored(self, oid: uuid.UUID, size: int):
async def inc_org_bytes_stored(self, oid: uuid.UUID, size: int, type_="crawl"):
"""Increase org bytesStored count (pass negative value to subtract)."""
await self.orgs.find_one_and_update(
{"_id": oid}, {"$inc": {"bytesStored": size}}
)
if type_ == "crawl":
await self.orgs.find_one_and_update(
{"_id": oid}, {"$inc": {"bytesStored": size, "bytesStoredCrawls": size}}
)
elif type_ == "upload":
await self.orgs.find_one_and_update(
{"_id": oid},
{"$inc": {"bytesStored": size, "bytesStoredUploads": size}},
)
elif type_ == "profile":
await self.orgs.find_one_and_update(
{"_id": oid},
{"$inc": {"bytesStored": size, "bytesStoredProfiles": size}},
)
return await self.storage_quota_reached(oid)
# pylint: disable=invalid-name
@ -327,20 +336,10 @@ class OrgOps:
return org.quotas.maxConcurrentCrawls
return 0
async def add_crawl_files_to_org_bytes_stored(self, oid: uuid.UUID, size: int):
"""Add crawl's files to org bytesStored"""
await self.orgs.find_one_and_update(
{"_id": oid}, {"$inc": {"bytesStored": size}}
)
async def get_org_metrics(self, org: Organization):
"""Calculate and return org metrics"""
# pylint: disable=too-many-locals
storage_quota_gb = 0
storage_quota = await self.get_org_storage_quota(org.id)
if storage_quota:
storage_quota_gb = round(storage_quota / BYTES_IN_GB)
max_concurrent_crawls = await self.get_max_concurrent_crawls(org.id)
# Calculate these counts in loop to avoid having db iterate through
@ -378,9 +377,10 @@ class OrgOps:
return {
"storageUsedBytes": org.bytesStored,
"storageUsedGB": round((org.bytesStored / BYTES_IN_GB), 2),
"storageUsedCrawls": org.bytesStoredCrawls,
"storageUsedUploads": org.bytesStoredUploads,
"storageUsedProfiles": org.bytesStoredProfiles,
"storageQuotaBytes": storage_quota,
"storageQuotaGB": storage_quota_gb,
"archivedItemCount": archived_item_count,
"crawlCount": crawl_count,
"uploadCount": upload_count,

View File

@ -193,7 +193,7 @@ class ProfileOps:
{"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True
)
quota_reached = await self.orgs.inc_org_bytes_stored(oid, file_size)
quota_reached = await self.orgs.inc_org_bytes_stored(oid, file_size, "profile")
return {
"added": True,
@ -310,7 +310,9 @@ class ProfileOps:
# Delete file from storage
if profile.resource:
await delete_crawl_file_object(org, profile.resource, self.crawl_manager)
await self.orgs.inc_org_bytes_stored(org.id, -profile.resource.size)
await self.orgs.inc_org_bytes_stored(
org.id, -profile.resource.size, "profile"
)
res = await self.profiles.delete_one(query)
if not res or res.deleted_count != 1:

View File

@ -180,13 +180,15 @@ class UploadOps(BaseCrawlOps):
self.event_webhook_ops.create_upload_finished_notification(crawl_id)
)
quota_reached = await self.orgs.inc_org_bytes_stored(org.id, file_size)
quota_reached = await self.orgs.inc_org_bytes_stored(
org.id, file_size, "upload"
)
return {"id": crawl_id, "added": True, "storageQuotaReached": quota_reached}
async def delete_uploads(self, delete_list: DeleteCrawlList, org: Organization):
"""Delete uploaded crawls"""
deleted_count, _, _, quota_reached = await self.delete_crawls(
deleted_count, _, quota_reached = await self.delete_crawls(
org, delete_list, "upload"
)

View File

@ -22,6 +22,7 @@ _admin_config_id = None
_crawler_config_id = None
_auto_add_config_id = None
_all_crawls_config_id = None
_all_crawls_delete_config_id = None
NON_DEFAULT_ORG_NAME = "Non-default org"
@ -408,6 +409,69 @@ def uploads_collection_id(crawler_auth_headers, default_org_id):
return r.json()["id"]
@pytest.fixture(scope="session")
def all_crawls_delete_crawl_ids(admin_auth_headers, default_org_id):
crawl_data = {
"runNow": True,
"name": "All Crawls Delete Test Workflow",
"description": "Lorem ipsum",
"config": {
"seeds": [{"url": "https://webrecorder.net/"}],
"exclude": "community",
},
}
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
headers=admin_auth_headers,
json=crawl_data,
)
data = r.json()
global _all_crawls_delete_config_id
_all_crawls_delete_config_id = data["id"]
return_crawl_ids = []
crawl_id = data["run_now_job"]
return_crawl_ids.append(crawl_id)
# Wait for crawl to complete
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
headers=admin_auth_headers,
)
data = r.json()
if data["state"] in FINISHED_STATES:
break
time.sleep(5)
# Run workflow again and wait for second crawl to complete
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_all_crawls_delete_config_id}/run",
headers=admin_auth_headers,
)
crawl_2_id = r.json()["started"]
return_crawl_ids.append(crawl_2_id)
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_2_id}/replay.json",
headers=admin_auth_headers,
)
data = r.json()
if data["state"] in FINISHED_STATES:
break
time.sleep(5)
return return_crawl_ids
@pytest.fixture(scope="session")
def all_crawls_delete_config_id(admin_crawl_id):
return _all_crawls_delete_config_id
@pytest.fixture(scope="function")
def echo_server():
print(f"Echo server starting", flush=True)

View File

@ -369,9 +369,16 @@ def test_org_metrics(crawler_auth_headers, default_org_id):
data = r.json()
assert data["storageUsedBytes"] > 0
assert data["storageUsedGB"] > 0
assert data["storageUsedCrawls"] > 0
assert data["storageUsedUploads"] >= 0
assert data["storageUsedProfiles"] >= 0
assert (
data["storageUsedBytes"]
== data["storageUsedCrawls"]
+ data["storageUsedUploads"]
+ data["storageUsedProfiles"]
)
assert data["storageQuotaBytes"] >= 0
assert data["storageQuotaGB"] >= 0
assert data["archivedItemCount"] > 0
assert data["crawlCount"] > 0
assert data["uploadCount"] >= 0

View File

@ -840,12 +840,79 @@ def test_update_upload_metadata_all_crawls(admin_auth_headers, default_org_id):
assert data["collectionIds"] == []
def test_delete_form_upload_from_all_crawls(admin_auth_headers, default_org_id):
def test_delete_form_upload_and_crawls_from_all_crawls(
admin_auth_headers,
default_org_id,
all_crawls_delete_crawl_ids,
all_crawls_delete_config_id,
):
crawls_to_delete = all_crawls_delete_crawl_ids
crawls_to_delete.append(upload_id_2)
# Get org metrics
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/metrics",
headers=admin_auth_headers,
)
data = r.json()
org_bytes = data["storageUsedBytes"]
org_crawl_bytes = data["storageUsedCrawls"]
org_upload_bytes = data["storageUsedUploads"]
# Get workflow and crawl sizes
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{all_crawls_delete_config_id}",
headers=admin_auth_headers,
)
workflow_size = r.json()["totalSize"]
crawl_id_1 = all_crawls_delete_crawl_ids[0]
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id_1}/replay.json",
headers=admin_auth_headers,
)
crawl_1_size = r.json()["fileSize"]
crawl_id_2 = all_crawls_delete_crawl_ids[1]
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id_2}/replay.json",
headers=admin_auth_headers,
)
crawl_2_size = r.json()["fileSize"]
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/uploads/{upload_id_2}/replay.json",
headers=admin_auth_headers,
)
upload_size = r.json()["fileSize"]
combined_crawl_size = crawl_1_size + crawl_2_size
total_size = combined_crawl_size + upload_size
# Delete mixed type archived items
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/all-crawls/delete",
headers=admin_auth_headers,
json={"crawl_ids": [upload_id_2]},
json={"crawl_ids": crawls_to_delete},
)
data = r.json()
assert data["deleted"]
assert data["storageQuotaReached"] is False
# Check that org and workflow size figures are as expected
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/metrics",
headers=admin_auth_headers,
)
data = r.json()
assert data["storageUsedBytes"] == org_bytes - total_size
assert data["storageUsedCrawls"] == org_crawl_bytes - combined_crawl_size
assert data["storageUsedUploads"] == org_upload_bytes - upload_size
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{all_crawls_delete_config_id}",
headers=admin_auth_headers,
)
assert r.json()["totalSize"] == workflow_size - combined_crawl_size