From 62da0fbd6ce5277a5e34681b8e37e1f11faf9de1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Fri, 20 Sep 2024 11:52:56 -0700 Subject: [PATCH] security: tweak get /invite endpoints / InviteOut to: (#2087) don't set inviterEmail / inviterName if the inviter is the superuser: - return fromSuperuser true/false - if fromSuperuser, don't set inviterEmail / inviterName - tests: add tests for non-superuser admin invites --- backend/.pylintrc | 3 +- backend/btrixcloud/invites.py | 19 +++++++--- backend/btrixcloud/models.py | 5 +-- backend/btrixcloud/orgs.py | 4 +-- backend/test/test_org_subs.py | 3 ++ backend/test/test_users.py | 67 +++++++++++++++++++++++++++++++---- 6 files changed, 84 insertions(+), 17 deletions(-) diff --git a/backend/.pylintrc b/backend/.pylintrc index 673170f0..2e703c84 100644 --- a/backend/.pylintrc +++ b/backend/.pylintrc @@ -1,3 +1,4 @@ -[MASTER] +[MAIN] extension-pkg-whitelist=pydantic ignore-paths=btrixcloud/migrations/migration_0028_page_files_errors.py +disable=too-many-positional-arguments,invalid-name diff --git a/backend/btrixcloud/invites.py b/backend/btrixcloud/invites.py index 03d90963..22a1e9af 100644 --- a/backend/btrixcloud/invites.py +++ b/backend/btrixcloud/invites.py @@ -259,14 +259,23 @@ class InviteOps: self, invite: InvitePending, users: UserManager, include_first_org_admin=False ) -> InviteOut: """format an InvitePending to return via api, resolve name of inviter""" - inviter = await users.get_by_email(invite.inviterEmail) - if not inviter: - raise HTTPException(status_code=400, detail="invalid_invite") + from_superuser = invite.fromSuperuser + inviter_name = None + inviter_email = None + inviter = None + if not from_superuser: + inviter = await users.get_by_email(invite.inviterEmail) + if not inviter: + raise HTTPException(status_code=400, detail="invalid_invite") + + inviter_name = inviter.name + inviter_email = invite.inviterEmail invite_out = InviteOut( created=invite.created, - inviterEmail=invite.inviterEmail, - inviterName=inviter.name, + inviterEmail=inviter_email, + inviterName=inviter_name, + fromSuperuser=from_superuser, oid=invite.oid, role=invite.role, email=invite.email, diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 0ff0347f..a011cbf1 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -94,8 +94,9 @@ class InviteOut(BaseModel): """Single invite output model""" created: datetime - inviterEmail: EmailStr - inviterName: str + inviterEmail: Optional[EmailStr] = None + inviterName: Optional[str] = None + fromSuperuser: bool oid: Optional[UUID] = None orgName: Optional[str] = None orgSlug: Optional[str] = None diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index c517bb07..2b278caf 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -256,11 +256,11 @@ class OrgOps: return [Organization.from_dict(data) for data in items], total async def get_org_for_user_by_id( - self, oid: UUID, user: User, role: UserRole = UserRole.VIEWER + self, oid: UUID, user: Optional[User], role: UserRole = UserRole.VIEWER ) -> Optional[Organization]: """Get an org for user by unique id""" query: dict[str, object] - if user.is_superuser: + if not user or user.is_superuser: query = {"_id": oid} else: query = {f"users.{user.id}": {"$gte": role.value}, "_id": oid} diff --git a/backend/test/test_org_subs.py b/backend/test/test_org_subs.py index 88d93d79..9d795d1a 100644 --- a/backend/test/test_org_subs.py +++ b/backend/test/test_org_subs.py @@ -206,6 +206,9 @@ def test_login_existing_user_for_invite(): assert data["firstOrgAdmin"] == True assert data["orgName"] == data["oid"] assert data["orgName"] == data["orgSlug"] + assert data["fromSuperuser"] == True + assert not data["inviterEmail"] + assert not data["inviterName"] # Accept existing user invite r = requests.post( diff --git a/backend/test/test_users.py b/backend/test/test_users.py index 2ad8e0a3..71d013dc 100644 --- a/backend/test/test_users.py +++ b/backend/test/test_users.py @@ -18,6 +18,9 @@ VALID_USER_PW = "validpassw0rd!" VALID_USER_PW_RESET = "new!password" VALID_USER_PW_RESET_AGAIN = "new!password1" +ADMIN_ROLE = 40 +CRAWLER_ROLE = 20 + my_id = None valid_user_headers = None @@ -26,6 +29,10 @@ existing_user_invite_token = None wrong_token = None +new_user_auth_headers = None +another_user_email = "another-user@example.com" + + def test_create_super_user(admin_auth_headers): assert admin_auth_headers auth = admin_auth_headers["Authorization"] @@ -67,7 +74,7 @@ def test_me_with_orgs(crawler_auth_headers, default_org_id): assert default_org["id"] == default_org_id assert default_org["name"] assert default_org["default"] - assert default_org["role"] == 20 + assert default_org["role"] == CRAWLER_ROLE def test_me_id(admin_auth_headers, default_org_id): @@ -114,7 +121,7 @@ def test_login_user_info(admin_auth_headers, crawler_userid, default_org_id): assert org["name"] == default_org["name"] assert org["slug"] == default_org["slug"] assert org["default"] - assert org["role"] == 20 + assert org["role"] == CRAWLER_ROLE def test_login_case_insensitive_email(): @@ -139,7 +146,7 @@ def test_add_user_to_org_invalid_password(admin_auth_headers, default_org_id): "password": "pw", "name": "invalid pw user", "description": "test invalid password", - "role": 20, + "role": CRAWLER_ROLE, }, headers=admin_auth_headers, ) @@ -153,7 +160,7 @@ def test_register_user_invalid_password(admin_auth_headers, default_org_id): r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/invite", headers=admin_auth_headers, - json={"email": email, "role": 20}, + json={"email": email, "role": CRAWLER_ROLE}, ) assert r.status_code == 200 data = r.json() @@ -184,7 +191,7 @@ def test_new_user_send_invite(admin_auth_headers, default_org_id): r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/invite", headers=admin_auth_headers, - json={"email": VALID_USER_EMAIL, "role": 20}, + json={"email": VALID_USER_EMAIL, "role": CRAWLER_ROLE}, ) assert r.status_code == 200 data = r.json() @@ -224,6 +231,10 @@ def test_new_user_token(): f"{API_PREFIX}/users/invite/{new_user_invite_token}?email={VALID_USER_EMAIL}", ) assert r.status_code == 200 + data = r.json() + assert data["fromSuperuser"] + assert not data["inviterEmail"] + assert not data["inviterName"] def test_register_user_no_invite(): @@ -346,7 +357,7 @@ def test_existing_user_send_invite(admin_auth_headers, non_default_org_id): r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invite", headers=admin_auth_headers, - json={"email": VALID_USER_EMAIL, "role": 20}, + json={"email": VALID_USER_EMAIL, "role": CRAWLER_ROLE}, ) assert r.status_code == 200 data = r.json() @@ -401,6 +412,10 @@ def test_login_existing_user_for_invite(): headers=auth_headers, ) assert r.status_code == 200 + data = r.json() + assert data["fromSuperuser"] + assert not data["inviterEmail"] + assert not data["inviterName"] # Accept existing user invite r = requests.post( @@ -408,6 +423,9 @@ def test_login_existing_user_for_invite(): headers=auth_headers, ) + global new_user_auth_headers + new_user_auth_headers = auth_headers + def test_pending_invites_clear(admin_auth_headers, non_default_org_id): r = requests.get( @@ -450,17 +468,52 @@ def test_user_part_of_two_orgs(default_org_id, non_default_org_id): assert non_default_org_id in org_ids +def test_non_crawler_user_cant_invite(default_org_id): + # Send invite + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/invite", + headers=new_user_auth_headers, + json={"email": another_user_email, "role": CRAWLER_ROLE}, + ) + assert r.status_code == 403 + + def test_user_change_role(admin_auth_headers, default_org_id): r = requests.patch( f"{API_PREFIX}/orgs/{default_org_id}/user-role", headers=admin_auth_headers, - json={"email": VALID_USER_EMAIL, "role": 40}, + json={"email": VALID_USER_EMAIL, "role": ADMIN_ROLE}, ) assert r.status_code == 200 assert r.json()["updated"] == True +def test_non_superadmin_admin_can_invite(default_org_id): + # Send invite + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/invite", + headers=new_user_auth_headers, + json={"email": another_user_email, "role": CRAWLER_ROLE}, + ) + assert r.status_code == 200 + data = r.json() + + assert data["invited"] == "new_user" + + another_token = data["token"] + + # Confirm token is valid (no auth needed) + r = requests.get( + f"{API_PREFIX}/users/invite/{another_token}?email={another_user_email}", + ) + assert r.status_code == 200 + data = r.json() + assert not data["fromSuperuser"] + assert data["inviterEmail"] == VALID_USER_EMAIL + assert data["inviterName"] == "valid" + + def test_forgot_password(): r = requests.post( f"{API_PREFIX}/auth/forgot-password", json={"email": "no-such-user@example.com"}