* fix redis connection leaks + exclusions error: (fixes #1065) - use contextmanager for accessing redis to ensure redis.close() is always called - add get_redis_client() to k8sapi to ensure unified place to get redis client - use connectionpool.from_url() until redis 5.0.0 is released to ensure auto close and single client settings are applied - also: catch invalid regex passed to re.compile() in queue regex check, return 400 instead of 500 for invalid regex - redis requirements: bump to 5.0.0rc2
This commit is contained in:
parent
89983542f9
commit
2e73148bea
@ -6,10 +6,11 @@ import os
|
||||
from datetime import timedelta
|
||||
from typing import Optional, List, Union
|
||||
import urllib.parse
|
||||
import contextlib
|
||||
|
||||
from pydantic import UUID4
|
||||
from fastapi import HTTPException, Depends
|
||||
from redis import asyncio as aioredis, exceptions
|
||||
from redis import exceptions
|
||||
|
||||
from .models import (
|
||||
CrawlFile,
|
||||
@ -216,7 +217,7 @@ class BaseCrawlOps:
|
||||
# more responsive, saves db update in operator
|
||||
if crawl.state in RUNNING_STATES:
|
||||
try:
|
||||
redis = await self.get_redis(crawl.id)
|
||||
async with self.get_redis(crawl.id) as redis:
|
||||
crawl.stats = await get_redis_crawl_stats(redis, crawl.id)
|
||||
# redis not available, ignore
|
||||
except exceptions.ConnectionError:
|
||||
@ -281,13 +282,17 @@ class BaseCrawlOps:
|
||||
for update in updates:
|
||||
await self.crawls.find_one_and_update(*update)
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def get_redis(self, crawl_id):
|
||||
"""get redis url for crawl id"""
|
||||
redis_url = self.crawl_manager.get_redis_url(crawl_id)
|
||||
|
||||
return await aioredis.from_url(
|
||||
redis_url, encoding="utf-8", decode_responses=True
|
||||
)
|
||||
redis = await self.crawl_manager.get_redis_client(redis_url)
|
||||
|
||||
try:
|
||||
yield redis
|
||||
finally:
|
||||
await redis.close()
|
||||
|
||||
async def add_to_collection(
|
||||
self, crawl_ids: List[uuid.UUID], collection_id: uuid.UUID, org: Organization
|
||||
|
@ -363,23 +363,26 @@ class CrawlOps(BaseCrawlOps):
|
||||
|
||||
total = 0
|
||||
results = []
|
||||
redis = None
|
||||
|
||||
try:
|
||||
redis = await self.get_redis(crawl_id)
|
||||
|
||||
async with self.get_redis(crawl_id) as redis:
|
||||
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
|
||||
results = await self._crawl_queue_range(
|
||||
redis, f"{crawl_id}:q", offset, count
|
||||
)
|
||||
results = [json.loads(result)["url"] for result in results]
|
||||
|
||||
except exceptions.ConnectionError:
|
||||
# can't connect to redis, likely not initialized yet
|
||||
pass
|
||||
|
||||
matched = []
|
||||
if regex:
|
||||
try:
|
||||
regex = re.compile(regex)
|
||||
except re.error as exc:
|
||||
raise HTTPException(status_code=400, detail="invalid_regex") from exc
|
||||
|
||||
matched = [result for result in results if regex.search(result)]
|
||||
|
||||
return {"total": total, "results": results, "matched": matched}
|
||||
@ -387,21 +390,25 @@ class CrawlOps(BaseCrawlOps):
|
||||
async def match_crawl_queue(self, crawl_id, regex):
|
||||
"""get list of urls that match regex"""
|
||||
total = 0
|
||||
redis = None
|
||||
matched = []
|
||||
step = 50
|
||||
|
||||
async with self.get_redis(crawl_id) as redis:
|
||||
try:
|
||||
redis = await self.get_redis(crawl_id)
|
||||
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
|
||||
except exceptions.ConnectionError:
|
||||
# can't connect to redis, likely not initialized yet
|
||||
pass
|
||||
|
||||
try:
|
||||
regex = re.compile(regex)
|
||||
matched = []
|
||||
step = 50
|
||||
except re.error as exc:
|
||||
raise HTTPException(status_code=400, detail="invalid_regex") from exc
|
||||
|
||||
for count in range(0, total, step):
|
||||
results = await self._crawl_queue_range(redis, f"{crawl_id}:q", count, step)
|
||||
results = await self._crawl_queue_range(
|
||||
redis, f"{crawl_id}:q", count, step
|
||||
)
|
||||
for result in results:
|
||||
url = json.loads(result)["url"]
|
||||
if regex.search(url):
|
||||
@ -413,24 +420,26 @@ class CrawlOps(BaseCrawlOps):
|
||||
"""filter out urls that match regex"""
|
||||
# pylint: disable=too-many-locals
|
||||
total = 0
|
||||
redis = None
|
||||
|
||||
q_key = f"{crawl_id}:q"
|
||||
s_key = f"{crawl_id}:s"
|
||||
step = 50
|
||||
num_removed = 0
|
||||
|
||||
async with self.get_redis(crawl_id) as redis:
|
||||
try:
|
||||
redis = await self.get_redis(crawl_id)
|
||||
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
|
||||
except exceptions.ConnectionError:
|
||||
# can't connect to redis, likely not initialized yet
|
||||
pass
|
||||
|
||||
dircount = -1
|
||||
|
||||
try:
|
||||
regex = re.compile(regex)
|
||||
step = 50
|
||||
except re.error as exc:
|
||||
raise HTTPException(status_code=400, detail="invalid_regex") from exc
|
||||
|
||||
count = 0
|
||||
num_removed = 0
|
||||
|
||||
# pylint: disable=fixme
|
||||
# todo: do this in a more efficient way?
|
||||
@ -475,8 +484,8 @@ class CrawlOps(BaseCrawlOps):
|
||||
skip = page * page_size
|
||||
upper_bound = skip + page_size - 1
|
||||
|
||||
async with self.get_redis(crawl_id) as redis:
|
||||
try:
|
||||
redis = await self.get_redis(crawl_id)
|
||||
errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
|
||||
total = await redis.llen(f"{crawl_id}:e")
|
||||
except exceptions.ConnectionError:
|
||||
|
@ -13,6 +13,9 @@ from kubernetes_asyncio.client.api import custom_objects_api
|
||||
from kubernetes_asyncio.utils import create_from_dict
|
||||
from kubernetes_asyncio.client.exceptions import ApiException
|
||||
|
||||
from redis.asyncio import Redis
|
||||
from redis.asyncio.connection import ConnectionPool
|
||||
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from .utils import get_templates_dir, dt_now, to_k8s_date
|
||||
|
||||
@ -62,6 +65,17 @@ class K8sAPI:
|
||||
)
|
||||
return redis_url
|
||||
|
||||
async def get_redis_client(self, redis_url):
|
||||
"""return redis client with correct params for one-time use"""
|
||||
# manual settings until redis 5.0.0 is released
|
||||
pool = ConnectionPool.from_url(redis_url, decode_responses=True)
|
||||
redis = Redis(
|
||||
connection_pool=pool,
|
||||
decode_responses=True,
|
||||
)
|
||||
redis.auto_close_connection_pool = True
|
||||
return redis
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
async def new_crawl_job(
|
||||
self, cid, userid, oid, scale=1, crawl_timeout=0, manual=True
|
||||
|
@ -13,7 +13,6 @@ import yaml
|
||||
import humanize
|
||||
|
||||
from pydantic import BaseModel
|
||||
from redis import asyncio as aioredis
|
||||
|
||||
from .utils import (
|
||||
from_k8s_date,
|
||||
@ -430,6 +429,7 @@ class BtrixOperator(K8sAPI):
|
||||
async def cancel_crawl(self, redis_url, crawl_id, cid, status, state):
|
||||
"""immediately cancel crawl with specified state
|
||||
return true if db mark_finished update succeeds"""
|
||||
redis = None
|
||||
try:
|
||||
redis = await self._get_redis(redis_url)
|
||||
await self.mark_finished(redis, crawl_id, uuid.UUID(cid), status, state)
|
||||
@ -438,6 +438,10 @@ class BtrixOperator(K8sAPI):
|
||||
except:
|
||||
return False
|
||||
|
||||
finally:
|
||||
if redis:
|
||||
await redis.close()
|
||||
|
||||
def _done_response(self, status, finalized=False):
|
||||
"""done response for removing crawl"""
|
||||
return {
|
||||
@ -462,15 +466,16 @@ class BtrixOperator(K8sAPI):
|
||||
"""init redis, ensure connectivity"""
|
||||
redis = None
|
||||
try:
|
||||
redis = await aioredis.from_url(
|
||||
redis_url, encoding="utf-8", decode_responses=True
|
||||
)
|
||||
redis = await self.get_redis_client(redis_url)
|
||||
# test connection
|
||||
await redis.ping()
|
||||
return redis
|
||||
|
||||
# pylint: disable=bare-except
|
||||
except:
|
||||
if redis:
|
||||
await redis.close()
|
||||
|
||||
return None
|
||||
|
||||
async def check_if_finished(self, crawl, status):
|
||||
@ -512,6 +517,7 @@ class BtrixOperator(K8sAPI):
|
||||
status.resync_after = self.fast_retry_secs
|
||||
return status
|
||||
|
||||
try:
|
||||
# set state to running (if not already)
|
||||
if status.state not in RUNNING_STATES:
|
||||
await self.set_state(
|
||||
@ -521,7 +527,6 @@ class BtrixOperator(K8sAPI):
|
||||
allowed_from=["starting", "waiting_capacity"],
|
||||
)
|
||||
|
||||
try:
|
||||
file_done = await redis.lpop(self.done_key)
|
||||
|
||||
while file_done:
|
||||
@ -547,6 +552,9 @@ class BtrixOperator(K8sAPI):
|
||||
print(f"Crawl get failed: {exc}, will try again")
|
||||
return status
|
||||
|
||||
finally:
|
||||
await redis.close()
|
||||
|
||||
def check_if_pods_running(self, pods):
|
||||
"""check if at least one crawler pod has started"""
|
||||
try:
|
||||
|
@ -5,7 +5,7 @@ loguru
|
||||
aiofiles
|
||||
kubernetes-asyncio==22.6.5
|
||||
aiobotocore
|
||||
redis>=4.2.0rc1
|
||||
redis>=5.0.0rc2
|
||||
pyyaml
|
||||
jinja2
|
||||
humanize
|
||||
|
Loading…
Reference in New Issue
Block a user