Storage ops followup type checking (#1274)

* storage ops: follow up to #1257:
- fix refactor typo
- add type hints for all storageops apis (add mypy_boto3_s3 and types_aiobotocore_s3 for type hints)
This commit is contained in:
Ilya Kreymer 2023-10-11 14:03:00 -07:00 committed by GitHub
parent f1dcc7e48a
commit 41c054d209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 29 deletions

View File

@ -916,7 +916,7 @@ def init_crawls_api(
if crawl.finished: if crawl.finished:
wacz_files = await ops.get_wacz_files(crawl_id, org) wacz_files = await ops.get_wacz_files(crawl_id, org)
resp = await storage_ops.sync_stream_wacz_logs( resp = await storage_ops.sync_stream_wacz_logs(
org, wacz_files, log_levels, contexts, crawl_manager org, wacz_files, log_levels, contexts
) )
return StreamingResponse(resp) return StreamingResponse(resp)

View File

@ -633,9 +633,9 @@ class S3Storage(BaseModel):
endpoint_url: str endpoint_url: str
access_key: str access_key: str
secret_key: str secret_key: str
access_endpoint_url: Optional[str] access_endpoint_url: str
region: Optional[str] = "" region: str = ""
use_access_for_presign: Optional[bool] = True use_access_for_presign: bool = True
# ============================================================================ # ============================================================================

View File

@ -1,7 +1,7 @@
""" """
Storage API Storage API
""" """
from typing import Optional, Union, Iterator, Iterable, List, Dict from typing import Optional, Union, Iterator, Iterable, List, Dict, AsyncIterator
from urllib.parse import urlsplit from urllib.parse import urlsplit
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -19,7 +19,18 @@ from stream_zip import stream_zip, NO_COMPRESSION_64
import aiobotocore.session import aiobotocore.session
import boto3 import boto3
from .models import CrawlFile, Organization, DefaultStorage, S3Storage, User from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.type_defs import CompletedPartTypeDef
from types_aiobotocore_s3 import S3Client as AIOS3Client
from .models import (
CrawlFile,
CrawlFileOut,
Organization,
DefaultStorage,
S3Storage,
User,
)
from .zip import ( from .zip import (
sync_get_zip_file, sync_get_zip_file,
sync_get_log_stream, sync_get_log_stream,
@ -55,7 +66,7 @@ class StorageOps:
# expand when additional storage options are supported # expand when additional storage options are supported
raise TypeError("Only s3 storage supported for now") raise TypeError("Only s3 storage supported for now")
def _create_s3_storage(self, storage): def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage:
"""create S3Storage object""" """create S3Storage object"""
endpoint_url = storage["endpoint_url"] endpoint_url = storage["endpoint_url"]
bucket_name = storage.get("bucket_name") bucket_name = storage.get("bucket_name")
@ -78,13 +89,13 @@ class StorageOps:
use_access_for_presign=use_access_for_presign, use_access_for_presign=use_access_for_presign,
) )
def has_storage(self, name): def has_storage(self, name) -> bool:
"""assert the specified storage exists""" """assert the specified storage exists"""
return name in self.storages return name in self.storages
async def update_storage( async def update_storage(
self, storage: Union[S3Storage, DefaultStorage], org: Organization, user: User self, storage: Union[S3Storage, DefaultStorage], org: Organization, user: User
): ) -> dict[str, bool]:
"""update storage for org""" """update storage for org"""
if storage.type == "default": if storage.type == "default":
if not self.has_storage(storage.name): if not self.has_storage(storage.name):
@ -109,7 +120,9 @@ class StorageOps:
return {"updated": True} return {"updated": True}
@asynccontextmanager @asynccontextmanager
async def get_s3_client(self, storage, use_access=False): async def get_s3_client(
self, storage: S3Storage, use_access=False
) -> AsyncIterator[tuple[AIOS3Client, str, str]]:
"""context manager for s3 client""" """context manager for s3 client"""
endpoint_url = ( endpoint_url = (
storage.endpoint_url if not use_access else storage.access_endpoint_url storage.endpoint_url if not use_access else storage.access_endpoint_url
@ -133,7 +146,9 @@ class StorageOps:
) as client: ) as client:
yield client, bucket, key yield client, bucket, key
def get_sync_s3_client(self, storage, use_access=False): def get_sync_s3_client(
self, storage: S3Storage, use_access=False
) -> tuple[S3Client, str, str, str]:
"""context manager for s3 client""" """context manager for s3 client"""
endpoint_url = storage.endpoint_url endpoint_url = storage.endpoint_url
@ -159,7 +174,7 @@ class StorageOps:
return client, bucket, key, public_endpoint_url return client, bucket, key, public_endpoint_url
async def verify_storage_upload(self, storage, filename): async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
"""Test credentials and storage endpoint by uploading an empty test file""" """Test credentials and storage endpoint by uploading an empty test file"""
async with self.get_s3_client(storage) as (client, bucket, key): async with self.get_s3_client(storage) as (client, bucket, key):
@ -169,7 +184,9 @@ class StorageOps:
resp = await client.put_object(Bucket=bucket, Key=key, Body=data) resp = await client.put_object(Bucket=bucket, Key=key, Body=data)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
def get_org_storage(self, org, storage_name="default", check_name_first=False): def get_org_storage(
self, org: Organization, storage_name="default", check_name_first=False
) -> S3Storage:
"""get storage for org, either looking for default storage name first """get storage for org, either looking for default storage name first
or custom storage from the org. Check default storage first if flag or custom storage from the org. Check default storage first if flag
set to true""" set to true"""
@ -186,16 +203,20 @@ class StorageOps:
return s3storage return s3storage
async def do_upload_single(self, org, filename, data, storage_name="default"): async def do_upload_single(
self, org: Organization, filename: str, data, storage_name="default"
) -> None:
"""do upload to specified key""" """do upload to specified key"""
s3storage = self.get_org_storage(org, storage_name) s3storage = self.get_org_storage(org, storage_name)
async with self.get_s3_client(s3storage) as (client, bucket, key): async with self.get_s3_client(s3storage) as (client, bucket, key):
key += filename key += filename
return await client.put_object(Bucket=bucket, Key=key, Body=data) await client.put_object(Bucket=bucket, Key=key, Body=data)
def get_sync_client(self, org, storage_name="default", use_access=False): def get_sync_client(
self, org: Organization, storage_name="default", use_access=False
) -> tuple[S3Client, str, str, str]:
"""get sync client""" """get sync client"""
s3storage = self.get_org_storage(org, storage_name) s3storage = self.get_org_storage(org, storage_name)
@ -203,12 +224,17 @@ class StorageOps:
# pylint: disable=too-many-arguments,too-many-locals # pylint: disable=too-many-arguments,too-many-locals
async def do_upload_multipart( async def do_upload_multipart(
self, org, filename, file_, min_size, storage_name="default" self,
): org: Organization,
filename: str,
file_: AsyncIterator,
min_size: int,
storage_name="default",
) -> bool:
"""do upload to specified key using multipart chunking""" """do upload to specified key using multipart chunking"""
s3storage = self.get_org_storage(org, storage_name) s3storage = self.get_org_storage(org, storage_name)
async def get_next_chunk(file_, min_size): async def get_next_chunk(file_, min_size) -> bytes:
total = 0 total = 0
bufs = [] bufs = []
@ -252,7 +278,12 @@ class StorageOps:
flush=True, flush=True,
) )
parts.append({"PartNumber": part_number, "ETag": resp["ETag"]}) part: CompletedPartTypeDef = {
"PartNumber": part_number,
"ETag": resp["ETag"],
}
parts.append(part)
part_number += 1 part_number += 1
@ -280,7 +311,9 @@ class StorageOps:
return False return False
async def get_presigned_url(self, org, crawlfile, duration=3600): async def get_presigned_url(
self, org: Organization, crawlfile: CrawlFile, duration=3600
) -> str:
"""generate pre-signed url for crawl file""" """generate pre-signed url for crawl file"""
s3storage = self.get_org_storage(org, crawlfile.def_storage_name, True) s3storage = self.get_org_storage(org, crawlfile.def_storage_name, True)
@ -307,13 +340,17 @@ class StorageOps:
return presigned_url return presigned_url
async def delete_crawl_file_object(self, org, crawlfile): async def delete_crawl_file_object(
self, org: Organization, crawlfile: CrawlFile
) -> bool:
"""delete crawl file from storage.""" """delete crawl file from storage."""
return await self.delete_file( return await self.delete_file(
org, crawlfile.filename, crawlfile.def_storage_name org, crawlfile.filename, crawlfile.def_storage_name
) )
async def delete_file(self, org, filename, def_storage_name="default"): async def delete_file(
self, org: Organization, filename: str, def_storage_name="default"
) -> bool:
"""delete specified file from storage""" """delete specified file from storage"""
status_code = None status_code = None
@ -330,7 +367,13 @@ class StorageOps:
return status_code == 204 return status_code == 204
async def sync_stream_wacz_logs(self, org, wacz_files, log_levels, contexts): async def sync_stream_wacz_logs(
self,
org: Organization,
wacz_files: List[CrawlFile],
log_levels: List[str],
contexts: List[str],
) -> Iterator[bytes]:
"""Return filtered stream of logs from specified WACZs sorted by timestamp""" """Return filtered stream of logs from specified WACZs sorted by timestamp"""
client, bucket, key, _ = self.get_sync_client(org) client, bucket, key, _ = self.get_sync_client(org)
@ -435,7 +478,9 @@ class StorageOps:
return stream_json_lines(heap_iter, log_levels, contexts) return stream_json_lines(heap_iter, log_levels, contexts)
def _sync_dl(self, all_files, client, bucket, key): def _sync_dl(
self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str
) -> Iterator[bytes]:
"""generate streaming zip as sync""" """generate streaming zip as sync"""
for file_ in all_files: for file_ in all_files:
file_.path = file_.name file_.path = file_.name
@ -444,9 +489,9 @@ class StorageOps:
"profile": "multi-wacz-package", "profile": "multi-wacz-package",
"resources": [file_.dict() for file_ in all_files], "resources": [file_.dict() for file_ in all_files],
} }
datapackage = json.dumps(datapackage).encode("utf-8") datapackage_str = json.dumps(datapackage).encode("utf-8")
def get_file(name): def get_file(name) -> Iterator[bytes]:
response = client.get_object(Bucket=bucket, Key=key + name) response = client.get_object(Bucket=bucket, Key=key + name)
return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE) return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE)
@ -467,12 +512,14 @@ class StorageOps:
modified_at, modified_at,
perms, perms,
NO_COMPRESSION_64, NO_COMPRESSION_64,
(datapackage,), (datapackage_str,),
) )
return stream_zip(member_files(), chunk_size=CHUNK_SIZE) return stream_zip(member_files(), chunk_size=CHUNK_SIZE)
async def download_streaming_wacz(self, org, files): async def download_streaming_wacz(
self, org: Organization, files: List[CrawlFileOut]
) -> Iterator[bytes]:
"""return an iter for downloading a stream nested wacz file """return an iter for downloading a stream nested wacz file
from list of files""" from list of files"""
client, bucket, key, _ = self.get_sync_client(org) client, bucket, key, _ = self.get_sync_client(org)

View File

@ -16,3 +16,5 @@ https://github.com/ikreymer/stream-zip/archive/refs/heads/stream-uncompress.zip
boto3 boto3
backoff>=2.2.1 backoff>=2.2.1
python-slugify>=8.0.1 python-slugify>=8.0.1
mypy_boto3_s3
types_aiobotocore_s3