Add crawl /log API endpoint

If a crawl is completed, the endpoint streams the logs from the log
files in all of the created WACZ files, sorted by timestamp.

The API endpoint supports filtering by log_level and context whether
the crawl is still running or not.

This is not yet proper streaming because the entire log file is read
into memory before being streamed to the client. We will want to
switch to proper streaming eventually, but are currently blocked by
an aiobotocore bug - see:

https://github.com/aio-libs/aiobotocore/issues/991?#issuecomment-1490737762
This commit is contained in:
Tessa Walsh 2023-03-01 15:43:15 -05:00
parent 631c84e488
commit fb80a04f18
6 changed files with 381 additions and 4 deletions

View File

@ -2,6 +2,7 @@
# pylint: disable=too-many-lines
import asyncio
import heapq
import uuid
import os
import json
@ -12,16 +13,17 @@ from typing import Optional, List, Dict, Union
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, UUID4, conint, HttpUrl
from redis import asyncio as aioredis, exceptions
import pymongo
from .crawlconfigs import Seed, CrawlConfigCore, CrawlConfig
from .db import BaseMongoModel
from .users import User
from .orgs import Organization, MAX_CRAWL_SCALE
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .storages import get_presigned_url, delete_crawl_file_object
from .storages import get_presigned_url, delete_crawl_file_object, get_wacz_logs
from .users import User
CRAWL_STATES = (
@ -487,6 +489,16 @@ class CrawlOps:
if status_code != 204:
raise HTTPException(status_code=400, detail="file_deletion_error")
async def get_wacz_files(self, crawl_id: str, org: Organization):
"""Return list of WACZ files associated with crawl."""
wacz_files = []
crawl_raw = await self.get_crawl_raw(crawl_id, org)
crawl = Crawl.from_dict(crawl_raw)
for file_ in crawl.files:
if file_.filename.endswith(".wacz"):
wacz_files.append(file_)
return wacz_files
async def add_new_crawl(self, crawl_id: str, crawlconfig: CrawlConfig, user: User):
"""initialize new crawl"""
crawl = Crawl(
@ -756,7 +768,7 @@ class CrawlOps:
# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def init_crawls_api(app, mdb, users, crawl_manager, crawl_config_ops, orgs, user_dep):
"""API for crawl management, including crawl done callback"""
# pylint: disable=invalid-name
@ -1024,6 +1036,46 @@ def init_crawls_api(app, mdb, users, crawl_manager, crawl_config_ops, orgs, user
):
return await ops.add_or_remove_exclusion(crawl_id, regex, org, user, add=False)
@app.get("/orgs/{oid}/crawls/{crawl_id}/logs", tags=["crawls"])
async def stream_crawl_logs(
crawl_id,
org: Organization = Depends(org_viewer_dep),
logLevel: Optional[str] = None,
context: Optional[str] = None,
):
crawl = await ops.get_crawl(crawl_id, org)
log_levels = []
contexts = []
if logLevel:
log_levels = logLevel.split(",")
if context:
contexts = context.split(",")
def stream_json_lines(iterator, log_levels, contexts):
"""Return iterator as generator, filtering as necessary"""
for line_dict in iterator:
if log_levels and line_dict["logLevel"] not in log_levels:
continue
if contexts and line_dict["context"] not in contexts:
continue
# Convert to JSON-lines bytes
json_str = json.dumps(line_dict, ensure_ascii=False) + "\n"
yield json_str.encode("utf-8")
# If crawl is finished, stream logs from WACZ files
if crawl.finished:
logs = []
wacz_files = await ops.get_wacz_files(crawl_id, org)
for wacz_file in wacz_files:
wacz_logs = await get_wacz_logs(org, wacz_file, crawl_manager)
logs.append(wacz_logs)
heap_iter = heapq.merge(*logs, key=lambda entry: entry["timestamp"])
return StreamingResponse(stream_json_lines(heap_iter, log_levels, contexts))
raise HTTPException(status_code=400, detail="crawl_not_finished")
return ops

View File

@ -7,8 +7,8 @@ import base64
import yaml
import aiohttp
from ..orgs import S3Storage
from ..crawlmanager import BaseCrawlManager
from ..orgs import S3Storage
from .k8sapi import K8sAPI

View File

@ -10,6 +10,7 @@ from aiobotocore.session import get_session
from .orgs import Organization, DefaultStorage, S3Storage
from .users import User
from .zip import get_zip_file, extract_and_parse_log_file
# ============================================================================
@ -148,3 +149,39 @@ async def delete_crawl_file_object(org, crawlfile, crawl_manager):
status_code = response["ResponseMetadata"]["HTTPStatusCode"]
return status_code
# ============================================================================
async def get_wacz_logs(org, crawlfile, crawl_manager):
"""Return combined and sorted list of log line dicts from all logs in WACZ."""
if crawlfile.def_storage_name:
s3storage = await crawl_manager.get_default_storage(crawlfile.def_storage_name)
elif org.storage.type == "s3":
s3storage = org.storage
else:
raise TypeError("No Default Storage Found, Invalid Storage Type")
async with get_s3_client(s3storage, s3storage.use_access_for_presign) as (
client,
bucket,
key,
):
key += crawlfile.filename
cd_start, zip_file = await get_zip_file(client, bucket, key)
log_files = [
f
for f in zip_file.filelist
if f.filename.startswith("logs/") and not f.is_dir()
]
combined_log_lines = []
for log_zipinfo in log_files:
parsed_log_lines = await extract_and_parse_log_file(
client, bucket, key, log_zipinfo, cd_start
)
combined_log_lines.extend(parsed_log_lines)
return combined_log_lines

147
backend/btrixcloud/zip.py Normal file
View File

@ -0,0 +1,147 @@
"""
Methods for interacting with zip/WACZ files
"""
import io
import json
import os
import struct
import zipfile
import zlib
from fastapi import HTTPException
# ============================================================================
EOCD_RECORD_SIZE = 22
ZIP64_EOCD_RECORD_SIZE = 56
ZIP64_EOCD_LOCATOR_SIZE = 20
MAX_STANDARD_ZIP_SIZE = 4_294_967_295
# ============================================================================
async def extract_and_parse_log_file(client, bucket, key, log_zipinfo, cd_start):
"""Return parsed JSON from extracted and uncompressed log"""
# pylint: disable=too-many-locals
file_head = await fetch(
client, bucket, key, cd_start + log_zipinfo.header_offset + 26, 4
)
name_len = parse_little_endian_to_int(file_head[0:2])
extra_len = parse_little_endian_to_int(file_head[2:4])
content = await fetch(
client,
bucket,
key,
cd_start + log_zipinfo.header_offset + 30 + name_len + extra_len,
log_zipinfo.compress_size,
)
if log_zipinfo.compress_type == zipfile.ZIP_DEFLATED:
uncompressed_content = zlib.decompressobj(-zlib.MAX_WBITS).decompress(content)
else:
uncompressed_content = content
content_length = len(uncompressed_content)
if not log_zipinfo.file_size == content_length:
# pylint: disable=line-too-long
detail = f"Error extracting log file {log_zipinfo.filename} from WACZ {os.path.basename(key)}."
detail += f" Expected {log_zipinfo.file_size} bytes uncompressed but found {content_length}"
print(detail, flush=True)
raise HTTPException(status_code=500, detail=detail)
parsed_log_lines = []
for json_line in uncompressed_content.decode("utf-8").split("\n"):
if not json_line:
continue
try:
result = json.loads(json_line)
parsed_log_lines.append(result)
except json.JSONDecodeError as err:
print(f"Error decoding json-l line: {json_line}. Error: {err}", flush=True)
return parsed_log_lines
async def get_zip_file(client, bucket, key):
"""Fetch enough of the WACZ file be able to read the zip filelist"""
file_size = await get_file_size(client, bucket, key)
eocd_record = await fetch(
client, bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE
)
if file_size <= MAX_STANDARD_ZIP_SIZE:
cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record)
central_directory = await fetch(client, bucket, key, cd_start, cd_size)
return (
cd_start,
zipfile.ZipFile(io.BytesIO(central_directory + eocd_record)),
)
zip64_eocd_record = await fetch(
client,
bucket,
key,
file_size
- (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE),
ZIP64_EOCD_RECORD_SIZE,
)
zip64_eocd_locator = await fetch(
client,
bucket,
key,
file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE),
ZIP64_EOCD_LOCATOR_SIZE,
)
cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record)
central_directory = await fetch(client, bucket, key, cd_start, cd_size)
return (
cd_start,
zipfile.ZipFile(
io.BytesIO(
central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record
)
),
)
async def get_file_size(client, bucket, key):
"""Get WACZ file size from HEAD request"""
head_response = await client.head_object(Bucket=bucket, Key=key)
return head_response["ContentLength"]
async def fetch(client, bucket, key, start, length):
"""Fetch a byte range from a file in object storage"""
end = start + length - 1
response = await client.get_object(
Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
)
return await response["Body"].read()
def get_central_directory_metadata_from_eocd(eocd):
"""Get central directory start and size"""
cd_size = parse_little_endian_to_int(eocd[12:16])
cd_start = parse_little_endian_to_int(eocd[16:20])
return cd_start, cd_size
def get_central_directory_metadata_from_eocd64(eocd64):
"""Get central directory start and size for zip64"""
cd_size = parse_little_endian_to_int(eocd64[40:48])
cd_start = parse_little_endian_to_int(eocd64[48:56])
return cd_start, cd_size
def parse_little_endian_to_int(little_endian_bytes):
"""Convert little endian used in zip spec to int"""
byte_length = len(little_endian_bytes)
format_character = "q"
if byte_length == 4:
format_character = "i"
elif byte_length == 2:
format_character = "h"
return struct.unpack("<" + format_character, little_endian_bytes)[0]

