Fixes #1432 Refactors the invite + registration system to be simpler and more consistent with regards to existing user invites. Previously, per-user invites are stored in the user.invites dict instead of in the invites collection, which creates a few issues: - Existing user do not show up in Org Invites list: #1432 - Existing user invites also do not expire, unlike new user invites, creating potential security issue. Instead, existing user invites should be treated like new user invites. This PR moves them into the same collection, adding a `userid` field to InvitePending to match with an existing user. If a user already exists, it will be matched by userid, instead of by email. This allows for user to update their email while still being invited. Note that the email of the invited existing user will not change in the invite email. This is also by design: an admin of one org should not be given any hint that an invited user already has an account, such as by having their email automatically update. For an org admin, the invite to a new or existing user should be indistinguishable. The sha256 of invite token is stored instead of actual token for better security. The registration system has also been refactored with the following changes: - Auto-creation of new orgs for new users has been removed - User.create_user() replaces the old User._create() and just creates the user with additional complex logic around org auto-add - Users are added to org in org add_user_to_org() - Users are added to org through invites with add_user_with_invite() Tests: - Additional tests include verifying that existing and new pending invites appear in the pending invites list - Tests for `/users/invite/<token>?email=` and `/users/me/invite/<token>` endpoints - Deleting pending invites - Additional tests added for user self-registration, including existing user self-registration to default org of existing user (in nightly tests)
168 lines
5.2 KiB
Python
168 lines
5.2 KiB
Python
""" Basic Email Sending Support"""
|
|
|
|
from datetime import datetime
|
|
import os
|
|
import smtplib
|
|
import ssl
|
|
from uuid import UUID
|
|
from typing import Optional, Union
|
|
|
|
from email.message import EmailMessage
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from fastapi import HTTPException
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from .models import CreateReplicaJob, DeleteReplicaJob, Organization, InvitePending
|
|
from .utils import is_bool
|
|
|
|
|
|
# pylint: disable=too-few-public-methods, too-many-instance-attributes
|
|
class EmailSender:
|
|
"""SMTP Email Sender"""
|
|
|
|
sender: str
|
|
password: str
|
|
reply_to: str
|
|
smtp_server: Optional[str]
|
|
smtp_port: int
|
|
smtp_use_tls: bool
|
|
support_email: str
|
|
templates: Jinja2Templates
|
|
|
|
def __init__(self):
|
|
self.sender = os.environ.get("EMAIL_SENDER") or "Browsertrix admin"
|
|
self.password = os.environ.get("EMAIL_PASSWORD") or ""
|
|
self.reply_to = os.environ.get("EMAIL_REPLY_TO") or self.sender
|
|
self.support_email = os.environ.get("EMAIL_SUPPORT") or self.reply_to
|
|
self.smtp_server = os.environ.get("EMAIL_SMTP_HOST")
|
|
self.smtp_port = int(os.environ.get("EMAIL_SMTP_PORT", 587))
|
|
self.smtp_use_tls = is_bool(os.environ.get("EMAIL_SMTP_USE_TLS"))
|
|
|
|
self.default_origin = os.environ.get("APP_ORIGIN")
|
|
|
|
self.templates = Jinja2Templates(
|
|
directory=os.path.join(os.path.dirname(__file__), "email-templates")
|
|
)
|
|
|
|
def _send_encrypted(self, receiver: str, name: str, **kwargs) -> None:
|
|
"""Send Encrypted SMTP Message using given template name"""
|
|
|
|
full = self.templates.env.get_template(name).render(kwargs)
|
|
parts = full.split("~~~")
|
|
if len(parts) == 3:
|
|
subject, html, text = parts
|
|
elif len(parts) == 2:
|
|
subject, text = parts
|
|
html = None
|
|
else:
|
|
raise HTTPException(status_code=500, detail="invalid_email_template")
|
|
|
|
print(full, flush=True)
|
|
|
|
if not self.smtp_server:
|
|
print("Email: No SMTP Server, not sending", flush=True)
|
|
return
|
|
|
|
msg: Union[EmailMessage, MIMEMultipart]
|
|
|
|
if html:
|
|
msg = MIMEMultipart("alternative")
|
|
msg.attach(MIMEText(text.strip(), "plain"))
|
|
msg.attach(MIMEText(html.strip(), "html"))
|
|
else:
|
|
msg = EmailMessage()
|
|
msg.set_content(text.strip())
|
|
|
|
msg["Subject"] = subject.strip()
|
|
msg["From"] = self.reply_to
|
|
msg["To"] = receiver
|
|
msg["Reply-To"] = msg["From"]
|
|
|
|
context = ssl.create_default_context()
|
|
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
|
|
if self.smtp_use_tls:
|
|
server.ehlo()
|
|
server.starttls(context=context)
|
|
server.ehlo()
|
|
if self.password:
|
|
server.login(self.sender, self.password)
|
|
server.send_message(msg)
|
|
# server.sendmail(self.sender, receiver, message)
|
|
|
|
def get_origin(self, headers):
|
|
"""Return origin of the received request"""
|
|
if not headers:
|
|
return self.default_origin
|
|
|
|
scheme = headers.get("X-Forwarded-Proto")
|
|
host = headers.get("Host")
|
|
if not scheme or not host:
|
|
return self.default_origin
|
|
|
|
return scheme + "://" + host
|
|
|
|
def send_user_validation(
|
|
self, receiver_email: str, token: str, headers: Optional[dict] = None
|
|
):
|
|
"""Send email to validate registration email address"""
|
|
|
|
origin = self.get_origin(headers)
|
|
|
|
self._send_encrypted(receiver_email, "validate", origin=origin, token=token)
|
|
|
|
# pylint: disable=too-many-arguments
|
|
def send_user_invite(
|
|
self,
|
|
invite: InvitePending,
|
|
token: UUID,
|
|
org_name: str,
|
|
is_new: bool,
|
|
headers: Optional[dict] = None,
|
|
):
|
|
"""Send email to invite new user"""
|
|
|
|
origin = self.get_origin(headers)
|
|
|
|
receiver_email = invite.email or ""
|
|
|
|
invite_url = (
|
|
f"{origin}/join/{token}?email={receiver_email}"
|
|
if is_new
|
|
else f"{origin}/invite/accept/{token}?email={receiver_email}"
|
|
)
|
|
|
|
self._send_encrypted(
|
|
receiver_email,
|
|
"invite",
|
|
invite_url=invite_url,
|
|
is_new=is_new,
|
|
sender=invite.inviterEmail if not invite.fromSuperuser else "",
|
|
org_name=org_name,
|
|
support_email=self.support_email,
|
|
)
|
|
|
|
def send_user_forgot_password(self, receiver_email, token, headers=None):
|
|
"""Send password reset email with token"""
|
|
origin = self.get_origin(headers)
|
|
|
|
self._send_encrypted(
|
|
receiver_email,
|
|
"password_reset",
|
|
origin=origin,
|
|
token=token,
|
|
support_email=self.support_email,
|
|
)
|
|
|
|
def send_background_job_failed(
|
|
self,
|
|
job: Union[CreateReplicaJob, DeleteReplicaJob],
|
|
org: Organization,
|
|
finished: datetime,
|
|
receiver_email: str,
|
|
):
|
|
"""Send background job failed email to superuser"""
|
|
self._send_encrypted(
|
|
receiver_email, "failed_bg_job", job=job, org=org, finished=finished
|
|
)
|