storages: use asynccontextmanager instead of sync to close client (#1521)
Follow-up to #1481, use the asyncontextmanager with `async with` as only used in async functions (which call run_in_executor)
This commit is contained in:
parent
b2a5dbf2cd
commit
65fec64197
@ -12,7 +12,7 @@ from typing import (
|
|||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
from urllib.parse import urlsplit
|
from urllib.parse import urlsplit
|
||||||
from contextlib import asynccontextmanager, contextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import heapq
|
import heapq
|
||||||
@ -276,8 +276,10 @@ class StorageOps:
|
|||||||
) as client:
|
) as client:
|
||||||
yield client, bucket, key
|
yield client, bucket, key
|
||||||
|
|
||||||
@contextmanager
|
@asynccontextmanager
|
||||||
def get_sync_client(self, org: Organization) -> Iterator[tuple[S3Client, str, str]]:
|
async def get_sync_client(
|
||||||
|
self, org: Organization
|
||||||
|
) -> AsyncIterator[tuple[S3Client, str, str]]:
|
||||||
"""context manager for s3 client"""
|
"""context manager for s3 client"""
|
||||||
storage = self.get_org_primary_storage(org)
|
storage = self.get_org_primary_storage(org)
|
||||||
|
|
||||||
@ -517,7 +519,7 @@ class StorageOps:
|
|||||||
contexts: List[str],
|
contexts: List[str],
|
||||||
) -> Iterator[bytes]:
|
) -> Iterator[bytes]:
|
||||||
"""Return filtered stream of logs from specified WACZs sorted by timestamp"""
|
"""Return filtered stream of logs from specified WACZs sorted by timestamp"""
|
||||||
with self.get_sync_client(org) as (client, bucket, key):
|
async with self.get_sync_client(org) as (client, bucket, key):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
resp = await loop.run_in_executor(
|
resp = await loop.run_in_executor(
|
||||||
@ -665,7 +667,7 @@ class StorageOps:
|
|||||||
) -> Iterator[bytes]:
|
) -> Iterator[bytes]:
|
||||||
"""return an iter for downloading a stream nested wacz file
|
"""return an iter for downloading a stream nested wacz file
|
||||||
from list of files"""
|
from list of files"""
|
||||||
with self.get_sync_client(org) as (client, bucket, key):
|
async with self.get_sync_client(org) as (client, bucket, key):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
resp = await loop.run_in_executor(
|
resp = await loop.run_in_executor(
|
||||||
|
Loading…
Reference in New Issue
Block a user