文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • OpenAI Codex 教程

    • OpenAI Codex 从入门到精通完整教程
    • Codex 快速入门指南
    • Codex 核心特点详解
    • Codex 版本演进历史
    • Codex 架构设计详解
    • Codex 高级功能与集成
    • Codex 最佳实践
    • Codex 性能优化技巧
    • Codex 实战案例集
    • Codex 常见问题与故障排除

08 - Codex 实战案例集

从实战中学习:8个完整的项目案例

📋 本章概览

本章包含 8 个完整的实战案例,涵盖:

  • REST API 开发
  • 前端组件开发
  • 数据处理脚本
  • 测试用例生成
  • 代码重构
  • 文档生成
  • Bug 修复
  • 性能优化

案例 1:构建 REST API

项目需求

创建一个博客系统的 REST API,包含:

  • 文章管理(CRUD)
  • 用户认证
  • 评论功能
  • 搜索功能

技术栈

  • FastAPI
  • SQLAlchemy
  • PostgreSQL
  • JWT 认证

实现步骤

Step 1:项目结构设计

提示词:

设计一个 FastAPI 博客系统的项目结构,包含:
1. 文章管理
2. 用户认证
3. 评论功能
4. 搜索功能

要求:
- 清晰的目录结构
- 模块化设计
- 遵循最佳实践

AI 生成的结构:

blog-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # 应用入口
│   ├── config.py            # 配置
│   ├── database.py          # 数据库连接
│   ├── models/              # 数据模型
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── post.py
│   │   └── comment.py
│   ├── schemas/             # Pydantic schemas
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── post.py
│   │   └── comment.py
│   ├── api/                 # API 路由
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── posts.py
│   │   └── comments.py
│   ├── services/            # 业务逻辑
│   │   ├── __init__.py
│   │   ├── auth_service.py
│   │   └── post_service.py
│   └── utils/               # 工具函数
│       ├── __init__.py
│       ├── security.py
│       └── pagination.py
├── tests/                   # 测试
├── requirements.txt
└── README.md

Step 2:数据模型

提示词:

实现 blog-api 的数据模型:

1. User 模型:
   - id, username, email, password_hash
   - created_at, updated_at
   - 关系:has many posts, has many comments

2. Post 模型:
   - id, title, content, author_id
   - created_at, updated_at, published_at
   - 关系:belongs to user, has many comments

3. Comment 模型:
   - id, content, post_id, author_id
   - created_at

使用 SQLAlchemy,包含类型注解和文档字符串。

AI 生成代码:完整的模型定义(略)

Step 3:API 端点

提示词:

实现文章管理的 API 端点:

POST   /api/posts          - 创建文章
GET    /api/posts          - 获取文章列表(分页)
GET    /api/posts/{id}     - 获取单篇文章
PUT    /api/posts/{id}     - 更新文章
DELETE /api/posts/{id}     - 删除文章
GET    /api/posts/search   - 搜索文章

要求:
- 使用 FastAPI
- JWT 认证(创建、更新、删除需要认证)
- 请求验证(Pydantic)
- 错误处理
- 包含 OpenAPI 文档

AI 生成代码:完整的 API 实现(略)

完整项目链接

查看完整代码:GitHub - blog-api-example


案例 2:React 组件开发

项目需求

创建一个数据表格组件,支持:

  • 排序
  • 筛选
  • 分页
  • 行选择
  • 导出数据

实现示例

提示词:

创建一个 React 数据表格组件,要求:

【功能】
- 显示表格数据
- 列排序(升序/降序)
- 列筛选
- 分页(可配置每页数量)
- 行选择(单选/多选)
- 导出为 CSV

【技术要求】
- TypeScript
- React Hooks
- 响应式设计

【组件 API】
<DataTable
  data={data}
  columns={columns}
  pageSize={10}
  onRowSelect={(rows) => {}}
  onExport={() => {}}
/>

AI 生成的组件:见完整代码(略)


案例 3:数据处理脚本

项目需求

从多个 CSV 文件中提取、转换和加载数据到数据库。

