Add API endpoint to delete org (#1448)

Fixes #903 

Adds superuser-only API endpoint to delete an org and all of its data

---------

Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
This commit is contained in:
Tessa Walsh 2024-07-03 16:00:11 -04:00 committed by GitHub
parent 9088101ef6
commit 192737ea99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 139 additions and 17 deletions

View File

@ -197,7 +197,8 @@ class BackgroundJobOps:
job_id = await self.create_delete_replica_job(
org, file, object_id, object_type, replica_ref
)
ids.append(job_id)
if job_id:
ids.append(job_id)
return {"added": True, "ids": ids}
@ -209,17 +210,17 @@ class BackgroundJobOps:
object_type: str,
replica_ref: StorageRef,
existing_job_id: Optional[str] = None,
) -> str:
) -> Optional[str]:
"""Create a job to delete one replica of a given file"""
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
replica_endpoint, bucket_suffix = self.strip_bucket(
replica_storage.endpoint_url
)
replica_file_path = bucket_suffix + file.filename
job_type = BgJobType.DELETE_REPLICA.value
try:
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
replica_endpoint, bucket_suffix = self.strip_bucket(
replica_storage.endpoint_url
)
replica_file_path = bucket_suffix + file.filename
job_type = BgJobType.DELETE_REPLICA.value
job_id = await self.crawl_manager.run_replica_job(
oid=str(org.id),
job_type=job_type,
@ -262,11 +263,13 @@ class BackgroundJobOps:
return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
raise HTTPException(
status_code=400, detail=f"Error starting background job: {exc}"
print(
"warning: replica deletion job could not be started "
+ f"for {object_type} {file}: {exc}"
)
return None
async def job_finished(
self,

View File

@ -162,7 +162,7 @@ def main():
init_uploads_api(*base_crawl_init)
org_ops.set_base_crawl_ops(base_crawl_ops)
org_ops.set_ops(base_crawl_ops, profiles, coll_ops)
user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

View File

@ -54,6 +54,7 @@ from .models import (
Collection,
OrgOutExport,
PageWithAllQA,
DeleteCrawlList,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import slug_from_name, validate_slug, JSONSerializer
@ -61,15 +62,20 @@ from .utils import slug_from_name, validate_slug, JSONSerializer
if TYPE_CHECKING:
from .invites import InviteOps
from .basecrawls import BaseCrawlOps
from .colls import CollectionOps
from .profiles import ProfileOps
from .users import UserManager
else:
InviteOps = BaseCrawlOps = UserManager = object
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = UserManager = object
DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization")
MAX_CRAWL_SCALE = int(os.environ.get("MAX_CRAWL_SCALE", 3))
# number of items to delete at a time
DEL_ITEMS = 1000
# ============================================================================
# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-locals
@ -91,6 +97,7 @@ class OrgOps:
self.users_db = mdb["users"]
self.pages_db = mdb["pages"]
self.version_db = mdb["version"]
self.invites_db = mdb["invites"]
self.router = None
self.org_viewer_dep = None
@ -104,9 +111,17 @@ class OrgOps:
self.user_manager = user_manager
self.register_to_org_id = os.environ.get("REGISTER_TO_ORG_ID")
def set_base_crawl_ops(self, base_crawl_ops: BaseCrawlOps) -> None:
def set_ops(
self,
base_crawl_ops: BaseCrawlOps,
profile_ops: ProfileOps,
coll_ops: CollectionOps,
) -> None:
"""Set base crawl ops"""
# pylint: disable=attribute-defined-outside-init
self.base_crawl_ops = base_crawl_ops
self.profile_ops = profile_ops
self.coll_ops = coll_ops
def set_default_primary_storage(self, storage: StorageRef):
"""set default primary storage"""
@ -1023,6 +1038,59 @@ class OrgOps:
collection = json_stream.to_standard_types(collection)
await self.colls_db.insert_one(Collection.from_dict(collection).to_dict())
async def delete_org_and_data(self, org: Organization, user_manager: UserManager):
"""Delete org and all of its associated data."""
# Delete archived items
cursor = self.crawls_db.find({"oid": org.id}, projection=["_id"])
items = await cursor.to_list(length=DEL_ITEMS)
while items:
item_ids = [item["_id"] for item in items]
await self.base_crawl_ops.delete_crawls_all_types(
delete_list=DeleteCrawlList(crawl_ids=item_ids), org=org
)
items = await cursor.to_list(length=DEL_ITEMS)
# Delete workflows and revisions
cursor = self.crawl_configs_db.find({"oid": org.id}, projection=["_id"])
workflows = await cursor.to_list(length=DEL_ITEMS)
while workflows:
workflow_ids = [workflow["_id"] for workflow in workflows]
await self.configs_revs_db.delete_many({"cid": {"$in": workflow_ids}})
workflows = await cursor.to_list(length=DEL_ITEMS)
await self.crawl_configs_db.delete_many({"oid": org.id})
# Delete profiles
async for profile in self.profiles_db.find({"oid": org.id}, projection=["_id"]):
await self.profile_ops.delete_profile(profile["_id"], org)
# Delete collections
async for coll in self.colls_db.find({"oid": org.id}, projection=["_id"]):
await self.coll_ops.delete_collection(coll["_id"], org)
# Delete users that only belong to this org
for org_user_id in org.users.keys():
user = await user_manager.get_by_id(UUID(org_user_id))
if not user:
continue
orgs, total_orgs = await self.get_orgs_for_user(user)
if total_orgs == 1:
first_org = orgs[0]
if first_org.id != org.id:
continue
await self.users_db.delete_one({"id": user.id})
# Delete invites
await self.invites_db.delete_many({"oid": org.id})
# Delete org
await self.orgs.delete_one({"_id": org.id})
return {"deleted": True}
# ============================================================================
# pylint: disable=too-many-statements, too-many-arguments
@ -1165,6 +1233,15 @@ def init_orgs_api(
org_out.execMinutesQuotaReached = await ops.exec_mins_quota_reached(org.id)
return org_out
@router.delete("", tags=["organizations"])
async def delete_org(
org: Organization = Depends(org_dep), user: User = Depends(user_dep)
):
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")
return await ops.delete_org_and_data(org, user_manager)
@router.post("/rename", tags=["organizations"])
async def rename_org(
rename: RenameOrg,

View File

@ -1028,5 +1028,5 @@ def test_delete_form_upload_and_crawls_from_all_crawls(
if count + 1 == MAX_ATTEMPTS:
assert False
time.sleep(5)
time.sleep(10)
count += 1

View File

@ -0,0 +1,42 @@
import requests
from .conftest import API_PREFIX
def test_delete_org_non_superadmin(crawler_auth_headers, default_org_id):
# Assert that non-superadmin can't delete org
r = requests.delete(
f"{API_PREFIX}/orgs/{default_org_id}", headers=crawler_auth_headers
)
assert r.status_code == 403
assert r.json()["detail"] == "Not Allowed"
def test_delete_org_superadmin(admin_auth_headers, default_org_id):
# Track items in org to ensure they're deleted later (we may want to expand
# this, but currently only have the ability to check items across all orgs)
item_ids = []
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/all-crawls", headers=admin_auth_headers
)
assert r.status_code == 200
data = r.json()
assert data["total"] > 0
for item in data["items"]:
item_ids.append(item["id"])
# Delete org and its data
r = requests.delete(
f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers
)
assert r.status_code == 200
assert r.json()["deleted"]
# Ensure items got deleted
for item_id in item_ids:
r = requests.get(
f"{API_PREFIX}/orgs/all/all-crawls/{item_id}/replay.json",
headers=admin_auth_headers,
)
assert r.status_code == 404