文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • OpenAI Codex 教程

    • OpenAI Codex 从入门到精通完整教程
    • Codex 快速入门指南
    • Codex 核心特点详解
    • Codex 版本演进历史
    • Codex 架构设计详解
    • Codex 高级功能与集成
    • Codex 最佳实践
    • Codex 性能优化技巧
    • Codex 实战案例集
    • Codex 常见问题与故障排除

07 - Codex 性能优化技巧

优化性能,降低成本,提升效率

📋 本章目标

学完本章,你将能够:

  • 选择合适的模型以平衡成本和性能
  • 优化提示词以降低 token 消耗
  • 实施有效的缓存策略
  • 优化批处理和并发请求

🎯 模型选择策略

模型性能对比

模型成本速度质量适用场景
GPT-4o-mini$⭐⭐⭐⭐⭐⭐⭐⭐简单任务、原型开发
GPT-4o$$⭐⭐⭐⭐⭐⭐⭐⭐日常编码、快速迭代
GPT-4 Turbo$$$⭐⭐⭐⭐⭐⭐⭐复杂项目、大上下文
o1$$$$⭐⭐⭐⭐⭐⭐⭐复杂算法、架构设计
Claude 4 Sonnet$$$⭐⭐⭐⭐⭐⭐⭐⭐⭐代码质量要求高

任务分级策略

class ModelSelector:
    """智能模型选择器"""
    
    def __init__(self):
        self.models = {
            "simple": "gpt-4o-mini",
            "medium": "gpt-4o",
            "complex": "gpt-4-turbo",
            "critical": "o1"
        }
    
    def select_model(self, task: dict) -> str:
        """根据任务复杂度选择模型"""
        
        # 1. 计算任务复杂度
        complexity = self.calculate_complexity(task)
        
        # 2. 选择模型
        if complexity < 0.3:
            return self.models["simple"]
        elif complexity < 0.6:
            return self.models["medium"]
        elif complexity < 0.9:
            return self.models["complex"]
        else:
            return self.models["critical"]
    
    def calculate_complexity(self, task: dict) -> float:
        """计算任务复杂度(0-1)"""
        
        score = 0.0
        
        # 代码行数
        if task.get("lines", 0) > 500:
            score += 0.3
        elif task.get("lines", 0) > 100:
            score += 0.1
        
        # 涉及文件数
        if task.get("files", 0) > 10:
            score += 0.2
        elif task.get("files", 0) > 3:
            score += 0.1
        
        # 算法复杂度
        if task.get("algorithm") in ["dynamic_programming", "graph"]:
            score += 0.3
        elif task.get("algorithm") in ["sorting", "searching"]:
            score += 0.1
        
        # 领域知识要求
        if task.get("domain") in ["security", "ml", "systems"]:
            score += 0.2
        
        return min(score, 1.0)

# 使用示例
selector = ModelSelector()

# 简单任务:使用便宜的模型
task1 = {
    "description": "写一个函数计算平均值",
    "lines": 10,
    "files": 1,
    "algorithm": "basic"
}
model1 = selector.select_model(task1)  # → gpt-4o-mini

# 复杂任务:使用强大的模型
task2 = {
    "description": "实现分布式缓存系统",
    "lines": 1000,
    "files": 20,
    "algorithm": "distributed_systems",
    "domain": "systems"
}
model2 = selector.select_model(task2)  # → o1

成本计算

