new features:
- sending emai for validation + invites, configured via env vars - inviting new users to join an existing archive - /crawldone webhook to track verify crawl id (next: store crawl complete entry)
This commit is contained in:
parent
627e9a6f14
commit
f2d9d7ba6a
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
**/*.pyc
|
||||
**/node_modules/
|
||||
config.env
|
||||
|
@ -24,6 +24,13 @@ class InviteRequest(BaseModel):
|
||||
role: UserRole
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class NewUserInvite(InvitePending, BaseMongoModel):
|
||||
"""An invite for a new user, with an email and invite token as id"""
|
||||
|
||||
email: str
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class UpdateRole(InviteRequest):
|
||||
"""Update existing role for user"""
|
||||
@ -88,8 +95,12 @@ class Archive(BaseMongoModel):
|
||||
class ArchiveOps:
|
||||
"""Archive API operations"""
|
||||
|
||||
def __init__(self, db):
|
||||
def __init__(self, db, email):
|
||||
self.archives = db["archives"]
|
||||
|
||||
self.invites = db["invites"]
|
||||
self.email = email
|
||||
|
||||
self.router = None
|
||||
self.archive_dep = None
|
||||
|
||||
@ -149,20 +160,66 @@ class ArchiveOps:
|
||||
res = await self.archives.find_one(query)
|
||||
return Archive.from_dict(res)
|
||||
|
||||
async def get_archive_by_id(self, uid: str):
|
||||
async def get_archive_by_id(self, aid: str):
|
||||
"""Get an archive by id"""
|
||||
res = await self.archives.find_one({"_id": uid})
|
||||
res = await self.archives.find_one({"_id": aid})
|
||||
return Archive.from_dict(res)
|
||||
|
||||
async def update(self, archive: Archive):
|
||||
"""Update existing archive"""
|
||||
self.archives.replace_one({"_id": archive.id}, archive.to_dict())
|
||||
|
||||
async def add_new_user_invite(
|
||||
self, new_user_invite: NewUserInvite, inviter_email, archive_name
|
||||
):
|
||||
"""Add invite for new user"""
|
||||
|
||||
res = await self.invites.find_one({"email": new_user_invite.email})
|
||||
if res:
|
||||
raise HTTPException(
|
||||
status_code=403, detail="This user has already been invited"
|
||||
)
|
||||
|
||||
await self.invites.insert_one(new_user_invite.to_dict())
|
||||
|
||||
self.email.send_new_user_invite(
|
||||
new_user_invite.email, inviter_email, archive_name, new_user_invite.id
|
||||
)
|
||||
|
||||
async def handle_new_user_invite(self, invite_token: str, user: User):
|
||||
"""Handle invite from a new user"""
|
||||
invite_data = await self.invites.find_one({"_id": invite_token})
|
||||
if not invite_data:
|
||||
raise HTTPException(status_code=400, detail="Invalid Invite Code")
|
||||
|
||||
new_user_invite = NewUserInvite.from_dict(invite_data)
|
||||
|
||||
if user.email != new_user_invite.email:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Invalid Invite Code for this user"
|
||||
)
|
||||
|
||||
await self.add_user_by_invite(new_user_invite, user)
|
||||
await self.invites.delete_one({"_id": invite_token})
|
||||
return True
|
||||
|
||||
async def add_user_by_invite(self, invite: InvitePending, user: User):
|
||||
"""Add user to an Archive from an InvitePending"""
|
||||
archive = await self.get_archive_by_id(invite.aid)
|
||||
if not archive:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Invalid Invite Code, No Such Archive"
|
||||
)
|
||||
|
||||
archive.users[str(user.id)] = invite.role
|
||||
await self.update(archive)
|
||||
return True
|
||||
|
||||
|
||||
# ============================================================================
|
||||
def init_archives_api(app, mdb, users, user_dep: User):
|
||||
def init_archives_api(app, mdb, users, email, user_dep: User):
|
||||
"""Init archives api router for /archives"""
|
||||
ops = ArchiveOps(mdb)
|
||||
ops = ArchiveOps(mdb, email)
|
||||
|
||||
async def archive_dep(aid: str, user: User = Depends(user_dep)):
|
||||
archive = await ops.get_archive_for_user_by_id(aid, user)
|
||||
@ -204,12 +261,26 @@ def init_archives_api(app, mdb, users, user_dep: User):
|
||||
detail="User does not have permission to invite other users",
|
||||
)
|
||||
|
||||
invite_code = uuid.uuid4().hex
|
||||
|
||||
invite_pending = InvitePending(
|
||||
aid=str(archive.id), created=datetime.datetime.utcnow(), role=invite.role
|
||||
)
|
||||
|
||||
other_user = await users.db.get_by_email(invite.email)
|
||||
|
||||
if not other_user:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="No user found for specified e-mail"
|
||||
|
||||
await ops.add_new_user_invite(
|
||||
NewUserInvite(
|
||||
id=invite_code, email=invite.email, **invite_pending.dict()
|
||||
),
|
||||
user.email,
|
||||
archive.name,
|
||||
)
|
||||
|
||||
return {"invited": "new_user"}
|
||||
|
||||
if other_user.email == user.email:
|
||||
raise HTTPException(status_code=400, detail="Can't invite ourselves!")
|
||||
|
||||
@ -218,21 +289,12 @@ def init_archives_api(app, mdb, users, user_dep: User):
|
||||
status_code=400, detail="User already a member of this archive."
|
||||
)
|
||||
|
||||
# try:
|
||||
# role = UserRole[invite.role].name
|
||||
# except KeyError:
|
||||
# # pylint: disable=raise-missing-from
|
||||
# raise HTTPException(status_code=400, detail="Invalid User Role")
|
||||
other_user.invites[invite_code] = invite_pending
|
||||
|
||||
invite_code = uuid.uuid4().hex
|
||||
other_user.invites[invite_code] = InvitePending(
|
||||
aid=str(archive.id), created=datetime.datetime.utcnow(), role=invite.role
|
||||
)
|
||||
await users.db.update(other_user)
|
||||
|
||||
return {
|
||||
"invite_code": invite_code,
|
||||
"email": invite.email,
|
||||
"role": invite.role.value,
|
||||
"invited": "existing_user",
|
||||
}
|
||||
|
||||
@router.patch("/user-role", tags=["invites"])
|
||||
@ -268,14 +330,7 @@ def init_archives_api(app, mdb, users, user_dep: User):
|
||||
if not invite:
|
||||
raise HTTPException(status_code=400, detail="Invalid Invite Code")
|
||||
|
||||
archive = await ops.get_archive_by_id(invite.aid)
|
||||
if not archive:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Invalid Invite Code, No Such Archive"
|
||||
)
|
||||
|
||||
archive.users[str(user.id)] = invite.role
|
||||
await ops.update(archive)
|
||||
await ops.add_user_by_invite(invite, user)
|
||||
await users.db.update(user)
|
||||
return {"added": True}
|
||||
|
||||
|
@ -75,7 +75,7 @@ class CrawlConfigIn(BaseModel):
|
||||
schedule: Optional[str] = ""
|
||||
runNow: Optional[bool] = False
|
||||
|
||||
#storageName: Optional[str] = "default"
|
||||
# storageName: Optional[str] = "default"
|
||||
|
||||
config: RawCrawlConfig
|
||||
|
||||
@ -87,13 +87,22 @@ class CrawlConfig(BaseMongoModel):
|
||||
schedule: Optional[str] = ""
|
||||
runNow: Optional[bool] = False
|
||||
|
||||
#storageName: Optional[str] = "default"
|
||||
# storageName: Optional[str] = "default"
|
||||
|
||||
archive: Optional[str]
|
||||
|
||||
config: RawCrawlConfig
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class CrawlCompleteMsg(BaseModel):
|
||||
filename: Optional[str]
|
||||
user: Optional[str]
|
||||
crawl: Optional[str]
|
||||
size: int
|
||||
hash: str
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class CrawlOps:
|
||||
"""Crawl Config Operations"""
|
||||
|
56
backend/emailsender.py
Normal file
56
backend/emailsender.py
Normal file
@ -0,0 +1,56 @@
|
||||
""" Basic Email Sending Support"""
|
||||
|
||||
import os
|
||||
import smtplib
|
||||
import ssl
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
class EmailSender:
|
||||
"""SMTP Email Sender"""
|
||||
|
||||
def __init__(self):
|
||||
self.sender = os.environ.get("EMAIL_SENDER")
|
||||
self.password = os.environ.get("EMAIL_PASSWORD")
|
||||
self.smtp_server = os.environ.get("EMAIL_SMTP_HOST")
|
||||
|
||||
self.host = "http://localhost:8000/"
|
||||
|
||||
def _send_encrypted(self, receiver, message):
|
||||
"""Send Encrypted SMTP Message"""
|
||||
print(message)
|
||||
|
||||
if not self.smtp_server:
|
||||
print("Email: No SMTP Server, not sending")
|
||||
return
|
||||
|
||||
context = ssl.create_default_context()
|
||||
with smtplib.SMTP(self.smtp_server, 587) as server:
|
||||
server.ehlo() # Can be omitted
|
||||
server.starttls(context=context)
|
||||
server.ehlo() # Can be omitted
|
||||
server.login(self.sender, self.password)
|
||||
server.sendmail(self.sender, receiver, message)
|
||||
|
||||
def send_user_validation(self, receiver_email, token):
|
||||
"""Send email to validate registration email address"""
|
||||
message = f"""
|
||||
Please verify your registration for Browsertrix Cloud for {receiver_email}
|
||||
|
||||
You can verify by clicking here: {self.host}/app/verify/{token}
|
||||
|
||||
The verification token is: {token}"""
|
||||
|
||||
self._send_encrypted(receiver_email, message)
|
||||
|
||||
def send_new_user_invite(self, receiver_email, sender, archive_name, token):
|
||||
"""Send email to invite new user"""
|
||||
|
||||
message = f"""
|
||||
You are invited by {sender} to join their archive, {archive_name} on Browsertrix Cloud!
|
||||
|
||||
You can join by clicking here: {self.host}/app/join/{token}
|
||||
|
||||
The invite token is: {token}"""
|
||||
|
||||
self._send_encrypted(receiver_email, message)
|
@ -2,7 +2,6 @@
|
||||
|
||||
import os
|
||||
|
||||
# import urllib.parse
|
||||
import json
|
||||
|
||||
from kubernetes_asyncio import client, config
|
||||
@ -31,6 +30,19 @@ class K8SManager:
|
||||
self.crawler_image = os.environ.get("CRAWLER_IMAGE")
|
||||
self.crawler_image_pull_policy = "IfNotPresent"
|
||||
|
||||
async def validate_crawl_data(self, data):
|
||||
pod = await self.core_api.read_namespaced_pod(data.crawl, self.namespace)
|
||||
|
||||
if not pod or pod.metadata.labels["btrix.user"] != data.user:
|
||||
return None
|
||||
|
||||
result = {}
|
||||
data.crawl = pod.metadata.labels["job-name"]
|
||||
result["created"] = pod.metadata.creation_timestamp
|
||||
result["archive"] = pod.metadata.labels["btrix.archive"]
|
||||
result["crawlconfig"] = pod.metadata.labels["btrix.crawlconfig"]
|
||||
return result
|
||||
|
||||
async def add_crawl_config(
|
||||
self,
|
||||
userid: str,
|
||||
@ -66,7 +78,7 @@ class K8SManager:
|
||||
|
||||
# Create Secret
|
||||
endpoint_with_coll_url = os.path.join(
|
||||
storage.endpoint_url, crawlconfig.config.collection + "/"
|
||||
storage.endpoint_url, "collections", crawlconfig.config.collection + "/"
|
||||
)
|
||||
|
||||
crawl_secret = client.V1Secret(
|
||||
@ -81,6 +93,7 @@ class K8SManager:
|
||||
"STORE_ENDPOINT_URL": endpoint_with_coll_url,
|
||||
"STORE_ACCESS_KEY": storage.access_key,
|
||||
"STORE_SECRET_KEY": storage.secret_key,
|
||||
"WEBHOOK_URL": "http://browsertrix-cloud.default:8000/crawldone",
|
||||
},
|
||||
)
|
||||
|
||||
@ -238,7 +251,15 @@ class K8SManager:
|
||||
"envFrom": [
|
||||
{"secretRef": {"name": f"crawl-secret-{uid}"}}
|
||||
],
|
||||
"resources": resources
|
||||
"env": [
|
||||
{
|
||||
"name": "CRAWL_ID",
|
||||
"valueFrom": {
|
||||
"fieldRef": {"fieldPath": "metadata.name"}
|
||||
},
|
||||
}
|
||||
],
|
||||
"resources": resources,
|
||||
}
|
||||
],
|
||||
"volumes": [
|
||||
|
@ -4,15 +4,16 @@ supports docker and kubernetes based deployments of multiple browsertrix-crawler
|
||||
"""
|
||||
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
|
||||
|
||||
from db import init_db
|
||||
|
||||
from users import init_users_api, UserDB
|
||||
from archives import init_archives_api
|
||||
from crawls import init_crawl_config_api
|
||||
|
||||
from crawls import init_crawl_config_api, CrawlCompleteMsg
|
||||
from emailsender import EmailSender
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@ -34,6 +35,9 @@ class BrowsertrixAPI:
|
||||
self.default_storage_access_key = os.environ.get("STORE_ACCESS_KEY", "access")
|
||||
self.default_storage_secret_key = os.environ.get("STORE_SECRET_KEY", "secret")
|
||||
|
||||
self.email = EmailSender()
|
||||
self.crawl_manager = None
|
||||
|
||||
# pylint: disable=import-outside-toplevel
|
||||
if os.environ.get("KUBERNETES_SERVICE_HOST"):
|
||||
from k8sman import K8SManager
|
||||
@ -58,7 +62,7 @@ class BrowsertrixAPI:
|
||||
current_active_user = self.fastapi_users.current_user(active=True)
|
||||
|
||||
self.archive_ops = init_archives_api(
|
||||
self.app, self.mdb, self.fastapi_users, current_active_user
|
||||
self.app, self.mdb, self.fastapi_users, self.email, current_active_user
|
||||
)
|
||||
|
||||
self.crawl_config_ops = init_crawl_config_api(
|
||||
@ -74,20 +78,55 @@ class BrowsertrixAPI:
|
||||
# async def root():
|
||||
# return {"message": "Hello World"}
|
||||
|
||||
async def on_handle_crawl_complete(msg: CrawlCompleteMsg):
|
||||
print("crawl complete started")
|
||||
try:
|
||||
data = await self.crawl_manager.validate_crawl_data(msg)
|
||||
if data:
|
||||
data.update(msg.dict())
|
||||
print(data)
|
||||
else:
|
||||
print("Not a valid crawl complete msg!")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
@app.post("/crawldone")
|
||||
async def webhook(msg: CrawlCompleteMsg, background_tasks: BackgroundTasks):
|
||||
#background_tasks.add_task(on_handle_crawl_complete, msg)
|
||||
#asyncio.ensure_future(on_handle_crawl_complete(msg))
|
||||
await on_handle_crawl_complete(msg)
|
||||
return {"message": "webhook received"}
|
||||
|
||||
|
||||
# pylint: disable=no-self-use, unused-argument
|
||||
async def on_after_register(self, user: UserDB, request):
|
||||
async def on_after_register(self, user: UserDB, request: Request):
|
||||
"""callback after registeration"""
|
||||
|
||||
await self.archive_ops.create_new_archive_for_user(
|
||||
archive_name="default",
|
||||
base_endpoint_url=self.default_storage_endpoint_url,
|
||||
access_key=self.default_storage_access_key,
|
||||
secret_key=self.default_storage_secret_key,
|
||||
user=user,
|
||||
)
|
||||
|
||||
print(f"User {user.id} has registered.")
|
||||
|
||||
req_data = await request.json()
|
||||
|
||||
if req_data.get("newArchive"):
|
||||
print(f"Creating new archive for {user.id}")
|
||||
|
||||
archive_name = req_data.get("name") or f"{user.email} Archive"
|
||||
|
||||
await self.archive_ops.create_new_archive_for_user(
|
||||
archive_name=archive_name,
|
||||
base_endpoint_url=self.default_storage_endpoint_url,
|
||||
access_key=self.default_storage_access_key,
|
||||
secret_key=self.default_storage_secret_key,
|
||||
user=user,
|
||||
)
|
||||
|
||||
if req_data.get("inviteToken"):
|
||||
try:
|
||||
await self.archive_ops.handle_new_user_invite(
|
||||
req_data.get("inviteToken"), user
|
||||
)
|
||||
except HTTPException as exc:
|
||||
print(exc)
|
||||
|
||||
# pylint: disable=no-self-use, unused-argument
|
||||
def on_after_forgot_password(self, user: UserDB, token: str, request: Request):
|
||||
"""callback after password forgot"""
|
||||
@ -96,7 +135,8 @@ class BrowsertrixAPI:
|
||||
# pylint: disable=no-self-use, unused-argument
|
||||
def on_after_verification_request(self, user: UserDB, token: str, request: Request):
|
||||
"""callback after verification request"""
|
||||
print(f"Verification requested for user {user.id}. Verification token: {token}")
|
||||
|
||||
self.email.send_user_validation(token, user.email)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
|
@ -7,7 +7,7 @@ import uuid
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from typing import Dict
|
||||
from typing import Dict, Optional
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
@ -44,8 +44,6 @@ class User(models.BaseUser):
|
||||
Base User Model
|
||||
"""
|
||||
|
||||
invites: Dict[str, InvitePending] = {}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class UserCreate(models.BaseUserCreate):
|
||||
@ -53,7 +51,8 @@ class UserCreate(models.BaseUserCreate):
|
||||
User Creation Model
|
||||
"""
|
||||
|
||||
invites: Dict[str, InvitePending] = {}
|
||||
inviteToken: Optional[str]
|
||||
newArchive: bool
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@ -62,8 +61,6 @@ class UserUpdate(User, models.BaseUserUpdate):
|
||||
User Update Model
|
||||
"""
|
||||
|
||||
invites: Dict[str, InvitePending] = {}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
class UserDB(User, models.BaseUserDB):
|
||||
@ -126,6 +123,7 @@ def init_users_api(
|
||||
prefix="/auth",
|
||||
tags=["auth"],
|
||||
)
|
||||
|
||||
app.include_router(
|
||||
fastapi_users.get_users_router(), prefix="/users", tags=["users"]
|
||||
)
|
||||
|
@ -7,15 +7,13 @@ services:
|
||||
ports:
|
||||
- 8000:8000
|
||||
|
||||
environment:
|
||||
MONGO_HOST: mongo
|
||||
PASSWORD_SECRET: 'c9085f33ecce4347aa1d69339e16c499'
|
||||
env_file:
|
||||
- ./config.env
|
||||
|
||||
mongo:
|
||||
image: mongo
|
||||
environment:
|
||||
MONGO_INITDB_ROOT_USERNAME: root
|
||||
MONGO_INITDB_ROOT_PASSWORD: example
|
||||
env_file:
|
||||
- ./config.env
|
||||
|
||||
volumes:
|
||||
- mongodata:/data/db
|
||||
|
Loading…
Reference in New Issue
Block a user