diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 7f35da1e..cde1a0b6 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -6,7 +6,7 @@ import re import urllib.parse from uuid import UUID -from typing import Optional, List +from typing import Optional, List, Dict, Union from fastapi import Depends, HTTPException from fastapi.responses import StreamingResponse @@ -14,7 +14,7 @@ from redis import asyncio as exceptions import pymongo from .pagination import DEFAULT_PAGE_SIZE, paginated_format -from .utils import dt_now, parse_jsonl_error_messages +from .utils import dt_now, parse_jsonl_error_messages, stream_dict_list_as_csv from .basecrawls import BaseCrawlOps from .models import ( UpdateCrawl, @@ -497,6 +497,74 @@ class CrawlOps(BaseCrawlOps): except Exception: return [], 0 + async def get_crawl_stats( + self, org: Optional[Organization] = None + ) -> List[Dict[str, Union[str, int]]]: + """Return crawl statistics""" + # pylint: disable=too-many-locals + org_slugs = await self.orgs.get_org_slugs_by_ids() + user_emails = await self.user_manager.get_user_emails_by_ids() + + crawls_data: List[Dict[str, Union[str, int]]] = [] + + query: Dict[str, Union[str, UUID]] = {"type": "crawl"} + if org: + query["oid"] = org.id + + async for crawl in self.crawls.find(query): + data: Dict[str, Union[str, int]] = {} + data["id"] = str(crawl.get("_id")) + + oid = crawl.get("oid") + data["oid"] = str(oid) + data["org"] = org_slugs[oid] + + data["cid"] = str(crawl.get("cid")) + crawl_name = crawl.get("name") + data["name"] = f'"{crawl_name}"' if crawl_name else "" + data["state"] = crawl.get("state") + + userid = crawl.get("userid") + data["userid"] = str(userid) + data["user"] = user_emails.get(userid) + + started = crawl.get("started") + finished = crawl.get("finished") + + data["started"] = str(started) + data["finished"] = str(finished) + + data["duration"] = 0 + if started and finished: + duration = finished - started + duration_seconds = int(duration.total_seconds()) + if duration_seconds: + data["duration"] = duration_seconds + + done_stats = None + if crawl.get("stats") and crawl.get("stats").get("done"): + done_stats = crawl["stats"]["done"] + + data["pages"] = 0 + if done_stats: + data["pages"] = done_stats + + data["filesize"] = crawl.get("fileSize", 0) + + data["avg_page_time"] = 0 + if ( + done_stats + and done_stats != 0 + and started + and finished + and duration_seconds + ): + data["avg_page_time"] = int(duration_seconds / done_stats) + + crawls_data.append(data) + + return crawls_data + # ============================================================================ async def recompute_crawl_file_count_and_size(crawls, crawl_id): @@ -646,6 +714,23 @@ def init_crawls_api(app, user_dep, *args): ): return await ops.delete_crawls(org, delete_list, "crawl", user) + @app.get("/orgs/all/crawls/stats", tags=["crawls"]) + async def get_all_orgs_crawl_stats( + user: User = Depends(user_dep), + ): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + crawl_stats = await ops.get_crawl_stats() + return stream_dict_list_as_csv(crawl_stats, "crawling-stats.csv") + + @app.get("/orgs/{oid}/crawls/stats", tags=["crawls"]) + async def get_org_crawl_stats( + org: Organization = Depends(org_crawl_dep), + ): + crawl_stats = await ops.get_crawl_stats(org) + return stream_dict_list_as_csv(crawl_stats, f"crawling-stats-{org.id}.csv") + @app.get( "/orgs/all/crawls/{crawl_id}/replay.json", tags=["crawls"], diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index 1c177262..06128d72 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -1,6 +1,7 @@ """ Organization API handling """ +# pylint: disable=too-many-lines import math import os import time @@ -653,7 +654,7 @@ class OrgOps: slugs = await self.orgs.distinct("slug", {}) return {"slugs": slugs} - async def get_all_org_slugs_with_ids(self): + async def get_org_slugs_by_ids(self): """Return dict with {id: slug} for all orgs.""" slug_id_map = {} async for org in self.orgs.find({}): @@ -933,6 +934,6 @@ def init_orgs_api(app, mdb, user_manager, invites, user_dep): async def get_all_org_slugs_with_ids(user: User = Depends(user_dep)): if not user.is_superuser: raise HTTPException(status_code=403, detail="Not Allowed") - return await ops.get_all_org_slugs_with_ids() + return await ops.get_org_slugs_by_ids() return ops diff --git a/backend/btrixcloud/users.py b/backend/btrixcloud/users.py index 474e8b94..f542a172 100644 --- a/backend/btrixcloud/users.py +++ b/backend/btrixcloud/users.py @@ -213,6 +213,13 @@ class UserManager: ) return await cursor.to_list(length=1000) + async def get_user_emails_by_ids(self): + """return dict of user emails keyed by id""" + email_id_map = {} + async for user in self.users.find({}): + email_id_map[user["id"]] = user["email"] + return email_id_map + async def get_superuser(self) -> Optional[User]: """return current superuser, if any""" user_data = await self.users.find_one({"is_superuser": True}) diff --git a/backend/btrixcloud/utils.py b/backend/btrixcloud/utils.py index f9b7964a..66774ba8 100644 --- a/backend/btrixcloud/utils.py +++ b/backend/btrixcloud/utils.py @@ -1,15 +1,19 @@ """ k8s utils """ -from typing import Optional -import os import asyncio -import json -import sys -import signal import atexit +import csv +import io +import json +import signal +import os +import sys from datetime import datetime +from typing import Optional, Dict, Union, List +from fastapi import HTTPException +from fastapi.responses import StreamingResponse from slugify import slugify @@ -97,3 +101,22 @@ def is_bool(stri: Optional[str]) -> bool: def slug_from_name(name: str) -> str: """Generate slug from name""" return slugify(name.replace("'", "")) + + +def stream_dict_list_as_csv(data: List[Dict[str, Union[str, int]]], filename: str): + """Stream list of dictionaries as CSV with attachment filename header""" + if not data: + raise HTTPException(status_code=404, detail="crawls_not_found") + + keys = data[0].keys() + + buffer = io.StringIO() + dict_writer = csv.DictWriter(buffer, keys, quoting=csv.QUOTE_NONNUMERIC) + dict_writer.writeheader() + dict_writer.writerows(data) + + return StreamingResponse( + iter([buffer.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": f"attachment;filename={filename}"}, + ) diff --git a/backend/test/test_run_crawl.py b/backend/test/test_run_crawl.py index 515f4fcd..ab42a4c1 100644 --- a/backend/test/test_run_crawl.py +++ b/backend/test/test_run_crawl.py @@ -4,6 +4,8 @@ import time import io import zipfile import re +import csv +import codecs from .conftest import API_PREFIX, HOST_PREFIX from .test_collections import UPDATED_NAME as COLLECTION_NAME @@ -297,6 +299,81 @@ def test_update_crawl( assert not data["description"] +def test_crawl_stats_all_orgs_not_superadmin(crawler_auth_headers): + r = requests.get( + f"{API_PREFIX}/orgs/all/crawls/stats", headers=crawler_auth_headers + ) + assert r.status_code == 403 + + +def test_crawl_stats_all_orgs(admin_auth_headers): + with requests.get( + f"{API_PREFIX}/orgs/all/crawls/stats", headers=admin_auth_headers, stream=True + ) as r: + assert r.status_code == 200 + + # Wait for stream content + if not r.content: + while True: + if r.content: + break + time.sleep(5) + + buffer = r.iter_lines() + for row in csv.DictReader( + codecs.iterdecode(buffer, "utf-8"), skipinitialspace=True + ): + assert row["id"] + assert row["oid"] + assert row["org"] + assert row["cid"] + assert row["name"] or row["name"] == "" + assert row["state"] + assert row["userid"] + assert row["user"] + assert row["started"] + assert row["finished"] or row["finished"] is None + assert row["duration"] or row["duration"] == 0 + assert row["pages"] or row["pages"] == 0 + assert row["filesize"] or row["filesize"] == 0 + assert row["avg_page_time"] or row["avg_page_time"] == 0 + + +def test_crawl_stats(crawler_auth_headers, default_org_id): + with requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/stats", + headers=crawler_auth_headers, + stream=True, + ) as r: + assert r.status_code == 200 + + # Wait for stream content + if not r.content: + while True: + if r.content: + break + time.sleep(5) + + buffer = r.iter_lines() + for row in csv.DictReader( + codecs.iterdecode(buffer, "utf-8"), skipinitialspace=True + ): + assert row["id"] + assert row["oid"] == default_org_id + assert row["org"] + assert row["cid"] + assert row["name"] or row["name"] == "" + assert row["state"] + assert row["userid"] + assert row["user"] + assert row["started"] + assert row["finished"] or row["finished"] is None + assert row["duration"] or row["duration"] == 0 + assert row["pages"] or row["pages"] == 0 + assert row["filesize"] or row["filesize"] == 0 + assert row["avg_page_time"] or row["avg_page_time"] == 0 + + def test_delete_crawls_crawler( crawler_auth_headers, default_org_id, admin_crawl_id, crawler_crawl_id ):