实现

提示词:

创建一个 ETL 脚本:

【输入】
- 多个 CSV 文件(用户、订单、产品)
- CSV 格式不统一,需要清洗

【处理】
1. 读取所有 CSV 文件
2. 数据清洗:
   - 移除重复行
   - 处理缺失值
   - 标准化日期格式
   - 验证数据类型
3. 数据转换:
   - 合并相关数据
   - 计算衍生字段
4. 加载到 PostgreSQL

【要求】
- 使用 pandas
- 批量插入(提升性能)
- 错误处理和日志
- 进度显示
- 可恢复(中断后可继续)

AI 生成代码:完整的 ETL 脚本(略)


案例 4:测试用例生成

项目需求

为现有代码库自动生成单元测试。

实现

提示词:

为以下函数生成完整的单元测试:

```python
def process_payment(
    amount: float,
    currency: str,
    payment_method: str,
    user_id: int
) -> dict:
    """
    处理支付
    
    Args:
        amount: 支付金额
        currency: 货币代码(USD, EUR等)
        payment_method: 支付方式(credit_card, paypal等)
        user_id: 用户ID
    
    Returns:
        支付结果字典
    
    Raises:
        ValueError: 无效的参数
        PaymentError: 支付失败
    """
    # 实现...

【要求】

  • 使用 pytest
  • 测试正常情况
  • 测试边界条件
  • 测试异常情况
  • 使用 fixtures
  • 使用 parametrize
  • Mock 外部依赖
  • 测试覆盖率 > 90%

**AI 生成的测试**:完整测试代码(略)

---

## 案例 5:代码重构

### 项目需求

重构遗留代码,提升可读性和可维护性。

### Before(遗留代码)

```python
def p(d):
    r = []
    for i in d:
        if i['a'] > 18:
            n = i['n'].upper()
            e = i['e'].lower()
            r.append({'n': n, 'e': e, 'a': i['a']})
    return r

重构提示词

重构以下代码,要求:

1. 改进命名
   - 使用清晰的变量名
   - 使用描述性的函数名

2. 添加类型注解
   - 参数类型
   - 返回值类型

3. 添加文档
   - Docstring
   - 行内注释(必要时)

4. 优化逻辑
   - 使用列表推导式或生成器(如适用)
   - 提取魔法数字为常量

5. 错误处理
   - 验证输入
   - 处理异常情况

原始代码:
[粘贴代码]

After(重构后)

from typing import List, Dict, TypedDict

class UserInput(TypedDict):
    """用户输入数据结构"""
    n: str  # name
    e: str  # email
    a: int  # age

class ProcessedUser(TypedDict):
    """处理后的用户数据结构"""
    n: str
    e: str
    a: int

ADULT_AGE_THRESHOLD = 18

def process_adult_users(users: List[UserInput]) -> List[ProcessedUser]:
    """
    处理成年用户数据
    
    过滤出年龄大于18岁的用户,并标准化其姓名和邮箱格式:
    - 姓名:转换为大写
    - 邮箱:转换为小写
    
    Args:
        users: 用户数据列表,每个用户包含姓名、邮箱和年龄
    
    Returns:
        处理后的成年用户列表
    
    Raises:
        ValueError: 如果用户数据格式无效
    
    Example:
        >>> users = [
        ...     {'n': 'Alice', 'e': 'ALICE@EXAMPLE.COM', 'a': 25},
        ...     {'n': 'Bob', 'e': 'bob@example.com', 'a': 17}
        ... ]
        >>> process_adult_users(users)
        [{'n': 'ALICE', 'e': 'alice@example.com', 'a': 25}]
    """
    if not isinstance(users, list):
        raise ValueError("users must be a list")
    
    processed_users = []
    
    for user in users:
        # 验证必需字段
        if not all(key in user for key in ['n', 'e', 'a']):
            raise ValueError(f"Invalid user data: {user}")
        
        # 过滤成年用户
        if user['a'] > ADULT_AGE_THRESHOLD:
            processed_users.append({
                'n': user['n'].upper(),
                'e': user['e'].lower(),
                'a': user['a']
            })
    
    return processed_users

