前言

用户系统完成后,今天实现核心业务模块——知识库与文档管理

用户登录后可以创建知识库、上传文档、管理文件。这是整个产品的数据入口,后面所有的 RAG 问答都基于这里的数据。

1. 知识库 CRUD

1.1 Pydantic Schema

# backend/app/schemas/knowledge_base.py
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime


class KnowledgeBaseCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=200, description="知识库名称")
    description: Optional[str] = Field("", max_length=500, description="知识库描述")
    chunk_size: Optional[int] = Field(512, ge=128, le=2048, description="切分大小")
    chunk_overlap: Optional[int] = Field(128, ge=0, le=512, description="切分重叠")


class KnowledgeBaseUpdate(BaseModel):
    name: Optional[str] = Field(None, max_length=200)
    description: Optional[str] = Field(None, max_length=500)


class KnowledgeBaseResponse(BaseModel):
    id: str
    name: str
    description: str
    document_count: int
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True


class KnowledgeBaseListResponse(BaseModel):
    items: List[KnowledgeBaseResponse]
    total: int

1.2 知识库 Service

# backend/app/services/knowledge_base_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, delete
from uuid import UUID

from app.models.knowledge_base import KnowledgeBase
from app.models.document import Document


class KnowledgeBaseService:

    @staticmethod
    async def create(
        db: AsyncSession,
        user_id: str,
        name: str,
        description: str = "",
        chunk_size: int = 512,
        chunk_overlap: int = 128,
    ) -> KnowledgeBase:
        """创建知识库。"""
        kb = KnowledgeBase(
            user_id=UUID(user_id),
            name=name,
            description=description,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )
        db.add(kb)
        await db.commit()
        await db.refresh(kb)
        return kb

    @staticmethod
    async def list_by_user(db: AsyncSession, user_id: str) -> List[KnowledgeBase]:
        """获取用户的所有知识库。"""
        result = await db.execute(
            select(KnowledgeBase)
            .where(KnowledgeBase.user_id == UUID(user_id))
            .order_by(KnowledgeBase.updated_at.desc())
        )
        return result.scalars().all()

    @staticmethod
    async def get_by_id(db: AsyncSession, kb_id: str, user_id: str) -> KnowledgeBase:
        """获取知识库详情,同时验证所有权。"""
        result = await db.execute(
            select(KnowledgeBase).where(
                KnowledgeBase.id == UUID(kb_id),
                KnowledgeBase.user_id == UUID(user_id),
            )
        )
        kb = result.scalar_one_or_none()
        if not kb:
            raise ValueError("知识库不存在")
        return kb

    @staticmethod
    async def update(db: AsyncSession, kb_id: str, user_id: str, **kwargs) -> KnowledgeBase:
        """更新知识库。"""
        kb = await KnowledgeBaseService.get_by_id(db, kb_id, user_id)
        for key, value in kwargs.items():
            if value is not None and hasattr(kb, key):
                setattr(kb, key, value)
        await db.commit()
        await db.refresh(kb)
        return kb

    @staticmethod
    async def delete(db: AsyncSession, kb_id: str, user_id: str):
        """删除知识库和关联的文档。"""
        kb = await KnowledgeBaseService.get_by_id(db, kb_id, user_id)
        await db.delete(kb)
        await db.commit()

1.3 知识库路由

# backend/app/routers/knowledge_bases.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List

from app.database import get_db
from app.services.auth import require_auth
from app.models.user import User
from app.schemas.knowledge_base import (
    KnowledgeBaseCreate, KnowledgeBaseUpdate,
    KnowledgeBaseResponse, KnowledgeBaseListResponse,
)
from app.services.knowledge_base_service import KnowledgeBaseService

router = APIRouter()


@router.post("", response_model=KnowledgeBaseResponse, status_code=status.HTTP_201_CREATED)
async def create_knowledge_base(
    body: KnowledgeBaseCreate,
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """创建知识库。"""
    kb = await KnowledgeBaseService.create(
        db, user_id=str(user.id), **body.model_dump()
    )
    return _to_response(kb)


@router.get("", response_model=KnowledgeBaseListResponse)
async def list_knowledge_bases(
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """获取知识库列表。"""
    items = await KnowledgeBaseService.list_by_user(db, str(user.id))
    return KnowledgeBaseListResponse(
        items=[_to_response(kb) for kb in items],
        total=len(items),
    )


@router.get("/{kb_id}", response_model=KnowledgeBaseResponse)
async def get_knowledge_base(
    kb_id: str,
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """获取知识库详情。"""
    try:
        kb = await KnowledgeBaseService.get_by_id(db, kb_id, str(user.id))
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))
    return _to_response(kb)


@router.put("/{kb_id}", response_model=KnowledgeBaseResponse)
async def update_knowledge_base(
    kb_id: str,
    body: KnowledgeBaseUpdate,
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """更新知识库。"""
    try:
        kb = await KnowledgeBaseService.update(
            db, kb_id, str(user.id), **body.model_dump(exclude_none=True)
        )
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))
    return _to_response(kb)