class CostCalculator:
    """成本计算器"""
    
    # 价格表(每百万 tokens)
    PRICES = {
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4-turbo": {"input": 10.00, "output": 30.00},
        "o1": {"input": 15.00, "output": 60.00}
    }
    
    def estimate_cost(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        """估算成本(美元)"""
        
        prices = self.PRICES[model]
        
        input_cost = (input_tokens / 1_000_000) * prices["input"]
        output_cost = (output_tokens / 1_000_000) * prices["output"]
        
        return input_cost + output_cost
    
    def compare_models(
        self,
        input_tokens: int,
        output_tokens: int
    ) -> dict:
        """对比不同模型的成本"""
        
        results = {}
        
        for model in self.PRICES:
            cost = self.estimate_cost(model, input_tokens, output_tokens)
            results[model] = {
                "cost": f"${cost:.4f}",
                "cost_per_request": f"${cost:.4f}"
            }
        
        return results

# 使用示例
calculator = CostCalculator()

# 典型代码生成任务
input_tokens = 1000   # 提示词 + 上下文
output_tokens = 500   # 生成的代码

comparison = calculator.compare_models(input_tokens, output_tokens)

"""
输出:
{
    "gpt-4o-mini": {
        "cost": "$0.0004",
        "cost_per_request": "$0.0004"
    },
    "gpt-4o": {
        "cost": "$0.0075",
        "cost_per_request": "$0.0075"
    },
    "gpt-4-turbo": {
        "cost": "$0.0250",
        "cost_per_request": "$0.0250"
    },
    "o1": {
        "cost": "$0.0450",
        "cost_per_request": "$0.0450"
    }
}

成本差异:
- 使用 gpt-4o-mini 比 o1 便宜 112 倍
- 1000 次请求:
  - gpt-4o-mini: $0.40
  - gpt-4o: $7.50
  - o1: $45.00
"""

💬 提示词优化

减少 Token 消耗

技巧 1:精简上下文

# ❌ 不好:包含不必要的信息
prompt = """
这是我的项目,非常大,有很多文件。
我现在想要实现一个功能。
这个功能很重要。
我需要你帮我写代码。

[粘贴整个项目的代码...]

请帮我实现用户登录功能。
"""

# ✅ 好:只包含相关信息
prompt = """
实现用户登录功能。

相关代码:
```python
# 现有的 User 模型
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    password_hash = db.Column(db.String(128))

要求:

  • 使用 bcrypt 验证密码
  • 返回 JWT token
  • 处理错误情况 """

#### 技巧 2:使用引用而非重复

```python
# ❌ 不好:重复发送相同的上下文
for endpoint in endpoints:
    prompt = f"""
    {long_context}  # 每次都发送完整上下文
    
    创建API端点:{endpoint}
    """
    generate_code(prompt)

# ✅ 好:使用引用
# 第一次发送完整上下文
system_message = long_context

# 后续只发送增量信息
for endpoint in endpoints:
    prompt = f"创建API端点:{endpoint}"
    generate_code(prompt, system=system_message)

技巧 3:结构化提示词

class PromptOptimizer:
    """提示词优化器"""
    
    @staticmethod
    def optimize(prompt: str) -> str:
        """优化提示词,减少 token 消耗"""
        
        # 1. 移除多余的空白
        prompt = " ".join(prompt.split())
        
        # 2. 使用缩写(保持可读性)
        replacements = {
            "请你帮我": "",
            "非常感谢": "",
            "麻烦你": "",
            "能否": ""
        }
        
        for old, new in replacements.items():
            prompt = prompt.replace(old, new)
        
        # 3. 简化结构
        prompt = prompt.replace("。\n\n", "。")
        prompt = prompt.replace("\n\n\n", "\n")
        
        return prompt.strip()

# 示例
original = """
请你帮我写一个函数。

这个函数需要做以下事情:

1. 接收一个列表
2. 计算平均值
3. 返回结果

非常感谢!
"""

optimized = PromptOptimizer.optimize(original)
# → "写一个函数:接收列表,计算平均值,返回结果。"

# Token 节省:从 ~50 tokens 减少到 ~15 tokens(节省 70%)

提高生成质量

使用明确的约束

# ❌ 模糊的提示
"写一个排序函数"

# ✅ 明确的提示
"""
实现快速排序:
- 输入:整数列表
- 输出:升序排序的列表
- 要求:原地排序(in-place)
- 时间复杂度:O(n log n)
- 包含注释说明算法步骤
"""

使用示例

# ❌ 无示例
"创建数据验证函数"

# ✅ 有示例
"""
创建数据验证函数,参考以下模式:

示例 1:
def validate_email(email: str) -> bool:
    pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return bool(re.match(pattern, email))

示例 2:
def validate_phone(phone: str) -> bool:
    pattern = r'^1[3-9]\d{9}$'
    return bool(re.match(pattern, phone))

现在创建:
def validate_url(url: str) -> bool:
    # 验证 URL 格式(http/https)
"""

💾 缓存策略

结果缓存

import hashlib
import json
from typing import Any, Optional
from datetime import datetime, timedelta

class ResponseCache:
    """响应缓存"""
    
    def __init__(self, ttl: int = 3600):
        self.cache = {}
        self.ttl = ttl  # 秒
    
    def get(self, prompt: str, model: str) -> Optional[str]:
        """获取缓存的响应"""
        
        cache_key = self._generate_key(prompt, model)
        
        if cache_key in self.cache:
            item = self.cache[cache_key]
            
            # 检查是否过期
            if datetime.now() < item['expires_at']:
                print(f"✅ 缓存命中:{cache_key[:8]}...")
                return item['response']
            else:
                # 删除过期项
                del self.cache[cache_key]
        
        print(f"❌ 缓存未命中")
        return None
    
    def set(self, prompt: str, model: str, response: str):
        """设置缓存"""
        
        cache_key = self._generate_key(prompt, model)
        
        self.cache[cache_key] = {
            'response': response,
            'created_at': datetime.now(),
            'expires_at': datetime.now() + timedelta(seconds=self.ttl)
        }
        
        print(f"💾 已缓存:{cache_key[:8]}...")
    
    def _generate_key(self, prompt: str, model: str) -> str:
        """生成缓存键"""
        
        # 规范化提示词(移除空白差异)
        normalized = " ".join(prompt.split())
        
        # 生成哈希
        content = f"{model}:{normalized}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def clear_expired(self):
        """清理过期缓存"""
        
        now = datetime.now()
        expired_keys = [
            key for key, item in self.cache.items()
            if now >= item['expires_at']
        ]
        
        for key in expired_keys:
            del self.cache[key]
        
        print(f"🧹 清理了 {len(expired_keys)} 个过期缓存")
    
    def stats(self) -> dict:
        """缓存统计"""
        
        total = len(self.cache)
        expired = sum(
            1 for item in self.cache.values()
            if datetime.now() >= item['expires_at']
        )
        
        return {
            "total": total,
            "active": total - expired,
            "expired": expired
        }

# 使用示例
cache = ResponseCache(ttl=3600)

def generate_with_cache(prompt: str, model: str = "gpt-4o"):
    """带缓存的代码生成"""
    
    # 尝试从缓存获取
    cached = cache.get(prompt, model)
    if cached:
        return cached
    
    # 缓存未命中,调用 API
    print("📡 调用 API...")
    response = call_openai_api(prompt, model)
    
    # 存入缓存
    cache.set(prompt, model, response)
    
    return response

# 测试
result1 = generate_with_cache("写一个冒泡排序")
# 输出:❌ 缓存未命中
#      📡 调用 API...
#      💾 已缓存:a1b2c3d4...

result2 = generate_with_cache("写一个冒泡排序")
# 输出:✅ 缓存命中:a1b2c3d4...
# 节省了一次 API 调用!

# 查看统计
print(cache.stats())
# {'total': 1, 'active': 1, 'expired': 0}

语义缓存

from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticCache:
    """语义缓存:相似的提示词共享缓存"""
    
    def __init__(self, similarity_threshold: float = 0.9):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.cache = []  # [(embedding, response)]
        self.threshold = similarity_threshold
    
    def get(self, prompt: str) -> Optional[str]:
        """获取语义相似的缓存"""
        
        # 计算提示词的向量
        prompt_embedding = self.model.encode([prompt])[0]
        
        # 查找相似的缓存
        for cached_embedding, response in self.cache:
            similarity = self._cosine_similarity(
                prompt_embedding,
                cached_embedding
            )
            
            if similarity >= self.threshold:
                print(f"✅ 语义缓存命中(相似度:{similarity:.2f})")
                return response
        
        print(f"❌ 语义缓存未命中")
        return None
    
    def set(self, prompt: str, response: str):
        """设置缓存"""
        
        embedding = self.model.encode([prompt])[0]
        self.cache.append((embedding, response))
        print(f"💾 已缓存(共 {len(self.cache)} 项)")
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """计算余弦相似度"""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# 使用示例
semantic_cache = SemanticCache(similarity_threshold=0.85)

# 第一次请求
result1 = generate_with_semantic_cache(
    "写一个函数计算列表平均值"
)
# ❌ 语义缓存未命中
# 💾 已缓存(共 1 项)

# 相似的请求(不同表述)
result2 = generate_with_semantic_cache(
    "创建一个方法来求数组的平均数"
)
# ✅ 语义缓存命中(相似度:0.92)
# 返回之前的结果,节省API调用!

# 不相似的请求
result3 = generate_with_semantic_cache(
    "写一个排序算法"
)
# ❌ 语义缓存未命中

⚡ 批处理优化

批量请求

import asyncio
from typing import List

class BatchProcessor:
    """批量处理请求"""
    
    def __init__(self, batch_size: int = 10):
        self.batch_size = batch_size
    
    async def process_batch(
        self,
        prompts: List[str],
        model: str = "gpt-4o"
    ) -> List[str]:
        """批量处理多个提示词"""
        
        results = []
        
        # 分批处理
        for i in range(0, len(prompts), self.batch_size):
            batch = prompts[i:i+self.batch_size]
            
            # 并行处理当前批次
            batch_results = await asyncio.gather(*[
                self._process_single(prompt, model)
                for prompt in batch
            ])
            
            results.extend(batch_results)
            
            print(f"✅ 完成批次 {i//self.batch_size + 1}")
        
        return results
    
    async def _process_single(self, prompt: str, model: str) -> str:
        """处理单个请求"""
        
        # 检查缓存
        cached = cache.get(prompt, model)
        if cached:
            return cached
        
        # 调用 API
        response = await call_openai_api_async(prompt, model)
        
        # 缓存结果
        cache.set(prompt, model, response)
        
        return response

# 使用示例
async def main():
    processor = BatchProcessor(batch_size=10)
    
    # 生成100个测试用例
    prompts = [
        f"为函数 {func_name} 生成单元测试"
        for func_name in get_all_functions()
    ]
    
    # 批量处理
    results = await processor.process_batch(prompts)
    
    print(f"✅ 完成 {len(results)} 个测试用例生成")

# 运行
asyncio.run(main())

# 性能对比:
# 串行处理:100 个请求 × 2秒 = 200秒
# 批处理(10):10 批次 × 2秒 = 20秒
# 提升:10倍

并发控制

import asyncio
from asyncio import Semaphore

class RateLimitedProcessor:
    """带速率限制的处理器"""
    
    def __init__(self, max_concurrent: int = 5, requests_per_minute: int = 60):
        self.semaphore = Semaphore(max_concurrent)
        self.requests_per_minute = requests_per_minute
        self.request_times = []
    
    async def process(self, prompt: str, model: str) -> str:
        """处理单个请求(带速率限制)"""
        
        # 等待信号量
        async with self.semaphore:
            # 检查速率限制
            await self._wait_for_rate_limit()
            
            # 记录请求时间
            self.request_times.append(asyncio.get_event_loop().time())
            
            # 执行请求
            return await call_openai_api_async(prompt, model)
    
    async def _wait_for_rate_limit(self):
        """等待直到可以发送请求"""
        
        now = asyncio.get_event_loop().time()
        
        # 移除1分钟前的记录
        self.request_times = [
            t for t in self.request_times
            if now - t < 60
        ]
        
        # 如果超过速率限制,等待
        if len(self.request_times) >= self.requests_per_minute:
            oldest = self.request_times[0]
            wait_time = 60 - (now - oldest)
            
            if wait_time > 0:
                print(f"⏸️  速率限制:等待 {wait_time:.1f}秒")
                await asyncio.sleep(wait_time)

# 使用示例
processor = RateLimitedProcessor(
    max_concurrent=5,         # 最多5个并发请求
    requests_per_minute=60    # 每分钟最多60个请求
)

async def main():
    prompts = ["prompt 1", "prompt 2", ...]  # 100个提示词
    
    # 并发处理(自动控制速率)
    results = await asyncio.gather(*[
        processor.process(prompt, "gpt-4o")
        for prompt in prompts
    ])
    
    return results

asyncio.run(main())

📊 性能监控

使用统计

class UsageTracker:
    """使用量追踪"""
    
    def __init__(self):
        self.stats = {
            "requests": 0,
            "total_tokens": 0,
            "input_tokens": 0,
            "output_tokens": 0,
            "total_cost": 0.0,
            "cache_hits": 0,
            "cache_misses": 0
        }
    
    def track_request(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        from_cache: bool = False
    ):
        """记录请求"""
        
        self.stats["requests"] += 1
        
        if from_cache:
            self.stats["cache_hits"] += 1
        else:
            self.stats["cache_misses"] += 1
            self.stats["input_tokens"] += input_tokens
            self.stats["output_tokens"] += output_tokens
            self.stats["total_tokens"] += input_tokens + output_tokens
            
            # 计算成本
            cost = calculator.estimate_cost(model, input_tokens, output_tokens)
            self.stats["total_cost"] += cost
    
    def report(self) -> dict:
        """生成报告"""
        
        cache_hit_rate = 0
        if self.stats["requests"] > 0:
            cache_hit_rate = (
                self.stats["cache_hits"] / self.stats["requests"] * 100
            )
        
        return {
            "总请求数": self.stats["requests"],
            "总Token数": f"{self.stats['total_tokens']:,}",
            "总成本": f"${self.stats['total_cost']:.2f}",
            "缓存命中率": f"{cache_hit_rate:.1f}%",
            "节省成本": f"${self._calculate_savings():.2f}"
        }
    
    def _calculate_savings(self) -> float:
        """计算缓存节省的成本"""
        
        # 假设缓存命中的请求平均有 500 input + 500 output tokens
        avg_cost_per_request = calculator.estimate_cost(
            "gpt-4o", 500, 500
        )
        
        return self.stats["cache_hits"] * avg_cost_per_request

# 使用示例
tracker = UsageTracker()

# 记录请求
tracker.track_request("gpt-4o", 1000, 500, from_cache=False)
tracker.track_request("gpt-4o", 0, 0, from_cache=True)

# 生成报告
report = tracker.report()
print(json.dumps(report, indent=2, ensure_ascii=False))

"""
输出:
{
  "总请求数": 100,
  "总Token数": "75,000",
  "总成本": "$12.50",
  "缓存命中率": "45.0%",
  "节省成本": "$3.38"
}
"""

💡 优化总结

核心策略

1. 选择合适的模型
   └─ 简单任务用便宜的模型

2. 优化提示词
   └─ 精简上下文,减少token消耗

3. 实施缓存
   └─ 相同/相似请求直接返回缓存

4. 批量处理
   └─ 并发处理多个请求

5. 监控使用量
   └─ 追踪成本,持续优化

优化效果

优化前:
- 模型:统一使用 GPT-4 Turbo
- 缓存:无
- 并发:串行处理
- 成本:$100/天

优化后:
- 模型:根据任务选择(70% 使用 gpt-4o-mini)
- 缓存:45% 命中率
- 并发:批处理(10倍速度提升)
- 成本:$15/天

节省:85%

🎯 下一步

掌握了优化技巧后,可以:

  • 📖 08 - 实战案例集 - 实践应用
  • 📖 09 - 常见问题与故障排除 - 解决问题
  • 返回 README - 回顾整个教程

👉 下一章:08 - 实战案例集

最近更新: 2025/12/22 14:25
Contributors: wsyx
Prev
Codex 最佳实践
Next
Codex 实战案例集