# 或者使用列表推导式的简洁版本
def process_adult_users_compact(users: List[UserInput]) -> List[ProcessedUser]:
    """处理成年用户数据(简洁版本)"""
    return [
        {
            'n': user['n'].upper(),
            'e': user['e'].lower(),
            'a': user['a']
        }
        for user in users
        if user['a'] > ADULT_AGE_THRESHOLD
    ]

案例 6:文档生成

项目需求

为 Python 模块自动生成 README 和 API 文档。

实现

提示词:

分析以下 Python 模块,生成完整的 README.md:

[粘贴模块代码]

README 应包含:
1. 项目标题和简介
2. 功能特性
3. 安装说明
4. 快速开始
5. API 文档
   - 每个函数/类的说明
   - 参数说明
   - 返回值
   - 使用示例
6. 配置选项
7. 常见问题
8. 贡献指南
9. 许可证

使用清晰的格式和代码示例。

AI 生成的 README:完整文档(略)


案例 7:Bug 修复

问题描述

API 在高并发下出现数据不一致问题。

排查过程

Step 1:问题描述

问题:
在高并发场景下,库存扣减出现负数。

代码:
```python
def decrease_stock(product_id: int, quantity: int):
    product = db.query(Product).filter_by(id=product_id).first()
    if product.stock >= quantity:
        product.stock -= quantity
        db.commit()
    else:
        raise InsufficientStockError()

环境:

  • Python 3.10
  • SQLAlchemy 2.0
  • PostgreSQL 14
  • 并发:100 requests/second

现象:

  • 单线程测试正常
  • 并发测试时,库存可能变成负数

请分析问题并提供解决方案。


**AI 分析**:

问题根因:竞态条件(Race Condition)

当多个请求同时执行时:

  1. 请求A读取库存:10
  2. 请求B读取库存:10
  3. 请求A扣减:10 - 5 = 5,提交
  4. 请求B扣减:10 - 8 = 2,提交
  5. 最终库存:2(正确应该是 -3 或拒绝请求B)

解决方案1:数据库层面加锁 [代码]

解决方案2:使用乐观锁 [代码]

解决方案3:使用 Redis 分布式锁 [代码]

推荐使用方案1(最简单可靠)


**修复后的代码**:
```python
from sqlalchemy import select
from sqlalchemy.orm import Session

def decrease_stock(
    db: Session,
    product_id: int,
    quantity: int
) -> None:
    """
    线程安全的库存扣减
    
    使用 SELECT FOR UPDATE 锁定行,防止并发问题
    """
    # 使用行级锁
    product = db.execute(
        select(Product)
        .filter_by(id=product_id)
        .with_for_update()  # 关键:加锁
    ).scalar_one()
    
    # 检查库存
    if product.stock < quantity:
        raise InsufficientStockError(
            f"Insufficient stock. Available: {product.stock}, "
            f"Required: {quantity}"
        )
    
    # 扣减库存
    product.stock -= quantity
    
    # 提交事务(自动释放锁)
    db.commit()

案例 8:性能优化

问题

处理10万条用户数据耗时 > 5分钟。

原始代码

def process_users(users: List[dict]) -> List[dict]:
    """处理用户数据"""
    results = []
    
    for user in users:
        # 查询数据库获取额外信息
        profile = db.query(UserProfile).filter_by(
            user_id=user['id']
        ).first()
        
        # 调用外部 API
        score = requests.get(
            f"https://api.example.com/score/{user['id']}"
        ).json()['score']
        
        # 复杂计算
        result = calculate_user_metrics(user, profile, score)
        results.append(result)
    
    return results

优化提示词

优化以下代码的性能:

[粘贴代码]

问题:
- 处理 100,000 条数据耗时 > 5分钟
- 目标:< 30秒

分析性能瓶颈并提供优化方案:
1. 识别瓶颈
2. 提出具体优化方案
3. 实现优化后的代码
4. 估算性能提升

