import os import requests import uuid import pytest from .conftest import API_PREFIX, NON_DEFAULT_ORG_NAME, NON_DEFAULT_ORG_SLUG from .utils import read_in_chunks curr_dir = os.path.dirname(os.path.realpath(__file__)) new_oid = None invite_email = "test-user@example.com" def test_ensure_only_one_default_org(admin_auth_headers): r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers) data = r.json() assert data["total"] == 2 orgs = data["items"] default_orgs = [org for org in orgs if org["default"]] assert len(default_orgs) == 1 default_org_name = default_orgs[0]["name"] orgs_with_same_name = [org for org in orgs if org["name"] == default_org_name] assert len(orgs_with_same_name) == 1 def test_get_org_admin(admin_auth_headers, default_org_id): """org owners should receive details on users.""" r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) assert r.status_code == 200 data = r.json() assert data["id"] == default_org_id assert data["name"] users = data["users"] assert users for _, value in users.items(): assert value["name"] assert value["email"] assert value["role"] def test_get_org_crawler(crawler_auth_headers, default_org_id): """non-owners should *not* receive details on users.""" r = requests.get( f"{API_PREFIX}/orgs/{default_org_id}", headers=crawler_auth_headers ) assert r.status_code == 200 data = r.json() assert data["id"] == default_org_id assert data["name"] assert data.get("users") == {} def test_update_org_crawling_defaults(admin_auth_headers, default_org_id): r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/defaults/crawling", headers=admin_auth_headers, json={ "maxCrawlSize": 200000, "lang": "fr", "customBehaviors": ["git+https://github.com/webrecorder/custom-behaviors"], }, ) assert r.status_code == 200 assert r.json()["updated"] == True r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) data = r.json() assert data["crawlingDefaults"] assert data["crawlingDefaults"]["maxCrawlSize"] == 200000 assert data["crawlingDefaults"]["lang"] == "fr" assert data["crawlingDefaults"]["customBehaviors"] == [ "git+https://github.com/webrecorder/custom-behaviors" ] def test_update_org_crawling_defaults_invalid_lang(admin_auth_headers, default_org_id): for invalid_code in ("f", "fra", "french"): r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/defaults/crawling", headers=admin_auth_headers, json={ "lang": "invalid_code", }, ) assert r.status_code == 400 assert r.json()["detail"] == "invalid_lang" def test_rename_org(admin_auth_headers, default_org_id): UPDATED_NAME = "updated org name" UPDATED_SLUG = "updated-org-name" rename_data = {"name": UPDATED_NAME, "slug": UPDATED_SLUG} r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/rename", headers=admin_auth_headers, json=rename_data, ) assert r.status_code == 200 data = r.json() assert data["updated"] # Verify that name and slug are now updated. r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) assert r.status_code == 200 data = r.json() assert data["name"] == UPDATED_NAME assert data["slug"] == UPDATED_SLUG def test_rename_org_invalid_slug(admin_auth_headers, default_org_id): UPDATED_NAME = "updated org name" UPDATED_SLUG = "not a valid slug" rename_data = {"name": UPDATED_NAME, "slug": UPDATED_SLUG} r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/rename", headers=admin_auth_headers, json=rename_data, ) assert r.status_code == 400 assert r.json()["detail"] == "invalid_slug" @pytest.mark.parametrize( "name", [ # Identical name (NON_DEFAULT_ORG_NAME), # Identical name, different case ("Non-Default Org"), ], ) def test_rename_org_duplicate_name( admin_auth_headers, default_org_id, non_default_org_id, name ): rename_data = {"name": name, "slug": "this-slug-should-be-okay"} r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/rename", headers=admin_auth_headers, json=rename_data, ) assert r.status_code == 400 assert r.json()["detail"] == "duplicate_org_name" @pytest.mark.parametrize( "slug", [ # Identical slug (NON_DEFAULT_ORG_SLUG), # Identical slug, different case ("Non-Default-Org"), ], ) def test_rename_org_duplicate_slug( admin_auth_headers, default_org_id, non_default_org_id, slug ): rename_data = {"name": "Should be okay", "slug": slug} r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/rename", headers=admin_auth_headers, json=rename_data, ) assert r.status_code == 400 assert r.json()["detail"] == "duplicate_org_slug" def test_create_org(admin_auth_headers): NEW_ORG_NAME = "New Org" NEW_ORG_SLUG = "new-org" r = requests.post( f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": NEW_ORG_NAME, "slug": NEW_ORG_SLUG}, ) assert r.status_code == 200 data = r.json() assert data["added"] assert data["id"] global new_oid new_oid = data["id"] # Verify that org exists. r = requests.get(f"{API_PREFIX}/orgs/{new_oid}", headers=admin_auth_headers) assert r.status_code == 200 data = r.json() assert data["name"] == NEW_ORG_NAME assert data["slug"] == NEW_ORG_SLUG assert data["created"] @pytest.mark.parametrize( "name", [ # Identical name (NON_DEFAULT_ORG_NAME), # Identical name, different case ("Non-Default Org"), ], ) def test_create_org_duplicate_name(admin_auth_headers, non_default_org_id, name): r = requests.post( f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": name, "slug": "another-new-org"}, ) assert r.status_code == 400 data = r.json() assert data["detail"] == "duplicate_org_name" @pytest.mark.parametrize( "slug", [ # Identical slug (NON_DEFAULT_ORG_SLUG), # Identical slug, different case ("Non-Default-Org"), ], ) def test_create_org_duplicate_slug(admin_auth_headers, non_default_org_id, slug): r = requests.post( f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": "Yet another new org", "slug": slug}, ) assert r.status_code == 400 data = r.json() assert data["detail"] == "duplicate_org_slug" # disable until storage customization is enabled def _test_change_org_storage(admin_auth_headers): # change to invalid storage r = requests.post( f"{API_PREFIX}/orgs/{new_oid}/storage", headers=admin_auth_headers, json={"storage": {"name": "invalid-storage", "custom": False}}, ) assert r.status_code == 400 # change to invalid storage r = requests.post( f"{API_PREFIX}/orgs/{new_oid}/storage", headers=admin_auth_headers, json={"storage": {"name": "alt-storage", "custom": True}}, ) assert r.status_code == 400 # change to valid storage r = requests.post( f"{API_PREFIX}/orgs/{new_oid}/storage", headers=admin_auth_headers, json={"storage": {"name": "alt-storage", "custom": False}}, ) assert r.status_code == 200 assert r.json()["updated"] def test_remove_user_from_org(admin_auth_headers, default_org_id): # Add new user to org r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/add-user", json={ "email": "toremove@example.com", "password": "PASSW0RD!", "name": "toremove", "role": 10, }, headers=admin_auth_headers, ) assert r.status_code == 200 data = r.json() assert data["added"] # Remove user r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/remove", json={"email": "toremove@example.com"}, headers=admin_auth_headers, ) assert r.status_code == 200 data = r.json() assert data["removed"] def test_remove_non_existent_user(admin_auth_headers, default_org_id): # Remove user r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/remove", json={"email": "toremove@example.com"}, headers=admin_auth_headers, ) assert r.status_code == 404 data = r.json() assert data["detail"] == "no_such_org_user" def test_get_pending_org_invites( admin_auth_headers, default_org_id, non_default_org_id ): # Invite user to non-default org NON_DEFAULT_INVITE_EMAIL = "non-default-invite@example.com" r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invite", headers=admin_auth_headers, json={"email": NON_DEFAULT_INVITE_EMAIL, "role": 20}, ) assert r.status_code == 200 data = r.json() assert data["invited"] == "new_user" # Invite user to default org DEFAULT_INVITE_EMAIL = "default-invite@example.com" r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/invite", headers=admin_auth_headers, json={"email": DEFAULT_INVITE_EMAIL, "role": 10}, ) assert r.status_code == 200 data = r.json() assert data["invited"] == "new_user" # Check that only invite to non-default org is returned r = requests.get( f"{API_PREFIX}/orgs/{non_default_org_id}/invites", headers=admin_auth_headers, ) assert r.status_code == 200 data = r.json() invites = data["items"] assert len(invites) == 1 assert data["total"] == 1 invite = invites[0] assert invite["email"] == NON_DEFAULT_INVITE_EMAIL assert invite["oid"] == non_default_org_id assert invite["created"] assert invite["role"] assert invite["firstOrgAdmin"] == False # Delete Invites r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invites/delete", headers=admin_auth_headers, json={"email": NON_DEFAULT_INVITE_EMAIL}, ) assert r.status_code == 200 r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/invites/delete", headers=admin_auth_headers, json={"email": DEFAULT_INVITE_EMAIL}, ) assert r.status_code == 200 @pytest.mark.parametrize( "invite_email, expected_stored_email", [ # Standard email ("invite-to-accept-org@example.com", "invite-to-accept-org@example.com"), # Email address with comments ("user+comment-org@example.com", "user+comment-org@example.com"), # URL encoded email address with comments ( "user%2Bcomment-encoded-org@example.com", "user+comment-encoded-org@example.com", ), # User email with diacritic characters ("diacritic-tést-org@example.com", "diacritic-tést-org@example.com"), # User email with encoded diacritic characters ( "diacritic-t%C3%A9st-encoded-org@example.com", "diacritic-tést-encoded-org@example.com", ), # User email with upper case characters, stored as all lowercase ("exampleName@EXAMple.com", "examplename@example.com"), ], ) def test_send_and_accept_org_invite( admin_auth_headers, non_default_org_id, invite_email, expected_stored_email ): # Send invite r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invite", headers=admin_auth_headers, json={"email": invite_email, "role": 20}, ) assert r.status_code == 200 data = r.json() assert data["invited"] == "new_user" token = data["token"] # Register user # Note: This will accept invitation without needing to call the # accept invite endpoint explicitly due to post-registration hook. r = requests.post( f"{API_PREFIX}/auth/register", headers=admin_auth_headers, json={ "name": "accepted", "email": expected_stored_email, "password": "testingpassword", "inviteToken": token, }, ) assert r.status_code == 201 # Verify user belongs to org r = requests.get( f"{API_PREFIX}/orgs/{non_default_org_id}", headers=admin_auth_headers ) assert r.status_code == 200 data = r.json() users = data["users"] users_with_invited_email = [ user for user in users.values() if user["email"] == expected_stored_email ] assert len(users_with_invited_email) == 1 def test_delete_invite_by_email(admin_auth_headers, non_default_org_id): # Invite user to non-default org INVITE_EMAIL = "new-non-default-org-invite-by-email@example.com" r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invite", headers=admin_auth_headers, json={"email": INVITE_EMAIL, "role": 20}, ) assert r.status_code == 200 data = r.json() assert data["invited"] == "new_user" # Delete invite by email r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invites/delete", headers=admin_auth_headers, json={"email": INVITE_EMAIL}, ) assert r.status_code == 200 data = r.json() assert data["removed"] assert data["count"] == 1 # Verify invite no longer exists r = requests.get( f"{API_PREFIX}/orgs/{non_default_org_id}/invites", headers=admin_auth_headers, ) assert r.status_code == 200 data = r.json() invites_matching_email = [ invite for invite in data["items"] if invite["email"] == INVITE_EMAIL ] assert len(invites_matching_email) == 0 # Try to delete non-existent email and test we get 404 bad_token = str(uuid.uuid4()) r = requests.post( f"{API_PREFIX}/orgs/{non_default_org_id}/invites/delete", headers=admin_auth_headers, json={"email": "not-a-valid-invite@example.com"}, ) assert r.status_code == 404 data = r.json() assert data["detail"] == "invite_not_found" def test_org_metrics(crawler_auth_headers, default_org_id): r = requests.get( f"{API_PREFIX}/orgs/{default_org_id}/metrics", headers=crawler_auth_headers, ) assert r.status_code == 200 data = r.json() assert data["storageUsedBytes"] > 0 assert data["storageUsedCrawls"] > 0 assert data["storageUsedUploads"] >= 0 assert data["storageUsedThumbnails"] >= 0 assert data["storageUsedThumbnails"] >= 0 assert data["storageUsedProfiles"] >= 0 assert ( data["storageUsedBytes"] == data["storageUsedCrawls"] + data["storageUsedUploads"] + data["storageUsedProfiles"] + data["storageUsedSeedFiles"] + data["storageUsedThumbnails"] ) assert data["storageQuotaBytes"] >= 0 assert data["archivedItemCount"] > 0 assert data["crawlCount"] > 0 assert data["uploadCount"] >= 0 assert data["archivedItemCount"] == data["crawlCount"] + data["uploadCount"] assert data["pageCount"] > 0 assert data["profileCount"] >= 0 assert data["workflowsRunningCount"] >= 0 assert data["workflowsQueuedCount"] >= 0 assert data["collectionsCount"] > 0 assert data["publicCollectionsCount"] >= 0 def test_get_org_slugs(admin_auth_headers): # Fetch org count and slugs from /orgs r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers) assert r.status_code == 200 data = r.json() org_count = data["total"] org_slugs = [item["slug"] for item in data["items"]] # Fetch slugs from /orgs/slugs and verify data looks right r = requests.get(f"{API_PREFIX}/orgs/slugs", headers=admin_auth_headers) assert r.status_code == 200 slugs = r.json()["slugs"] assert len(slugs) == org_count for slug in slugs: assert slug in org_slugs def test_get_org_slugs_non_superadmin(crawler_auth_headers): r = requests.get(f"{API_PREFIX}/orgs/slugs", headers=crawler_auth_headers) assert r.status_code == 403 assert r.json()["detail"] == "Not Allowed" def test_get_org_slug_lookup(admin_auth_headers): # Build an expected return from /orgs list to compare against expected_return = {} r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers) assert r.status_code == 200 for org in r.json()["items"]: expected_return[org["id"]] = org["slug"] # Fetch data from /orgs/slug-lookup and verify data is correct r = requests.get(f"{API_PREFIX}/orgs/slug-lookup", headers=admin_auth_headers) assert r.status_code == 200 assert r.json() == expected_return def test_get_org_slug_lookup_non_superadmin(crawler_auth_headers): r = requests.get(f"{API_PREFIX}/orgs/slug-lookup", headers=crawler_auth_headers) assert r.status_code == 403 assert r.json()["detail"] == "Not Allowed" def test_update_read_only(admin_auth_headers, default_org_id): r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) data = r.json() assert data["readOnly"] in (False, None) assert data["readOnlyReason"] in (None, "") r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/read-only", headers=admin_auth_headers, json={"readOnly": True, "readOnlyReason": "Payment suspended"}, ) assert r.json()["updated"] r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) data = r.json() assert data["readOnly"] is True assert data["readOnlyReason"] == "Payment suspended" # Try to start crawl from new workflow, should fail crawl_data = { "runNow": True, "name": "Read Only Test Crawl", "description": "Should not run now", "tags": [], "config": { "seeds": [{"url": "https://webrecorder.net/", "depth": 1}], "exclude": "community", }, } r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/", headers=admin_auth_headers, json=crawl_data, ) data = r.json() assert data["added"] assert data["run_now_job"] is None cid = data["id"] assert cid # Try to start crawl from existing workflow, should fail r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{cid}/run", headers=admin_auth_headers, json=crawl_data, ) assert r.status_code == 403 assert r.json()["detail"] == "org_set_to_read_only" # Try to upload a WACZ, should fail with open(os.path.join(curr_dir, "data", "example.wacz"), "rb") as fh: r = requests.put( f"{API_PREFIX}/orgs/{default_org_id}/uploads/stream?filename=test.wacz&name=My%20New%20Upload&description=Should%20Fail&collections=&tags=", headers=admin_auth_headers, data=read_in_chunks(fh), ) assert r.status_code == 403 assert r.json()["detail"] == "org_set_to_read_only" # Reset back to False, future tests should be unaffected r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/read-only", headers=admin_auth_headers, json={"readOnly": False}, ) assert r.json()["updated"] r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) data = r.json() assert data["readOnly"] is False # Test that reason is unset when readOnly is set to false, even implicitly assert data["readOnlyReason"] == "" def test_sort_orgs(admin_auth_headers): # Create a few new orgs for testing r = requests.post( f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": "abc", "slug": "abc"}, ) assert r.status_code == 200 r = requests.post( f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": "Mno", "slug": "mno"}, ) assert r.status_code == 200 r = requests.post( f"{API_PREFIX}/orgs/create", headers=admin_auth_headers, json={"name": "xyz", "slug": "xyz"}, ) assert r.status_code == 200 # Check default sorting # Default org should come first, followed by alphabetical sorting ascending # Ensure org names are sorted lexically, not by character code r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers) data = r.json() orgs = data["items"] assert orgs[0]["default"] other_orgs = orgs[1:] last_name = None for org in other_orgs: org_name = org["name"] org_name_lower = org_name.lower() if last_name: assert org_name_lower > last_name last_name = org_name_lower # Sort by name descending, ensure default org still first r = requests.get( f"{API_PREFIX}/orgs?sortBy=name&sortDirection=-1", headers=admin_auth_headers ) data = r.json() orgs = data["items"] assert orgs[0]["default"] other_orgs = orgs[1:] last_name = None for org in other_orgs: org_name = org["name"] org_name_lower = org_name.lower() if last_name: assert org_name_lower < last_name last_name = org_name_lower # Sort desc by lastCrawlFinished, ensure default org still first r = requests.get( f"{API_PREFIX}/orgs?sortBy=lastCrawlFinished&sortDirection=-1", headers=admin_auth_headers, ) data = r.json() orgs = data["items"] assert orgs[0]["default"] other_orgs = orgs[1:] last_last_crawl_finished = None for org in other_orgs: last_crawl_finished = org.get("lastCrawlFinished") if not last_crawl_finished: continue if last_last_crawl_finished: assert last_crawl_finished <= last_last_crawl_finished last_last_crawl_finished = last_crawl_finished