View File

@ -150,3 +150,56 @@ def crawl_config_info(admin_auth_headers, default_org_id):
if data["state"] == "complete":
return (crawl_config_id, crawl_id, second_crawl_id)
time.sleep(5)
@pytest.fixture(scope="session")
def large_crawl_id(admin_auth_headers, default_org_id):
# Start crawl
crawl_data = {
"runNow": True,
"name": "Large Test Crawl",
"tags": ["wacz-logs"],
"config": {
"seeds": [{"url": "https://webrecorder.net/"}],
"scopeType": "domain",
"limit": 100,
"extraHops": 1,
},
}
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
headers=admin_auth_headers,
json=crawl_data,
)
data = r.json()
crawl_id = data["run_now_job"]
# Wait for crawl to start running
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
headers=admin_auth_headers,
)
data = r.json()
if data["state"] == "running":
# Give crawl time to start properly
time.sleep(30)
return crawl_id
time.sleep(5)
@pytest.fixture(scope="session")
def large_crawl_finished(admin_auth_headers, default_org_id, large_crawl_id):
# Wait for crawl to complete
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{large_crawl_id}/replay.json",
headers=admin_auth_headers,
)
data = r.json()
if data["state"] == "complete":
# Give some time for WACZ files to be stored
time.sleep(30)
break
time.sleep(5)

