前言

前两篇我们搭建了项目骨架,所有服务都能跑起来了。今天实现第一个完整的功能模块——用户系统

用户系统是几乎所有 Web 应用的基础。我们将实现:
1. 用户注册(邮箱 + 密码)
2. 用户登录(JWT Token)
3. 密码加密与验证
4. 登录状态检查与 Token 刷新
5. 完整的 API + 前端页面

1. 密码加密

密码绝对不能明文存储。我们使用 bcrypt 进行哈希加密。

# backend/app/services/auth.py
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    """对密码进行 bcrypt 哈希。"""
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码与哈希是否匹配。"""
    return pwd_context.verify(plain_password, hashed_password)

为什么用 bcrypt?
- 自动加盐(每个密码的 salt 不同)
- 可调节计算成本(随着硬件升级可以增加迭代次数)
- 抗 GPU/ASIC 加速(内存密集型,专用硬件加速效果有限)

绝对不要用:MD5、SHA-1、SHA-256(这些是哈希函数,不是密码哈希函数,没有抗暴力破解设计)。

2. JWT Token

使用 JWT(JSON Web Token)做身份认证。

# backend/app/services/auth.py(续)
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from typing import Optional

from app.config import settings


def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
    """创建 JWT Token。"""
    payload = {
        "sub": user_id,  # subject——用户 ID
        "iat": datetime.now(timezone.utc),  # 签发时间
        "exp": datetime.now(timezone.utc) + (expires_delta or timedelta(hours=settings.JWT_EXPIRATION_HOURS)),  # 过期时间
        "type": "access",
    }
    return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)


def decode_token(token: str) -> Optional[dict]:
    """解码并验证 JWT Token。"""
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
        return payload
    except JWTError:
        return None


def get_current_user_id(token: str) -> Optional[str]:
    """从 Token 中提取用户 ID。"""
    payload = decode_token(token)
    if payload is None:
        return None
    return payload.get("sub")

3. 注册接口

3.1 Pydantic Schema

# backend/app/schemas/auth.py
from pydantic import BaseModel, EmailStr, Field


class RegisterRequest(BaseModel):
    email: str = Field(..., description="邮箱地址")
    password: str = Field(..., min_length=6, max_length=100, description="密码")
    nickname: str = Field(..., min_length=1, max_length=50, description="昵称")


class LoginRequest(BaseModel):
    email: str = Field(..., description="邮箱地址")
    password: str = Field(..., description="密码")


class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    user: "UserInfo"


class UserInfo(BaseModel):
    id: str
    email: str
    nickname: str
    avatar_url: str

    class Config:
        from_attributes = True


class ErrorResponse(BaseModel):
    detail: str

3.2 注册逻辑

# backend/app/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.user import User
from app.services.auth import hash_password


class UserService:

    @staticmethod
    async def register(db: AsyncSession, email: str, password: str, nickname: str) -> User:
        """注册新用户。"""
        # 检查邮箱是否已存在
        existing = await db.execute(select(User).where(User.email == email))
        if existing.scalar_one_or_none():
            raise ValueError("该邮箱已被注册")

        # 创建用户
        user = User(
            email=email,
            password_hash=hash_password(password),
            nickname=nickname,
        )
        db.add(user)
        await db.commit()
        await db.refresh(user)
        return user

    @staticmethod
    async def authenticate(db: AsyncSession, email: str, password: str) -> User:
        """验证用户登录。"""
        from app.services.auth import verify_password

        result = await db.execute(select(User).where(User.email == email))
        user = result.scalar_one_or_none()

        if not user:
            raise ValueError("邮箱或密码错误")

        if not verify_password(password, user.password_hash):
            raise ValueError("邮箱或密码错误")

        if not user.is_active:
            raise ValueError("账户已被禁用")

        return user

    @staticmethod
    async def get_by_id(db: AsyncSession, user_id: str) -> User:
        """通过 ID 获取用户。"""
        from uuid import UUID
        result = await db.execute(select(User).where(User.id == UUID(user_id)))
        user = result.scalar_one_or_none()
        if not user:
            raise ValueError("用户不存在")
        return user

3.3 注册路由

# backend/app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.auth import (
    RegisterRequest, LoginRequest,
    TokenResponse, UserInfo, ErrorResponse,
)
from app.services.user_service import UserService
from app.services.auth import create_access_token

router = APIRouter()


@router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
    """用户注册。"""
    try:
        user = await UserService.register(
            db, email=body.email, password=body.password, nickname=body.nickname
        )
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))

    token = create_access_token(str(user.id))
    return TokenResponse(
        access_token=token,
        user=UserInfo(
            id=str(user.id),
            email=user.email,
            nickname=user.nickname,
            avatar_url=user.avatar_url or "",
        ),
    )


@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
    """用户登录。"""
    try:
        user = await UserService.authenticate(db, email=body.email, password=body.password)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))

    token = create_access_token(str(user.id))
    return TokenResponse(
        access_token=token,
        user=UserInfo(
            id=str(user.id),
            email=user.email,
            nickname=user.nickname,
            avatar_url=user.avatar_url or "",
        ),
    )

⚠️ 安全细节
- 登录失败时返回 "邮箱或密码错误",不告诉用户到底是邮箱不存在还是密码错误——防止攻击者枚举邮箱
- 密码传输通过 HTTPS 加密,不在前端做任何哈希(前端哈希等于明文,因为攻击者可以直接调用 API)

4. 认证中间件

需要登录的接口通过依赖注入验证 Token:

# backend/app/services/auth.py(续)
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()


async def require_auth(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    """需要登录的接口依赖。"""
    token = credentials.credentials
    user_id = get_current_user_id(token)

    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token 无效或已过期",
        )

    try:
        user = await UserService.get_by_id(db, user_id)
    except ValueError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户不存在",
        )

    return user


@router.get("/me", response_model=UserInfo)
async def get_me(user: User = Depends(require_auth)):
    """获取当前用户信息。"""
    return UserInfo(
        id=str(user.id),
        email=user.email,
        nickname=user.nickname,
        avatar_url=user.avatar_url or "",
    )

5. 注册路由到应用

# backend/app/main.py(更新)
from app.routers import auth

app.include_router(auth.router, prefix="/api/auth", tags=["Auth"])

# 现在 protected 路由也可以使用了
from app.services.auth import require_auth

6. 前端登录/注册页面

6.1 API 层

// frontend/src/api/auth.ts
import api from "@/lib/api";

export interface UserInfo {
  id: string;
  email: string;
  nickname: string;
  avatar_url: string;
}

export interface TokenResponse {
  access_token: string;
  token_type: string;
  user: UserInfo;
}

export async function register(
  email: string,
  password: string,
  nickname: string
): Promise {
  const { data } = await api.post("/auth/register", {
    email,
    password,
    nickname,
  });
  return data;
}

export async function login(
  email: string,
  password: string
): Promise {
  const { data } = await api.post("/auth/login", { email, password });
  return data;
}

export async function getMe(): Promise {
  const { data } = await api.get("/auth/me");
  return data;
}

6.2 Auth Context

// frontend/src/hooks/useAuth.tsx
import { createContext, useContext, useState, useEffect, ReactNode } from "react";
import { UserInfo, login as apiLogin, register as apiRegister, getMe } from "@/api/auth";

interface AuthContextType {
  user: UserInfo | null;
  token: string | null;
  isLoading: boolean;
  login: (email: string, password: string) => Promise;
  register: (email: string, password: string, nickname: string) => Promise;
  logout: () => void;
}

const AuthContext = createContext(null);

export function AuthProvider({ children }: { children: ReactNode }) {
  const [user, setUser] = useState(null);
  const [token, setToken] = useState(() =>
    localStorage.getItem("token")
  );
  const [isLoading, setIsLoading] = useState(true);

  useEffect(() => {
    if (token) {
      getMe()
        .then(setUser)
        .catch(() => {
          localStorage.removeItem("token");
          setToken(null);
        })
        .finally(() => setIsLoading(false));
    } else {
      setIsLoading(false);
    }
  }, [token]);

  const login = async (email: string, password: string) => {
    const res = await apiLogin(email, password);
    localStorage.setItem("token", res.access_token);
    setToken(res.access_token);
    setUser(res.user);
  };

  const register = async (email: string, password: string, nickname: string) => {
    const res = await apiRegister(email, password, nickname);
    localStorage.setItem("token", res.access_token);
    setToken(res.access_token);
    setUser(res.user);
  };

  const logout = () => {
    localStorage.removeItem("token");
    setToken(null);
    setUser(null);
  };

  return (

      {children}

  );
}

export function useAuth() {
  const ctx = useContext(AuthContext);
  if (!ctx) throw new Error("useAuth must be used within AuthProvider");
  return ctx;
}

6.3 登录页面

// frontend/src/pages/Login.tsx
import { useState } from "react";
import { useNavigate, Link } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";

export default function Login() {
  const [email, setEmail] = useState("");
  const [password, setPassword] = useState("");
  const [error, setError] = useState("");
  const [loading, setLoading] = useState(false);
  const { login } = useAuth();
  const navigate = useNavigate();

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setError("");
    setLoading(true);

    try {
      await login(email, password);
      navigate("/dashboard");
    } catch (err: any) {
      setError(err.response?.data?.detail || "登录失败");
    } finally {
      setLoading(false);
    }
  };

  return (



          登录 KNow
          登录你的知识库账户



            {error && (

                {error}

            )}

              邮箱
               setEmail(e.target.value)}
                required
              />


              密码
               setPassword(e.target.value)}
                required
              />




              {loading ? "登录中..." : "登录"}


              还没有账户{" "}

                注册






  );
}

6.4 注册页面

// frontend/src/pages/Register.tsx
import { useState } from "react";
import { useNavigate, Link } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";

export default function Register() {
  const [nickname, setNickname] = useState("");
  const [email, setEmail] = useState("");
  const [password, setPassword] = useState("");
  const [confirmPassword, setConfirmPassword] = useState("");
  const [error, setError] = useState("");
  const [loading, setLoading] = useState(false);
  const { register } = useAuth();
  const navigate = useNavigate();

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setError("");

    if (password !== confirmPassword) {
      setError("两次密码不一致");
      return;
    }
    if (password.length 


          注册 KNow
          创建你的知识库账户



            {error && (

                {error}

            )}

              昵称
               setNickname(e.target.value)}
                required
              />


              邮箱
               setEmail(e.target.value)}
                required
              />


              密码
               setPassword(e.target.value)}
                required
              />


              确认密码
               setConfirmPassword(e.target.value)}
                required
              />




              {loading ? "注册中..." : "注册"}


              已有账户{" "}

                登录






  );
}

6.5 受保护的路由

// frontend/src/components/ProtectedRoute.tsx
import { Navigate } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";

export function ProtectedRoute({ children }: { children: React.ReactNode }) {
  const { user, isLoading } = useAuth();

  if (isLoading) {
    return (



    );
  }

  if (!user) {
    return ;
  }

  return {children};
}

6.6 路由更新

// frontend/src/App.tsx
// 添加 ProtectedRoute
import { ProtectedRoute } from "@/components/ProtectedRoute";

// 在 Routes 中包裹需要登录的页面
}>
  } />
  } />
  } />

7. 验证

7.1 测试注册

# 注册
curl -X POST http://localhost:8000/api/auth/register \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"123456","nickname":"测试用户"}'

# 响应
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...","token_type":"bearer","user":{"id":"uuid","email":"test@example.com","nickname":"测试用户","avatar_url":""}}

7.2 测试登录

# 登录
curl -X POST http://localhost:8000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"123456"}'

# 使用 Token 访问受保护接口
curl http://localhost:8000/api/auth/me \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

7.3 测试错误密码

# 错误密码
curl -X POST http://localhost:8000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"wrong"}'

# 响应 401
{"detail":"邮箱或密码错误"}

8. 常见安全问题

问题 解决方案
密码明文传输 HTTPS + 后端接收明文密码(不要前端哈希)
Token 被盗 设置合理的过期时间(72h),HTTPS 传输
暴力破解 登录失败计数 + 限流(后文实现)
SQL 注入 使用 ORM(SQLAlchemy)参数化查询
XSS 前端不要用 dangerouslySetInnerHTML,用安全的 Markdown 渲染

关于密码策略:目前只要求 6 位密码。实际产品建议:
- 最少 8 位
- 至少包含字母和数字
- 提供密码强度指示器(前端实时检测)
- 但不强制特殊字符(这只会让用户写密码在便利贴上)

总结

今天完成了完整的用户系统:

组件 说明
密码加密 bcrypt 哈希存储
JWT Token 签发/验证/过期
注册接口 邮箱唯一性检查 + 密码规范
登录接口 密码验证 + Token 返回
认证中间件 保护需要登录的接口
前端登录页 完整的登录表单 + 错误处理
前端注册页 完整的注册表单 + 密码确认
Auth Context 全局认证状态管理
路由保护 未登录自动跳转登录页

现在用户可以用邮箱注册、登录,前端会保存 Token 并在后续请求中自动带上。

下一篇我们将实现知识库与文档管理——用户可以创建知识库、上传文件。


本文是 《AI 全栈开发实战——做一个真正的产品》 系列的第 3 篇。
系列目录:
1. ✅ 产品定义与架构设计
2. ✅ 技术选型与项目初始化
3. ✅ 用户系统(注册/登录/JWT)← 你在这里
4. 📝 知识库与文档管理
5. 📝 文档处理 Pipeline
...

本文由 Zyentor(智元界) 原创发布