@router.delete("/{kb_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_knowledge_base(
    kb_id: str,
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """删除知识库。"""
    try:
        await KnowledgeBaseService.delete(db, kb_id, str(user.id))
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))


def _to_response(kb: KnowledgeBase) -> KnowledgeBaseResponse:
    return KnowledgeBaseResponse(
        id=str(kb.id),
        name=kb.name,
        description=kb.description or "",
        document_count=kb.document_count or 0,
        created_at=kb.created_at,
        updated_at=kb.updated_at,
    )

2. 文档管理

2.1 MinIO 文件存储

# backend/app/services/storage.py
from minio import Minio
from app.config import settings
import uuid
from pathlib import Path


class FileStorage:
    """文件存储服务(MinIO S3 兼容)。"""

    def __init__(self):
        self.client = Minio(
            settings.MINIO_ENDPOINT,
            access_key=settings.MINIO_ACCESS_KEY,
            secret_key=settings.MINIO_SECRET_KEY,
            secure=False,  # 内网 HTTP
        )
        self.bucket = settings.MINIO_BUCKET
        self._ensure_bucket()

    def _ensure_bucket(self):
        """确保 bucket 存在。"""
        if not self.client.bucket_exists(self.bucket):
            self.client.make_bucket(self.bucket)

    async def upload(self, file_data: bytes, filename: str, content_type: str) -> dict:
        """上传文件,返回存储路径和元数据。"""
        ext = Path(filename).suffix.lower()
        object_name = f"{uuid.uuid4().hex}{ext}"

        self.client.put_object(
            bucket_name=self.bucket,
            object_name=object_name,
            data=file_data,
            length=len(file_data),
            content_type=content_type,
        )

        file_url = f"{settings.MINIO_ENDPOINT}/{self.bucket}/{object_name}"
        return {
            "object_name": object_name,
            "file_url": file_url,
            "file_size": len(file_data),
        }

    async def delete(self, object_name: str):
        """删除文件。"""
        self.client.remove_object(self.bucket, object_name)

    async def get_download_url(self, object_name: str, expires: int = 3600) -> str:
        """获取临时下载链接。"""
        return self.client.presigned_get_object(
            self.bucket, object_name, expires=expires
        )


file_storage = FileStorage()

2.2 文档 Schema

# backend/app/schemas/document.py
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime


class DocumentResponse(BaseModel):
    id: str
    filename: str
    file_size: int
    file_type: str
    status: str  # pending / processing / ready / failed
    chunk_count: int
    created_at: datetime

    class Config:
        from_attributes = True


class DocumentListResponse(BaseModel):
    items: List[DocumentResponse]
    total: int

2.3 文档 Service

# backend/app/services/document_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from uuid import UUID
from typing import List
from pathlib import Path

from app.models.document import Document
from app.services.storage import file_storage


ALLOWED_EXTENSIONS = {".pdf", ".txt", ".md", ".docx"}
MAX_FILE_SIZE = 50 * 1024 * 1024  # 50MB


