820 lines
23 KiB
Python
820 lines
23 KiB
Python
import requests
|
|
import time
|
|
from uuid import uuid4
|
|
|
|
from .conftest import (
|
|
API_PREFIX,
|
|
CRAWLER_USERNAME,
|
|
CRAWLER_USERNAME_LOWERCASE,
|
|
CRAWLER_PW,
|
|
ADMIN_USERNAME,
|
|
FINISHED_STATES,
|
|
)
|
|
|
|
INVALID_PASSWORD_EMAIL = "invalidpassword@example.com"
|
|
VALID_USER_EMAIL = "validpassword@example.com"
|
|
VALID_USER_PW = "validpassw0rd!"
|
|
VALID_USER_PW_RESET = "new!password"
|
|
VALID_USER_PW_RESET_AGAIN = "new!password1"
|
|
|
|
ADMIN_ROLE = 40
|
|
CRAWLER_ROLE = 20
|
|
|
|
my_id = None
|
|
valid_user_headers = None
|
|
|
|
new_user_invite_token = None
|
|
existing_user_invite_token = None
|
|
wrong_token = None
|
|
|
|
|
|
new_user_auth_headers = None
|
|
another_user_email = "another-user@example.com"
|
|
|
|
|
|
def test_create_super_user(admin_auth_headers):
|
|
assert admin_auth_headers
|
|
auth = admin_auth_headers["Authorization"]
|
|
token = auth.replace("Bearer ", "")
|
|
assert token != "None"
|
|
assert len(token) > 4
|
|
|
|
|
|
def test_create_non_super_user(viewer_auth_headers):
|
|
assert viewer_auth_headers
|
|
auth = viewer_auth_headers["Authorization"]
|
|
token = auth.replace("Bearer ", "")
|
|
assert token != "None"
|
|
assert len(token) > 4
|
|
|
|
|
|
def test_me_with_orgs(crawler_auth_headers, default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/me",
|
|
headers=crawler_auth_headers,
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
data = r.json()
|
|
assert data["email"] == CRAWLER_USERNAME_LOWERCASE
|
|
assert data["id"]
|
|
assert data["is_superuser"] is False
|
|
assert data["is_verified"] is True
|
|
assert data["name"] == "new-crawler"
|
|
|
|
orgs = data["orgs"]
|
|
assert len(orgs) == 1
|
|
|
|
global my_id
|
|
my_id = data["id"]
|
|
|
|
default_org = orgs[0]
|
|
assert default_org["id"] == default_org_id
|
|
assert default_org["name"]
|
|
assert default_org["default"]
|
|
assert default_org["role"] == CRAWLER_ROLE
|
|
|
|
|
|
def test_me_id(admin_auth_headers, default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/{my_id}",
|
|
headers=admin_auth_headers,
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_login_user_info(admin_auth_headers, crawler_userid, default_org_id):
|
|
# Get default org info for comparison
|
|
r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers)
|
|
default_org = [org for org in r.json()["items"] if org["default"]][0]
|
|
|
|
# Log in and check response
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": CRAWLER_USERNAME,
|
|
"password": CRAWLER_PW,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
data = r.json()
|
|
assert r.status_code == 200
|
|
assert data["access_token"]
|
|
assert data["token_type"] == "bearer"
|
|
|
|
user_info = data["user_info"]
|
|
assert user_info
|
|
|
|
assert user_info["id"] == crawler_userid
|
|
assert user_info["name"] == "new-crawler"
|
|
assert user_info["email"] == CRAWLER_USERNAME_LOWERCASE
|
|
assert user_info["is_superuser"] is False
|
|
assert user_info["is_verified"]
|
|
|
|
user_orgs = user_info["orgs"]
|
|
assert len(user_orgs) == 1
|
|
org = user_orgs[0]
|
|
|
|
assert org["id"] == default_org_id
|
|
assert org["name"] == default_org["name"]
|
|
assert org["slug"] == default_org["slug"]
|
|
assert org["default"]
|
|
assert org["role"] == CRAWLER_ROLE
|
|
|
|
|
|
def test_login_case_insensitive_email():
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": CRAWLER_USERNAME_LOWERCASE,
|
|
"password": CRAWLER_PW,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
data = r.json()
|
|
assert r.status_code == 200
|
|
assert data["access_token"]
|
|
|
|
|
|
def test_add_user_to_org_invalid_password(admin_auth_headers, default_org_id):
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/add-user",
|
|
json={
|
|
"email": INVALID_PASSWORD_EMAIL,
|
|
"password": "pw",
|
|
"name": "invalid pw user",
|
|
"description": "test invalid password",
|
|
"role": CRAWLER_ROLE,
|
|
},
|
|
headers=admin_auth_headers,
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.json()["detail"] == "invalid_password"
|
|
|
|
|
|
def test_register_user_invalid_password(admin_auth_headers, default_org_id):
|
|
email = INVALID_PASSWORD_EMAIL
|
|
# Send invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
|
headers=admin_auth_headers,
|
|
json={"email": email, "role": CRAWLER_ROLE},
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["invited"] == "new_user"
|
|
|
|
global wrong_token
|
|
wrong_token = data["token"]
|
|
|
|
# Create user with invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
headers=admin_auth_headers,
|
|
json={
|
|
"name": "invalid",
|
|
"email": email,
|
|
"password": "passwd",
|
|
"inviteToken": wrong_token,
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
detail = r.json()["detail"]
|
|
# assert detail["code"] == "invalid_password"
|
|
assert detail == "invalid_password"
|
|
|
|
|
|
def test_new_user_send_invite(admin_auth_headers, default_org_id):
|
|
# Send invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
|
headers=admin_auth_headers,
|
|
json={"email": VALID_USER_EMAIL, "role": CRAWLER_ROLE},
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["invited"] == "new_user"
|
|
|
|
global new_user_invite_token
|
|
new_user_invite_token = data["token"]
|
|
|
|
|
|
def test_pending_invite_new_user(admin_auth_headers, default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invites", headers=admin_auth_headers
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
invites = data["items"]
|
|
assert len(invites) == 2
|
|
|
|
assert data["total"] == 2
|
|
for invite in invites:
|
|
assert invite["email"] in (VALID_USER_EMAIL, INVALID_PASSWORD_EMAIL)
|
|
assert invite["oid"] == default_org_id
|
|
assert invite["created"]
|
|
assert invite["role"]
|
|
assert invite["firstOrgAdmin"] == False
|
|
|
|
|
|
def test_new_user_token():
|
|
# Must include email to validate token
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/invite/{new_user_invite_token}",
|
|
)
|
|
assert r.status_code == 422
|
|
|
|
# Confirm token is valid (no auth needed)
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/invite/{new_user_invite_token}?email={VALID_USER_EMAIL}",
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["fromSuperuser"]
|
|
assert not data["inviterEmail"]
|
|
assert not data["inviterName"]
|
|
|
|
|
|
def test_register_user_no_invite():
|
|
# Create with no invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
json={
|
|
"name": "valid",
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
},
|
|
)
|
|
assert r.json()["detail"] == "invite_token_required"
|
|
assert r.status_code == 400
|
|
|
|
|
|
def test_register_user_wrong_invite():
|
|
# Create with wrong invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
json={
|
|
"name": "valid",
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"inviteToken": wrong_token,
|
|
},
|
|
)
|
|
assert r.json()["detail"] == "invalid_invite"
|
|
assert r.status_code == 400
|
|
|
|
# Create with wrong invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
json={
|
|
"name": "valid",
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"inviteToken": str(uuid4()),
|
|
},
|
|
)
|
|
assert r.json()["detail"] == "invalid_invite"
|
|
assert r.status_code == 400
|
|
|
|
# Create with wrong invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
json={
|
|
"name": "valid",
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"inviteToken": "abcdefg",
|
|
},
|
|
)
|
|
assert r.status_code == 422
|
|
|
|
|
|
def test_register_user_valid_password():
|
|
# Create user with invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
json={
|
|
"name": "valid",
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"inviteToken": new_user_invite_token,
|
|
},
|
|
)
|
|
assert r.status_code == 201
|
|
assert r.json()["is_verified"] == True
|
|
|
|
|
|
def test_register_dupe():
|
|
# Create user with invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/register",
|
|
json={
|
|
"name": "valid",
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"inviteToken": new_user_invite_token,
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
def test_delete_invite(admin_auth_headers, default_org_id):
|
|
email = INVALID_PASSWORD_EMAIL
|
|
# Delete unused invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invites/delete",
|
|
headers=admin_auth_headers,
|
|
json={"email": email},
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["removed"] == True
|
|
assert data["count"] == 1
|
|
|
|
# now 404
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invites/delete",
|
|
headers=admin_auth_headers,
|
|
json={"email": email},
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_pending_invites_clear_new_user(admin_auth_headers, default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invites", headers=admin_auth_headers
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
invites = data["items"]
|
|
assert len(invites) == 0
|
|
|
|
|
|
def test_existing_user_send_invite(admin_auth_headers, non_default_org_id):
|
|
# Send invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{non_default_org_id}/invite",
|
|
headers=admin_auth_headers,
|
|
json={"email": VALID_USER_EMAIL, "role": CRAWLER_ROLE},
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["invited"] == "existing_user"
|
|
|
|
global existing_user_invite_token
|
|
existing_user_invite_token = data["token"]
|
|
|
|
|
|
def test_pending_invite_existing_user(admin_auth_headers, non_default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{non_default_org_id}/invites", headers=admin_auth_headers
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
invites = data["items"]
|
|
assert len(invites) == 1
|
|
|
|
assert data["total"] == 1
|
|
invite = invites[0]
|
|
assert invite["email"] == VALID_USER_EMAIL
|
|
assert invite["oid"] == non_default_org_id
|
|
assert invite["created"]
|
|
assert invite["role"]
|
|
assert invite["firstOrgAdmin"] == False
|
|
|
|
|
|
def test_pending_invites_crawler(crawler_auth_headers, default_org_id):
|
|
# Verify that only admins can see pending invites
|
|
r = requests.get(f"{API_PREFIX}/users/invites", headers=crawler_auth_headers)
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_login_existing_user_for_invite():
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
data = r.json()
|
|
assert r.status_code == 200
|
|
login_token = data["access_token"]
|
|
|
|
auth_headers = {"Authorization": "bearer " + login_token}
|
|
|
|
# Get existing user invite to confirm it is valid
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/me/invite/{existing_user_invite_token}",
|
|
headers=auth_headers,
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["fromSuperuser"]
|
|
assert not data["inviterEmail"]
|
|
assert not data["inviterName"]
|
|
|
|
# Accept existing user invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/invite-accept/{existing_user_invite_token}",
|
|
headers=auth_headers,
|
|
)
|
|
|
|
global new_user_auth_headers
|
|
new_user_auth_headers = auth_headers
|
|
|
|
|
|
def test_pending_invites_clear(admin_auth_headers, non_default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{non_default_org_id}/invites", headers=admin_auth_headers
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
invites = data["items"]
|
|
assert len(invites) == 0
|
|
|
|
|
|
def test_user_part_of_two_orgs(default_org_id, non_default_org_id):
|
|
# User part of two orgs
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
data = r.json()
|
|
assert r.status_code == 200
|
|
login_token = data["access_token"]
|
|
|
|
auth_headers = {"Authorization": "bearer " + login_token}
|
|
|
|
# Get user info
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/me",
|
|
headers=auth_headers,
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
|
|
# confirm user is part of the two orgs
|
|
assert len(data["orgs"]) == 2
|
|
org_ids = [org["id"] for org in data["orgs"]]
|
|
assert default_org_id in org_ids
|
|
assert non_default_org_id in org_ids
|
|
|
|
|
|
def test_non_crawler_user_cant_invite(default_org_id):
|
|
# Send invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
|
headers=new_user_auth_headers,
|
|
json={"email": another_user_email, "role": CRAWLER_ROLE},
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_user_change_role(admin_auth_headers, default_org_id):
|
|
r = requests.patch(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/user-role",
|
|
headers=admin_auth_headers,
|
|
json={"email": VALID_USER_EMAIL, "role": ADMIN_ROLE},
|
|
)
|
|
|
|
assert r.status_code == 200
|
|
assert r.json()["updated"] == True
|
|
|
|
|
|
def test_non_superadmin_admin_can_invite(default_org_id):
|
|
# Send invite
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/invite",
|
|
headers=new_user_auth_headers,
|
|
json={"email": another_user_email, "role": CRAWLER_ROLE},
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
|
|
assert data["invited"] == "new_user"
|
|
|
|
another_token = data["token"]
|
|
|
|
# Confirm token is valid (no auth needed)
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/invite/{another_token}?email={another_user_email}",
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert not data["fromSuperuser"]
|
|
assert data["inviterEmail"] == VALID_USER_EMAIL
|
|
assert data["inviterName"] == "valid"
|
|
assert data["firstOrgAdmin"] == False
|
|
|
|
|
|
def test_forgot_password():
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/forgot-password", json={"email": "no-such-user@example.com"}
|
|
)
|
|
# always return success for security reasons even if user doesn't exist
|
|
assert r.status_code == 202
|
|
detail = r.json()["success"] == True
|
|
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/forgot-password", json={"email": VALID_USER_EMAIL}
|
|
)
|
|
assert r.status_code == 202
|
|
detail = r.json()["success"] == True
|
|
|
|
|
|
def test_reset_invalid_password(admin_auth_headers):
|
|
r = requests.put(
|
|
f"{API_PREFIX}/users/me/password-change",
|
|
headers=admin_auth_headers,
|
|
json={"email": ADMIN_USERNAME, "password": "PASSW0RD!", "newPassword": "12345"},
|
|
)
|
|
assert r.status_code == 400
|
|
detail = r.json()["detail"]
|
|
assert detail == "invalid_password"
|
|
|
|
|
|
def test_reset_patch_id_endpoint_invalid(admin_auth_headers, default_org_id):
|
|
r = requests.patch(
|
|
f"{API_PREFIX}/users/{my_id}",
|
|
headers=admin_auth_headers,
|
|
json={"email": ADMIN_USERNAME, "password": "newpassword"},
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_reset_password_invalid_current(admin_auth_headers):
|
|
r = requests.put(
|
|
f"{API_PREFIX}/users/me/password-change",
|
|
headers=admin_auth_headers,
|
|
json={
|
|
"email": ADMIN_USERNAME,
|
|
"password": "invalid",
|
|
"newPassword": "newpassword",
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.json()["detail"] == "invalid_current_password"
|
|
|
|
|
|
def test_reset_valid_password(admin_auth_headers, default_org_id):
|
|
count = 0
|
|
while True:
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
data = r.json()
|
|
try:
|
|
global valid_user_headers
|
|
valid_user_headers = {"Authorization": f"Bearer {data['access_token']}"}
|
|
break
|
|
except:
|
|
print("Waiting for valid user auth headers")
|
|
time.sleep(5)
|
|
if count > 5:
|
|
break
|
|
|
|
count += 1
|
|
|
|
r = requests.put(
|
|
f"{API_PREFIX}/users/me/password-change",
|
|
headers=valid_user_headers,
|
|
json={
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW,
|
|
"newPassword": VALID_USER_PW_RESET,
|
|
},
|
|
)
|
|
assert r.status_code == 200
|
|
# assert r.json()["email"] == VALID_USER_EMAIL
|
|
assert r.json()["updated"] == True
|
|
|
|
|
|
def test_lock_out_user_after_failed_logins():
|
|
# Almost lock out user by making 5 consecutive failed login attempts
|
|
for _ in range(5):
|
|
requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": "incorrect",
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
time.sleep(1)
|
|
|
|
# Ensure we get a 429 response on the 5th failed attempt
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": "incorrect",
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
assert r.status_code == 429
|
|
assert r.json()["detail"] == "too_many_login_attempts"
|
|
|
|
# Try again with correct password and ensure we still can't log in
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW_RESET,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
assert r.status_code in (400, 429)
|
|
|
|
# Reset password
|
|
r = requests.put(
|
|
f"{API_PREFIX}/users/me/password-change",
|
|
headers=valid_user_headers,
|
|
json={
|
|
"email": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW_RESET,
|
|
"newPassword": VALID_USER_PW_RESET_AGAIN,
|
|
},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
time.sleep(5)
|
|
|
|
# Try once more again with invalid password and ensure we no longer get a
|
|
# 429 response since password reset unlocked user
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": "incorrect",
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.json()["detail"] == "login_bad_credentials"
|
|
|
|
# Try again with correct reset password and this time it should work
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": VALID_USER_EMAIL,
|
|
"password": VALID_USER_PW_RESET_AGAIN,
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
|
|
def test_lock_out_unregistered_user_after_failed_logins():
|
|
unregistered_email = "notregistered@example.com"
|
|
# Almost lock out email by making 5 consecutive failed login attempts
|
|
for _ in range(5):
|
|
requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": unregistered_email,
|
|
"password": "incorrect",
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
time.sleep(1)
|
|
|
|
# Ensure we get a 429 response on the 5th failed attempt
|
|
r = requests.post(
|
|
f"{API_PREFIX}/auth/jwt/login",
|
|
data={
|
|
"username": unregistered_email,
|
|
"password": "incorrect",
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
assert r.status_code == 429
|
|
assert r.json()["detail"] == "too_many_login_attempts"
|
|
|
|
|
|
def test_patch_me_endpoint(admin_auth_headers, default_org_id, admin_userid):
|
|
# Start a new crawl
|
|
crawl_data = {
|
|
"runNow": True,
|
|
"name": "name change test crawl",
|
|
"config": {
|
|
"seeds": [{"url": "https://specs.webrecorder.net/", "depth": 1}],
|
|
},
|
|
}
|
|
r = requests.post(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
|
|
headers=admin_auth_headers,
|
|
json=crawl_data,
|
|
)
|
|
data = r.json()
|
|
crawl_id = data["run_now_job"]
|
|
|
|
# Wait for it to complete
|
|
while True:
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
|
|
headers=admin_auth_headers,
|
|
)
|
|
data = r.json()
|
|
if data["state"] in FINISHED_STATES:
|
|
break
|
|
time.sleep(5)
|
|
|
|
# Change user name and email
|
|
new_name = "New Admin"
|
|
r = requests.patch(
|
|
f"{API_PREFIX}/users/me",
|
|
headers=admin_auth_headers,
|
|
json={"email": "admin2@example.com", "name": new_name},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
# Verify that name was updated in workflows and crawls
|
|
for workflow_field in ["createdBy", "modifiedBy", "lastStartedBy"]:
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs?{workflow_field}={admin_userid}",
|
|
headers=admin_auth_headers,
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["total"] > 0
|
|
for workflow in data["items"]:
|
|
if workflow[workflow_field] == admin_userid:
|
|
assert workflow[f"{workflow_field}Name"] == new_name
|
|
|
|
r = requests.get(
|
|
f"{API_PREFIX}/orgs/{default_org_id}/crawls?userid={admin_userid}",
|
|
headers=admin_auth_headers,
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["total"] > 0
|
|
for item in data["items"]:
|
|
if item["userid"] == admin_userid:
|
|
assert item["userName"] == new_name
|
|
|
|
|
|
def test_patch_me_invalid_email_in_use(admin_auth_headers, default_org_id):
|
|
r = requests.patch(
|
|
f"{API_PREFIX}/users/me",
|
|
headers=admin_auth_headers,
|
|
json={"email": VALID_USER_EMAIL},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.json()["detail"] == "user_already_exists"
|
|
|
|
|
|
def test_user_emails_endpoint_non_superuser(crawler_auth_headers, default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/emails",
|
|
headers=crawler_auth_headers,
|
|
)
|
|
assert r.status_code == 403
|
|
assert r.json()["detail"] == "not_allowed"
|
|
|
|
|
|
def test_user_emails_endpoint_superuser(admin_auth_headers, default_org_id):
|
|
r = requests.get(
|
|
f"{API_PREFIX}/users/emails",
|
|
headers=admin_auth_headers,
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
|
|
total = data["total"]
|
|
user_emails = data["items"]
|
|
|
|
assert total > 0
|
|
assert total == len(user_emails)
|
|
|
|
for user in user_emails:
|
|
assert user["email"]
|
|
assert "id" not in user
|
|
assert "is_superuser" not in user
|
|
assert user["is_verified"] == True
|
|
orgs = user.get("orgs")
|
|
if orgs == []:
|
|
continue
|
|
|
|
for org in orgs:
|
|
assert org["id"]
|
|
assert org["name"]
|
|
assert org["slug"]
|
|
assert org["default"] in (True, False)
|
|
assert "readOnly" in org
|
|
assert "readOnlyReason" in org
|
|
assert "subscription" in org
|
|
role = org["role"]
|
|
assert role
|
|
assert isinstance(role, int)
|