View File

@ -0,0 +1,88 @@
import json
import requests
import time
import pytest
from .conftest import API_PREFIX
LINES_TO_TEST = 10
@pytest.mark.parametrize(
"log_level, context",
[
# No filtering
(None, None),
# Filter log level
("info", None),
("info,debug", None),
# Filter context
(None, "general"),
(None, "general,worker"),
# Filter both
("info,debug", "general,worker"),
],
)
def test_stream_crawl_logs_wacz(
admin_auth_headers,
default_org_id,
large_crawl_id,
large_crawl_finished,
log_level,
context,
):
"""Test that streaming logs after crawl concludes from WACZs works."""
api_url = f"{API_PREFIX}/orgs/{default_org_id}/crawls/{large_crawl_id}/logs"
if log_level and context:
api_url = api_url + f"?logLevel={log_level}&context={context}"
elif log_level:
api_url = api_url + f"?logLevel={log_level}"
elif context:
api_url = api_url + f"?context={context}"
log_levels = []
contexts = []
if log_level:
log_levels = log_level.split(",")
if context:
contexts = context.split(",")
with requests.get(api_url, headers=admin_auth_headers, stream=True) as r:
assert r.status_code == 200
last_timestamp = None
line_index = 0
# Wait for stream content
if not r.content:
while True:
if r.content:
break
time.sleep(5)
for line in r.iter_lines():
if line_index >= LINES_TO_TEST:
r.close()
return
line = line.decode("utf-8")
log_line_dict = json.loads(line)
assert log_line_dict["logLevel"]
if log_level:
assert log_line_dict["logLevel"] in log_levels
assert log_line_dict["context"]
if context:
assert log_line_dict["context"] in contexts
assert log_line_dict["details"] or log_line_dict["details"] == {}
timestamp = log_line_dict["timestamp"]
assert timestamp
if last_timestamp:
assert timestamp >= last_timestamp
last_timestamp = timestamp
line_index += 1