class DocumentService:

    @staticmethod
    async def upload(
        db: AsyncSession,
        kb_id: str,
        user_id: str,
        file_data: bytes,
        filename: str,
    ) -> Document:
        """上传文档。"""
        # 验证文件类型
        ext = Path(filename).suffix.lower()
        if ext not in ALLOWED_EXTENSIONS:
            raise ValueError(f"不支持的文件格式:{ext},仅支持 {', '.join(ALLOWED_EXTENSIONS)}")

        # 验证文件大小
        if len(file_data) > MAX_FILE_SIZE:
            raise ValueError(f"文件大小不能超过 50MB")

        # 上传到 MinIO
        result = await file_storage.upload(file_data, filename, _content_type(ext))

        # 创建数据库记录
        doc = Document(
            knowledge_base_id=UUID(kb_id),
            filename=filename,
            file_path=result["object_name"],
            file_size=result["file_size"],
            file_type=ext.lstrip("."),
            status="pending",
        )
        db.add(doc)
        await db.commit()
        await db.refresh(doc)

        # 更新知识库文档计数
        await db.execute(
            update(KnowledgeBase)
            .where(KnowledgeBase.id == UUID(kb_id))
            .values(document_count=KnowledgeBase.document_count + 1)
        )
        await db.commit()

        return doc

    @staticmethod
    async def list_by_kb(db: AsyncSession, kb_id: str, user_id: str) -> List[Document]:
        """获取知识库的文档列表。"""
        # 先验证知识库所有权
        from app.services.knowledge_base_service import KnowledgeBaseService
        await KnowledgeBaseService.get_by_id(db, kb_id, user_id)

        result = await db.execute(
            select(Document)
            .where(Document.knowledge_base_id == UUID(kb_id))
            .order_by(Document.created_at.desc())
        )
        return result.scalars().all()

    @staticmethod
    async def delete(db: AsyncSession, doc_id: str, kb_id: str, user_id: str):
        """删除文档。"""
        # 验证所有权
        from app.services.knowledge_base_service import KnowledgeBaseService
        await KnowledgeBaseService.get_by_id(db, kb_id, user_id)

        result = await db.execute(
            select(Document).where(
                Document.id == UUID(doc_id),
                Document.knowledge_base_id == UUID(kb_id),
            )
        )
        doc = result.scalar_one_or_none()
        if not doc:
            raise ValueError("文档不存在")

        # 删除 MinIO 文件
        await file_storage.delete(doc.file_path)

        # 删除数据库记录
        await db.delete(doc)
        await db.commit()

        # 更新计数
        await db.execute(
            update(KnowledgeBase)
            .where(KnowledgeBase.id == UUID(kb_id))
            .values(document_count=KnowledgeBase.document_count - 1)
        )
        await db.commit()


def _content_type(ext: str) -> str:
    return {
        ".pdf": "application/pdf",
        ".txt": "text/plain",
        ".md": "text/markdown",
        ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    }.get(ext, "application/octet-stream")

2.4 文档路由

# backend/app/routers/documents.py
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, status
from sqlalchemy.ext.asyncio import AsyncSession

from app.database import get_db
from app.services.auth import require_auth
from app.models.user import User
from app.schemas.document import DocumentResponse, DocumentListResponse
from app.services.document_service import DocumentService

router = APIRouter()


@router.post("/{kb_id}/documents", response_model=DocumentResponse, status_code=201)
async def upload_document(
    kb_id: str,
    file: UploadFile = File(...),
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """上传文档到知识库。"""
    file_data = await file.read()
    try:
        doc = await DocumentService.upload(
            db, kb_id, str(user.id), file_data, file.filename
        )
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

    return _doc_to_response(doc)


@router.get("/{kb_id}/documents", response_model=DocumentListResponse)
async def list_documents(
    kb_id: str,
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """获取文档列表。"""
    try:
        docs = await DocumentService.list_by_kb(db, kb_id, str(user.id))
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    return DocumentListResponse(
        items=[_doc_to_response(d) for d in docs],
        total=len(docs),
    )


@router.delete("/{kb_id}/documents/{doc_id}", status_code=204)
async def delete_document(
    kb_id: str,
    doc_id: str,
    user: User = Depends(require_auth),
    db: AsyncSession = Depends(get_db),
):
    """删除文档。"""
    try:
        await DocumentService.delete(db, doc_id, kb_id, str(user.id))
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))


def _doc_to_response(doc) -> DocumentResponse:
    return DocumentResponse(
        id=str(doc.id),
        filename=doc.filename,
        file_size=doc.file_size,
        file_type=doc.file_type,
        status=doc.status,
        chunk_count=doc.chunk_count,
        created_at=doc.created_at,
    )

3. 注册路由

# backend/app/main.py(更新)
from app.routers import auth, knowledge_bases, documents

app.include_router(auth.router, prefix="/api/auth", tags=["Auth"])
app.include_router(knowledge_bases.router, prefix="/api/knowledge-bases", tags=["Knowledge Bases"])
app.include_router(documents.router, prefix="/api/knowledge-bases", tags=["Documents"])

4. 前端页面

4.1 API 层

// frontend/src/api/knowledgeBase.ts
import api from "@/lib/api";

