Add org metrics API endpoint (#1196)
* Initial implementation of org metrics (This can eventually be sped up significantly by precomputing the values and storing them in the db.) * Rename storageQuota to storageQuotaBytes to be consistent * Update tests to include metrics
This commit is contained in:
		
							parent
							
								
									bd99840fca
								
							
						
					
					
						commit
						83f80d4103
					
				| @ -750,6 +750,26 @@ class OrgOut(BaseMongoModel): | ||||
|     quotas: Optional[OrgQuotas] = OrgQuotas() | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| class OrgMetrics(BaseModel): | ||||
|     """Organization API metrics model""" | ||||
| 
 | ||||
|     storageUsedBytes: int | ||||
|     storageUsedGB: float | ||||
|     storageQuotaBytes: int | ||||
|     storageQuotaGB: float | ||||
|     archivedItemCount: int | ||||
|     crawlCount: int | ||||
|     uploadCount: int | ||||
|     pageCount: int | ||||
|     profileCount: int | ||||
|     workflowsRunningCount: int | ||||
|     maxConcurrentCrawls: int | ||||
|     workflowsQueuedCount: int | ||||
|     collectionsCount: int | ||||
|     publicCollectionsCount: int | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| 
 | ||||
| ### PAGINATION ### | ||||
|  | ||||
| @ -13,11 +13,13 @@ from pymongo import ReturnDocument | ||||
| from pymongo.errors import AutoReconnect, DuplicateKeyError | ||||
| from fastapi import APIRouter, Depends, HTTPException, Request | ||||
| 
 | ||||
| from .basecrawls import SUCCESSFUL_STATES, RUNNING_STATES, STARTING_STATES | ||||
| from .models import ( | ||||
|     Organization, | ||||
|     DefaultStorage, | ||||
|     S3Storage, | ||||
|     OrgQuotas, | ||||
|     OrgMetrics, | ||||
|     OrgWebhookUrls, | ||||
|     RenameOrg, | ||||
|     UpdateRole, | ||||
| @ -35,14 +37,19 @@ from .pagination import DEFAULT_PAGE_SIZE, paginated_format | ||||
| 
 | ||||
| DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization") | ||||
| 
 | ||||
| BYTES_IN_GB = 1_000_000_000 | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| # pylint: disable=too-many-public-methods | ||||
| # pylint: disable=too-many-public-methods, too-many-instance-attributes | ||||
| class OrgOps: | ||||
|     """Organization API operations""" | ||||
| 
 | ||||
|     def __init__(self, mdb, invites): | ||||
|         self.orgs = mdb["organizations"] | ||||
|         self.crawls_db = mdb["crawls"] | ||||
|         self.profiles_db = mdb["profiles"] | ||||
|         self.colls_db = mdb["collections"] | ||||
| 
 | ||||
|         self.router = None | ||||
|         self.org_viewer_dep = None | ||||
| @ -326,6 +333,66 @@ class OrgOps: | ||||
|             {"_id": oid}, {"$inc": {"bytesStored": size}} | ||||
|         ) | ||||
| 
 | ||||
|     async def get_org_metrics(self, org: Organization): | ||||
|         """Calculate and return org metrics""" | ||||
|         # pylint: disable=too-many-locals | ||||
|         storage_quota_gb = 0 | ||||
|         storage_quota = await self.get_org_storage_quota(org.id) | ||||
|         if storage_quota: | ||||
|             storage_quota_gb = round(storage_quota / BYTES_IN_GB) | ||||
| 
 | ||||
|         max_concurrent_crawls = await self.get_max_concurrent_crawls(org.id) | ||||
| 
 | ||||
|         # Calculate these counts in loop to avoid having db iterate through | ||||
|         # archived items several times. | ||||
|         archived_item_count = 0 | ||||
|         crawl_count = 0 | ||||
|         upload_count = 0 | ||||
|         page_count = 0 | ||||
| 
 | ||||
|         cursor = self.crawls_db.find({"oid": org.id}) | ||||
|         items = await cursor.to_list(length=10_000) | ||||
|         for item in items: | ||||
|             if item["state"] not in SUCCESSFUL_STATES: | ||||
|                 continue | ||||
|             archived_item_count += 1 | ||||
|             type_ = item.get("type") | ||||
|             if type_ == "crawl": | ||||
|                 crawl_count += 1 | ||||
|             if type_ == "upload": | ||||
|                 upload_count += 1 | ||||
|             if item.get("stats"): | ||||
|                 page_count += item.get("stats", {}).get("done", 0) | ||||
| 
 | ||||
|         profile_count = await self.profiles_db.count_documents({"oid": org.id}) | ||||
|         workflows_running_count = await self.crawls_db.count_documents( | ||||
|             {"oid": org.id, "state": {"$in": list(RUNNING_STATES)}} | ||||
|         ) | ||||
|         workflows_queued_count = await self.crawls_db.count_documents( | ||||
|             {"oid": org.id, "state": {"$in": list(STARTING_STATES)}} | ||||
|         ) | ||||
|         collections_count = await self.colls_db.count_documents({"oid": org.id}) | ||||
|         public_collections_count = await self.colls_db.count_documents( | ||||
|             {"oid": org.id, "isPublic": True} | ||||
|         ) | ||||
| 
 | ||||
|         return { | ||||
|             "storageUsedBytes": org.bytesStored, | ||||
|             "storageUsedGB": round((org.bytesStored / BYTES_IN_GB), 2), | ||||
|             "storageQuotaBytes": storage_quota, | ||||
|             "storageQuotaGB": storage_quota_gb, | ||||
|             "archivedItemCount": archived_item_count, | ||||
|             "crawlCount": crawl_count, | ||||
|             "uploadCount": upload_count, | ||||
|             "pageCount": page_count, | ||||
|             "profileCount": profile_count, | ||||
|             "workflowsRunningCount": workflows_running_count, | ||||
|             "maxConcurrentCrawls": max_concurrent_crawls, | ||||
|             "workflowsQueuedCount": workflows_queued_count, | ||||
|             "collectionsCount": collections_count, | ||||
|             "publicCollectionsCount": public_collections_count, | ||||
|         } | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| # pylint: disable=too-many-statements | ||||
| @ -579,4 +646,8 @@ def init_orgs_api(app, mdb, user_manager, invites, user_dep): | ||||
|         await set_role(update_role, org, user) | ||||
|         return {"added": True} | ||||
| 
 | ||||
|     @router.get("/metrics", tags=["organizations"], response_model=OrgMetrics) | ||||
|     async def get_org_metrics(org: Organization = Depends(org_dep)): | ||||
|         return await ops.get_org_metrics(org) | ||||
| 
 | ||||
|     return ops | ||||
|  | ||||
| @ -358,3 +358,27 @@ def test_update_event_webhook_urls_org_crawler(crawler_auth_headers, default_org | ||||
|     ) | ||||
|     assert r.status_code == 403 | ||||
|     assert r.json()["detail"] == "User does not have permission to perform this action" | ||||
| 
 | ||||
| 
 | ||||
| def test_org_metrics(crawler_auth_headers, default_org_id): | ||||
|     r = requests.get( | ||||
|         f"{API_PREFIX}/orgs/{default_org_id}/metrics", | ||||
|         headers=crawler_auth_headers, | ||||
|     ) | ||||
|     assert r.status_code == 200 | ||||
|     data = r.json() | ||||
| 
 | ||||
|     assert data["storageUsedBytes"] > 0 | ||||
|     assert data["storageUsedGB"] > 0 | ||||
|     assert data["storageQuotaBytes"] >= 0 | ||||
|     assert data["storageQuotaGB"] >= 0 | ||||
|     assert data["archivedItemCount"] > 0 | ||||
|     assert data["crawlCount"] > 0 | ||||
|     assert data["uploadCount"] >= 0 | ||||
|     assert data["archivedItemCount"] == data["crawlCount"] + data["uploadCount"] | ||||
|     assert data["pageCount"] > 0 | ||||
|     assert data["profileCount"] >= 0 | ||||
|     assert data["workflowsRunningCount"] >= 0 | ||||
|     assert data["workflowsQueuedCount"] >= 0 | ||||
|     assert data["collectionsCount"] > 0 | ||||
|     assert data["publicCollectionsCount"] >= 0 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user