From 037396f3d99a04f8b06c56a3c4dc7079a1e6cffe Mon Sep 17 00:00:00 2001 From: Anish Lakhwara Date: Thu, 28 Sep 2023 18:54:52 -0700 Subject: [PATCH] Fix: Stream log downloading from WACZ (#1225) * Fix(backend): Stream logs without causing OOM Also be smarter about when to use `heapq.merge` and when to use `itertools.chain`: If all the logs are coming from the same instance we `chain` them, otherwise we'll `merge` them iterator fixes: - group wacz files by instance by suffix, eg. -0.wacz, -1.wacz, -2.wacz - sort wacz files, and all logs within each wacz file - chain log iterators for all log files within wacz group - merge log iterators across wacz files in different groups - add type hints to help keep track of iterator helper functions - add iter_lines() from botocore, use that for line parsing for simplicity --------- Co-authored-by: Ilya Kreymer --- backend/btrixcloud/storages.py | 105 ++++++++++++++++++++------------- backend/btrixcloud/zip.py | 16 ++++- 2 files changed, 79 insertions(+), 42 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 88cb62a6..c05a754e 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -1,13 +1,14 @@ """ Storage API """ -from typing import Optional, Union +from typing import Optional, Union, Iterator, Iterable, List, Dict from urllib.parse import urlsplit from contextlib import asynccontextmanager import asyncio import heapq import json +import itertools from datetime import datetime @@ -17,7 +18,7 @@ from stream_zip import stream_zip, NO_COMPRESSION_64 import aiobotocore.session import boto3 -from .models import Organization, DefaultStorage, S3Storage, User +from .models import CrawlFile, Organization, DefaultStorage, S3Storage, User from .zip import ( sync_get_zip_file, sync_get_log_stream, @@ -348,33 +349,34 @@ def _parse_json(line): # ============================================================================ -def _sync_get_logs(wacz_files, log_levels, contexts, client, bucket, key): +def _sync_get_logs( + wacz_files: List[CrawlFile], + log_levels: List[str], + contexts: List[str], + client, + bucket: str, + key: str, +) -> Iterator[bytes]: """Generate filtered stream of logs from specified WACZs sorted by timestamp""" # pylint: disable=too-many-function-args - def stream_log_bytes_as_line_dicts(stream_generator): - """Yield parsed JSON lines as dicts from stream generator.""" - last_line = "" - try: - while True: - next_chunk = next(stream_generator) - next_chunk = next_chunk.decode("utf-8", errors="ignore") - chunk = last_line + next_chunk - chunk_by_line = chunk.split("\n") - last_line = chunk_by_line.pop() - for line in chunk_by_line: - if not line: - continue - json_dict = _parse_json(line) - if json_dict: - yield json_dict - except StopIteration: - if last_line: - json_dict = _parse_json(last_line) - if json_dict: - yield json_dict + def stream_log_lines( + wacz_key, wacz_filename, cd_start, log_zipinfo + ) -> Iterator[dict]: + """Pass lines as json objects""" - def stream_json_lines(iterator, log_levels, contexts): + print(f"Fetching log {log_zipinfo.filename} from {wacz_filename}", flush=True) + + line_iter: Iterator[bytes] = sync_get_log_stream( + client, bucket, wacz_key, log_zipinfo, cd_start + ) + + for line in line_iter: + yield _parse_json(line.decode("utf-8", errors="ignore")) + + def stream_json_lines( + iterator: Iterable[dict], log_levels: List[str], contexts: List[str] + ) -> Iterator[bytes]: """Yield parsed JSON dicts as JSON-lines bytes after filtering as necessary""" for line_dict in iterator: if log_levels and line_dict["logLevel"] not in log_levels: @@ -384,29 +386,50 @@ def _sync_get_logs(wacz_files, log_levels, contexts, client, bucket, key): json_str = json.dumps(line_dict, ensure_ascii=False) + "\n" yield json_str.encode("utf-8") - log_generators = [] + def organize_based_on_instance_number( + wacz_files: List[CrawlFile], + ) -> List[List[CrawlFile]]: + """Place wacz_files into their own list based on instance number""" + wacz_files.sort(key=lambda file: file.filename) + waczs_groups: Dict[str, List[CrawlFile]] = {} + for file in wacz_files: + instance_number = file.filename[ + file.filename.rfind("-") + 1 : file.filename.rfind(".") + ] + if instance_number in waczs_groups: + waczs_groups[instance_number].append(file) + else: + waczs_groups[instance_number] = [file] + return list(waczs_groups.values()) - for wacz_file in wacz_files: - wacz_key = key + wacz_file.filename - cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key) + log_generators: List[Iterator[dict]] = [] - log_files = [ - f - for f in zip_file.filelist - if f.filename.startswith("logs/") and not f.is_dir() - ] + waczs_groups = organize_based_on_instance_number(wacz_files) + for instance_list in waczs_groups: + wacz_log_streams: List[Iterator[dict]] = [] - wacz_log_streams = [] + for wacz_file in instance_list: + wacz_key = key + wacz_file.filename + cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key) - for log_zipinfo in log_files: - log_stream = sync_get_log_stream( - client, bucket, wacz_key, log_zipinfo, cd_start - ) - wacz_log_streams.extend(stream_log_bytes_as_line_dicts(log_stream)) + log_files = [ + f + for f in zip_file.filelist + if f.filename.startswith("logs/") and not f.is_dir() + ] + log_files.sort(key=lambda log_zipinfo: log_zipinfo.filename) - log_generators.append(wacz_log_streams) + for log_zipinfo in log_files: + wacz_log_streams.append( + stream_log_lines( + wacz_key, wacz_file.filename, cd_start, log_zipinfo + ) + ) + + log_generators.append(itertools.chain(*wacz_log_streams)) heap_iter = heapq.merge(*log_generators, key=lambda entry: entry["timestamp"]) + return stream_json_lines(heap_iter, log_levels, contexts) diff --git a/backend/btrixcloud/zip.py b/backend/btrixcloud/zip.py index 8ab03d66..e1c0d445 100644 --- a/backend/btrixcloud/zip.py +++ b/backend/btrixcloud/zip.py @@ -40,7 +40,21 @@ def sync_get_log_stream(client, bucket, key, log_zipinfo, cd_start): else: uncompressed_content = content - return uncompressed_content + return sync_iter_lines(uncompressed_content) + + +def sync_iter_lines(chunk_iter, keepends=True): + """ + Iter by lines, adapted from botocore + """ + pending = b"" + for chunk in chunk_iter: + lines = (pending + chunk).splitlines(True) + for line in lines[:-1]: + yield line.splitlines(keepends)[0] + pending = lines[-1] + if pending: + yield pending.splitlines(keepends)[0] async def get_zip_file(client, bucket, key):