Fixes #2673 Changes in this PR: - Adds a new `file_uploads.py` module and corresponding `/files` API prefix with methods/endpoints for uploading, GETing, and deleting seed files (can be extended to other types of files moving forward) - Seed files are supported via `CrawlConfig.config.seedFileId` on POST and PATCH endpoints. This seedFileId is replaced by a presigned url when passed to the crawler by the operator - Seed files are read when first uploaded to calculate `firstSeed` and `seedCount` and store them in the database, and this is copied into the workflow and crawl documents when they are created. - Logic is added to store `firstSeed` and `seedCount` for other workflows as well, and a migration added to backfill data, to maintain consistency and fix some of the pymongo aggregations that previously assumed all workflows would have at least one `Seed` object in `CrawlConfig.seeds` - Seed file and thumbnail storage stats are added to org stats - Seed file and thumbnail uploads first check that the org's storage quota has not been exceeded and return a 400 if so - A cron background job (run weekly each Sunday at midnight by default, but configurable) is added to look for seed files at least x minutes old (1440 minutes, or 1 day, by default, but configurable) that are not in use in any workflows, and to delete them when they are found. The backend pods will ensure this k8s batch job exists when starting up and create it if it does not already exist. A database entry for each run of the job is created in the operator on job completion so that it'll appear in the `/jobs` API endpoints, but retrying of this type of regularly scheduled background job is not supported as we don't want to accidentally create multiple competing scheduled jobs. - Adds a `min_seed_file_crawler_image` value to the Helm chart that is checked before creating a crawl from a workflow if set. If a workflow cannot be run, return the detail of the exception in `CrawlConfigAddedResponse.errorDetail` so that we can display the reason in the frontend - Add SeedFile model from base UserFile (former ImageFIle), ensure all APIs returning uploaded files return an absolute pre-signed URL (either with external origin or internal service origin) --------- Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
		
			
				
	
	
		
			722 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			722 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import requests
 | |
| import uuid
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from .conftest import API_PREFIX, NON_DEFAULT_ORG_NAME, NON_DEFAULT_ORG_SLUG
 | |
| from .utils import read_in_chunks
 | |
| 
 | |
| curr_dir = os.path.dirname(os.path.realpath(__file__))
 | |
| 
 | |
| new_oid = None
 | |
| 
 | |
| invite_email = "test-user@example.com"
 | |
| 
 | |
| 
 | |
| def test_ensure_only_one_default_org(admin_auth_headers):
 | |
|     r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers)
 | |
|     data = r.json()
 | |
|     assert data["total"] == 2
 | |
| 
 | |
|     orgs = data["items"]
 | |
|     default_orgs = [org for org in orgs if org["default"]]
 | |
|     assert len(default_orgs) == 1
 | |
| 
 | |
|     default_org_name = default_orgs[0]["name"]
 | |
|     orgs_with_same_name = [org for org in orgs if org["name"] == default_org_name]
 | |
|     assert len(orgs_with_same_name) == 1
 | |
| 
 | |
| 
 | |
| def test_get_org_admin(admin_auth_headers, default_org_id):
 | |
|     """org owners should receive details on users."""
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["id"] == default_org_id
 | |
|     assert data["name"]
 | |
| 
 | |
|     users = data["users"]
 | |
|     assert users
 | |
|     for _, value in users.items():
 | |
|         assert value["name"]
 | |
|         assert value["email"]
 | |
|         assert value["role"]
 | |
| 
 | |
| 
 | |
| def test_get_org_crawler(crawler_auth_headers, default_org_id):
 | |
|     """non-owners should *not* receive details on users."""
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}", headers=crawler_auth_headers
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["id"] == default_org_id
 | |
|     assert data["name"]
 | |
|     assert data.get("users") == {}
 | |
| 
 | |
| 
 | |
| def test_update_org_crawling_defaults(admin_auth_headers, default_org_id):
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/defaults/crawling",
 | |
|         headers=admin_auth_headers,
 | |