export interface KnowledgeBase {
  id: string;
  name: string;
  description: string;
  document_count: number;
  created_at: string;
}

export interface Document {
  id: string;
  filename: string;
  file_size: number;
  file_type: string;
  status: string;
  chunk_count: number;
  created_at: string;
}

export async function listKnowledgeBases() {
  const { data } = await api.get("/knowledge-bases");
  return data as { items: KnowledgeBase[]; total: number };
}

export async function createKnowledgeBase(body: {
  name: string;
  description?: string;
}) {
  const { data } = await api.post("/knowledge-bases", body);
  return data as KnowledgeBase;
}

export async function deleteKnowledgeBase(id: string) {
  await api.delete(`/knowledge-bases/${id}`);
}

export async function listDocuments(kbId: string) {
  const { data } = await api.get(`/knowledge-bases/${kbId}/documents`);
  return data as { items: Document[]; total: number };
}

export async function uploadDocument(kbId: string, file: File) {
  const form = new FormData();
  form.append("file", file);
  const { data } = await api.post(`/knowledge-bases/${kbId}/documents`, form);
  return data as Document;
}

export async function deleteDocument(kbId: string, docId: string) {
  await api.delete(`/knowledge-bases/${kbId}/documents/${docId}`);
}

4.2 仪表盘页面

// frontend/src/pages/Dashboard.tsx
import { useState, useEffect } from "react";
import { useNavigate } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import {
  listKnowledgeBases, createKnowledgeBase,
  deleteKnowledgeBase, KnowledgeBase,
} from "@/api/knowledgeBase";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Dialog, DialogContent, DialogHeader, DialogTitle, DialogTrigger } from "@/components/ui/dialog";
import { Input } from "@/components/ui/input";
import { Textarea } from "@/components/ui/textarea";

export default function Dashboard() {
  const { user, logout } = useAuth();
  const navigate = useNavigate();
  const [kbs, setKbs] = useState([]);
  const [loading, setLoading] = useState(true);
  const [open, setOpen] = useState(false);
  const [name, setName] = useState("");
  const [desc, setDesc] = useState("");

  const load = async () => {
    setLoading(true);
    const res = await listKnowledgeBases();
    setKbs(res.items);
    setLoading(false);
  };

  useEffect(() => { load(); }, []);

  const handleCreate = async () => {
    if (!name.trim()) return;
    await createKnowledgeBase({ name, description: desc });
    setOpen(false);
    setName("");
    setDesc("");
    load();
  };

  const handleDelete = async (id: string) => {
    if (!confirm("确定删除此知识库?文档也会被删除。")) return;
    await deleteKnowledgeBase(id);
    load();
  };

  const formatSize = (bytes: number) => {
    if (bytes 


          我的知识库
          欢迎回来{user?.nickname}




              新建知识库



                新建知识库


                 setName(e.target.value)} />
                 setDesc(e.target.value)} />
                创建



          退出



      {loading ? (
        加载中...
      ) : kbs.length === 0 ? (

          📚
          还没有知识库
          创建一个知识库开始上传文档
           setOpen(true)}>新建知识库

      ) : (

          {kbs.map((kb) => (
             navigate(`/knowledge-bases/${kb.id}`)}>

                {kb.name}



                  {kb.description || "暂无描述"}


                  {kb.document_count} 个文档
                   {
                    e.stopPropagation();
                    handleDelete(kb.id);
                  }}>删除



          ))}

      )}

  );
}

4.3 知识库详情页(文件管理)

// frontend/src/pages/KnowledgeBaseDetail.tsx
import { useState, useEffect, useRef } from "react";
import { useParams, useNavigate } from "react-router-dom";
import {
  listDocuments, uploadDocument,
  deleteDocument, Document,
} from "@/api/knowledgeBase";
import { Button } from "@/components/ui/button";
import { Card, CardContent } from "@/components/ui/card";

const STATUS_MAP: Record = {
  pending: "等待处理",
  processing: "处理中",
  ready: "已完成",
  failed: "处理失败",
};

const STATUS_CLASS: Record = {
  pending: "bg-yellow-100 text-yellow-700",
  processing: "bg-blue-100 text-blue-700",
  ready: "bg-green-100 text-green-700",
  failed: "bg-red-100 text-red-700",
};

