diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 08e2bae8..dca22de3 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -2,7 +2,6 @@ # pylint: disable=too-many-lines import asyncio -import heapq import uuid import json import re @@ -17,7 +16,7 @@ from redis import asyncio as exceptions import pymongo from .pagination import DEFAULT_PAGE_SIZE, paginated_format -from .storages import get_wacz_logs +from .storages import sync_stream_wacz_logs from .utils import dt_now, parse_jsonl_error_messages from .basecrawls import BaseCrawlOps from .models import ( @@ -915,27 +914,13 @@ def init_crawls_api( if context: contexts = context.split(",") - def stream_json_lines(iterator, log_levels, contexts): - """Return iterator as generator, filtering as necessary""" - for line_dict in iterator: - if log_levels and line_dict["logLevel"] not in log_levels: - continue - if contexts and line_dict["context"] not in contexts: - continue - - # Convert to JSON-lines bytes - json_str = json.dumps(line_dict, ensure_ascii=False) + "\n" - yield json_str.encode("utf-8") - # If crawl is finished, stream logs from WACZ files if crawl.finished: - logs = [] wacz_files = await ops.get_wacz_files(crawl_id, org) - for wacz_file in wacz_files: - wacz_logs = await get_wacz_logs(org, wacz_file, crawl_manager) - logs.append(wacz_logs) - heap_iter = heapq.merge(*logs, key=lambda entry: entry["timestamp"]) - return StreamingResponse(stream_json_lines(heap_iter, log_levels, contexts)) + resp = await sync_stream_wacz_logs( + org, wacz_files, log_levels, contexts, crawl_manager + ) + return StreamingResponse(resp) raise HTTPException(status_code=400, detail="crawl_not_finished") diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index ffc2f9a9..88cb62a6 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -1,11 +1,12 @@ """ Storage API """ -from typing import Union +from typing import Optional, Union from urllib.parse import urlsplit from contextlib import asynccontextmanager import asyncio +import heapq import json from datetime import datetime @@ -17,7 +18,10 @@ import aiobotocore.session import boto3 from .models import Organization, DefaultStorage, S3Storage, User -from .zip import get_zip_file, extract_and_parse_log_file +from .zip import ( + sync_get_zip_file, + sync_get_log_stream, +) CHUNK_SIZE = 1024 * 256 @@ -319,39 +323,91 @@ async def delete_file(org, filename, crawl_manager, def_storage_name="default"): # ============================================================================ -async def get_wacz_logs(org, crawlfile, crawl_manager): - """Return combined and sorted list of log line dicts from all logs in WACZ.""" - if crawlfile.def_storage_name: - s3storage = await crawl_manager.get_default_storage(crawlfile.def_storage_name) +async def sync_stream_wacz_logs(org, wacz_files, log_levels, contexts, crawl_manager): + """Return filtered stream of logs from specified WACZs sorted by timestamp""" + client, bucket, key, _ = await get_sync_client(org, crawl_manager) - elif org.storage.type == "s3": - s3storage = org.storage + loop = asyncio.get_event_loop() - else: - raise TypeError("No Default Storage Found, Invalid Storage Type") + resp = await loop.run_in_executor( + None, _sync_get_logs, wacz_files, log_levels, contexts, client, bucket, key + ) + + return resp + + +# ============================================================================ +def _parse_json(line): + """Parse JSON str into dict.""" + parsed_json: Optional[dict] = None + try: + parsed_json = json.loads(line) + except json.JSONDecodeError as err: + print(f"Error decoding json-l line: {line}. Error: {err}", flush=True) + return parsed_json + + +# ============================================================================ +def _sync_get_logs(wacz_files, log_levels, contexts, client, bucket, key): + """Generate filtered stream of logs from specified WACZs sorted by timestamp""" + + # pylint: disable=too-many-function-args + def stream_log_bytes_as_line_dicts(stream_generator): + """Yield parsed JSON lines as dicts from stream generator.""" + last_line = "" + try: + while True: + next_chunk = next(stream_generator) + next_chunk = next_chunk.decode("utf-8", errors="ignore") + chunk = last_line + next_chunk + chunk_by_line = chunk.split("\n") + last_line = chunk_by_line.pop() + for line in chunk_by_line: + if not line: + continue + json_dict = _parse_json(line) + if json_dict: + yield json_dict + except StopIteration: + if last_line: + json_dict = _parse_json(last_line) + if json_dict: + yield json_dict + + def stream_json_lines(iterator, log_levels, contexts): + """Yield parsed JSON dicts as JSON-lines bytes after filtering as necessary""" + for line_dict in iterator: + if log_levels and line_dict["logLevel"] not in log_levels: + continue + if contexts and line_dict["context"] not in contexts: + continue + json_str = json.dumps(line_dict, ensure_ascii=False) + "\n" + yield json_str.encode("utf-8") + + log_generators = [] + + for wacz_file in wacz_files: + wacz_key = key + wacz_file.filename + cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key) - async with get_s3_client(s3storage, s3storage.use_access_for_presign) as ( - client, - bucket, - key, - ): - key += crawlfile.filename - cd_start, zip_file = await get_zip_file(client, bucket, key) log_files = [ f for f in zip_file.filelist if f.filename.startswith("logs/") and not f.is_dir() ] - combined_log_lines = [] + wacz_log_streams = [] for log_zipinfo in log_files: - parsed_log_lines = await extract_and_parse_log_file( - client, bucket, key, log_zipinfo, cd_start + log_stream = sync_get_log_stream( + client, bucket, wacz_key, log_zipinfo, cd_start ) - combined_log_lines.extend(parsed_log_lines) + wacz_log_streams.extend(stream_log_bytes_as_line_dicts(log_stream)) - return combined_log_lines + log_generators.append(wacz_log_streams) + + heap_iter = heapq.merge(*log_generators, key=lambda entry: entry["timestamp"]) + return stream_json_lines(heap_iter, log_levels, contexts) # ============================================================================ diff --git a/backend/btrixcloud/zip.py b/backend/btrixcloud/zip.py index 99cecf0c..8ab03d66 100644 --- a/backend/btrixcloud/zip.py +++ b/backend/btrixcloud/zip.py @@ -2,14 +2,10 @@ Methods for interacting with zip/WACZ files """ import io -import json -import os import struct import zipfile import zlib -from fastapi import HTTPException - # ============================================================================ EOCD_RECORD_SIZE = 22 @@ -18,18 +14,20 @@ ZIP64_EOCD_LOCATOR_SIZE = 20 MAX_STANDARD_ZIP_SIZE = 4_294_967_295 +CHUNK_SIZE = 1024 * 256 + # ============================================================================ -async def extract_and_parse_log_file(client, bucket, key, log_zipinfo, cd_start): - """Return parsed JSON from extracted and uncompressed log""" +def sync_get_log_stream(client, bucket, key, log_zipinfo, cd_start): + """Return uncompressed byte stream of log file in WACZ""" # pylint: disable=too-many-locals - file_head = await fetch( + file_head = sync_fetch( client, bucket, key, cd_start + log_zipinfo.header_offset + 26, 4 ) name_len = parse_little_endian_to_int(file_head[0:2]) extra_len = parse_little_endian_to_int(file_head[2:4]) - content = await fetch( + content = sync_fetch_stream( client, bucket, key, @@ -42,26 +40,7 @@ async def extract_and_parse_log_file(client, bucket, key, log_zipinfo, cd_start) else: uncompressed_content = content - content_length = len(uncompressed_content) - if not log_zipinfo.file_size == content_length: - # pylint: disable=line-too-long - detail = f"Error extracting log file {log_zipinfo.filename} from WACZ {os.path.basename(key)}." - detail += f" Expected {log_zipinfo.file_size} bytes uncompressed but found {content_length}" - print(detail, flush=True) - raise HTTPException(status_code=500, detail=detail) - - parsed_log_lines = [] - - for json_line in uncompressed_content.decode("utf-8").split("\n"): - if not json_line: - continue - try: - result = json.loads(json_line) - parsed_log_lines.append(result) - except json.JSONDecodeError as err: - print(f"Error decoding json-l line: {json_line}. Error: {err}", flush=True) - - return parsed_log_lines + return uncompressed_content async def get_zip_file(client, bucket, key): @@ -106,12 +85,56 @@ async def get_zip_file(client, bucket, key): ) +def sync_get_zip_file(client, bucket, key): + """Fetch enough of the WACZ file be able to read the zip filelist""" + file_size = sync_get_file_size(client, bucket, key) + eocd_record = sync_fetch( + client, bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE + ) + + if file_size <= MAX_STANDARD_ZIP_SIZE: + cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record) + central_directory = sync_fetch(client, bucket, key, cd_start, cd_size) + with zipfile.ZipFile(io.BytesIO(central_directory + eocd_record)) as zip_file: + return (cd_start, zip_file) + + zip64_eocd_record = sync_fetch( + client, + bucket, + key, + file_size + - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE), + ZIP64_EOCD_RECORD_SIZE, + ) + zip64_eocd_locator = sync_fetch( + client, + bucket, + key, + file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE), + ZIP64_EOCD_LOCATOR_SIZE, + ) + cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record) + central_directory = sync_fetch(client, bucket, key, cd_start, cd_size) + with zipfile.ZipFile( + io.BytesIO( + central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record + ) + ) as zip_file: + return (cd_start, zip_file) + + async def get_file_size(client, bucket, key): """Get WACZ file size from HEAD request""" head_response = await client.head_object(Bucket=bucket, Key=key) return head_response["ContentLength"] +def sync_get_file_size(client, bucket, key): + """Get WACZ file size from HEAD request""" + head_response = client.head_object(Bucket=bucket, Key=key) + return head_response["ContentLength"] + + async def fetch(client, bucket, key, start, length): """Fetch a byte range from a file in object storage""" end = start + length - 1 @@ -121,6 +144,20 @@ async def fetch(client, bucket, key, start, length): return await response["Body"].read() +def sync_fetch(client, bucket, key, start, length): + """Fetch a byte range from a file in object storage""" + end = start + length - 1 + response = client.get_object(Bucket=bucket, Key=key, Range=f"bytes={start}-{end}") + return response["Body"].read() + + +def sync_fetch_stream(client, bucket, key, start, length): + """Fetch a byte range from a file in object storage as a stream""" + end = start + length - 1 + response = client.get_object(Bucket=bucket, Key=key, Range=f"bytes={start}-{end}") + return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE) + + def get_central_directory_metadata_from_eocd(eocd): """Get central directory start and size""" cd_size = parse_little_endian_to_int(eocd[12:16])