|         json={
 | |
|             "maxCrawlSize": 200000,
 | |
|             "lang": "fr",
 | |
|             "customBehaviors": ["git+https://github.com/webrecorder/custom-behaviors"],
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 200
 | |
|     assert r.json()["updated"] == True
 | |
| 
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
 | |
| 
 | |
|     data = r.json()
 | |
|     assert data["crawlingDefaults"]
 | |
|     assert data["crawlingDefaults"]["maxCrawlSize"] == 200000
 | |
|     assert data["crawlingDefaults"]["lang"] == "fr"
 | |
|     assert data["crawlingDefaults"]["customBehaviors"] == [
 | |
|         "git+https://github.com/webrecorder/custom-behaviors"
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def test_update_org_crawling_defaults_invalid_lang(admin_auth_headers, default_org_id):
 | |
|     for invalid_code in ("f", "fra", "french"):
 | |
|         r = requests.post(
 | |
|             f"{API_PREFIX}/orgs/{default_org_id}/defaults/crawling",
 | |
|             headers=admin_auth_headers,
 | |
|             json={
 | |
|                 "lang": "invalid_code",
 | |
|             },
 | |
|         )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
|     assert r.json()["detail"] == "invalid_lang"
 | |
| 
 | |
| 
 | |
| def test_rename_org(admin_auth_headers, default_org_id):
 | |
|     UPDATED_NAME = "updated org name"
 | |
|     UPDATED_SLUG = "updated-org-name"
 | |
|     rename_data = {"name": UPDATED_NAME, "slug": UPDATED_SLUG}
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/rename",
 | |
|         headers=admin_auth_headers,
 | |
|         json=rename_data,
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["updated"]
 | |
| 
 | |
|     # Verify that name and slug are now updated.
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["name"] == UPDATED_NAME
 | |
|     assert data["slug"] == UPDATED_SLUG
 | |
| 
 | |
| 
 | |
| def test_rename_org_invalid_slug(admin_auth_headers, default_org_id):
 | |
|     UPDATED_NAME = "updated org name"
 | |
|     UPDATED_SLUG = "not a valid slug"
 | |
|     rename_data = {"name": UPDATED_NAME, "slug": UPDATED_SLUG}
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/rename",
 | |
|         headers=admin_auth_headers,
 | |
|         json=rename_data,
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
|     assert r.json()["detail"] == "invalid_slug"
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "name",
 | |
|     [
 | |
|         # Identical name
 | |
|         (NON_DEFAULT_ORG_NAME),
 | |
|         # Identical name, different case
 | |
|         ("Non-Default Org"),
 | |
|     ],
 | |
| )
 | |
| def test_rename_org_duplicate_name(
 | |
|     admin_auth_headers, default_org_id, non_default_org_id, name
 | |
| ):
 | |
|     rename_data = {"name": name, "slug": "this-slug-should-be-okay"}
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/rename",
 | |
|         headers=admin_auth_headers,
 | |
|         json=rename_data,
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
|     assert r.json()["detail"] == "duplicate_org_name"
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "slug",
 | |
|     [
 | |
|         # Identical slug
 | |
|         (NON_DEFAULT_ORG_SLUG),
 | |
|         # Identical slug, different case
 | |
|         ("Non-Default-Org"),
 | |
|     ],
 | |
| )
 | |
| def test_rename_org_duplicate_slug(
 | |
|     admin_auth_headers, default_org_id, non_default_org_id, slug
 | |
| ):
 | |
|     rename_data = {"name": "Should be okay", "slug": slug}
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/rename",
 | |
|         headers=admin_auth_headers,
 | |
|         json=rename_data,
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
|     assert r.json()["detail"] == "duplicate_org_slug"
 | |
| 
 | |
| 
 | |
| def test_create_org(admin_auth_headers):
 | |
|     NEW_ORG_NAME = "New Org"
 | |
|     NEW_ORG_SLUG = "new-org"
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/create",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"name": NEW_ORG_NAME, "slug": NEW_ORG_SLUG},
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["added"]
 | |
|     assert data["id"]
 | |
| 
 | |
|     global new_oid
 | |
|     new_oid = data["id"]
 | |
| 
 | |
|     # Verify that org exists.
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{new_oid}", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["name"] == NEW_ORG_NAME
 | |
|     assert data["slug"] == NEW_ORG_SLUG
 | |
|     assert data["created"]
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "name",
 | |
