note/api/auth.py
2026-04-23 20:42:16 +08:00

185 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
JWT 认证模块
用于验证 Authentik 颁发的 JWT Token
"""
import httpx
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt
from jose.exceptions import JWTError
from typing import Optional, Dict, Any
from functools import lru_cache
import os
# 从环境变量获取配置 - note 应用
AUTHENTIK_ISSUER = os.getenv(
"AUTHENTIK_ISSUER", "https://auth.yuany3721.site/application/o/note/"
)
AUTHENTIK_CLIENT_ID = os.getenv(
"AUTHENTIK_CLIENT_ID", "xzqQVQHINluS4pW0qhZAEg1YAYsS2j8HMJnVcUT5"
)
print(f"[Auth] AUTHENTIK_ISSUER: {AUTHENTIK_ISSUER}")
print(f"[Auth] AUTHENTIK_CLIENT_ID: {AUTHENTIK_CLIENT_ID}")
# JWKS 缓存
_jwks_cache = None
_jwks_cache_time = None
# Bearer token 验证器
security = HTTPBearer(auto_error=False)
async def fetch_jwks() -> Dict[str, Any]:
"""获取 Authentik 的 JWKS公钥集"""
global _jwks_cache, _jwks_cache_time
# JWKS URL
jwks_url = f"{AUTHENTIK_ISSUER.rstrip('/')}/jwks/"
print(f"[Auth] Fetching JWKS from: {jwks_url}")
try:
async with httpx.AsyncClient() as client:
response = await client.get(jwks_url, timeout=10.0)
response.raise_for_status()
jwks = response.json()
print(
f"[Auth] JWKS fetched successfully, keys: {list(jwks.get('keys', []))}"
)
return jwks
except Exception as e:
print(f"[Auth] 获取 JWKS 失败: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="无法获取认证公钥"
)
def get_public_key(jwks: Dict[str, Any], kid: str) -> Optional[str]:
"""从 JWKS 中获取指定 kid 的公钥"""
for key in jwks.get("keys", []):
if key.get("kid") == kid:
return key
return None
async def verify_token(token: str) -> Dict[str, Any]:
"""
验证 JWT Token
"""
try:
print(f"[Auth] Verifying token (first 50 chars): {token[:50]}...")
# 1. 获取 JWKS
jwks = await fetch_jwks()
# 2. 获取 token header 中的 kid
header = jwt.get_unverified_header(token)
kid = header.get("kid")
print(f"[Auth] Token header: {header}")
if not kid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 格式无效",
headers={"WWW-Authenticate": "Bearer"},
)
# 3. 获取对应的公钥
public_key = get_public_key(jwks, kid)
if not public_key:
print(f"[Auth] Public key not found for kid: {kid}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 签名密钥无效",
headers={"WWW-Authenticate": "Bearer"},
)
print(f"[Auth] Found public key for kid: {kid}")
# 4. 验证 token
print(
f"[Auth] Decoding token with issuer: {AUTHENTIK_ISSUER}, audience: {AUTHENTIK_CLIENT_ID}"
)
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
issuer=AUTHENTIK_ISSUER,
audience=AUTHENTIK_CLIENT_ID,
)
print(f"[Auth] Token verified successfully, payload: {payload}")
return payload
except JWTError as e:
print(f"[Auth] JWT verification failed: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token 验证失败: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
except Exception as e:
print(f"[Auth] 验证 token 时出错: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="服务器内部错误"
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> Dict[str, Any]:
"""
FastAPI 依赖项:获取当前用户
"""
print(f"[Auth] get_current_user called, credentials: {credentials}")
if not credentials:
print("[Auth] No credentials provided")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="缺少认证信息",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
print(f"[Auth] Token received (first 30 chars): {token[:30]}...")
return await verify_token(token)
async def get_current_user_optional(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> Optional[Dict[str, Any]]:
"""
FastAPI 依赖项:获取当前用户(可选)
"""
if not credentials:
return None
try:
token = credentials.credentials
return await verify_token(token)
except HTTPException:
return None
class AuthRequired:
"""
用于在路由中要求认证的装饰器(类视图方式)
"""
async def __call__(
self, credentials: HTTPAuthorizationCredentials = Depends(security)
):
await get_current_user(credentials)
return True
# 便捷函数:检查 token 是否有效(不抛出异常)
async def is_token_valid(token: str) -> bool:
"""检查 token 是否有效,返回布尔值"""
try:
await verify_token(token)
return True
except HTTPException:
return False