diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 18d737b2..deade084 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -2,6 +2,7 @@ # pylint: disable=too-many-lines import asyncio +import heapq import uuid import os import json @@ -12,16 +13,17 @@ from typing import Optional, List, Dict, Union from datetime import datetime, timedelta from fastapi import Depends, HTTPException +from fastapi.responses import StreamingResponse from pydantic import BaseModel, UUID4, conint, HttpUrl from redis import asyncio as aioredis, exceptions import pymongo from .crawlconfigs import Seed, CrawlConfigCore, CrawlConfig from .db import BaseMongoModel -from .users import User from .orgs import Organization, MAX_CRAWL_SCALE from .pagination import DEFAULT_PAGE_SIZE, paginated_format -from .storages import get_presigned_url, delete_crawl_file_object +from .storages import get_presigned_url, delete_crawl_file_object, get_wacz_logs +from .users import User CRAWL_STATES = ( @@ -487,6 +489,16 @@ class CrawlOps: if status_code != 204: raise HTTPException(status_code=400, detail="file_deletion_error") + async def get_wacz_files(self, crawl_id: str, org: Organization): + """Return list of WACZ files associated with crawl.""" + wacz_files = [] + crawl_raw = await self.get_crawl_raw(crawl_id, org) + crawl = Crawl.from_dict(crawl_raw) + for file_ in crawl.files: + if file_.filename.endswith(".wacz"): + wacz_files.append(file_) + return wacz_files + async def add_new_crawl(self, crawl_id: str, crawlconfig: CrawlConfig, user: User): """initialize new crawl""" crawl = Crawl( @@ -756,7 +768,7 @@ class CrawlOps: # ============================================================================ -# pylint: disable=too-many-arguments, too-many-locals +# pylint: disable=too-many-arguments, too-many-locals, too-many-statements def init_crawls_api(app, mdb, users, crawl_manager, crawl_config_ops, orgs, user_dep): """API for crawl management, including crawl done callback""" # pylint: disable=invalid-name @@ -1024,6 +1036,46 @@ def init_crawls_api(app, mdb, users, crawl_manager, crawl_config_ops, orgs, user ): return await ops.add_or_remove_exclusion(crawl_id, regex, org, user, add=False) + @app.get("/orgs/{oid}/crawls/{crawl_id}/logs", tags=["crawls"]) + async def stream_crawl_logs( + crawl_id, + org: Organization = Depends(org_viewer_dep), + logLevel: Optional[str] = None, + context: Optional[str] = None, + ): + crawl = await ops.get_crawl(crawl_id, org) + + log_levels = [] + contexts = [] + if logLevel: + log_levels = logLevel.split(",") + if context: + contexts = context.split(",") + + def stream_json_lines(iterator, log_levels, contexts): + """Return iterator as generator, filtering as necessary""" + for line_dict in iterator: + if log_levels and line_dict["logLevel"] not in log_levels: + continue + if contexts and line_dict["context"] not in contexts: + continue + + # Convert to JSON-lines bytes + json_str = json.dumps(line_dict, ensure_ascii=False) + "\n" + yield json_str.encode("utf-8") + + # If crawl is finished, stream logs from WACZ files + if crawl.finished: + logs = [] + wacz_files = await ops.get_wacz_files(crawl_id, org) + for wacz_file in wacz_files: + wacz_logs = await get_wacz_logs(org, wacz_file, crawl_manager) + logs.append(wacz_logs) + heap_iter = heapq.merge(*logs, key=lambda entry: entry["timestamp"]) + return StreamingResponse(stream_json_lines(heap_iter, log_levels, contexts)) + + raise HTTPException(status_code=400, detail="crawl_not_finished") + return ops diff --git a/backend/btrixcloud/k8s/k8sman.py b/backend/btrixcloud/k8s/k8sman.py index b3c86c11..5ec988a4 100644 --- a/backend/btrixcloud/k8s/k8sman.py +++ b/backend/btrixcloud/k8s/k8sman.py @@ -7,8 +7,8 @@ import base64 import yaml import aiohttp -from ..orgs import S3Storage from ..crawlmanager import BaseCrawlManager +from ..orgs import S3Storage from .k8sapi import K8sAPI diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 3386663e..e1f3e765 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -10,6 +10,7 @@ from aiobotocore.session import get_session from .orgs import Organization, DefaultStorage, S3Storage from .users import User +from .zip import get_zip_file, extract_and_parse_log_file # ============================================================================ @@ -148,3 +149,39 @@ async def delete_crawl_file_object(org, crawlfile, crawl_manager): status_code = response["ResponseMetadata"]["HTTPStatusCode"] return status_code + + +# ============================================================================ +async def get_wacz_logs(org, crawlfile, crawl_manager): + """Return combined and sorted list of log line dicts from all logs in WACZ.""" + if crawlfile.def_storage_name: + s3storage = await crawl_manager.get_default_storage(crawlfile.def_storage_name) + + elif org.storage.type == "s3": + s3storage = org.storage + + else: + raise TypeError("No Default Storage Found, Invalid Storage Type") + + async with get_s3_client(s3storage, s3storage.use_access_for_presign) as ( + client, + bucket, + key, + ): + key += crawlfile.filename + cd_start, zip_file = await get_zip_file(client, bucket, key) + log_files = [ + f + for f in zip_file.filelist + if f.filename.startswith("logs/") and not f.is_dir() + ] + + combined_log_lines = [] + + for log_zipinfo in log_files: + parsed_log_lines = await extract_and_parse_log_file( + client, bucket, key, log_zipinfo, cd_start + ) + combined_log_lines.extend(parsed_log_lines) + + return combined_log_lines diff --git a/backend/btrixcloud/zip.py b/backend/btrixcloud/zip.py new file mode 100644 index 00000000..99cecf0c --- /dev/null +++ b/backend/btrixcloud/zip.py @@ -0,0 +1,147 @@ +""" +Methods for interacting with zip/WACZ files +""" +import io +import json +import os +import struct +import zipfile +import zlib + +from fastapi import HTTPException + + +# ============================================================================ +EOCD_RECORD_SIZE = 22 +ZIP64_EOCD_RECORD_SIZE = 56 +ZIP64_EOCD_LOCATOR_SIZE = 20 + +MAX_STANDARD_ZIP_SIZE = 4_294_967_295 + + +# ============================================================================ +async def extract_and_parse_log_file(client, bucket, key, log_zipinfo, cd_start): + """Return parsed JSON from extracted and uncompressed log""" + # pylint: disable=too-many-locals + file_head = await fetch( + client, bucket, key, cd_start + log_zipinfo.header_offset + 26, 4 + ) + name_len = parse_little_endian_to_int(file_head[0:2]) + extra_len = parse_little_endian_to_int(file_head[2:4]) + + content = await fetch( + client, + bucket, + key, + cd_start + log_zipinfo.header_offset + 30 + name_len + extra_len, + log_zipinfo.compress_size, + ) + + if log_zipinfo.compress_type == zipfile.ZIP_DEFLATED: + uncompressed_content = zlib.decompressobj(-zlib.MAX_WBITS).decompress(content) + else: + uncompressed_content = content + + content_length = len(uncompressed_content) + if not log_zipinfo.file_size == content_length: + # pylint: disable=line-too-long + detail = f"Error extracting log file {log_zipinfo.filename} from WACZ {os.path.basename(key)}." + detail += f" Expected {log_zipinfo.file_size} bytes uncompressed but found {content_length}" + print(detail, flush=True) + raise HTTPException(status_code=500, detail=detail) + + parsed_log_lines = [] + + for json_line in uncompressed_content.decode("utf-8").split("\n"): + if not json_line: + continue + try: + result = json.loads(json_line) + parsed_log_lines.append(result) + except json.JSONDecodeError as err: + print(f"Error decoding json-l line: {json_line}. Error: {err}", flush=True) + + return parsed_log_lines + + +async def get_zip_file(client, bucket, key): + """Fetch enough of the WACZ file be able to read the zip filelist""" + file_size = await get_file_size(client, bucket, key) + eocd_record = await fetch( + client, bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE + ) + + if file_size <= MAX_STANDARD_ZIP_SIZE: + cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record) + central_directory = await fetch(client, bucket, key, cd_start, cd_size) + return ( + cd_start, + zipfile.ZipFile(io.BytesIO(central_directory + eocd_record)), + ) + + zip64_eocd_record = await fetch( + client, + bucket, + key, + file_size + - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE), + ZIP64_EOCD_RECORD_SIZE, + ) + zip64_eocd_locator = await fetch( + client, + bucket, + key, + file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE), + ZIP64_EOCD_LOCATOR_SIZE, + ) + cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record) + central_directory = await fetch(client, bucket, key, cd_start, cd_size) + return ( + cd_start, + zipfile.ZipFile( + io.BytesIO( + central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record + ) + ), + ) + + +async def get_file_size(client, bucket, key): + """Get WACZ file size from HEAD request""" + head_response = await client.head_object(Bucket=bucket, Key=key) + return head_response["ContentLength"] + + +async def fetch(client, bucket, key, start, length): + """Fetch a byte range from a file in object storage""" + end = start + length - 1 + response = await client.get_object( + Bucket=bucket, Key=key, Range=f"bytes={start}-{end}" + ) + return await response["Body"].read() + + +def get_central_directory_metadata_from_eocd(eocd): + """Get central directory start and size""" + cd_size = parse_little_endian_to_int(eocd[12:16]) + cd_start = parse_little_endian_to_int(eocd[16:20]) + return cd_start, cd_size + + +def get_central_directory_metadata_from_eocd64(eocd64): + """Get central directory start and size for zip64""" + cd_size = parse_little_endian_to_int(eocd64[40:48]) + cd_start = parse_little_endian_to_int(eocd64[48:56]) + return cd_start, cd_size + + +def parse_little_endian_to_int(little_endian_bytes): + """Convert little endian used in zip spec to int""" + byte_length = len(little_endian_bytes) + format_character = "q" + if byte_length == 4: + format_character = "i" + elif byte_length == 2: + format_character = "h" + + return struct.unpack("<" + format_character, little_endian_bytes)[0] diff --git a/backend/test_nightly/conftest.py b/backend/test_nightly/conftest.py index 27d74c61..23d983da 100644 --- a/backend/test_nightly/conftest.py +++ b/backend/test_nightly/conftest.py @@ -150,3 +150,56 @@ def crawl_config_info(admin_auth_headers, default_org_id): if data["state"] == "complete": return (crawl_config_id, crawl_id, second_crawl_id) time.sleep(5) + + +@pytest.fixture(scope="session") +def large_crawl_id(admin_auth_headers, default_org_id): + # Start crawl + crawl_data = { + "runNow": True, + "name": "Large Test Crawl", + "tags": ["wacz-logs"], + "config": { + "seeds": [{"url": "https://webrecorder.net/"}], + "scopeType": "domain", + "limit": 100, + "extraHops": 1, + }, + } + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/", + headers=admin_auth_headers, + json=crawl_data, + ) + data = r.json() + + crawl_id = data["run_now_job"] + + # Wait for crawl to start running + while True: + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json", + headers=admin_auth_headers, + ) + data = r.json() + if data["state"] == "running": + # Give crawl time to start properly + time.sleep(30) + return crawl_id + time.sleep(5) + + +@pytest.fixture(scope="session") +def large_crawl_finished(admin_auth_headers, default_org_id, large_crawl_id): + # Wait for crawl to complete + while True: + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{large_crawl_id}/replay.json", + headers=admin_auth_headers, + ) + data = r.json() + if data["state"] == "complete": + # Give some time for WACZ files to be stored + time.sleep(30) + break + time.sleep(5) diff --git a/backend/test_nightly/test_crawl_logs.py b/backend/test_nightly/test_crawl_logs.py new file mode 100644 index 00000000..ffdb970a --- /dev/null +++ b/backend/test_nightly/test_crawl_logs.py @@ -0,0 +1,88 @@ +import json +import requests +import time + +import pytest + +from .conftest import API_PREFIX + + +LINES_TO_TEST = 10 + + +@pytest.mark.parametrize( + "log_level, context", + [ + # No filtering + (None, None), + # Filter log level + ("info", None), + ("info,debug", None), + # Filter context + (None, "general"), + (None, "general,worker"), + # Filter both + ("info,debug", "general,worker"), + ], +) +def test_stream_crawl_logs_wacz( + admin_auth_headers, + default_org_id, + large_crawl_id, + large_crawl_finished, + log_level, + context, +): + """Test that streaming logs after crawl concludes from WACZs works.""" + api_url = f"{API_PREFIX}/orgs/{default_org_id}/crawls/{large_crawl_id}/logs" + if log_level and context: + api_url = api_url + f"?logLevel={log_level}&context={context}" + elif log_level: + api_url = api_url + f"?logLevel={log_level}" + elif context: + api_url = api_url + f"?context={context}" + + log_levels = [] + contexts = [] + if log_level: + log_levels = log_level.split(",") + if context: + contexts = context.split(",") + + with requests.get(api_url, headers=admin_auth_headers, stream=True) as r: + assert r.status_code == 200 + + last_timestamp = None + line_index = 0 + + # Wait for stream content + if not r.content: + while True: + if r.content: + break + time.sleep(5) + + for line in r.iter_lines(): + if line_index >= LINES_TO_TEST: + r.close() + return + + line = line.decode("utf-8") + log_line_dict = json.loads(line) + + assert log_line_dict["logLevel"] + if log_level: + assert log_line_dict["logLevel"] in log_levels + + assert log_line_dict["context"] + if context: + assert log_line_dict["context"] in contexts + assert log_line_dict["details"] or log_line_dict["details"] == {} + + timestamp = log_line_dict["timestamp"] + assert timestamp + if last_timestamp: + assert timestamp >= last_timestamp + last_timestamp = timestamp + + line_index += 1