|     [
 | |
|         # Identical name
 | |
|         (NON_DEFAULT_ORG_NAME),
 | |
|         # Identical name, different case
 | |
|         ("Non-Default Org"),
 | |
|     ],
 | |
| )
 | |
| def test_create_org_duplicate_name(admin_auth_headers, non_default_org_id, name):
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/create",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"name": name, "slug": "another-new-org"},
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
|     data = r.json()
 | |
|     assert data["detail"] == "duplicate_org_name"
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "slug",
 | |
|     [
 | |
|         # Identical slug
 | |
|         (NON_DEFAULT_ORG_SLUG),
 | |
|         # Identical slug, different case
 | |
|         ("Non-Default-Org"),
 | |
|     ],
 | |
| )
 | |
| def test_create_org_duplicate_slug(admin_auth_headers, non_default_org_id, slug):
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/create",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"name": "Yet another new org", "slug": slug},
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
|     data = r.json()
 | |
|     assert data["detail"] == "duplicate_org_slug"
 | |
| 
 | |
| 
 | |
| # disable until storage customization is enabled
 | |
| def _test_change_org_storage(admin_auth_headers):
 | |
|     # change to invalid storage
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{new_oid}/storage",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"storage": {"name": "invalid-storage", "custom": False}},
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
| 
 | |
|     # change to invalid storage
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{new_oid}/storage",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"storage": {"name": "alt-storage", "custom": True}},
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 400
 | |
| 
 | |
|     # change to valid storage
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{new_oid}/storage",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"storage": {"name": "alt-storage", "custom": False}},
 | |
|     )
 | |
| 
 | |
|     assert r.status_code == 200
 | |
|     assert r.json()["updated"]
 | |
| 
 | |
| 
 | |
| def test_remove_user_from_org(admin_auth_headers, default_org_id):
 | |
|     # Add new user to org
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/add-user",
 | |
|         json={
 | |
|             "email": "toremove@example.com",
 | |
|             "password": "PASSW0RD!",
 | |
|             "name": "toremove",
 | |
|             "role": 10,
 | |
|         },
 | |
|         headers=admin_auth_headers,
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["added"]
 | |
| 
 | |
|     # Remove user
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/remove",
 | |
|         json={"email": "toremove@example.com"},
 | |
|         headers=admin_auth_headers,
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["removed"]
 | |
| 
 | |
| 
 | |
| def test_remove_non_existent_user(admin_auth_headers, default_org_id):
 | |
|     # Remove user
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/remove",
 | |
|         json={"email": "toremove@example.com"},
 | |
|         headers=admin_auth_headers,
 | |
|     )
 | |
|     assert r.status_code == 404
 | |
|     data = r.json()
 | |
|     assert data["detail"] == "no_such_org_user"
 | |
| 
 | |
| 
 | |
| def test_get_pending_org_invites(
 | |
|     admin_auth_headers, default_org_id, non_default_org_id
 | |
| ):
 | |
|     # Invite user to non-default org
 | |
|     NON_DEFAULT_INVITE_EMAIL = "non-default-invite@example.com"
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invite",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": NON_DEFAULT_INVITE_EMAIL, "role": 20},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["invited"] == "new_user"
 | |
| 
 | |
|     # Invite user to default org
 | |
|     DEFAULT_INVITE_EMAIL = "default-invite@example.com"
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/invite",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": DEFAULT_INVITE_EMAIL, "role": 10},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["invited"] == "new_user"
 | |
| 
 | |
|     # Check that only invite to non-default org is returned
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invites",
 | |
|         headers=admin_auth_headers,
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     invites = data["items"]
 | |
|     assert len(invites) == 1
 | |
|     assert data["total"] == 1
 | |
|     invite = invites[0]
 | |
|     assert invite["email"] == NON_DEFAULT_INVITE_EMAIL
 | |
|     assert invite["oid"] == non_default_org_id
 | |
|     assert invite["created"]
 | |
|     assert invite["role"]
 | |
|     assert invite["firstOrgAdmin"] == False
 | |
| 
 | |
|     # Delete Invites
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invites/delete",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": NON_DEFAULT_INVITE_EMAIL},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
| 
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/invites/delete",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": DEFAULT_INVITE_EMAIL},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "invite_email, expected_stored_email",
 | |
