Implement sync streaming for finished crawl logs (#1168)

- Crawl logs streamed from WACZs using the sync boto client
This commit is contained in:
Tessa Walsh 2023-09-14 20:05:19 -04:00 committed by GitHub
parent 6ddba105f4
commit 2efc461b9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 70 deletions

View File

@ -2,7 +2,6 @@
# pylint: disable=too-many-lines # pylint: disable=too-many-lines
import asyncio import asyncio
import heapq
import uuid import uuid
import json import json
import re import re
@ -17,7 +16,7 @@ from redis import asyncio as exceptions
import pymongo import pymongo
from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .storages import get_wacz_logs from .storages import sync_stream_wacz_logs
from .utils import dt_now, parse_jsonl_error_messages from .utils import dt_now, parse_jsonl_error_messages
from .basecrawls import BaseCrawlOps from .basecrawls import BaseCrawlOps
from .models import ( from .models import (
@ -915,27 +914,13 @@ def init_crawls_api(
if context: if context:
contexts = context.split(",") contexts = context.split(",")
def stream_json_lines(iterator, log_levels, contexts):
"""Return iterator as generator, filtering as necessary"""
for line_dict in iterator:
if log_levels and line_dict["logLevel"] not in log_levels:
continue
if contexts and line_dict["context"] not in contexts:
continue
# Convert to JSON-lines bytes
json_str = json.dumps(line_dict, ensure_ascii=False) + "\n"
yield json_str.encode("utf-8")
# If crawl is finished, stream logs from WACZ files # If crawl is finished, stream logs from WACZ files
if crawl.finished: if crawl.finished:
logs = []
wacz_files = await ops.get_wacz_files(crawl_id, org) wacz_files = await ops.get_wacz_files(crawl_id, org)
for wacz_file in wacz_files: resp = await sync_stream_wacz_logs(
wacz_logs = await get_wacz_logs(org, wacz_file, crawl_manager) org, wacz_files, log_levels, contexts, crawl_manager
logs.append(wacz_logs) )
heap_iter = heapq.merge(*logs, key=lambda entry: entry["timestamp"]) return StreamingResponse(resp)
return StreamingResponse(stream_json_lines(heap_iter, log_levels, contexts))
raise HTTPException(status_code=400, detail="crawl_not_finished") raise HTTPException(status_code=400, detail="crawl_not_finished")

View File

@ -1,11 +1,12 @@
""" """
Storage API Storage API
""" """
from typing import Union from typing import Optional, Union
from urllib.parse import urlsplit from urllib.parse import urlsplit
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import asyncio import asyncio
import heapq
import json import json
from datetime import datetime from datetime import datetime
@ -17,7 +18,10 @@ import aiobotocore.session
import boto3 import boto3
from .models import Organization, DefaultStorage, S3Storage, User from .models import Organization, DefaultStorage, S3Storage, User
from .zip import get_zip_file, extract_and_parse_log_file from .zip import (
sync_get_zip_file,
sync_get_log_stream,
)
CHUNK_SIZE = 1024 * 256 CHUNK_SIZE = 1024 * 256
@ -319,39 +323,91 @@ async def delete_file(org, filename, crawl_manager, def_storage_name="default"):
# ============================================================================ # ============================================================================
async def get_wacz_logs(org, crawlfile, crawl_manager): async def sync_stream_wacz_logs(org, wacz_files, log_levels, contexts, crawl_manager):
"""Return combined and sorted list of log line dicts from all logs in WACZ.""" """Return filtered stream of logs from specified WACZs sorted by timestamp"""
if crawlfile.def_storage_name: client, bucket, key, _ = await get_sync_client(org, crawl_manager)
s3storage = await crawl_manager.get_default_storage(crawlfile.def_storage_name)
elif org.storage.type == "s3": loop = asyncio.get_event_loop()
s3storage = org.storage
else: resp = await loop.run_in_executor(
raise TypeError("No Default Storage Found, Invalid Storage Type") None, _sync_get_logs, wacz_files, log_levels, contexts, client, bucket, key
)
return resp
# ============================================================================
def _parse_json(line):
"""Parse JSON str into dict."""
parsed_json: Optional[dict] = None
try:
parsed_json = json.loads(line)
except json.JSONDecodeError as err:
print(f"Error decoding json-l line: {line}. Error: {err}", flush=True)
return parsed_json
# ============================================================================
def _sync_get_logs(wacz_files, log_levels, contexts, client, bucket, key):
"""Generate filtered stream of logs from specified WACZs sorted by timestamp"""
# pylint: disable=too-many-function-args
def stream_log_bytes_as_line_dicts(stream_generator):
"""Yield parsed JSON lines as dicts from stream generator."""
last_line = ""
try:
while True:
next_chunk = next(stream_generator)
next_chunk = next_chunk.decode("utf-8", errors="ignore")
chunk = last_line + next_chunk
chunk_by_line = chunk.split("\n")
last_line = chunk_by_line.pop()
for line in chunk_by_line:
if not line:
continue
json_dict = _parse_json(line)
if json_dict:
yield json_dict
except StopIteration:
if last_line:
json_dict = _parse_json(last_line)
if json_dict:
yield json_dict
def stream_json_lines(iterator, log_levels, contexts):
"""Yield parsed JSON dicts as JSON-lines bytes after filtering as necessary"""
for line_dict in iterator:
if log_levels and line_dict["logLevel"] not in log_levels:
continue
if contexts and line_dict["context"] not in contexts:
continue
json_str = json.dumps(line_dict, ensure_ascii=False) + "\n"
yield json_str.encode("utf-8")
log_generators = []
for wacz_file in wacz_files:
wacz_key = key + wacz_file.filename
cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key)
async with get_s3_client(s3storage, s3storage.use_access_for_presign) as (
client,
bucket,
key,
):
key += crawlfile.filename
cd_start, zip_file = await get_zip_file(client, bucket, key)
log_files = [ log_files = [
f f
for f in zip_file.filelist for f in zip_file.filelist
if f.filename.startswith("logs/") and not f.is_dir() if f.filename.startswith("logs/") and not f.is_dir()
] ]
combined_log_lines = [] wacz_log_streams = []
for log_zipinfo in log_files: for log_zipinfo in log_files:
parsed_log_lines = await extract_and_parse_log_file( log_stream = sync_get_log_stream(
client, bucket, key, log_zipinfo, cd_start client, bucket, wacz_key, log_zipinfo, cd_start
) )
combined_log_lines.extend(parsed_log_lines) wacz_log_streams.extend(stream_log_bytes_as_line_dicts(log_stream))
return combined_log_lines log_generators.append(wacz_log_streams)
heap_iter = heapq.merge(*log_generators, key=lambda entry: entry["timestamp"])
return stream_json_lines(heap_iter, log_levels, contexts)
# ============================================================================ # ============================================================================

View File

@ -2,14 +2,10 @@
Methods for interacting with zip/WACZ files Methods for interacting with zip/WACZ files
""" """
import io import io
import json
import os
import struct import struct
import zipfile import zipfile
import zlib import zlib
from fastapi import HTTPException
# ============================================================================ # ============================================================================
EOCD_RECORD_SIZE = 22 EOCD_RECORD_SIZE = 22
@ -18,18 +14,20 @@ ZIP64_EOCD_LOCATOR_SIZE = 20
MAX_STANDARD_ZIP_SIZE = 4_294_967_295 MAX_STANDARD_ZIP_SIZE = 4_294_967_295
CHUNK_SIZE = 1024 * 256
# ============================================================================ # ============================================================================
async def extract_and_parse_log_file(client, bucket, key, log_zipinfo, cd_start): def sync_get_log_stream(client, bucket, key, log_zipinfo, cd_start):
"""Return parsed JSON from extracted and uncompressed log""" """Return uncompressed byte stream of log file in WACZ"""
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
file_head = await fetch( file_head = sync_fetch(
client, bucket, key, cd_start + log_zipinfo.header_offset + 26, 4 client, bucket, key, cd_start + log_zipinfo.header_offset + 26, 4
) )
name_len = parse_little_endian_to_int(file_head[0:2]) name_len = parse_little_endian_to_int(file_head[0:2])
extra_len = parse_little_endian_to_int(file_head[2:4]) extra_len = parse_little_endian_to_int(file_head[2:4])
content = await fetch( content = sync_fetch_stream(
client, client,
bucket, bucket,
key, key,
@ -42,26 +40,7 @@ async def extract_and_parse_log_file(client, bucket, key, log_zipinfo, cd_start)
else: else:
uncompressed_content = content uncompressed_content = content
content_length = len(uncompressed_content) return uncompressed_content
if not log_zipinfo.file_size == content_length:
# pylint: disable=line-too-long
detail = f"Error extracting log file {log_zipinfo.filename} from WACZ {os.path.basename(key)}."
detail += f" Expected {log_zipinfo.file_size} bytes uncompressed but found {content_length}"
print(detail, flush=True)
raise HTTPException(status_code=500, detail=detail)
parsed_log_lines = []
for json_line in uncompressed_content.decode("utf-8").split("\n"):
if not json_line:
continue
try:
result = json.loads(json_line)
parsed_log_lines.append(result)
except json.JSONDecodeError as err:
print(f"Error decoding json-l line: {json_line}. Error: {err}", flush=True)
return parsed_log_lines
async def get_zip_file(client, bucket, key): async def get_zip_file(client, bucket, key):
@ -106,12 +85,56 @@ async def get_zip_file(client, bucket, key):
) )
def sync_get_zip_file(client, bucket, key):
"""Fetch enough of the WACZ file be able to read the zip filelist"""
file_size = sync_get_file_size(client, bucket, key)
eocd_record = sync_fetch(
client, bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE
)
if file_size <= MAX_STANDARD_ZIP_SIZE:
cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record)
central_directory = sync_fetch(client, bucket, key, cd_start, cd_size)
with zipfile.ZipFile(io.BytesIO(central_directory + eocd_record)) as zip_file:
return (cd_start, zip_file)
zip64_eocd_record = sync_fetch(
client,
bucket,
key,
file_size
- (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE),
ZIP64_EOCD_RECORD_SIZE,
)
zip64_eocd_locator = sync_fetch(
client,
bucket,
key,
file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE),
ZIP64_EOCD_LOCATOR_SIZE,
)
cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record)
central_directory = sync_fetch(client, bucket, key, cd_start, cd_size)
with zipfile.ZipFile(
io.BytesIO(
central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record
)
) as zip_file:
return (cd_start, zip_file)
async def get_file_size(client, bucket, key): async def get_file_size(client, bucket, key):
"""Get WACZ file size from HEAD request""" """Get WACZ file size from HEAD request"""
head_response = await client.head_object(Bucket=bucket, Key=key) head_response = await client.head_object(Bucket=bucket, Key=key)
return head_response["ContentLength"] return head_response["ContentLength"]
def sync_get_file_size(client, bucket, key):
"""Get WACZ file size from HEAD request"""
head_response = client.head_object(Bucket=bucket, Key=key)
return head_response["ContentLength"]
async def fetch(client, bucket, key, start, length): async def fetch(client, bucket, key, start, length):
"""Fetch a byte range from a file in object storage""" """Fetch a byte range from a file in object storage"""
end = start + length - 1 end = start + length - 1
@ -121,6 +144,20 @@ async def fetch(client, bucket, key, start, length):
return await response["Body"].read() return await response["Body"].read()
def sync_fetch(client, bucket, key, start, length):
"""Fetch a byte range from a file in object storage"""
end = start + length - 1
response = client.get_object(Bucket=bucket, Key=key, Range=f"bytes={start}-{end}")
return response["Body"].read()
def sync_fetch_stream(client, bucket, key, start, length):
"""Fetch a byte range from a file in object storage as a stream"""
end = start + length - 1
response = client.get_object(Bucket=bucket, Key=key, Range=f"bytes={start}-{end}")
return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE)
def get_central_directory_metadata_from_eocd(eocd): def get_central_directory_metadata_from_eocd(eocd):
"""Get central directory start and size""" """Get central directory start and size"""
cd_size = parse_little_endian_to_int(eocd[12:16]) cd_size = parse_little_endian_to_int(eocd[12:16])