- Add slug field with uniqueness constraint to Organization - Use python-slugify to generate slug from name and import that in migration - Require name in all /rename and org creation requests - Auto-generate slug for new org with no slug or when /rename is called w/o a slug - Auto-generate slug for 'default-org' based on name - Add /api/orgs/slugs GET endpoint to return all slugs in use - tests: extend backend test-requirements.txt from requirements to allow testing slugify - tests: move get_redis_crawl_stats() to avoid extra dependency in utils
100 lines
2.3 KiB
Python
100 lines
2.3 KiB
Python
""" k8s utils """
|
|
|
|
from typing import Optional
|
|
import os
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import signal
|
|
import atexit
|
|
|
|
from datetime import datetime
|
|
|
|
from slugify import slugify
|
|
|
|
|
|
def get_templates_dir():
|
|
"""return directory containing templates for loading"""
|
|
return os.path.join(os.path.dirname(__file__), "templates")
|
|
|
|
|
|
def from_k8s_date(string):
|
|
"""convert k8s date string to datetime"""
|
|
return datetime.fromisoformat(string[:-1]) if string else None
|
|
|
|
|
|
def to_k8s_date(dt_val):
|
|
"""convert datetime to string for k8s"""
|
|
return dt_val.isoformat("T") + "Z"
|
|
|
|
|
|
def dt_now():
|
|
"""get current ts"""
|
|
return datetime.utcnow().replace(microsecond=0, tzinfo=None)
|
|
|
|
|
|
def ts_now():
|
|
"""get current ts"""
|
|
return str(dt_now())
|
|
|
|
|
|
def run_once_lock(name):
|
|
"""run once lock via temp directory
|
|
- if dir doesn't exist, return true
|
|
- if exists, return false"""
|
|
lock_dir = "/tmp/." + name
|
|
try:
|
|
os.mkdir(lock_dir)
|
|
# pylint: disable=bare-except
|
|
except:
|
|
return False
|
|
|
|
# just in case, delete dir on exit
|
|
def del_dir():
|
|
print("release lock: " + lock_dir, flush=True)
|
|
os.rmdir(lock_dir)
|
|
|
|
atexit.register(del_dir)
|
|
return True
|
|
|
|
|
|
def register_exit_handler():
|
|
"""register exit handler to exit on SIGTERM"""
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def exit_handler():
|
|
"""sigterm handler"""
|
|
print("SIGTERM received, exiting")
|
|
sys.exit(1)
|
|
|
|
loop.add_signal_handler(signal.SIGTERM, exit_handler)
|
|
|
|
|
|
def parse_jsonl_error_messages(errors):
|
|
"""parse json-l error strings from redis/db into json"""
|
|
parsed_errors = []
|
|
for error_line in errors:
|
|
if not error_line:
|
|
continue
|
|
try:
|
|
result = json.loads(error_line)
|
|
parsed_errors.append(result)
|
|
except json.JSONDecodeError as err:
|
|
print(
|
|
f"Error decoding json-l error line: {error_line}. Error: {err}",
|
|
flush=True,
|
|
)
|
|
return parsed_errors
|
|
|
|
|
|
def is_bool(stri: Optional[str]) -> bool:
|
|
"""Check if the string parameter is stringly true"""
|
|
if stri:
|
|
return stri.lower() in ("true", "1", "yes")
|
|
return False
|
|
|
|
|
|
def slug_from_name(name: str) -> str:
|
|
"""Generate slug from name"""
|
|
return slugify(name.replace("'", ""))
|