|     [
 | |
|         # Standard email
 | |
|         ("invite-to-accept-org@example.com", "invite-to-accept-org@example.com"),
 | |
|         # Email address with comments
 | |
|         ("user+comment-org@example.com", "user+comment-org@example.com"),
 | |
|         # URL encoded email address with comments
 | |
|         (
 | |
|             "user%2Bcomment-encoded-org@example.com",
 | |
|             "user+comment-encoded-org@example.com",
 | |
|         ),
 | |
|         # User email with diacritic characters
 | |
|         ("diacritic-tést-org@example.com", "diacritic-tést-org@example.com"),
 | |
|         # User email with encoded diacritic characters
 | |
|         (
 | |
|             "diacritic-t%C3%A9st-encoded-org@example.com",
 | |
|             "diacritic-tést-encoded-org@example.com",
 | |
|         ),
 | |
|         # User email with upper case characters, stored as all lowercase
 | |
|         ("exampleName@EXAMple.com", "examplename@example.com"),
 | |
|     ],
 | |
| )
 | |
| def test_send_and_accept_org_invite(
 | |
|     admin_auth_headers, non_default_org_id, invite_email, expected_stored_email
 | |
| ):
 | |
|     # Send invite
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invite",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": invite_email, "role": 20},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["invited"] == "new_user"
 | |
|     token = data["token"]
 | |
| 
 | |
|     # Register user
 | |
|     # Note: This will accept invitation without needing to call the
 | |
|     # accept invite endpoint explicitly due to post-registration hook.
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/auth/register",
 | |
|         headers=admin_auth_headers,
 | |
|         json={
 | |
|             "name": "accepted",
 | |
|             "email": expected_stored_email,
 | |
|             "password": "testingpassword",
 | |
|             "inviteToken": token,
 | |
|         },
 | |
|     )
 | |
|     assert r.status_code == 201
 | |
| 
 | |
|     # Verify user belongs to org
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}", headers=admin_auth_headers
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     users = data["users"]
 | |
|     users_with_invited_email = [
 | |
|         user for user in users.values() if user["email"] == expected_stored_email
 | |
|     ]
 | |
|     assert len(users_with_invited_email) == 1
 | |
| 
 | |
| 
 | |
| def test_delete_invite_by_email(admin_auth_headers, non_default_org_id):
 | |
|     # Invite user to non-default org
 | |
|     INVITE_EMAIL = "new-non-default-org-invite-by-email@example.com"
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invite",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": INVITE_EMAIL, "role": 20},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["invited"] == "new_user"
 | |
| 
 | |
|     # Delete invite by email
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invites/delete",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": INVITE_EMAIL},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     assert data["removed"]
 | |
|     assert data["count"] == 1
 | |
| 
 | |
|     # Verify invite no longer exists
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invites",
 | |
|         headers=admin_auth_headers,
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     invites_matching_email = [
 | |
|         invite for invite in data["items"] if invite["email"] == INVITE_EMAIL
 | |
|     ]
 | |
|     assert len(invites_matching_email) == 0
 | |
| 
 | |
|     # Try to delete non-existent email and test we get 404
 | |
|     bad_token = str(uuid.uuid4())
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{non_default_org_id}/invites/delete",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"email": "not-a-valid-invite@example.com"},
 | |
|     )
 | |
|     assert r.status_code == 404
 | |
|     data = r.json()
 | |
|     assert data["detail"] == "invite_not_found"
 | |
| 
 | |
| 
 | |
| def test_org_metrics(crawler_auth_headers, default_org_id):
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/metrics",
 | |
|         headers=crawler_auth_headers,
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
| 
 | |
|     assert data["storageUsedBytes"] > 0
 | |
|     assert data["storageUsedCrawls"] > 0
 | |
|     assert data["storageUsedUploads"] >= 0
 | |
|     assert data["storageUsedThumbnails"] >= 0
 | |
|     assert data["storageUsedThumbnails"] >= 0
 | |
|     assert data["storageUsedProfiles"] >= 0
 | |
