security: tweak get /invite endpoints / InviteOut to: (#2087)
don't set inviterEmail / inviterName if the inviter is the superuser: - return fromSuperuser true/false - if fromSuperuser, don't set inviterEmail / inviterName - tests: add tests for non-superuser admin invites
This commit is contained in:
parent
a674689354
commit
62da0fbd6c
@ -1,3 +1,4 @@
|
||||
[MASTER]
|
||||
[MAIN]
|
||||
extension-pkg-whitelist=pydantic
|
||||
ignore-paths=btrixcloud/migrations/migration_0028_page_files_errors.py
|
||||
disable=too-many-positional-arguments,invalid-name
|
||||
|
@ -259,14 +259,23 @@ class InviteOps:
|
||||
self, invite: InvitePending, users: UserManager, include_first_org_admin=False
|
||||
) -> InviteOut:
|
||||
"""format an InvitePending to return via api, resolve name of inviter"""
|
||||
inviter = await users.get_by_email(invite.inviterEmail)
|
||||
if not inviter:
|
||||
raise HTTPException(status_code=400, detail="invalid_invite")
|
||||
from_superuser = invite.fromSuperuser
|
||||
inviter_name = None
|
||||
inviter_email = None
|
||||
inviter = None
|
||||
if not from_superuser:
|
||||
inviter = await users.get_by_email(invite.inviterEmail)
|
||||
if not inviter:
|
||||
raise HTTPException(status_code=400, detail="invalid_invite")
|
||||
|
||||
inviter_name = inviter.name
|
||||
inviter_email = invite.inviterEmail
|
||||
|
||||
invite_out = InviteOut(
|
||||
created=invite.created,
|
||||
inviterEmail=invite.inviterEmail,
|
||||
inviterName=inviter.name,
|
||||
inviterEmail=inviter_email,
|
||||
inviterName=inviter_name,
|
||||
fromSuperuser=from_superuser,
|
||||
oid=invite.oid,
|
||||
role=invite.role,
|
||||
email=invite.email,
|
||||
|
@ -94,8 +94,9 @@ class InviteOut(BaseModel):
|
||||
"""Single invite output model"""
|
||||
|
||||
created: datetime
|
||||
inviterEmail: EmailStr
|
||||
inviterName: str
|
||||
inviterEmail: Optional[EmailStr] = None
|
||||
inviterName: Optional[str] = None
|
||||
fromSuperuser: bool
|
||||
oid: Optional[UUID] = None
|
||||
orgName: Optional[str] = None
|
||||
orgSlug: Optional[str] = None
|
||||
|
@ -256,11 +256,11 @@ class OrgOps:
|
||||
return [Organization.from_dict(data) for data in items], total
|
||||
|
||||
async def get_org_for_user_by_id(
|
||||
self, oid: UUID, user: User, role: UserRole = UserRole.VIEWER
|
||||
self, oid: UUID, user: Optional[User], role: UserRole = UserRole.VIEWER
|
||||
) -> Optional[Organization]:
|
||||
"""Get an org for user by unique id"""
|
||||
query: dict[str, object]
|
||||
if user.is_superuser:
|
||||
if not user or user.is_superuser:
|
||||
query = {"_id": oid}
|
||||
else:
|
||||
query = {f"users.{user.id}": {"$gte": role.value}, "_id": oid}
|
||||
|
@ -206,6 +206,9 @@ def test_login_existing_user_for_invite():
|
||||
assert data["firstOrgAdmin"] == True
|
||||
assert data["orgName"] == data["oid"]
|
||||
assert data["orgName"] == data["orgSlug"]
|
||||
assert data["fromSuperuser"] == True
|
||||
assert not data["inviterEmail"]
|
||||
assert not data["inviterName"]
|
||||
|
||||
# Accept existing user invite
|
||||
r = requests.post(
|
||||
|
@ -18,6 +18,9 @@ VALID_USER_PW = "validpassw0rd!"
|
||||
VALID_USER_PW_RESET = "new!password"
|
||||
VALID_USER_PW_RESET_AGAIN = "new!password1"
|
||||
|
||||
ADMIN_ROLE = 40
|
||||
CRAWLER_ROLE = 20
|
||||
|
||||
my_id = None
|
||||
valid_user_headers = None
|
||||
|
||||
@ -26,6 +29,10 @@ existing_user_invite_token = None
|
||||
wrong_token = None
|
||||
|
||||
|
||||
new_user_auth_headers = None
|
||||
another_user_email = "another-user@example.com"
|
||||
|
||||
|
||||
def test_create_super_user(admin_auth_headers):
|
||||
assert admin_auth_headers
|
||||
auth = admin_auth_headers["Authorization"]
|
||||
@ -67,7 +74,7 @@ def test_me_with_orgs(crawler_auth_headers, default_org_id):
|
||||
assert default_org["id"] == default_org_id
|
||||
assert default_org["name"]
|
||||
assert default_org["default"]
|
||||
assert default_org["role"] == 20
|
||||
assert default_org["role"] == CRAWLER_ROLE
|
||||
|
||||
|
||||
def test_me_id(admin_auth_headers, default_org_id):
|
||||
@ -114,7 +121,7 @@ def test_login_user_info(admin_auth_headers, crawler_userid, default_org_id):
|
||||
assert org["name"] == default_org["name"]
|
||||
assert org["slug"] == default_org["slug"]
|
||||
assert org["default"]
|
||||
assert org["role"] == 20
|
||||
assert org["role"] == CRAWLER_ROLE
|
||||
|
||||
|
||||
def test_login_case_insensitive_email():
|
||||
@ -139,7 +146,7 @@ def test_add_user_to_org_invalid_password(admin_auth_headers, default_org_id):
|
||||
"password": "pw",
|
||||
"name": "invalid pw user",
|
||||
"description": "test invalid password",
|
||||
"role": 20,
|
||||
"role": CRAWLER_ROLE,
|
||||
},
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
@ -153,7 +160,7 @@ def test_register_user_invalid_password(admin_auth_headers, default_org_id):
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
||||
headers=admin_auth_headers,
|
||||
json={"email": email, "role": 20},
|
||||
json={"email": email, "role": CRAWLER_ROLE},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
@ -184,7 +191,7 @@ def test_new_user_send_invite(admin_auth_headers, default_org_id):
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
||||
headers=admin_auth_headers,
|
||||
json={"email": VALID_USER_EMAIL, "role": 20},
|
||||
json={"email": VALID_USER_EMAIL, "role": CRAWLER_ROLE},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
@ -224,6 +231,10 @@ def test_new_user_token():
|
||||
f"{API_PREFIX}/users/invite/{new_user_invite_token}?email={VALID_USER_EMAIL}",
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert data["fromSuperuser"]
|
||||
assert not data["inviterEmail"]
|
||||
assert not data["inviterName"]
|
||||
|
||||
|
||||
def test_register_user_no_invite():
|
||||
@ -346,7 +357,7 @@ def test_existing_user_send_invite(admin_auth_headers, non_default_org_id):
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{non_default_org_id}/invite",
|
||||
headers=admin_auth_headers,
|
||||
json={"email": VALID_USER_EMAIL, "role": 20},
|
||||
json={"email": VALID_USER_EMAIL, "role": CRAWLER_ROLE},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
@ -401,6 +412,10 @@ def test_login_existing_user_for_invite():
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert data["fromSuperuser"]
|
||||
assert not data["inviterEmail"]
|
||||
assert not data["inviterName"]
|
||||
|
||||
# Accept existing user invite
|
||||
r = requests.post(
|
||||
@ -408,6 +423,9 @@ def test_login_existing_user_for_invite():
|
||||
headers=auth_headers,
|
||||
)
|
||||
|
||||
global new_user_auth_headers
|
||||
new_user_auth_headers = auth_headers
|
||||
|
||||
|
||||
def test_pending_invites_clear(admin_auth_headers, non_default_org_id):
|
||||
r = requests.get(
|
||||
@ -450,17 +468,52 @@ def test_user_part_of_two_orgs(default_org_id, non_default_org_id):
|
||||
assert non_default_org_id in org_ids
|
||||
|
||||
|
||||
def test_non_crawler_user_cant_invite(default_org_id):
|
||||
# Send invite
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
||||
headers=new_user_auth_headers,
|
||||
json={"email": another_user_email, "role": CRAWLER_ROLE},
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_user_change_role(admin_auth_headers, default_org_id):
|
||||
r = requests.patch(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/user-role",
|
||||
headers=admin_auth_headers,
|
||||
json={"email": VALID_USER_EMAIL, "role": 40},
|
||||
json={"email": VALID_USER_EMAIL, "role": ADMIN_ROLE},
|
||||
)
|
||||
|
||||
assert r.status_code == 200
|
||||
assert r.json()["updated"] == True
|
||||
|
||||
|
||||
def test_non_superadmin_admin_can_invite(default_org_id):
|
||||
# Send invite
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
||||
headers=new_user_auth_headers,
|
||||
json={"email": another_user_email, "role": CRAWLER_ROLE},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
|
||||
assert data["invited"] == "new_user"
|
||||
|
||||
another_token = data["token"]
|
||||
|
||||
# Confirm token is valid (no auth needed)
|
||||
r = requests.get(
|
||||
f"{API_PREFIX}/users/invite/{another_token}?email={another_user_email}",
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert not data["fromSuperuser"]
|
||||
assert data["inviterEmail"] == VALID_USER_EMAIL
|
||||
assert data["inviterName"] == "valid"
|
||||
|
||||
|
||||
def test_forgot_password():
|
||||
r = requests.post(
|
||||
f"{API_PREFIX}/auth/forgot-password", json={"email": "no-such-user@example.com"}
|
||||
|
Loading…
Reference in New Issue
Block a user