export default function KnowledgeBaseDetail() {
  const { id } = useParams();
  const navigate = useNavigate();
  const [docs, setDocs] = useState([]);
  const [loading, setLoading] = useState(true);
  const [uploading, setUploading] = useState(false);
  const fileRef = useRef(null);

  const load = async () => {
    if (!id) return;
    setLoading(true);
    const res = await listDocuments(id);
    setDocs(res.items);
    setLoading(false);
  };

  useEffect(() => { load(); }, [id]);

  const handleUpload = async (e: React.ChangeEvent) => {
    const files = e.target.files;
    if (!files || !files.length || !id) return;
    setUploading(true);
    for (const file of Array.from(files)) {
      try {
        await uploadDocument(id, file);
      } catch (err) {
        console.error("Upload failed:", file.name, err);
      }
    }
    setUploading(false);
    load();
    if (fileRef.current) fileRef.current.value = "";
  };

  const handleDelete = async (docId: string) => {
    if (!id) return;
    if (!confirm("确定删除此文档?")) return;
    await deleteDocument(id, docId);
    load();
  };

  const formatSize = (bytes: number) => {
    if (bytes 


           navigate("/dashboard")}
            className="text-sm text-gray-400 hover:text-gray-600 mb-1 block">
             返回知识库列表

          文档管理


           navigate(`/chat?kb=${id}`)}>
            💬 开始问答

           fileRef.current?.click()}>
            {uploading ? "上传中..." : "上传文档"}





      {loading ? (
        加载中...
      ) : docs.length === 0 ? (

          📄
          还没有文档
          上传 PDFTXTMD  DOCX 文件
           fileRef.current?.click()}>
            上传第一个文档


      ) : (

          {docs.map((doc) => (




                    {doc.file_type === "pdf" ? "📕" : doc.file_type === "md" ? "📝" : "📄"}


                    {doc.filename}

                      {formatSize(doc.file_size)} · {doc.chunk_count} 个片段





                    {STATUS_MAP[doc.status]}

                   handleDelete(doc.id)}>删除



          ))}

      )}

  );
}

5. 路由注册

// frontend/src/App.tsx(更新)
import KnowledgeBaseDetail from "./pages/KnowledgeBaseDetail";

// 在 ProtectedRoute 内添加
} />

6. 验证

# 1. 创建知识库
curl -X POST http://localhost:8000/api/knowledge-bases \
  -H "Authorization: Bearer " \
  -H "Content-Type: application/json" \
  -d '{"name":"我的技术文档","description":"存储技术相关文档"}'

# 响应
{"id":"uuid","name":"我的技术文档","document_count":0,...}

# 2. 上传文档
curl -X POST http://localhost:8000/api/knowledge-bases//documents \
  -H "Authorization: Bearer " \
  -F "file=@/path/to/document.pdf"

# 3. 查看文档列表
curl http://localhost:8000/api/knowledge-bases//documents \
  -H "Authorization: Bearer "

# 4. 删除文档
curl -X DELETE http://localhost:8000/api/knowledge-bases//documents/ \
  -H "Authorization: Bearer "

7. 数据流总结

用户操作              API                           后端服务             存储
──────────────────────────────────────────────────────────────────────
创建知识库    → POST /api/knowledge-bases    → PostgreSQL    → users + knowledge_bases
上传文档      → POST /kb/:id/documents       → MinIO + DB    → object storage + documents
查看文档列表  → GET  /kb/:id/documents       → PostgreSQL    → documents
删除文档      → DELETE /kb/:id/documents/:id → MinIO + DB    → 删除文件 + 记录

总结

今天完成了:

组件 说明
知识库 CRUD 创建/列表/详情/更新/删除
文档上传 文件类型验证 + 大小限制 + MinIO 存储
文档列表/删除 分页显示 + 级联删除
前端仪表盘 知识库卡片网格 + 创建对话框
前端文件管理 拖拽上传 + 状态展示 + 删除
MinIO 集成 对象存储 + 预签名 URL

现在用户可以创建知识库、上传文档、管理文件了。

下一篇我们将实现文档处理 Pipeline——PDF 解析、文本切分、Embedding 向量化、存入 Qdrant,让文档变成可检索的知识。


本文是 《AI 全栈开发实战——做一个真正的产品》 系列的第 4 篇。
系列目录:
1. ✅ 产品定义与架构设计
2. ✅ 技术选型与项目初始化
3. ✅ 用户系统
4. ✅ 知识库与文档管理 ← 你在这里
5. 📝 文档处理 Pipeline
...

本文由 Zyentor(智元界) 原创发布