|     assert (
 | |
|         data["storageUsedBytes"]
 | |
|         == data["storageUsedCrawls"]
 | |
|         + data["storageUsedUploads"]
 | |
|         + data["storageUsedProfiles"]
 | |
|         + data["storageUsedSeedFiles"]
 | |
|         + data["storageUsedThumbnails"]
 | |
|     )
 | |
|     assert data["storageQuotaBytes"] >= 0
 | |
|     assert data["archivedItemCount"] > 0
 | |
|     assert data["crawlCount"] > 0
 | |
|     assert data["uploadCount"] >= 0
 | |
|     assert data["archivedItemCount"] == data["crawlCount"] + data["uploadCount"]
 | |
|     assert data["pageCount"] > 0
 | |
|     assert data["profileCount"] >= 0
 | |
|     assert data["workflowsRunningCount"] >= 0
 | |
|     assert data["workflowsQueuedCount"] >= 0
 | |
|     assert data["collectionsCount"] > 0
 | |
|     assert data["publicCollectionsCount"] >= 0
 | |
| 
 | |
| 
 | |
| def test_get_org_slugs(admin_auth_headers):
 | |
|     # Fetch org count and slugs from /orgs
 | |
|     r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     data = r.json()
 | |
|     org_count = data["total"]
 | |
|     org_slugs = [item["slug"] for item in data["items"]]
 | |
| 
 | |
|     # Fetch slugs from /orgs/slugs and verify data looks right
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/slugs", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     slugs = r.json()["slugs"]
 | |
| 
 | |
|     assert len(slugs) == org_count
 | |
|     for slug in slugs:
 | |
|         assert slug in org_slugs
 | |
| 
 | |
| 
 | |
| def test_get_org_slugs_non_superadmin(crawler_auth_headers):
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/slugs", headers=crawler_auth_headers)
 | |
|     assert r.status_code == 403
 | |
|     assert r.json()["detail"] == "Not Allowed"
 | |
| 
 | |
| 
 | |
| def test_get_org_slug_lookup(admin_auth_headers):
 | |
|     # Build an expected return from /orgs list to compare against
 | |
|     expected_return = {}
 | |
|     r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     for org in r.json()["items"]:
 | |
|         expected_return[org["id"]] = org["slug"]
 | |
| 
 | |
|     # Fetch data from /orgs/slug-lookup and verify data is correct
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/slug-lookup", headers=admin_auth_headers)
 | |
|     assert r.status_code == 200
 | |
|     assert r.json() == expected_return
 | |
| 
 | |
| 
 | |
| def test_get_org_slug_lookup_non_superadmin(crawler_auth_headers):
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/slug-lookup", headers=crawler_auth_headers)
 | |
|     assert r.status_code == 403
 | |
|     assert r.json()["detail"] == "Not Allowed"
 | |
| 
 | |
| 
 | |
| def test_update_read_only(admin_auth_headers, default_org_id):
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
 | |
|     data = r.json()
 | |
|     assert data["readOnly"] in (False, None)
 | |
|     assert data["readOnlyReason"] in (None, "")
 | |
| 
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/read-only",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"readOnly": True, "readOnlyReason": "Payment suspended"},
 | |
|     )
 | |
|     assert r.json()["updated"]
 | |
| 
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
 | |
|     data = r.json()
 | |
|     assert data["readOnly"] is True
 | |
|     assert data["readOnlyReason"] == "Payment suspended"
 | |
| 
 | |
|     # Try to start crawl from new workflow, should fail
 | |
|     crawl_data = {
 | |
|         "runNow": True,
 | |
|         "name": "Read Only Test Crawl",
 | |
|         "description": "Should not run now",
 | |
|         "tags": [],
 | |
|         "config": {
 | |
|             "seeds": [{"url": "https://webrecorder.net/", "depth": 1}],
 | |
|             "exclude": "community",
 | |
|         },
 | |
|     }
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
 | |
|         headers=admin_auth_headers,
 | |
|         json=crawl_data,
 | |
|     )
 | |
|     data = r.json()
 | |
| 
 | |
|     assert data["added"]
 | |
|     assert data["run_now_job"] is None
 | |
| 
 | |
|     cid = data["id"]
 | |
|     assert cid
 | |
| 
 | |
|     # Try to start crawl from existing workflow, should fail
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{cid}/run",
 | |
