switch to simpler streaming download + multiwacz metadata improvements: (#1982)
- download via presigned URLs via requests instead of boto APIs, remove boto - follow-up to #1933 for streaming download improvements - fixes datapackage.json in multi-wacz to contain the same resources objects with: `name`, `path`, `hash`, `bytes` to match single WACZ. - Add additional metadata to multi-wacz datapackage.json, including `type` (`crawl`, `upload`, `collection`, `qaRun`), `id` (unique id for the object), `title` / `description` if available (for crawl/upload/collection), and `crawlId` for `qaRun`
This commit is contained in:
parent
2429bb620c
commit
104ea097c4
@ -54,7 +54,7 @@ PRESIGN_MINUTES_DEFAULT = PRESIGN_MINUTES_MAX
|
|||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
# pylint: disable=too-many-instance-attributes, too-many-public-methods
|
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines
|
||||||
class BaseCrawlOps:
|
class BaseCrawlOps:
|
||||||
"""operations that apply to all crawls"""
|
"""operations that apply to all crawls"""
|
||||||
|
|
||||||
@ -823,7 +823,14 @@ class BaseCrawlOps:
|
|||||||
if not crawl.resources:
|
if not crawl.resources:
|
||||||
raise HTTPException(status_code=400, detail="no_crawl_resources")
|
raise HTTPException(status_code=400, detail="no_crawl_resources")
|
||||||
|
|
||||||
resp = await self.storage_ops.download_streaming_wacz(org, crawl.resources)
|
metadata = {"type": crawl.type, "id": crawl_id, "organization": org.slug}
|
||||||
|
if crawl.name:
|
||||||
|
metadata["title"] = crawl.name
|
||||||
|
|
||||||
|
if crawl.description:
|
||||||
|
metadata["description"] = crawl.description
|
||||||
|
|
||||||
|
resp = await self.storage_ops.download_streaming_wacz(metadata, crawl.resources)
|
||||||
|
|
||||||
headers = {"Content-Disposition": f'attachment; filename="{crawl_id}.wacz"'}
|
headers = {"Content-Disposition": f'attachment; filename="{crawl_id}.wacz"'}
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
|
@ -323,7 +323,16 @@ class CollectionOps:
|
|||||||
"""Download all WACZs in collection as streaming nested WACZ"""
|
"""Download all WACZs in collection as streaming nested WACZ"""
|
||||||
coll = await self.get_collection(coll_id, org, resources=True)
|
coll = await self.get_collection(coll_id, org, resources=True)
|
||||||
|
|
||||||
resp = await self.storage_ops.download_streaming_wacz(org, coll.resources)
|
metadata = {
|
||||||
|
"type": "collection",
|
||||||
|
"id": str(coll_id),
|
||||||
|
"title": coll.name,
|
||||||
|
"organization": org.slug,
|
||||||
|
}
|
||||||
|
if coll.description:
|
||||||
|
metadata["description"] = coll.description
|
||||||
|
|
||||||
|
resp = await self.storage_ops.download_streaming_wacz(metadata, coll.resources)
|
||||||
|
|
||||||
headers = {"Content-Disposition": f'attachment; filename="{coll.name}.wacz"'}
|
headers = {"Content-Disposition": f'attachment; filename="{coll.name}.wacz"'}
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
|
@ -1034,7 +1034,16 @@ class CrawlOps(BaseCrawlOps):
|
|||||||
if not qa_run.resources:
|
if not qa_run.resources:
|
||||||
raise HTTPException(status_code=400, detail="qa_run_no_resources")
|
raise HTTPException(status_code=400, detail="qa_run_no_resources")
|
||||||
|
|
||||||
resp = await self.storage_ops.download_streaming_wacz(org, qa_run.resources)
|
metadata = {
|
||||||
|
"type": "qaRun",
|
||||||
|
"id": qa_run_id,
|
||||||
|
"crawlId": crawl_id,
|
||||||
|
"organization": org.slug,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = await self.storage_ops.download_streaming_wacz(
|
||||||
|
metadata, qa_run.resources
|
||||||
|
)
|
||||||
|
|
||||||
finished = qa_run.finished.isoformat()
|
finished = qa_run.finished.isoformat()
|
||||||
|
|
||||||
|
@ -31,11 +31,10 @@ from stream_zip import stream_zip, NO_COMPRESSION_64, Method
|
|||||||
from remotezip import RemoteZip
|
from remotezip import RemoteZip
|
||||||
|
|
||||||
import aiobotocore.session
|
import aiobotocore.session
|
||||||
import boto3
|
import requests
|
||||||
|
|
||||||
from mypy_boto3_s3.client import S3Client
|
|
||||||
from mypy_boto3_s3.type_defs import CompletedPartTypeDef
|
|
||||||
from types_aiobotocore_s3 import S3Client as AIOS3Client
|
from types_aiobotocore_s3 import S3Client as AIOS3Client
|
||||||
|
from types_aiobotocore_s3.type_defs import CompletedPartTypeDef
|
||||||
|
|
||||||
from .models import (
|
from .models import (
|
||||||
BaseFile,
|
BaseFile,
|
||||||
@ -52,6 +51,7 @@ from .models import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from .utils import is_bool, slug_from_name
|
from .utils import is_bool, slug_from_name
|
||||||
|
from .version import __version__
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@ -289,35 +289,6 @@ class StorageOps:
|
|||||||
) as client:
|
) as client:
|
||||||
yield client, bucket, key
|
yield client, bucket, key
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def get_sync_client(
|
|
||||||
self, org: Organization
|
|
||||||
) -> AsyncIterator[tuple[S3Client, str, str]]:
|
|
||||||
"""context manager for s3 client"""
|
|
||||||
storage = self.get_org_primary_storage(org)
|
|
||||||
|
|
||||||
endpoint_url = storage.endpoint_url
|
|
||||||
|
|
||||||
if not endpoint_url.endswith("/"):
|
|
||||||
endpoint_url += "/"
|
|
||||||
|
|
||||||
parts = urlsplit(endpoint_url)
|
|
||||||
bucket, key = parts.path[1:].split("/", 1)
|
|
||||||
|
|
||||||
endpoint_url = parts.scheme + "://" + parts.netloc
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = boto3.client(
|
|
||||||
"s3",
|
|
||||||
region_name=storage.region,
|
|
||||||
endpoint_url=endpoint_url,
|
|
||||||
aws_access_key_id=storage.access_key,
|
|
||||||
aws_secret_access_key=storage.secret_key,
|
|
||||||
)
|
|
||||||
yield client, bucket, key
|
|
||||||
finally:
|
|
||||||
client.close()
|
|
||||||
|
|
||||||
async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
|
async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
|
||||||
"""Test credentials and storage endpoint by uploading an empty test file"""
|
"""Test credentials and storage endpoint by uploading an empty test file"""
|
||||||
|
|
||||||
@ -683,21 +654,32 @@ class StorageOps:
|
|||||||
yield from file_stream
|
yield from file_stream
|
||||||
|
|
||||||
def _sync_dl(
|
def _sync_dl(
|
||||||
self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str
|
self, metadata: dict[str, str], all_files: List[CrawlFileOut]
|
||||||
) -> Iterator[bytes]:
|
) -> Iterator[bytes]:
|
||||||
"""generate streaming zip as sync"""
|
"""generate streaming zip as sync"""
|
||||||
for file_ in all_files:
|
|
||||||
file_.path = file_.name
|
|
||||||
|
|
||||||
datapackage = {
|
datapackage = {
|
||||||
"profile": "multi-wacz-package",
|
"profile": "multi-wacz-package",
|
||||||
"resources": [file_.dict() for file_ in all_files],
|
"resources": [
|
||||||
|
{
|
||||||
|
"name": file_.name,
|
||||||
|
"path": file_.name,
|
||||||
|
"hash": "sha256:" + file_.hash,
|
||||||
|
"bytes": file_.size,
|
||||||
}
|
}
|
||||||
datapackage_bytes = json.dumps(datapackage).encode("utf-8")
|
for file_ in all_files
|
||||||
|
],
|
||||||
|
"software": f"Browsertrix v{__version__}",
|
||||||
|
**metadata,
|
||||||
|
}
|
||||||
|
datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8")
|
||||||
|
|
||||||
def get_file(name) -> Iterator[bytes]:
|
def get_datapackage() -> Iterable[bytes]:
|
||||||
response = client.get_object(Bucket=bucket, Key=key + name)
|
yield datapackage_bytes
|
||||||
return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE)
|
|
||||||
|
def get_file(path: str) -> Iterable[bytes]:
|
||||||
|
path = self.resolve_internal_access_path(path)
|
||||||
|
r = requests.get(path, stream=True, timeout=None)
|
||||||
|
yield from r.iter_content(CHUNK_SIZE)
|
||||||
|
|
||||||
def member_files() -> (
|
def member_files() -> (
|
||||||
Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]]
|
Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]]
|
||||||
@ -710,7 +692,7 @@ class StorageOps:
|
|||||||
modified_at,
|
modified_at,
|
||||||
perms,
|
perms,
|
||||||
NO_COMPRESSION_64(file_.size, 0),
|
NO_COMPRESSION_64(file_.size, 0),
|
||||||
get_file(file_.name),
|
get_file(file_.path),
|
||||||
)
|
)
|
||||||
|
|
||||||
yield (
|
yield (
|
||||||
@ -720,23 +702,20 @@ class StorageOps:
|
|||||||
NO_COMPRESSION_64(
|
NO_COMPRESSION_64(
|
||||||
len(datapackage_bytes), zlib.crc32(datapackage_bytes)
|
len(datapackage_bytes), zlib.crc32(datapackage_bytes)
|
||||||
),
|
),
|
||||||
(datapackage_bytes,),
|
get_datapackage(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# stream_zip() is an Iterator but defined as an Iterable, can cast
|
# stream_zip() is an Iterator but defined as an Iterable, can cast
|
||||||
return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE))
|
return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE))
|
||||||
|
|
||||||
async def download_streaming_wacz(
|
async def download_streaming_wacz(
|
||||||
self, org: Organization, files: List[CrawlFileOut]
|
self, metadata: dict[str, str], files: List[CrawlFileOut]
|
||||||
) -> Iterator[bytes]:
|
) -> Iterator[bytes]:
|
||||||
"""return an iter for downloading a stream nested wacz file
|
"""return an iter for downloading a stream nested wacz file
|
||||||
from list of files"""
|
from list of files"""
|
||||||
async with self.get_sync_client(org) as (client, bucket, key):
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
resp = await loop.run_in_executor(
|
resp = await loop.run_in_executor(None, self._sync_dl, metadata, files)
|
||||||
None, self._sync_dl, files, client, bucket, key
|
|
||||||
)
|
|
||||||
|
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ aiofiles
|
|||||||
kubernetes-asyncio==29.0.0
|
kubernetes-asyncio==29.0.0
|
||||||
kubernetes
|
kubernetes
|
||||||
aiobotocore
|
aiobotocore
|
||||||
|
requests
|
||||||
redis>=5.0.0
|
redis>=5.0.0
|
||||||
pyyaml
|
pyyaml
|
||||||
jinja2
|
jinja2
|
||||||
@ -18,10 +19,8 @@ humanize
|
|||||||
python-multipart
|
python-multipart
|
||||||
pathvalidate
|
pathvalidate
|
||||||
https://github.com/ikreymer/stream-zip/archive/refs/heads/crc32-optional.zip
|
https://github.com/ikreymer/stream-zip/archive/refs/heads/crc32-optional.zip
|
||||||
boto3
|
|
||||||
backoff>=2.2.1
|
backoff>=2.2.1
|
||||||
python-slugify>=8.0.1
|
python-slugify>=8.0.1
|
||||||
mypy_boto3_s3
|
|
||||||
types_aiobotocore_s3
|
types_aiobotocore_s3
|
||||||
types-redis
|
types-redis
|
||||||
types-python-slugify
|
types-python-slugify
|
||||||
|
@ -6,6 +6,7 @@ import zipfile
|
|||||||
import re
|
import re
|
||||||
import csv
|
import csv
|
||||||
import codecs
|
import codecs
|
||||||
|
import json
|
||||||
from tempfile import TemporaryFile
|
from tempfile import TemporaryFile
|
||||||
from zipfile import ZipFile, ZIP_STORED
|
from zipfile import ZipFile, ZIP_STORED
|
||||||
|
|
||||||
@ -406,6 +407,15 @@ def test_download_wacz_crawls(
|
|||||||
assert filename.endswith(".wacz") or filename == "datapackage.json"
|
assert filename.endswith(".wacz") or filename == "datapackage.json"
|
||||||
assert zip_file.getinfo(filename).compress_type == ZIP_STORED
|
assert zip_file.getinfo(filename).compress_type == ZIP_STORED
|
||||||
|
|
||||||
|
if filename == "datapackage.json":
|
||||||
|
data = zip_file.read(filename).decode("utf-8")
|
||||||
|
datapackage = json.loads(data)
|
||||||
|
assert len(datapackage["resources"]) == 1
|
||||||
|
for resource in datapackage["resources"]:
|
||||||
|
assert resource["name"] == resource["path"]
|
||||||
|
assert resource["hash"]
|
||||||
|
assert resource["bytes"]
|
||||||
|
|
||||||
|
|
||||||
def test_update_crawl(
|
def test_update_crawl(
|
||||||
admin_auth_headers,
|
admin_auth_headers,
|
||||||
|
Loading…
Reference in New Issue
Block a user