diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 1b6639aa..9b072e61 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -916,7 +916,7 @@ def init_crawls_api( if crawl.finished: wacz_files = await ops.get_wacz_files(crawl_id, org) resp = await storage_ops.sync_stream_wacz_logs( - org, wacz_files, log_levels, contexts, crawl_manager + org, wacz_files, log_levels, contexts ) return StreamingResponse(resp) diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index b6a92f34..b81e466c 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -633,9 +633,9 @@ class S3Storage(BaseModel): endpoint_url: str access_key: str secret_key: str - access_endpoint_url: Optional[str] - region: Optional[str] = "" - use_access_for_presign: Optional[bool] = True + access_endpoint_url: str + region: str = "" + use_access_for_presign: bool = True # ============================================================================ diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 9c58b5d8..3c1d61c3 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -1,7 +1,7 @@ """ Storage API """ -from typing import Optional, Union, Iterator, Iterable, List, Dict +from typing import Optional, Union, Iterator, Iterable, List, Dict, AsyncIterator from urllib.parse import urlsplit from contextlib import asynccontextmanager @@ -19,7 +19,18 @@ from stream_zip import stream_zip, NO_COMPRESSION_64 import aiobotocore.session import boto3 -from .models import CrawlFile, Organization, DefaultStorage, S3Storage, User +from mypy_boto3_s3.client import S3Client +from mypy_boto3_s3.type_defs import CompletedPartTypeDef +from types_aiobotocore_s3 import S3Client as AIOS3Client + +from .models import ( + CrawlFile, + CrawlFileOut, + Organization, + DefaultStorage, + S3Storage, + User, +) from .zip import ( sync_get_zip_file, sync_get_log_stream, @@ -55,7 +66,7 @@ class StorageOps: # expand when additional storage options are supported raise TypeError("Only s3 storage supported for now") - def _create_s3_storage(self, storage): + def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage: """create S3Storage object""" endpoint_url = storage["endpoint_url"] bucket_name = storage.get("bucket_name") @@ -78,13 +89,13 @@ class StorageOps: use_access_for_presign=use_access_for_presign, ) - def has_storage(self, name): + def has_storage(self, name) -> bool: """assert the specified storage exists""" return name in self.storages async def update_storage( self, storage: Union[S3Storage, DefaultStorage], org: Organization, user: User - ): + ) -> dict[str, bool]: """update storage for org""" if storage.type == "default": if not self.has_storage(storage.name): @@ -109,7 +120,9 @@ class StorageOps: return {"updated": True} @asynccontextmanager - async def get_s3_client(self, storage, use_access=False): + async def get_s3_client( + self, storage: S3Storage, use_access=False + ) -> AsyncIterator[tuple[AIOS3Client, str, str]]: """context manager for s3 client""" endpoint_url = ( storage.endpoint_url if not use_access else storage.access_endpoint_url @@ -133,7 +146,9 @@ class StorageOps: ) as client: yield client, bucket, key - def get_sync_s3_client(self, storage, use_access=False): + def get_sync_s3_client( + self, storage: S3Storage, use_access=False + ) -> tuple[S3Client, str, str, str]: """context manager for s3 client""" endpoint_url = storage.endpoint_url @@ -159,7 +174,7 @@ class StorageOps: return client, bucket, key, public_endpoint_url - async def verify_storage_upload(self, storage, filename): + async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None: """Test credentials and storage endpoint by uploading an empty test file""" async with self.get_s3_client(storage) as (client, bucket, key): @@ -169,7 +184,9 @@ class StorageOps: resp = await client.put_object(Bucket=bucket, Key=key, Body=data) assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 - def get_org_storage(self, org, storage_name="default", check_name_first=False): + def get_org_storage( + self, org: Organization, storage_name="default", check_name_first=False + ) -> S3Storage: """get storage for org, either looking for default storage name first or custom storage from the org. Check default storage first if flag set to true""" @@ -186,16 +203,20 @@ class StorageOps: return s3storage - async def do_upload_single(self, org, filename, data, storage_name="default"): + async def do_upload_single( + self, org: Organization, filename: str, data, storage_name="default" + ) -> None: """do upload to specified key""" s3storage = self.get_org_storage(org, storage_name) async with self.get_s3_client(s3storage) as (client, bucket, key): key += filename - return await client.put_object(Bucket=bucket, Key=key, Body=data) + await client.put_object(Bucket=bucket, Key=key, Body=data) - def get_sync_client(self, org, storage_name="default", use_access=False): + def get_sync_client( + self, org: Organization, storage_name="default", use_access=False + ) -> tuple[S3Client, str, str, str]: """get sync client""" s3storage = self.get_org_storage(org, storage_name) @@ -203,12 +224,17 @@ class StorageOps: # pylint: disable=too-many-arguments,too-many-locals async def do_upload_multipart( - self, org, filename, file_, min_size, storage_name="default" - ): + self, + org: Organization, + filename: str, + file_: AsyncIterator, + min_size: int, + storage_name="default", + ) -> bool: """do upload to specified key using multipart chunking""" s3storage = self.get_org_storage(org, storage_name) - async def get_next_chunk(file_, min_size): + async def get_next_chunk(file_, min_size) -> bytes: total = 0 bufs = [] @@ -252,7 +278,12 @@ class StorageOps: flush=True, ) - parts.append({"PartNumber": part_number, "ETag": resp["ETag"]}) + part: CompletedPartTypeDef = { + "PartNumber": part_number, + "ETag": resp["ETag"], + } + + parts.append(part) part_number += 1 @@ -280,7 +311,9 @@ class StorageOps: return False - async def get_presigned_url(self, org, crawlfile, duration=3600): + async def get_presigned_url( + self, org: Organization, crawlfile: CrawlFile, duration=3600 + ) -> str: """generate pre-signed url for crawl file""" s3storage = self.get_org_storage(org, crawlfile.def_storage_name, True) @@ -307,13 +340,17 @@ class StorageOps: return presigned_url - async def delete_crawl_file_object(self, org, crawlfile): + async def delete_crawl_file_object( + self, org: Organization, crawlfile: CrawlFile + ) -> bool: """delete crawl file from storage.""" return await self.delete_file( org, crawlfile.filename, crawlfile.def_storage_name ) - async def delete_file(self, org, filename, def_storage_name="default"): + async def delete_file( + self, org: Organization, filename: str, def_storage_name="default" + ) -> bool: """delete specified file from storage""" status_code = None @@ -330,7 +367,13 @@ class StorageOps: return status_code == 204 - async def sync_stream_wacz_logs(self, org, wacz_files, log_levels, contexts): + async def sync_stream_wacz_logs( + self, + org: Organization, + wacz_files: List[CrawlFile], + log_levels: List[str], + contexts: List[str], + ) -> Iterator[bytes]: """Return filtered stream of logs from specified WACZs sorted by timestamp""" client, bucket, key, _ = self.get_sync_client(org) @@ -435,7 +478,9 @@ class StorageOps: return stream_json_lines(heap_iter, log_levels, contexts) - def _sync_dl(self, all_files, client, bucket, key): + def _sync_dl( + self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str + ) -> Iterator[bytes]: """generate streaming zip as sync""" for file_ in all_files: file_.path = file_.name @@ -444,9 +489,9 @@ class StorageOps: "profile": "multi-wacz-package", "resources": [file_.dict() for file_ in all_files], } - datapackage = json.dumps(datapackage).encode("utf-8") + datapackage_str = json.dumps(datapackage).encode("utf-8") - def get_file(name): + def get_file(name) -> Iterator[bytes]: response = client.get_object(Bucket=bucket, Key=key + name) return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE) @@ -467,12 +512,14 @@ class StorageOps: modified_at, perms, NO_COMPRESSION_64, - (datapackage,), + (datapackage_str,), ) return stream_zip(member_files(), chunk_size=CHUNK_SIZE) - async def download_streaming_wacz(self, org, files): + async def download_streaming_wacz( + self, org: Organization, files: List[CrawlFileOut] + ) -> Iterator[bytes]: """return an iter for downloading a stream nested wacz file from list of files""" client, bucket, key, _ = self.get_sync_client(org) diff --git a/backend/requirements.txt b/backend/requirements.txt index 66d35a62..01e1cc3d 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -16,3 +16,5 @@ https://github.com/ikreymer/stream-zip/archive/refs/heads/stream-uncompress.zip boto3 backoff>=2.2.1 python-slugify>=8.0.1 +mypy_boto3_s3 +types_aiobotocore_s3