Fix: Stream log downloading from WACZ (#1225)

* Fix(backend): Stream logs without causing OOM

Also be smarter about when to use `heapq.merge` and when to use
`itertools.chain`: If all the logs are coming from the same instance we
`chain` them, otherwise we'll `merge` them

iterator fixes:
- group wacz files by instance by suffix, eg. -0.wacz, -1.wacz, -2.wacz
- sort wacz files, and all logs within each wacz file
- chain log iterators for all log files within wacz group
- merge log iterators across wacz files in different groups
- add type hints to help keep track of iterator helper functions
- add iter_lines() from botocore, use that for line parsing for simplicity

---------
Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
This commit is contained in:
Anish Lakhwara 2023-09-28 18:54:52 -07:00 committed by GitHub
parent d6bc467c54
commit 037396f3d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 42 deletions

View File

@ -1,13 +1,14 @@
"""
Storage API
"""
from typing import Optional, Union
from typing import Optional, Union, Iterator, Iterable, List, Dict
from urllib.parse import urlsplit
from contextlib import asynccontextmanager
import asyncio
import heapq
import json
import itertools
from datetime import datetime
@ -17,7 +18,7 @@ from stream_zip import stream_zip, NO_COMPRESSION_64
import aiobotocore.session
import boto3
from .models import Organization, DefaultStorage, S3Storage, User
from .models import CrawlFile, Organization, DefaultStorage, S3Storage, User
from .zip import (
sync_get_zip_file,
sync_get_log_stream,
@ -348,33 +349,34 @@ def _parse_json(line):
# ============================================================================
def _sync_get_logs(wacz_files, log_levels, contexts, client, bucket, key):
def _sync_get_logs(
wacz_files: List[CrawlFile],
log_levels: List[str],
contexts: List[str],
client,
bucket: str,
key: str,
) -> Iterator[bytes]:
"""Generate filtered stream of logs from specified WACZs sorted by timestamp"""
# pylint: disable=too-many-function-args
def stream_log_bytes_as_line_dicts(stream_generator):
"""Yield parsed JSON lines as dicts from stream generator."""
last_line = ""
try:
while True:
next_chunk = next(stream_generator)
next_chunk = next_chunk.decode("utf-8", errors="ignore")
chunk = last_line + next_chunk
chunk_by_line = chunk.split("\n")
last_line = chunk_by_line.pop()
for line in chunk_by_line:
if not line:
continue
json_dict = _parse_json(line)
if json_dict:
yield json_dict
except StopIteration:
if last_line:
json_dict = _parse_json(last_line)
if json_dict:
yield json_dict
def stream_log_lines(
wacz_key, wacz_filename, cd_start, log_zipinfo
) -> Iterator[dict]:
"""Pass lines as json objects"""
def stream_json_lines(iterator, log_levels, contexts):
print(f"Fetching log {log_zipinfo.filename} from {wacz_filename}", flush=True)
line_iter: Iterator[bytes] = sync_get_log_stream(
client, bucket, wacz_key, log_zipinfo, cd_start
)
for line in line_iter:
yield _parse_json(line.decode("utf-8", errors="ignore"))
def stream_json_lines(
iterator: Iterable[dict], log_levels: List[str], contexts: List[str]
) -> Iterator[bytes]:
"""Yield parsed JSON dicts as JSON-lines bytes after filtering as necessary"""
for line_dict in iterator:
if log_levels and line_dict["logLevel"] not in log_levels:
@ -384,29 +386,50 @@ def _sync_get_logs(wacz_files, log_levels, contexts, client, bucket, key):
json_str = json.dumps(line_dict, ensure_ascii=False) + "\n"
yield json_str.encode("utf-8")
log_generators = []
def organize_based_on_instance_number(
wacz_files: List[CrawlFile],
) -> List[List[CrawlFile]]:
"""Place wacz_files into their own list based on instance number"""
wacz_files.sort(key=lambda file: file.filename)
waczs_groups: Dict[str, List[CrawlFile]] = {}
for file in wacz_files:
instance_number = file.filename[
file.filename.rfind("-") + 1 : file.filename.rfind(".")
]
if instance_number in waczs_groups:
waczs_groups[instance_number].append(file)
else:
waczs_groups[instance_number] = [file]
return list(waczs_groups.values())
for wacz_file in wacz_files:
wacz_key = key + wacz_file.filename
cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key)
log_generators: List[Iterator[dict]] = []
log_files = [
f
for f in zip_file.filelist
if f.filename.startswith("logs/") and not f.is_dir()
]
waczs_groups = organize_based_on_instance_number(wacz_files)
for instance_list in waczs_groups:
wacz_log_streams: List[Iterator[dict]] = []
wacz_log_streams = []
for wacz_file in instance_list:
wacz_key = key + wacz_file.filename
cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key)
for log_zipinfo in log_files:
log_stream = sync_get_log_stream(
client, bucket, wacz_key, log_zipinfo, cd_start
)
wacz_log_streams.extend(stream_log_bytes_as_line_dicts(log_stream))
log_files = [
f
for f in zip_file.filelist
if f.filename.startswith("logs/") and not f.is_dir()
]
log_files.sort(key=lambda log_zipinfo: log_zipinfo.filename)
log_generators.append(wacz_log_streams)
for log_zipinfo in log_files:
wacz_log_streams.append(
stream_log_lines(
wacz_key, wacz_file.filename, cd_start, log_zipinfo
)
)
log_generators.append(itertools.chain(*wacz_log_streams))
heap_iter = heapq.merge(*log_generators, key=lambda entry: entry["timestamp"])
return stream_json_lines(heap_iter, log_levels, contexts)

View File

@ -40,7 +40,21 @@ def sync_get_log_stream(client, bucket, key, log_zipinfo, cd_start):
else:
uncompressed_content = content
return uncompressed_content
return sync_iter_lines(uncompressed_content)
def sync_iter_lines(chunk_iter, keepends=True):
"""
Iter by lines, adapted from botocore
"""
pending = b""
for chunk in chunk_iter:
lines = (pending + chunk).splitlines(True)
for line in lines[:-1]:
yield line.splitlines(keepends)[0]
pending = lines[-1]
if pending:
yield pending.splitlines(keepends)[0]
async def get_zip_file(client, bucket, key):