Close sync S3 client (#1481)
Cleanup of boto3 sync client, ensure that it is used as a context manager like async client.
This commit is contained in:
parent
950844dc92
commit
bf38063e0a
@ -11,7 +11,7 @@ from typing import (
|
|||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
from urllib.parse import urlsplit
|
from urllib.parse import urlsplit
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager, contextmanager
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import heapq
|
import heapq
|
||||||
@ -275,10 +275,11 @@ class StorageOps:
|
|||||||
) as client:
|
) as client:
|
||||||
yield client, bucket, key
|
yield client, bucket, key
|
||||||
|
|
||||||
def get_sync_s3_client(
|
@contextmanager
|
||||||
self, storage: S3Storage, use_access=False
|
def get_sync_client(self, org: Organization) -> Iterator[tuple[S3Client, str, str]]:
|
||||||
) -> tuple[S3Client, str, str, str]:
|
|
||||||
"""context manager for s3 client"""
|
"""context manager for s3 client"""
|
||||||
|
storage = self.get_org_primary_storage(org)
|
||||||
|
|
||||||
endpoint_url = storage.endpoint_url
|
endpoint_url = storage.endpoint_url
|
||||||
|
|
||||||
if not endpoint_url.endswith("/"):
|
if not endpoint_url.endswith("/"):
|
||||||
@ -289,19 +290,17 @@ class StorageOps:
|
|||||||
|
|
||||||
endpoint_url = parts.scheme + "://" + parts.netloc
|
endpoint_url = parts.scheme + "://" + parts.netloc
|
||||||
|
|
||||||
client = boto3.client(
|
try:
|
||||||
"s3",
|
client = boto3.client(
|
||||||
region_name=storage.region,
|
"s3",
|
||||||
endpoint_url=endpoint_url,
|
region_name=storage.region,
|
||||||
aws_access_key_id=storage.access_key,
|
endpoint_url=endpoint_url,
|
||||||
aws_secret_access_key=storage.secret_key,
|
aws_access_key_id=storage.access_key,
|
||||||
)
|
aws_secret_access_key=storage.secret_key,
|
||||||
|
)
|
||||||
public_endpoint_url = (
|
yield client, bucket, key
|
||||||
storage.endpoint_url if not use_access else storage.access_endpoint_url
|
finally:
|
||||||
)
|
client.close()
|
||||||
|
|
||||||
return client, bucket, key, public_endpoint_url
|
|
||||||
|
|
||||||
async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
|
async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
|
||||||
"""Test credentials and storage endpoint by uploading an empty test file"""
|
"""Test credentials and storage endpoint by uploading an empty test file"""
|
||||||
@ -367,14 +366,6 @@ class StorageOps:
|
|||||||
|
|
||||||
await client.put_object(Bucket=bucket, Key=key, Body=data)
|
await client.put_object(Bucket=bucket, Key=key, Body=data)
|
||||||
|
|
||||||
def get_sync_client(
|
|
||||||
self, org: Organization, use_access=False
|
|
||||||
) -> tuple[S3Client, str, str, str]:
|
|
||||||
"""get sync client"""
|
|
||||||
s3storage = self.get_org_primary_storage(org)
|
|
||||||
|
|
||||||
return self.get_sync_s3_client(s3storage, use_access=use_access)
|
|
||||||
|
|
||||||
# pylint: disable=too-many-arguments,too-many-locals
|
# pylint: disable=too-many-arguments,too-many-locals
|
||||||
async def do_upload_multipart(
|
async def do_upload_multipart(
|
||||||
self,
|
self,
|
||||||
@ -525,22 +516,21 @@ class StorageOps:
|
|||||||
contexts: List[str],
|
contexts: List[str],
|
||||||
) -> Iterator[bytes]:
|
) -> Iterator[bytes]:
|
||||||
"""Return filtered stream of logs from specified WACZs sorted by timestamp"""
|
"""Return filtered stream of logs from specified WACZs sorted by timestamp"""
|
||||||
client, bucket, key, _ = self.get_sync_client(org)
|
with self.get_sync_client(org) as (client, bucket, key):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
resp = await loop.run_in_executor(
|
||||||
|
None,
|
||||||
|
self._sync_get_logs,
|
||||||
|
wacz_files,
|
||||||
|
log_levels,
|
||||||
|
contexts,
|
||||||
|
client,
|
||||||
|
bucket,
|
||||||
|
key,
|
||||||
|
)
|
||||||
|
|
||||||
resp = await loop.run_in_executor(
|
return resp
|
||||||
None,
|
|
||||||
self._sync_get_logs,
|
|
||||||
wacz_files,
|
|
||||||
log_levels,
|
|
||||||
contexts,
|
|
||||||
client,
|
|
||||||
bucket,
|
|
||||||
key,
|
|
||||||
)
|
|
||||||
|
|
||||||
return resp
|
|
||||||
|
|
||||||
def _sync_get_logs(
|
def _sync_get_logs(
|
||||||
self,
|
self,
|
||||||
@ -674,15 +664,14 @@ class StorageOps:
|
|||||||
) -> Iterator[bytes]:
|
) -> Iterator[bytes]:
|
||||||
"""return an iter for downloading a stream nested wacz file
|
"""return an iter for downloading a stream nested wacz file
|
||||||
from list of files"""
|
from list of files"""
|
||||||
client, bucket, key, _ = self.get_sync_client(org)
|
with self.get_sync_client(org) as (client, bucket, key):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
resp = await loop.run_in_executor(
|
||||||
|
None, self._sync_dl, files, client, bucket, key
|
||||||
|
)
|
||||||
|
|
||||||
resp = await loop.run_in_executor(
|
return resp
|
||||||
None, self._sync_dl, files, client, bucket, key
|
|
||||||
)
|
|
||||||
|
|
||||||
return resp
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
|
Loading…
Reference in New Issue
Block a user