mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-13 21:05:19 +00:00
补充邮箱文件 backend/open_webui/utils/email_utils.py
This commit is contained in:
parent
8a92d134b0
commit
948cdb9e98
1 changed files with 141 additions and 0 deletions
141
backend/open_webui/utils/email_utils.py
Normal file
141
backend/open_webui/utils/email_utils.py
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
import json
|
||||
import logging
|
||||
import smtplib
|
||||
import time
|
||||
import secrets
|
||||
from typing import Optional
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def send_email(
|
||||
*,
|
||||
subject: str,
|
||||
body: str,
|
||||
to_email: str,
|
||||
smtp_server: str,
|
||||
smtp_port: int,
|
||||
smtp_username: str = "",
|
||||
smtp_password: str = "",
|
||||
from_email: str,
|
||||
):
|
||||
"""发送纯文本邮件;默认使用 SMTPS (SSL) 直连端口,如 465"""
|
||||
msg = MIMEText(body)
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = from_email
|
||||
msg["To"] = to_email
|
||||
|
||||
with smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=10) as server:
|
||||
if smtp_username and smtp_password:
|
||||
server.login(smtp_username, smtp_password)
|
||||
server.sendmail(from_email, [to_email], msg.as_string())
|
||||
|
||||
|
||||
def generate_verification_code(length: int = 6) -> str:
|
||||
alphabet = "0123456789"
|
||||
return "".join(secrets.choice(alphabet) for _ in range(length))
|
||||
|
||||
|
||||
class EmailVerificationManager:
|
||||
"""Stores and validates email verification codes using Redis if available."""
|
||||
|
||||
def __init__(self, redis=None, prefix: str = "signup:code"):
|
||||
self.redis = redis
|
||||
self.prefix = prefix
|
||||
self.memory_store: dict[str, dict] = {}
|
||||
|
||||
def _now(self) -> int:
|
||||
return int(time.time())
|
||||
|
||||
def _key(self, email: str) -> str:
|
||||
return f"{self.prefix}:{email.lower()}"
|
||||
|
||||
def _delete(self, email: str):
|
||||
key = self._key(email)
|
||||
if self.redis:
|
||||
self.redis.delete(key)
|
||||
else:
|
||||
self.memory_store.pop(key, None)
|
||||
|
||||
def _load_record(self, email: str) -> Optional[dict]:
|
||||
key = self._key(email)
|
||||
record = None
|
||||
if self.redis:
|
||||
raw = self.redis.get(key)
|
||||
if raw:
|
||||
try:
|
||||
record = json.loads(raw)
|
||||
except Exception:
|
||||
log.debug("Failed to decode email verification record for %s", email)
|
||||
else:
|
||||
record = self.memory_store.get(key)
|
||||
|
||||
if not record:
|
||||
return None
|
||||
|
||||
expires_at = record.get("expires_at")
|
||||
if expires_at and expires_at <= self._now():
|
||||
self._delete(email)
|
||||
return None
|
||||
|
||||
return record
|
||||
|
||||
def _save_record(self, email: str, record: dict, ttl: int):
|
||||
key = self._key(email)
|
||||
ttl = max(ttl, 1)
|
||||
if self.redis:
|
||||
self.redis.set(key, json.dumps(record), ex=ttl)
|
||||
else:
|
||||
self.memory_store[key] = record
|
||||
|
||||
def can_send(self, email: str, send_interval: int) -> tuple[bool, int]:
|
||||
record = self._load_record(email)
|
||||
if not record:
|
||||
return True, 0
|
||||
|
||||
sent_at = record.get("sent_at")
|
||||
if sent_at:
|
||||
delta = self._now() - sent_at
|
||||
remaining = send_interval - delta
|
||||
if remaining > 0:
|
||||
return False, remaining
|
||||
|
||||
return True, 0
|
||||
|
||||
def store_code(
|
||||
self,
|
||||
email: str,
|
||||
code: str,
|
||||
ttl: int,
|
||||
max_attempts: int,
|
||||
ip: Optional[str] = None,
|
||||
):
|
||||
now = self._now()
|
||||
record = {
|
||||
"code": code,
|
||||
"expires_at": now + ttl,
|
||||
"attempts_left": max_attempts,
|
||||
"sent_at": now,
|
||||
**({"ip": ip} if ip else {}),
|
||||
}
|
||||
self._save_record(email, record, ttl)
|
||||
|
||||
def validate_code(self, email: str, code: str) -> bool:
|
||||
record = self._load_record(email)
|
||||
if not record:
|
||||
return False
|
||||
|
||||
ttl_left = max(record.get("expires_at", 0) - self._now(), 0)
|
||||
|
||||
if record.get("code") != code:
|
||||
attempts_left = record.get("attempts_left", 1) - 1
|
||||
if attempts_left <= 0:
|
||||
self._delete(email)
|
||||
else:
|
||||
record["attempts_left"] = attempts_left
|
||||
self._save_record(email, record, ttl_left)
|
||||
return False
|
||||
|
||||
self._delete(email)
|
||||
return True
|
||||
Loading…
Reference in a new issue