07 - Codex 性能优化技巧
优化性能,降低成本,提升效率
📋 本章目标
学完本章,你将能够:
- 选择合适的模型以平衡成本和性能
- 优化提示词以降低 token 消耗
- 实施有效的缓存策略
- 优化批处理和并发请求
🎯 模型选择策略
模型性能对比
| 模型 | 成本 | 速度 | 质量 | 适用场景 |
|---|---|---|---|---|
| GPT-4o-mini | $ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 简单任务、原型开发 |
| GPT-4o | $$ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 日常编码、快速迭代 |
| GPT-4 Turbo | $$$ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 复杂项目、大上下文 |
| o1 | $$$$ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 复杂算法、架构设计 |
| Claude 4 Sonnet | $$$ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 代码质量要求高 |
任务分级策略
class ModelSelector:
"""智能模型选择器"""
def __init__(self):
self.models = {
"simple": "gpt-4o-mini",
"medium": "gpt-4o",
"complex": "gpt-4-turbo",
"critical": "o1"
}
def select_model(self, task: dict) -> str:
"""根据任务复杂度选择模型"""
# 1. 计算任务复杂度
complexity = self.calculate_complexity(task)
# 2. 选择模型
if complexity < 0.3:
return self.models["simple"]
elif complexity < 0.6:
return self.models["medium"]
elif complexity < 0.9:
return self.models["complex"]
else:
return self.models["critical"]
def calculate_complexity(self, task: dict) -> float:
"""计算任务复杂度(0-1)"""
score = 0.0
# 代码行数
if task.get("lines", 0) > 500:
score += 0.3
elif task.get("lines", 0) > 100:
score += 0.1
# 涉及文件数
if task.get("files", 0) > 10:
score += 0.2
elif task.get("files", 0) > 3:
score += 0.1
# 算法复杂度
if task.get("algorithm") in ["dynamic_programming", "graph"]:
score += 0.3
elif task.get("algorithm") in ["sorting", "searching"]:
score += 0.1
# 领域知识要求
if task.get("domain") in ["security", "ml", "systems"]:
score += 0.2
return min(score, 1.0)
# 使用示例
selector = ModelSelector()
# 简单任务:使用便宜的模型
task1 = {
"description": "写一个函数计算平均值",
"lines": 10,
"files": 1,
"algorithm": "basic"
}
model1 = selector.select_model(task1) # → gpt-4o-mini
# 复杂任务:使用强大的模型
task2 = {
"description": "实现分布式缓存系统",
"lines": 1000,
"files": 20,
"algorithm": "distributed_systems",
"domain": "systems"
}
model2 = selector.select_model(task2) # → o1
成本计算
class CostCalculator:
"""成本计算器"""
# 价格表(每百万 tokens)
PRICES = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"o1": {"input": 15.00, "output": 60.00}
}
def estimate_cost(
self,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""估算成本(美元)"""
prices = self.PRICES[model]
input_cost = (input_tokens / 1_000_000) * prices["input"]
output_cost = (output_tokens / 1_000_000) * prices["output"]
return input_cost + output_cost
def compare_models(
self,
input_tokens: int,
output_tokens: int
) -> dict:
"""对比不同模型的成本"""
results = {}
for model in self.PRICES:
cost = self.estimate_cost(model, input_tokens, output_tokens)
results[model] = {
"cost": f"${cost:.4f}",
"cost_per_request": f"${cost:.4f}"
}
return results
# 使用示例
calculator = CostCalculator()
# 典型代码生成任务
input_tokens = 1000 # 提示词 + 上下文
output_tokens = 500 # 生成的代码
comparison = calculator.compare_models(input_tokens, output_tokens)
"""
输出:
{
"gpt-4o-mini": {
"cost": "$0.0004",
"cost_per_request": "$0.0004"
},
"gpt-4o": {
"cost": "$0.0075",
"cost_per_request": "$0.0075"
},
"gpt-4-turbo": {
"cost": "$0.0250",
"cost_per_request": "$0.0250"
},
"o1": {
"cost": "$0.0450",
"cost_per_request": "$0.0450"
}
}
成本差异:
- 使用 gpt-4o-mini 比 o1 便宜 112 倍
- 1000 次请求:
- gpt-4o-mini: $0.40
- gpt-4o: $7.50
- o1: $45.00
"""
💬 提示词优化
减少 Token 消耗
技巧 1:精简上下文
# ❌ 不好:包含不必要的信息
prompt = """
这是我的项目,非常大,有很多文件。
我现在想要实现一个功能。
这个功能很重要。
我需要你帮我写代码。
[粘贴整个项目的代码...]
请帮我实现用户登录功能。
"""
# ✅ 好:只包含相关信息
prompt = """
实现用户登录功能。
相关代码:
```python
# 现有的 User 模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
password_hash = db.Column(db.String(128))
要求:
- 使用 bcrypt 验证密码
- 返回 JWT token
- 处理错误情况 """
#### 技巧 2:使用引用而非重复
```python
# ❌ 不好:重复发送相同的上下文
for endpoint in endpoints:
prompt = f"""
{long_context} # 每次都发送完整上下文
创建API端点:{endpoint}
"""
generate_code(prompt)
# ✅ 好:使用引用
# 第一次发送完整上下文
system_message = long_context
# 后续只发送增量信息
for endpoint in endpoints:
prompt = f"创建API端点:{endpoint}"
generate_code(prompt, system=system_message)
技巧 3:结构化提示词
class PromptOptimizer:
"""提示词优化器"""
@staticmethod
def optimize(prompt: str) -> str:
"""优化提示词,减少 token 消耗"""
# 1. 移除多余的空白
prompt = " ".join(prompt.split())
# 2. 使用缩写(保持可读性)
replacements = {
"请你帮我": "",
"非常感谢": "",
"麻烦你": "",
"能否": ""
}
for old, new in replacements.items():
prompt = prompt.replace(old, new)
# 3. 简化结构
prompt = prompt.replace("。\n\n", "。")
prompt = prompt.replace("\n\n\n", "\n")
return prompt.strip()
# 示例
original = """
请你帮我写一个函数。
这个函数需要做以下事情:
1. 接收一个列表
2. 计算平均值
3. 返回结果
非常感谢!
"""
optimized = PromptOptimizer.optimize(original)
# → "写一个函数:接收列表,计算平均值,返回结果。"
# Token 节省:从 ~50 tokens 减少到 ~15 tokens(节省 70%)
提高生成质量
使用明确的约束
# ❌ 模糊的提示
"写一个排序函数"
# ✅ 明确的提示
"""
实现快速排序:
- 输入:整数列表
- 输出:升序排序的列表
- 要求:原地排序(in-place)
- 时间复杂度:O(n log n)
- 包含注释说明算法步骤
"""
使用示例
# ❌ 无示例
"创建数据验证函数"
# ✅ 有示例
"""
创建数据验证函数,参考以下模式:
示例 1:
def validate_email(email: str) -> bool:
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return bool(re.match(pattern, email))
示例 2:
def validate_phone(phone: str) -> bool:
pattern = r'^1[3-9]\d{9}$'
return bool(re.match(pattern, phone))
现在创建:
def validate_url(url: str) -> bool:
# 验证 URL 格式(http/https)
"""
💾 缓存策略
结果缓存
import hashlib
import json
from typing import Any, Optional
from datetime import datetime, timedelta
class ResponseCache:
"""响应缓存"""
def __init__(self, ttl: int = 3600):
self.cache = {}
self.ttl = ttl # 秒
def get(self, prompt: str, model: str) -> Optional[str]:
"""获取缓存的响应"""
cache_key = self._generate_key(prompt, model)
if cache_key in self.cache:
item = self.cache[cache_key]
# 检查是否过期
if datetime.now() < item['expires_at']:
print(f"✅ 缓存命中:{cache_key[:8]}...")
return item['response']
else:
# 删除过期项
del self.cache[cache_key]
print(f"❌ 缓存未命中")
return None
def set(self, prompt: str, model: str, response: str):
"""设置缓存"""
cache_key = self._generate_key(prompt, model)
self.cache[cache_key] = {
'response': response,
'created_at': datetime.now(),
'expires_at': datetime.now() + timedelta(seconds=self.ttl)
}
print(f"💾 已缓存:{cache_key[:8]}...")
def _generate_key(self, prompt: str, model: str) -> str:
"""生成缓存键"""
# 规范化提示词(移除空白差异)
normalized = " ".join(prompt.split())
# 生成哈希
content = f"{model}:{normalized}"
return hashlib.md5(content.encode()).hexdigest()
def clear_expired(self):
"""清理过期缓存"""
now = datetime.now()
expired_keys = [
key for key, item in self.cache.items()
if now >= item['expires_at']
]
for key in expired_keys:
del self.cache[key]
print(f"🧹 清理了 {len(expired_keys)} 个过期缓存")
def stats(self) -> dict:
"""缓存统计"""
total = len(self.cache)
expired = sum(
1 for item in self.cache.values()
if datetime.now() >= item['expires_at']
)
return {
"total": total,
"active": total - expired,
"expired": expired
}
# 使用示例
cache = ResponseCache(ttl=3600)
def generate_with_cache(prompt: str, model: str = "gpt-4o"):
"""带缓存的代码生成"""
# 尝试从缓存获取
cached = cache.get(prompt, model)
if cached:
return cached
# 缓存未命中,调用 API
print("📡 调用 API...")
response = call_openai_api(prompt, model)
# 存入缓存
cache.set(prompt, model, response)
return response
# 测试
result1 = generate_with_cache("写一个冒泡排序")
# 输出:❌ 缓存未命中
# 📡 调用 API...
# 💾 已缓存:a1b2c3d4...
result2 = generate_with_cache("写一个冒泡排序")
# 输出:✅ 缓存命中:a1b2c3d4...
# 节省了一次 API 调用!
# 查看统计
print(cache.stats())
# {'total': 1, 'active': 1, 'expired': 0}
语义缓存
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticCache:
"""语义缓存:相似的提示词共享缓存"""
def __init__(self, similarity_threshold: float = 0.9):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.cache = [] # [(embedding, response)]
self.threshold = similarity_threshold
def get(self, prompt: str) -> Optional[str]:
"""获取语义相似的缓存"""
# 计算提示词的向量
prompt_embedding = self.model.encode([prompt])[0]
# 查找相似的缓存
for cached_embedding, response in self.cache:
similarity = self._cosine_similarity(
prompt_embedding,
cached_embedding
)
if similarity >= self.threshold:
print(f"✅ 语义缓存命中(相似度:{similarity:.2f})")
return response
print(f"❌ 语义缓存未命中")
return None
def set(self, prompt: str, response: str):
"""设置缓存"""
embedding = self.model.encode([prompt])[0]
self.cache.append((embedding, response))
print(f"💾 已缓存(共 {len(self.cache)} 项)")
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""计算余弦相似度"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 使用示例
semantic_cache = SemanticCache(similarity_threshold=0.85)
# 第一次请求
result1 = generate_with_semantic_cache(
"写一个函数计算列表平均值"
)
# ❌ 语义缓存未命中
# 💾 已缓存(共 1 项)
# 相似的请求(不同表述)
result2 = generate_with_semantic_cache(
"创建一个方法来求数组的平均数"
)
# ✅ 语义缓存命中(相似度:0.92)
# 返回之前的结果,节省API调用!
# 不相似的请求
result3 = generate_with_semantic_cache(
"写一个排序算法"
)
# ❌ 语义缓存未命中
⚡ 批处理优化
批量请求
import asyncio
from typing import List
class BatchProcessor:
"""批量处理请求"""
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
async def process_batch(
self,
prompts: List[str],
model: str = "gpt-4o"
) -> List[str]:
"""批量处理多个提示词"""
results = []
# 分批处理
for i in range(0, len(prompts), self.batch_size):
batch = prompts[i:i+self.batch_size]
# 并行处理当前批次
batch_results = await asyncio.gather(*[
self._process_single(prompt, model)
for prompt in batch
])
results.extend(batch_results)
print(f"✅ 完成批次 {i//self.batch_size + 1}")
return results
async def _process_single(self, prompt: str, model: str) -> str:
"""处理单个请求"""
# 检查缓存
cached = cache.get(prompt, model)
if cached:
return cached
# 调用 API
response = await call_openai_api_async(prompt, model)
# 缓存结果
cache.set(prompt, model, response)
return response
# 使用示例
async def main():
processor = BatchProcessor(batch_size=10)
# 生成100个测试用例
prompts = [
f"为函数 {func_name} 生成单元测试"
for func_name in get_all_functions()
]
# 批量处理
results = await processor.process_batch(prompts)
print(f"✅ 完成 {len(results)} 个测试用例生成")
# 运行
asyncio.run(main())
# 性能对比:
# 串行处理:100 个请求 × 2秒 = 200秒
# 批处理(10):10 批次 × 2秒 = 20秒
# 提升:10倍
并发控制
import asyncio
from asyncio import Semaphore
class RateLimitedProcessor:
"""带速率限制的处理器"""
def __init__(self, max_concurrent: int = 5, requests_per_minute: int = 60):
self.semaphore = Semaphore(max_concurrent)
self.requests_per_minute = requests_per_minute
self.request_times = []
async def process(self, prompt: str, model: str) -> str:
"""处理单个请求(带速率限制)"""
# 等待信号量
async with self.semaphore:
# 检查速率限制
await self._wait_for_rate_limit()
# 记录请求时间
self.request_times.append(asyncio.get_event_loop().time())
# 执行请求
return await call_openai_api_async(prompt, model)
async def _wait_for_rate_limit(self):
"""等待直到可以发送请求"""
now = asyncio.get_event_loop().time()
# 移除1分钟前的记录
self.request_times = [
t for t in self.request_times
if now - t < 60
]
# 如果超过速率限制,等待
if len(self.request_times) >= self.requests_per_minute:
oldest = self.request_times[0]
wait_time = 60 - (now - oldest)
if wait_time > 0:
print(f"⏸️ 速率限制:等待 {wait_time:.1f}秒")
await asyncio.sleep(wait_time)
# 使用示例
processor = RateLimitedProcessor(
max_concurrent=5, # 最多5个并发请求
requests_per_minute=60 # 每分钟最多60个请求
)
async def main():
prompts = ["prompt 1", "prompt 2", ...] # 100个提示词
# 并发处理(自动控制速率)
results = await asyncio.gather(*[
processor.process(prompt, "gpt-4o")
for prompt in prompts
])
return results
asyncio.run(main())
📊 性能监控
使用统计
class UsageTracker:
"""使用量追踪"""
def __init__(self):
self.stats = {
"requests": 0,
"total_tokens": 0,
"input_tokens": 0,
"output_tokens": 0,
"total_cost": 0.0,
"cache_hits": 0,
"cache_misses": 0
}
def track_request(
self,
model: str,
input_tokens: int,
output_tokens: int,
from_cache: bool = False
):
"""记录请求"""
self.stats["requests"] += 1
if from_cache:
self.stats["cache_hits"] += 1
else:
self.stats["cache_misses"] += 1
self.stats["input_tokens"] += input_tokens
self.stats["output_tokens"] += output_tokens
self.stats["total_tokens"] += input_tokens + output_tokens
# 计算成本
cost = calculator.estimate_cost(model, input_tokens, output_tokens)
self.stats["total_cost"] += cost
def report(self) -> dict:
"""生成报告"""
cache_hit_rate = 0
if self.stats["requests"] > 0:
cache_hit_rate = (
self.stats["cache_hits"] / self.stats["requests"] * 100
)
return {
"总请求数": self.stats["requests"],
"总Token数": f"{self.stats['total_tokens']:,}",
"总成本": f"${self.stats['total_cost']:.2f}",
"缓存命中率": f"{cache_hit_rate:.1f}%",
"节省成本": f"${self._calculate_savings():.2f}"
}
def _calculate_savings(self) -> float:
"""计算缓存节省的成本"""
# 假设缓存命中的请求平均有 500 input + 500 output tokens
avg_cost_per_request = calculator.estimate_cost(
"gpt-4o", 500, 500
)
return self.stats["cache_hits"] * avg_cost_per_request
# 使用示例
tracker = UsageTracker()
# 记录请求
tracker.track_request("gpt-4o", 1000, 500, from_cache=False)
tracker.track_request("gpt-4o", 0, 0, from_cache=True)
# 生成报告
report = tracker.report()
print(json.dumps(report, indent=2, ensure_ascii=False))
"""
输出:
{
"总请求数": 100,
"总Token数": "75,000",
"总成本": "$12.50",
"缓存命中率": "45.0%",
"节省成本": "$3.38"
}
"""
💡 优化总结
核心策略
1. 选择合适的模型
└─ 简单任务用便宜的模型
2. 优化提示词
└─ 精简上下文,减少token消耗
3. 实施缓存
└─ 相同/相似请求直接返回缓存
4. 批量处理
└─ 并发处理多个请求
5. 监控使用量
└─ 追踪成本,持续优化
优化效果
优化前:
- 模型:统一使用 GPT-4 Turbo
- 缓存:无
- 并发:串行处理
- 成本:$100/天
优化后:
- 模型:根据任务选择(70% 使用 gpt-4o-mini)
- 缓存:45% 命中率
- 并发:批处理(10倍速度提升)
- 成本:$15/天
节省:85%
🎯 下一步
掌握了优化技巧后,可以:
- 📖 08 - 实战案例集 - 实践应用
- 📖 09 - 常见问题与故障排除 - 解决问题
- 返回 README - 回顾整个教程