AI 分析和优化

瓶颈分析:

1. 数据库查询:N+1 问题
   - 每个用户一次查询
   - 100,000 次查询

2. 外部 API 调用:串行执行
   - 每次调用 ~50ms
   - 总耗时:100,000 × 50ms = 5000秒

3. 计算:可能不是瓶颈,但可以优化

优化后的代码:

import asyncio
import aiohttp
from sqlalchemy import select
from typing import List, Dict
from concurrent.futures import ProcessPoolExecutor

async def process_users_optimized(
    users: List[dict],
    batch_size: int = 1000
) -> List[dict]:
    """优化后的用户数据处理"""
    
    # 优化1:批量查询数据库(解决 N+1 问题)
    user_ids = [u['id'] for u in users]
    profiles = await fetch_profiles_batch(user_ids)
    profiles_map = {p.user_id: p for p in profiles}
    
    # 优化2:并发调用外部 API
    async with aiohttp.ClientSession() as session:
        scores = await fetch_scores_batch(session, user_ids, batch_size)
    
    # 优化3:并行计算(使用多进程)
    with ProcessPoolExecutor() as executor:
        tasks = [
            executor.submit(
                calculate_user_metrics,
                user,
                profiles_map.get(user['id']),
                scores.get(user['id'])
            )
            for user in users
        ]
        
        results = [task.result() for task in tasks]
    
    return results

async def fetch_profiles_batch(user_ids: List[int]) -> List[UserProfile]:
    """批量获取用户资料"""
    return await db.execute(
        select(UserProfile)
        .filter(UserProfile.user_id.in_(user_ids))
    ).scalars().all()

async def fetch_scores_batch(
    session: aiohttp.ClientSession,
    user_ids: List[int],
    batch_size: int
) -> Dict[int, float]:
    """批量获取评分(并发)"""
    scores = {}
    
    # 分批并发请求
    for i in range(0, len(user_ids), batch_size):
        batch = user_ids[i:i+batch_size]
        
        # 并发请求当前批次
        tasks = [
            fetch_score_single(session, uid)
            for uid in batch
        ]
        
        batch_scores = await asyncio.gather(*tasks)
        
        # 合并结果
        scores.update({
            uid: score
            for uid, score in zip(batch, batch_scores)
        })
    
    return scores

async def fetch_score_single(
    session: aiohttp.ClientSession,
    user_id: int
) -> float:
    """获取单个用户评分"""
    async with session.get(
        f"https://api.example.com/score/{user_id}"
    ) as response:
        data = await response.json()
        return data['score']

# 性能对比
"""
优化前:
- 数据库查询:100,000 × 10ms = 1000秒
- API 调用:100,000 × 50ms = 5000秒
- 计算:100,000 × 1ms = 100秒
- 总计:6100秒 (~102分钟)

优化后:
- 数据库查询:1次批量查询 = 1秒
- API 调用:100,000 / 100(并发) × 50ms = 50秒
- 计算:100,000 / 8(核心) × 1ms = 12.5秒
- 总计:63.5秒

提升:96倍!
"""

💡 案例学习总结

关键takeaways

  1. 明确需求

    • 详细的功能说明
    • 技术约束
    • 性能要求
  2. 提供上下文

    • 现有代码
    • 项目结构
    • 技术栈
  3. 迭代优化

    • 先生成框架
    • 再完善细节
    • 持续改进
  4. 人工审查

    • 验证逻辑正确性
    • 检查安全问题
    • 测试边界情况

更多案例

查看项目仓库获取完整代码:

  • GitHub - Codex Examples
  • CodeSandbox - Live Demos

🎯 下一步

完成实战练习后:

  • 📖 09 - 常见问题与故障排除 - 解决常见问题
  • 返回 README - 回顾整个教程
  • 开始你自己的项目!

👉 下一章:09 - 常见问题与故障排除

最近更新: 2025/12/22 14:25
Contributors: wsyx
Prev
Codex 性能优化技巧
Next
Codex 常见问题与故障排除