|         headers=admin_auth_headers,
 | |
|         json=crawl_data,
 | |
|     )
 | |
|     assert r.status_code == 403
 | |
|     assert r.json()["detail"] == "org_set_to_read_only"
 | |
| 
 | |
|     # Try to upload a WACZ, should fail
 | |
|     with open(os.path.join(curr_dir, "data", "example.wacz"), "rb") as fh:
 | |
|         r = requests.put(
 | |
|             f"{API_PREFIX}/orgs/{default_org_id}/uploads/stream?filename=test.wacz&name=My%20New%20Upload&description=Should%20Fail&collections=&tags=",
 | |
|             headers=admin_auth_headers,
 | |
|             data=read_in_chunks(fh),
 | |
|         )
 | |
| 
 | |
|     assert r.status_code == 403
 | |
|     assert r.json()["detail"] == "org_set_to_read_only"
 | |
| 
 | |
|     # Reset back to False, future tests should be unaffected
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/{default_org_id}/read-only",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"readOnly": False},
 | |
|     )
 | |
|     assert r.json()["updated"]
 | |
| 
 | |
|     r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
 | |
|     data = r.json()
 | |
|     assert data["readOnly"] is False
 | |
|     # Test that reason is unset when readOnly is set to false, even implicitly
 | |
|     assert data["readOnlyReason"] == ""
 | |
| 
 | |
| 
 | |
| def test_sort_orgs(admin_auth_headers):
 | |
|     # Create a few new orgs for testing
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/create",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"name": "abc", "slug": "abc"},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
| 
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/create",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"name": "Mno", "slug": "mno"},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
| 
 | |
|     r = requests.post(
 | |
|         f"{API_PREFIX}/orgs/create",
 | |
|         headers=admin_auth_headers,
 | |
|         json={"name": "xyz", "slug": "xyz"},
 | |
|     )
 | |
|     assert r.status_code == 200
 | |
| 
 | |
|     # Check default sorting
 | |
|     # Default org should come first, followed by alphabetical sorting ascending
 | |
|     # Ensure org names are sorted lexically, not by character code
 | |
|     r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers)
 | |
|     data = r.json()
 | |
|     orgs = data["items"]
 | |
| 
 | |
|     assert orgs[0]["default"]
 | |
| 
 | |
|     other_orgs = orgs[1:]
 | |
|     last_name = None
 | |
|     for org in other_orgs:
 | |
|         org_name = org["name"]
 | |
|         org_name_lower = org_name.lower()
 | |
|         if last_name:
 | |
|             assert org_name_lower > last_name
 | |
|         last_name = org_name_lower
 | |
| 
 | |
|     # Sort by name descending, ensure default org still first
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs?sortBy=name&sortDirection=-1", headers=admin_auth_headers
 | |
|     )
 | |
|     data = r.json()
 | |
|     orgs = data["items"]
 | |
| 
 | |
|     assert orgs[0]["default"]
 | |
| 
 | |
|     other_orgs = orgs[1:]
 | |
|     last_name = None
 | |
|     for org in other_orgs:
 | |
|         org_name = org["name"]
 | |
|         org_name_lower = org_name.lower()
 | |
|         if last_name:
 | |
|             assert org_name_lower < last_name
 | |
|         last_name = org_name_lower
 | |
| 
 | |
|     # Sort desc by lastCrawlFinished, ensure default org still first
 | |
|     r = requests.get(
 | |
|         f"{API_PREFIX}/orgs?sortBy=lastCrawlFinished&sortDirection=-1",
 | |
|         headers=admin_auth_headers,
 | |
|     )
 | |
|     data = r.json()
 | |
|     orgs = data["items"]
 | |
| 
 | |
|     assert orgs[0]["default"]
 | |
| 
 | |
|     other_orgs = orgs[1:]
 | |
|     last_last_crawl_finished = None
 | |
|     for org in other_orgs:
 | |
|         last_crawl_finished = org.get("lastCrawlFinished")
 | |
|         if not last_crawl_finished:
 | |
|             continue
 | |
|         if last_last_crawl_finished:
 | |
|             assert last_crawl_finished <= last_last_crawl_finished
 | |
|         last_last_crawl_finished = last_crawl_finished
 |