Invite token improvements (#564)

- URL decode email address in invites.invite_user
- Add tests for accepting invites
This commit is contained in:
Tessa Walsh 2023-02-07 23:40:28 -05:00 committed by GitHub
parent a7a5b7fd63
commit 95155e6fbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 163 additions and 4 deletions

View File

@ -3,8 +3,9 @@
from datetime import datetime from datetime import datetime
from enum import IntEnum from enum import IntEnum
from typing import Optional from typing import Optional
import uuid
import os import os
import urllib.parse
import uuid
from pydantic import BaseModel, UUID4 from pydantic import BaseModel, UUID4
from fastapi import HTTPException from fastapi import HTTPException
@ -135,8 +136,12 @@ class InviteOps:
allow_existing=False, allow_existing=False,
headers: dict = None, headers: dict = None,
): ):
"""create new invite for user to join, optionally an org. """Invite user to org (if not specified, to default org).
if allow_existing is false, don't allow invites to existing users"""
If allow_existing is false, don't allow invites to existing users.
:returns: is_new_user (bool), invite token (str)
"""
invite_code = uuid.uuid4().hex invite_code = uuid.uuid4().hex
if org: if org:
@ -152,7 +157,8 @@ class InviteOps:
oid=oid, oid=oid,
created=datetime.utcnow(), created=datetime.utcnow(),
role=invite.role if hasattr(invite, "role") else None, role=invite.role if hasattr(invite, "role") else None,
email=invite.email, # URL decode email address just in case
email=urllib.parse.unquote(invite.email),
inviterEmail=user.email, inviterEmail=user.email,
) )

View File

@ -1,5 +1,7 @@
import requests import requests
import pytest
from .conftest import API_PREFIX from .conftest import API_PREFIX
@ -38,3 +40,75 @@ def test_pending_invites_crawler(crawler_auth_headers, default_org_id):
# Verify that only superusers can see pending invites # Verify that only superusers can see pending invites
r = requests.get(f"{API_PREFIX}/users/invites", headers=crawler_auth_headers) r = requests.get(f"{API_PREFIX}/users/invites", headers=crawler_auth_headers)
assert r.status_code == 403 assert r.status_code == 403
@pytest.mark.parametrize(
"invite_email, expected_stored_email",
[
# Standard email
("invite-to-accept@example.com", "invite-to-accept@example.com"),
# Email address with comments
("user+comment@example.com", "user+comment@example.com"),
# URL encoded email address with comments
("user%2Bcomment-encoded%40example.com", "user+comment-encoded@example.com"),
# User email with diacritic characters
("diacritic-tést@example.com", "diacritic-tést@example.com"),
# User email with encoded diacritic characters
(
"diacritic-t%C3%A9st-encoded%40example.com",
"diacritic-tést-encoded@example.com",
),
],
)
def test_send_and_accept_invite(
admin_auth_headers, default_org_id, invite_email, expected_stored_email
):
# Send invite
r = requests.post(
f"{API_PREFIX}/users/invite",
headers=admin_auth_headers,
json={"email": invite_email},
)
assert r.status_code == 200
data = r.json()
assert data["invited"] == "new_user"
# Look up token
r = requests.get(
f"{API_PREFIX}/users/invites",
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
invites_matching_email = [
invite
for invite in data["pending_invites"]
if invite["email"] == expected_stored_email
]
token = invites_matching_email[0]["id"]
# Register user
# Note: This will accept invitation without needing to call the
# accept invite endpoint explicitly due to post-registration hook.
r = requests.post(
f"{API_PREFIX}/auth/register",
headers=admin_auth_headers,
json={
"name": "accepted",
"email": expected_stored_email,
"password": "testpw",
"inviteToken": token,
"newOrg": False,
},
)
assert r.status_code == 201
# Verify user belongs to org
r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
assert r.status_code == 200
data = r.json()
users = data["users"]
users_with_invited_email = [
user for user in users.values() if user["email"] == expected_stored_email
]
assert len(users_with_invited_email) == 1

View File

@ -1,5 +1,7 @@
import requests import requests
import pytest
from .conftest import API_PREFIX from .conftest import API_PREFIX
@ -152,3 +154,80 @@ def test_get_pending_org_invites(
assert invite["oid"] == non_default_org_id assert invite["oid"] == non_default_org_id
assert invite["created"] assert invite["created"]
assert invite["role"] assert invite["role"]
@pytest.mark.parametrize(
"invite_email, expected_stored_email",
[
# Standard email
("invite-to-accept-org@example.com", "invite-to-accept-org@example.com"),
# Email address with comments
("user+comment-org@example.com", "user+comment-org@example.com"),
# URL encoded email address with comments
(
"user%2Bcomment-encoded-org%40example.com",
"user+comment-encoded-org@example.com",
),
# User email with diacritic characters
("diacritic-tést-org@example.com", "diacritic-tést-org@example.com"),
# User email with encoded diacritic characters
(
"diacritic-t%C3%A9st-encoded-org%40example.com",
"diacritic-tést-encoded-org@example.com",
),
],
)
def test_send_and_accept_org_invite(
admin_auth_headers, non_default_org_id, invite_email, expected_stored_email
):
# Send invite
r = requests.post(
f"{API_PREFIX}/orgs/{non_default_org_id}/invite",
headers=admin_auth_headers,
json={"email": invite_email, "role": 20},
)
assert r.status_code == 200
data = r.json()
assert data["invited"] == "new_user"
# Look up token
r = requests.get(
f"{API_PREFIX}/orgs/{non_default_org_id}/invites",
headers=admin_auth_headers,
)
assert r.status_code == 200
data = r.json()
invites_matching_email = [
invite
for invite in data["pending_invites"]
if invite["email"] == expected_stored_email
]
token = invites_matching_email[0]["id"]
# Register user
# Note: This will accept invitation without needing to call the
# accept invite endpoint explicitly due to post-registration hook.
r = requests.post(
f"{API_PREFIX}/auth/register",
headers=admin_auth_headers,
json={
"name": "accepted",
"email": expected_stored_email,
"password": "testpw",
"inviteToken": token,
"newOrg": False,
},
)
assert r.status_code == 201
# Verify user belongs to org
r = requests.get(
f"{API_PREFIX}/orgs/{non_default_org_id}", headers=admin_auth_headers
)
assert r.status_code == 200
data = r.json()
users = data["users"]
users_with_invited_email = [
user for user in users.values() if user["email"] == expected_stored_email
]
assert len(users_with_invited_email) == 1