文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • AI架构

    • AI架构

AI动手项目与实战案例

从入门到进阶的15个实战项目,边做边学掌握AI技术

📋 概述

本文提供15个由浅入深的AI实战项目,涵盖机器学习、深度学习、计算机视觉、自然语言处理等领域。每个项目都包含完整的代码实现、数据集获取方式和详细讲解,帮助你在实践中掌握AI技术。

学习原则:

  • 💻 动手第一:理论必须结合实践
  • 📈 循序渐进:从简单到复杂
  • 🎯 目标明确:每个项目学会一个核心技能
  • 🔄 反复迭代:不断优化和改进

第一部分:入门级项目(机器学习基础)

项目1:鸢尾花分类

难度:⭐
时间:2-3小时
技能:Scikit-learn、数据可视化、分类算法

项目目标

使用经典的鸢尾花数据集,学习机器学习的完整流程。

数据集

from sklearn.datasets import load_iris
iris = load_iris()

完整代码

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns

# 1. 加载数据
iris = load_iris()
X = iris.data
y = iris.target

print(f"数据形状: {X.shape}")
print(f"类别: {iris.target_names}")

# 2. 数据可视化
df = pd.DataFrame(X, columns=iris.feature_names)
df['species'] = y

# 特征分布
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for i, feature in enumerate(iris.feature_names):
    ax = axes[i//2, i%2]
    for species in range(3):
        ax.hist(df[df['species']==species][feature], 
                alpha=0.5, label=iris.target_names[species])
    ax.set_xlabel(feature)
    ax.set_ylabel('Frequency')
    ax.legend()
plt.tight_layout()
plt.show()

# 3. 数据划分
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 4. 特征缩放
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 5. 训练模型
model = LogisticRegression(max_iter=200)
model.fit(X_train_scaled, y_train)

# 6. 预测
y_pred = model.predict(X_test_scaled)

# 7. 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"\n准确率: {accuracy:.4f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=iris.target_names))

# 8. 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=iris.target_names,
            yticklabels=iris.target_names)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

学习要点

  1. 完整的ML流程:数据加载 → 可视化 → 划分 → 训练 → 评估
  2. 数据预处理:特征缩放的重要性
  3. 模型评估:准确率、混淆矩阵、分类报告
  4. 可视化:理解数据分布

扩展练习

  • 尝试其他算法:决策树、随机森林、SVM
  • 进行交叉验证
  • 特征重要性分析

项目2:房价预测

难度:⭐⭐
时间:4-5小时
技能:回归分析、特征工程、模型调优

项目目标

预测波士顿房价,学习回归问题和特征工程。

数据集

# 使用加州房价数据集(波士顿数据集已弃用)
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()

完整代码

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# 1. 加载数据
housing = fetch_california_housing()
X = housing.data
y = housing.target

df = pd.DataFrame(X, columns=housing.feature_names)
df['Price'] = y

print("数据集信息:")
print(df.info())
print("\n统计摘要:")
print(df.describe())

# 2. 探索性数据分析
# 相关性热图
plt.figure(figsize=(12, 10))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm', center=0)
plt.title('Feature Correlation Heatmap')
plt.show()

# 特征与目标的关系
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i, feature in enumerate(housing.feature_names):
    ax = axes[i//4, i%4]
    ax.scatter(df[feature], df['Price'], alpha=0.3)
    ax.set_xlabel(feature)
    ax.set_ylabel('Price')
plt.tight_layout()
plt.show()

# 3. 数据划分
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 4. 特征缩放
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 5. 训练多个模型
models = {
    'Linear Regression': LinearRegression(),
    'Ridge': Ridge(alpha=1.0),
    'Lasso': Lasso(alpha=0.1),
    'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42)
}

results = {}

for name, model in models.items():
    # 训练
    model.fit(X_train_scaled, y_train)
    
    # 预测
    y_pred = model.predict(X_test_scaled)
    
    # 评估
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    
    results[name] = {
        'RMSE': rmse,
        'MAE': mae,
        'R2': r2
    }
    
    print(f"\n{name}:")
    print(f"  RMSE: {rmse:.4f}")
    print(f"  MAE: {mae:.4f}")
    print(f"  R²: {r2:.4f}")

# 6. 结果对比
results_df = pd.DataFrame(results).T
print("\n模型对比:")
print(results_df)

# 7. 最佳模型的预测可视化
best_model = models['Random Forest']
y_pred = best_model.predict(X_test_scaled)

plt.figure(figsize=(10, 6))
plt.scatter(y_test, y_pred, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.xlabel('Actual Price')
plt.ylabel('Predicted Price')
plt.title('Actual vs Predicted Prices (Random Forest)')
plt.show()

# 8. 特征重要性
if hasattr(best_model, 'feature_importances_'):
    importances = best_model.feature_importances_
    indices = np.argsort(importances)[::-1]
    
    plt.figure(figsize=(10, 6))
    plt.bar(range(len(importances)), importances[indices])
    plt.xticks(range(len(importances)), 
               [housing.feature_names[i] for i in indices], 
               rotation=45)
    plt.xlabel('Feature')
    plt.ylabel('Importance')
    plt.title('Feature Importances')
    plt.tight_layout()
    plt.show()

学习要点

  1. 探索性数据分析:相关性分析、数据可视化
  2. 多模型对比:不同算法的优劣
  3. 回归评估指标:RMSE、MAE、R²
  4. 特征重要性:理解哪些特征最重要

扩展练习

  • 特征工程:创建新特征(如房间数/人口)
  • 超参数调优:使用GridSearchCV
  • 处理异常值和缺失值

项目3:泰坦尼克生存预测

难度:⭐⭐
时间:5-6小时
技能:数据清洗、特征工程、分类模型

项目目标

预测泰坦尼克号乘客生存情况,学习真实数据处理。

数据集

从Kaggle下载:https://www.kaggle.com/c/titanic

完整代码

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# 1. 加载数据
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

print("训练集形状:", train_df.shape)
print("\n前几行:")
print(train_df.head())
print("\n数据信息:")
print(train_df.info())

# 2. 探索性数据分析
# 生存率
print(f"\n总体生存率: {train_df['Survived'].mean():.2%}")

# 不同特征的生存率
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 性别
train_df.groupby('Sex')['Survived'].mean().plot(kind='bar', ax=axes[0,0])
axes[0,0].set_title('Survival Rate by Gender')

# 船舱等级
train_df.groupby('Pclass')['Survived'].mean().plot(kind='bar', ax=axes[0,1])
axes[0,1].set_title('Survival Rate by Class')

# 年龄分布
train_df[train_df['Survived']==1]['Age'].hist(bins=30, alpha=0.5, 
                                                label='Survived', ax=axes[1,0])
train_df[train_df['Survived']==0]['Age'].hist(bins=30, alpha=0.5, 
                                                label='Not Survived', ax=axes[1,0])
axes[1,0].set_xlabel('Age')
axes[1,0].legend()

# 票价分布
train_df[train_df['Survived']==1]['Fare'].hist(bins=30, alpha=0.5, 
                                                 label='Survived', ax=axes[1,1])
train_df[train_df['Survived']==0]['Fare'].hist(bins=30, alpha=0.5, 
                                                 label='Not Survived', ax=axes[1,1])
axes[1,1].set_xlabel('Fare')
axes[1,1].legend()

plt.tight_layout()
plt.show()

# 3. 数据清洗和特征工程
def preprocess_data(df):
    df = df.copy()
    
    # 填充缺失值
    df['Age'].fillna(df['Age'].median(), inplace=True)
    df['Fare'].fillna(df['Fare'].median(), inplace=True)
    df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
    
    # 创建新特征
    df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
    df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
    
    # 从名字中提取称谓
    df['Title'] = df['Name'].str.extract(' ([A-Za-z]+)\.', expand=False)
    df['Title'] = df['Title'].replace(['Lady', 'Countess','Capt', 'Col',
                                         'Don', 'Dr', 'Major', 'Rev', 'Sir', 
                                         'Jonkheer', 'Dona'], 'Rare')
    df['Title'] = df['Title'].replace('Mlle', 'Miss')
    df['Title'] = df['Title'].replace('Ms', 'Miss')
    df['Title'] = df['Title'].replace('Mme', 'Mrs')
    
    # 编码分类变量
    df['Sex'] = df['Sex'].map({'male': 0, 'female': 1})
    df['Embarked'] = df['Embarked'].map({'S': 0, 'C': 1, 'Q': 2})
    
    # Title编码
    title_mapping = {"Mr": 1, "Miss": 2, "Mrs": 3, "Master": 4, "Rare": 5}
    df['Title'] = df['Title'].map(title_mapping)
    df['Title'].fillna(0, inplace=True)
    
    # 年龄分段
    df['AgeBand'] = pd.cut(df['Age'], 5, labels=[0, 1, 2, 3, 4])
    
    # 票价分段
    df['FareBand'] = pd.qcut(df['Fare'], 4, labels=[0, 1, 2, 3])
    
    return df

train_processed = preprocess_data(train_df)
test_processed = preprocess_data(test_df)

# 4. 选择特征
features = ['Pclass', 'Sex', 'Age', 'Fare', 'Embarked', 
            'FamilySize', 'IsAlone', 'Title', 'AgeBand', 'FareBand']

X = train_processed[features]
y = train_processed['Survived']

# 5. 数据划分
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 6. 训练模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 7. 评估
y_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)

print(f"\n验证集准确率: {accuracy:.4f}")
print("\n分类报告:")
print(classification_report(y_val, y_pred))

# 交叉验证
cv_scores = cross_val_score(model, X, y, cv=5)
print(f"\n交叉验证分数: {cv_scores}")
print(f"平均分数: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")

# 8. 特征重要性
importances = pd.DataFrame({
    'feature': features,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

plt.figure(figsize=(10, 6))
plt.barh(importances['feature'], importances['importance'])
plt.xlabel('Importance')
plt.title('Feature Importances')
plt.gca().invert_yaxis()
plt.show()

# 9. 生成提交文件
test_X = test_processed[features]
predictions = model.predict(test_X)

submission = pd.DataFrame({
    'PassengerId': test_df['PassengerId'],
    'Survived': predictions
})
submission.to_csv('submission.csv', index=False)
print("\n提交文件已生成: submission.csv")

学习要点

  1. 真实数据处理:缺失值、异常值
  2. 特征工程:创建新特征、特征转换
  3. 数据可视化:理解数据分布和关系
  4. 模型评估:交叉验证、特征重要性

扩展练习

  • 尝试更多特征工程(如票号、船舱号)
  • 模型融合(Ensemble)
  • 超参数调优

第二部分:进阶级项目(深度学习入门)

项目4:手写数字识别(MNIST)

难度:⭐⭐
时间:3-4小时
技能:神经网络、PyTorch/TensorFlow、图像处理

项目目标

使用神经网络识别手写数字,深度学习的"Hello World"。

数据集

MNIST:28×28的手写数字图片

完整代码(PyTorch版本)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np

# 1. 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

# 2. 数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 3. 加载数据
train_dataset = datasets.MNIST(root='./data', train=True, 
                                download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, 
                               download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

# 4. 可视化样本
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
for i, (img, label) in enumerate(train_dataset):
    if i >= 10:
        break
    ax = axes[i//5, i%5]
    ax.imshow(img.squeeze(), cmap='gray')
    ax.set_title(f'Label: {label}')
    ax.axis('off')
plt.tight_layout()
plt.show()

# 5. 定义模型
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
    
    def forward(self, x):
        x = x.view(-1, 28*28)  # 展平
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

model = SimpleNN().to(device)
print(model)

# 6. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 7. 训练函数
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    train_loss = 0
    correct = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        pred = output.argmax(dim=1)
        correct += pred.eq(target).sum().item()
        
        if batch_idx % 100 == 0:
            print(f'Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} '
                  f'({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
    
    train_loss /= len(train_loader)
    accuracy = 100. * correct / len(train_loader.dataset)
    return train_loss, accuracy

# 8. 测试函数
def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target).sum().item()
    
    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    
    print(f'\nTest set: Average loss: {test_loss:.4f}, '
          f'Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)\n')
    
    return test_loss, accuracy

# 9. 训练模型
epochs = 10
train_losses = []
train_accs = []
test_losses = []
test_accs = []

for epoch in range(1, epochs + 1):
    train_loss, train_acc = train(model, device, train_loader, optimizer, criterion, epoch)
    test_loss, test_acc = test(model, device, test_loader, criterion)
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    test_losses.append(test_loss)
    test_accs.append(test_acc)

# 10. 可视化训练过程
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(train_losses, label='Train Loss')
ax1.plot(test_losses, label='Test Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Test Loss')
ax1.legend()

ax2.plot(train_accs, label='Train Accuracy')
ax2.plot(test_accs, label='Test Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.set_title('Training and Test Accuracy')
ax2.legend()

plt.tight_layout()
plt.show()

# 11. 预测示例
model.eval()
fig, axes = plt.subplots(2, 5, figsize=(12, 5))

with torch.no_grad():
    for i in range(10):
        img, label = test_dataset[i]
        img_tensor = img.unsqueeze(0).to(device)
        output = model(img_tensor)
        pred = output.argmax(dim=1).item()
        
        ax = axes[i//5, i%5]
        ax.imshow(img.squeeze(), cmap='gray')
        ax.set_title(f'True: {label}, Pred: {pred}')
        ax.axis('off')

plt.tight_layout()
plt.show()

# 12. 保存模型
torch.save(model.state_dict(), 'mnist_model.pth')
print("模型已保存: mnist_model.pth")

学习要点

  1. PyTorch基础:模型定义、训练循环
  2. 神经网络:全连接层、激活函数、Dropout
  3. 训练技巧:批处理、优化器、学习率
  4. 模型评估:损失曲线、准确率

扩展练习

  • 使用卷积神经网络(CNN)
  • 数据增强
  • 学习率调度
  • 模型可视化(权重、激活)

项目5:猫狗分类(CNN)

难度:⭐⭐⭐
时间:6-8小时
技能:卷积神经网络、迁移学习、数据增强

项目目标

使用CNN进行图像分类,学习卷积神经网络和迁移学习。

数据集

Kaggle Dogs vs Cats:https://www.kaggle.com/c/dogs-vs-cats

完整代码

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import os
import matplotlib.pyplot as plt
import numpy as np

# 1. 自定义数据集
class DogsVsCatsDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.labels = []
        
        # 加载图片路径和标签
        for filename in os.listdir(root_dir):
            if filename.endswith('.jpg'):
                self.images.append(os.path.join(root_dir, filename))
                # 从文件名判断类别
                if filename.startswith('dog'):
                    self.labels.append(1)
                else:
                    self.labels.append(0)
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# 2. 数据预处理和增强
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 3. 加载数据
train_dataset = DogsVsCatsDataset('train', transform=train_transform)
test_dataset = DogsVsCatsDataset('test', transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# 4. 定义CNN模型(从零开始)
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        
        self.conv_layers = nn.Sequential(
            # 第一个卷积块
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            # 第二个卷积块
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            # 第三个卷积块
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            # 第四个卷积块
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 14 * 14, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 2)
        )
    
    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x

# 5. 使用迁移学习(推荐)
def create_transfer_model():
    # 加载预训练的ResNet18
    model = models.resnet18(pretrained=True)
    
    # 冻结预训练层
    for param in model.parameters():
        param.requires_grad = False
    
    # 替换最后的全连接层
    num_features = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Linear(num_features, 512),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(512, 2)
    )
    
    return model

# 选择模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = SimpleCNN().to(device)  # 从零开始
model = create_transfer_model().to(device)  # 迁移学习
print(f"使用设备: {device}")

# 6. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# 7. 训练函数
def train_epoch(model, device, train_loader, optimizer, criterion):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()
        
        if batch_idx % 50 == 0:
            print(f'Batch: {batch_idx}/{len(train_loader)}, '
                  f'Loss: {loss.item():.4f}, '
                  f'Acc: {100.*correct/total:.2f}%')
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

# 8. 验证函数
def validate(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()
    
    test_loss /= len(test_loader)
    test_acc = 100. * correct / total
    
    print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')
    return test_loss, test_acc

# 9. 训练模型
epochs = 15
train_losses = []
train_accs = []
test_losses = []
test_accs = []

best_acc = 0

for epoch in range(1, epochs + 1):
    print(f'\nEpoch {epoch}/{epochs}')
    print('-' * 50)
    
    train_loss, train_acc = train_epoch(model, device, train_loader, optimizer, criterion)
    test_loss, test_acc = validate(model, device, test_loader, criterion)
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    test_losses.append(test_loss)
    test_accs.append(test_acc)
    
    # 保存最佳模型
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), 'best_model.pth')
        print(f'保存最佳模型,准确率: {best_acc:.2f}%')
    
    scheduler.step()

# 10. 可视化训练过程
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(train_losses, label='Train Loss')
ax1.plot(test_losses, label='Test Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Test Loss')
ax1.legend()

ax2.plot(train_accs, label='Train Accuracy')
ax2.plot(test_accs, label='Test Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.set_title('Training and Test Accuracy')
ax2.legend()

plt.tight_layout()
plt.show()

# 11. 预测示例
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

fig, axes = plt.subplots(2, 5, figsize=(15, 6))
classes = ['Cat', 'Dog']

with torch.no_grad():
    for i in range(10):
        img, label = test_dataset[i]
        img_tensor = img.unsqueeze(0).to(device)
        output = model(img_tensor)
        _, pred = output.max(1)
        
        # 反归一化以显示
        img_display = img.cpu().numpy().transpose(1, 2, 0)
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        img_display = std * img_display + mean
        img_display = np.clip(img_display, 0, 1)
        
        ax = axes[i//5, i%5]
        ax.imshow(img_display)
        ax.set_title(f'True: {classes[label]}, Pred: {classes[pred.item()]}')
        ax.axis('off')

plt.tight_layout()
plt.show()

学习要点

  1. 卷积神经网络:卷积层、池化层、批归一化
  2. 迁移学习:使用预训练模型,加速训练
  3. 数据增强:提高模型泛化能力
  4. 训练技巧:学习率调度、早停、模型保存

扩展练习

  • 尝试不同的预训练模型(ResNet50、VGG、EfficientNet)
  • 微调(Fine-tuning):解冻部分层
  • 使用更多数据增强技术
  • 实现GradCAM可视化

项目6:情感分析(NLP)

难度:⭐⭐⭐
时间:5-7小时
技能:文本处理、RNN/LSTM、词嵌入

项目目标

分析电影评论的情感(正面/负面),学习自然语言处理。

数据集

IMDB电影评论数据集

完整代码

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# 1. 加载数据
# 假设数据格式:review, sentiment
df = pd.read_csv('IMDB Dataset.csv')
print(df.head())
print(f"数据集大小: {len(df)}")
print(f"正面评论: {(df['sentiment']=='positive').sum()}")
print(f"负面评论: {(df['sentiment']=='negative').sum()}")

# 2. 数据预处理
tokenizer = get_tokenizer('basic_english')

def yield_tokens(data_iter):
    for text in data_iter:
        yield tokenizer(text)

# 构建词汇表
vocab = build_vocab_from_iterator(
    yield_tokens(df['review']),
    specials=["<unk>", "<pad>"],
    max_tokens=10000
)
vocab.set_default_index(vocab["<unk>"])

print(f"词汇表大小: {len(vocab)}")

# 文本转索引
def text_pipeline(text):
    return [vocab[token] for token in tokenizer(text)]

# 标签转索引
label_pipeline = lambda x: 1 if x == 'positive' else 0

# 3. 自定义数据集
class IMDBDataset(Dataset):
    def __init__(self, texts, labels, max_len=200):
        self.texts = texts
        self.labels = labels
        self.max_len = max_len
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        
        # 转换为索引
        text_indices = text_pipeline(text)
        
        # 截断或填充
        if len(text_indices) > self.max_len:
            text_indices = text_indices[:self.max_len]
        else:
            text_indices += [vocab["<pad>"]] * (self.max_len - len(text_indices))
        
        return torch.tensor(text_indices), torch.tensor(label_pipeline(label))

# 4. 数据划分
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['review'].values, df['sentiment'].values, 
    test_size=0.2, random_state=42
)

train_dataset = IMDBDataset(train_texts, train_labels)
test_dataset = IMDBDataset(test_texts, test_labels)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 5. 定义LSTM模型
class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, 
                 n_layers=2, dropout=0.5):
        super(SentimentLSTM, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab["<pad>"])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,
                            bidirectional=True, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, text):
        # text: [batch_size, seq_len]
        embedded = self.dropout(self.embedding(text))
        # embedded: [batch_size, seq_len, embedding_dim]
        
        output, (hidden, cell) = self.lstm(embedded)
        # output: [batch_size, seq_len, hidden_dim * 2]
        # hidden: [n_layers * 2, batch_size, hidden_dim]
        
        # 连接最后一层的前向和后向隐藏状态
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        # hidden: [batch_size, hidden_dim * 2]
        
        return self.fc(hidden)

# 6. 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

VOCAB_SIZE = len(vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 2
N_LAYERS = 2
DROPOUT = 0.5

model = SentimentLSTM(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, 
                       OUTPUT_DIM, N_LAYERS, DROPOUT).to(device)

print(model)
print(f"模型参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

# 7. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 8. 训练函数
def train_epoch(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    
    for batch_idx, (text, label) in enumerate(iterator):
        text, label = text.to(device), label.to(device)
        
        optimizer.zero_grad()
        predictions = model(text)
        loss = criterion(predictions, label)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        acc = (predictions.argmax(1) == label).float().mean()
        epoch_acc += acc.item()
        
        if batch_idx % 100 == 0:
            print(f'Batch: {batch_idx}/{len(iterator)}, '
                  f'Loss: {loss.item():.4f}, Acc: {acc.item():.4f}')
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# 9. 评估函数
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0
    
    with torch.no_grad():
        for text, label in iterator:
            text, label = text.to(device), label.to(device)
            predictions = model(text)
            loss = criterion(predictions, label)
            
            epoch_loss += loss.item()
            acc = (predictions.argmax(1) == label).float().mean()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# 10. 训练模型
N_EPOCHS = 10
train_losses = []
train_accs = []
test_losses = []
test_accs = []

best_test_acc = 0

for epoch in range(N_EPOCHS):
    print(f'\nEpoch: {epoch+1}/{N_EPOCHS}')
    print('-' * 50)
    
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion)
    test_loss, test_acc = evaluate(model, test_loader, criterion)
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    test_losses.append(test_loss)
    test_accs.append(test_acc)
    
    print(f'\nTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
    
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), 'sentiment_model.pth')
        print(f'保存最佳模型,准确率: {best_test_acc*100:.2f}%')

# 11. 可视化
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(train_losses, label='Train Loss')
ax1.plot(test_losses, label='Test Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Test Loss')
ax1.legend()

ax2.plot([acc*100 for acc in train_accs], label='Train Accuracy')
ax2.plot([acc*100 for acc in test_accs], label='Test Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.set_title('Training and Test Accuracy')
ax2.legend()

plt.tight_layout()
plt.show()

# 12. 预测函数
def predict_sentiment(model, sentence, max_len=200):
    model.eval()
    tokens = text_pipeline(sentence)
    
    if len(tokens) > max_len:
        tokens = tokens[:max_len]
    else:
        tokens += [vocab["<pad>"]] * (max_len - len(tokens))
    
    tensor = torch.tensor(tokens).unsqueeze(0).to(device)
    
    with torch.no_grad():
        output = model(tensor)
        prediction = output.argmax(1).item()
    
    return "Positive" if prediction == 1 else "Negative"

# 13. 测试预测
test_sentences = [
    "This movie was absolutely fantastic! I loved every minute of it.",
    "Terrible film. Complete waste of time and money.",
    "It was okay, nothing special but not terrible either.",
    "One of the best movies I've ever seen. Highly recommend!",
    "I fell asleep halfway through. So boring."
]

model.load_state_dict(torch.load('sentiment_model.pth'))

print("\n预测结果:")
print("=" * 70)
for sentence in test_sentences:
    sentiment = predict_sentiment(model, sentence)
    print(f"评论: {sentence}")
    print(f"情感: {sentiment}\n")

学习要点

  1. 文本预处理:分词、构建词汇表、文本转索引
  2. 词嵌入:将词转换为向量
  3. LSTM:处理序列数据,捕捉长期依赖
  4. 双向LSTM:同时考虑前后文

扩展练习

  • 使用预训练词向量(GloVe、Word2Vec)
  • 尝试其他模型(CNN、Transformer)
  • 多分类情感分析(1-5星)
  • 注意力机制可视化

第三部分:高级项目

项目7:目标检测(YOLO)

难度:⭐⭐⭐⭐
时间:8-10小时
技能:目标检测、YOLO、实时推理

项目目标

使用YOLO进行实时目标检测,学习计算机视觉的高级应用。

简化版代码(使用预训练模型)

import torch
import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# 1. 加载预训练的YOLOv5模型
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.eval()

# 2. 图片检测
def detect_image(image_path):
    # 读取图片
    img = Image.open(image_path)
    
    # 推理
    results = model(img)
    
    # 显示结果
    results.show()
    
    # 获取检测结果
    detections = results.pandas().xyxy[0]
    print(detections)
    
    return results

# 3. 视频检测
def detect_video(video_path, output_path='output.mp4'):
    cap = cv2.VideoCapture(video_path)
    
    # 获取视频信息
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    
    # 创建视频写入器
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # 转换颜色空间
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # 检测
        results = model(frame_rgb)
        
        # 绘制结果
        frame_with_detections = np.squeeze(results.render())
        frame_with_detections = cv2.cvtColor(frame_with_detections, cv2.COLOR_RGB2BGR)
        
        # 写入视频
        out.write(frame_with_detections)
        
        frame_count += 1
        if frame_count % 30 == 0:
            print(f'处理了 {frame_count} 帧')
    
    cap.release()
    out.release()
    print(f'视频已保存到: {output_path}')

# 4. 实时摄像头检测
def detect_webcam():
    cap = cv2.VideoCapture(0)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 转换颜色空间
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # 检测
        results = model(frame_rgb)
        
        # 绘制结果
        frame_with_detections = np.squeeze(results.render())
        frame_with_detections = cv2.cvtColor(frame_with_detections, cv2.COLOR_RGB2BGR)
        
        # 显示
        cv2.imshow('YOLOv5 Detection', frame_with_detections)
        
        # 按'q'退出
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# 5. 批量检测
def batch_detect(image_folder, output_folder):
    import os
    os.makedirs(output_folder, exist_ok=True)
    
    for filename in os.listdir(image_folder):
        if filename.endswith(('.jpg', '.jpeg', '.png')):
            image_path = os.path.join(image_folder, filename)
            results = detect_image(image_path)
            
            # 保存结果
            results.save(save_dir=output_folder)
            print(f'处理完成: {filename}')

# 使用示例
# detect_image('test.jpg')
# detect_video('test.mp4')
# detect_webcam()

学习要点

  1. 目标检测:同时进行分类和定位
  2. YOLO架构:单阶段检测器,速度快
  3. 实时推理:视频和摄像头检测
  4. 预训练模型:迁移学习的应用

项目8:图像生成(GAN)

难度:⭐⭐⭐⭐
时间:10-12小时
技能:生成对抗网络、图像生成

项目目标

使用GAN生成手写数字,学习生成式模型。

完整代码(DCGAN)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np

# 1. 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 2. 超参数
latent_dim = 100
image_size = 28
channels = 1
batch_size = 128
epochs = 50
lr = 0.0002

# 3. 数据加载
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 4. 定义生成器
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        
        self.model = nn.Sequential(
            # 输入: latent_dim
            nn.Linear(latent_dim, 256),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(256),
            
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(512),
            
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(1024),
            
            nn.Linear(1024, image_size * image_size * channels),
            nn.Tanh()
        )
    
    def forward(self, z):
        img = self.model(z)
        img = img.view(img.size(0), channels, image_size, image_size)
        return img

# 5. 定义判别器
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        
        self.model = nn.Sequential(
            nn.Linear(image_size * image_size * channels, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            
            nn.Linear(256, 1),
            nn.Sigmoid()
        )
    
    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        validity = self.model(img_flat)
        return validity

# 6. 初始化模型
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# 7. 损失函数和优化器
adversarial_loss = nn.BCELoss()

optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

# 8. 训练
G_losses = []
D_losses = []

fixed_noise = torch.randn(64, latent_dim, device=device)

for epoch in range(epochs):
    for i, (imgs, _) in enumerate(dataloader):
        batch_size = imgs.size(0)
        
        # 真实和假标签
        real_labels = torch.ones(batch_size, 1, device=device)
        fake_labels = torch.zeros(batch_size, 1, device=device)
        
        # 真实图片
        real_imgs = imgs.to(device)
        
        # ---------------------
        #  训练判别器
        # ---------------------
        optimizer_D.zero_grad()
        
        # 真实图片的损失
        real_loss = adversarial_loss(discriminator(real_imgs), real_labels)
        
        # 生成假图片
        z = torch.randn(batch_size, latent_dim, device=device)
        fake_imgs = generator(z)
        
        # 假图片的损失
        fake_loss = adversarial_loss(discriminator(fake_imgs.detach()), fake_labels)
        
        # 总判别器损失
        d_loss = (real_loss + fake_loss) / 2
        d_loss.backward()
        optimizer_D.step()
        
        # ---------------------
        #  训练生成器
        # ---------------------
        optimizer_G.zero_grad()
        
        # 生成器希望判别器认为假图片是真的
        g_loss = adversarial_loss(discriminator(fake_imgs), real_labels)
        g_loss.backward()
        optimizer_G.step()
        
        # 记录损失
        if i % 100 == 0:
            print(f'[Epoch {epoch}/{epochs}] [Batch {i}/{len(dataloader)}] '
                  f'[D loss: {d_loss.item():.4f}] [G loss: {g_loss.item():.4f}]')
    
    G_losses.append(g_loss.item())
    D_losses.append(d_loss.item())
    
    # 生成图片
    if epoch % 5 == 0:
        with torch.no_grad():
            fake = generator(fixed_noise).detach().cpu()
        
        fig, axes = plt.subplots(8, 8, figsize=(10, 10))
        for i in range(64):
            ax = axes[i//8, i%8]
            ax.imshow(fake[i].squeeze(), cmap='gray')
            ax.axis('off')
        plt.suptitle(f'Epoch {epoch}')
        plt.tight_layout()
        plt.show()

# 9. 可视化损失
plt.figure(figsize=(10, 5))
plt.plot(G_losses, label='Generator Loss')
plt.plot(D_losses, label='Discriminator Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('GAN Training Loss')
plt.show()

# 10. 生成新图片
with torch.no_grad():
    z = torch.randn(64, latent_dim, device=device)
    generated_imgs = generator(z).cpu()

fig, axes = plt.subplots(8, 8, figsize=(10, 10))
for i in range(64):
    ax = axes[i//8, i%8]
    ax.imshow(generated_imgs[i].squeeze(), cmap='gray')
    ax.axis('off')
plt.suptitle('Generated Images')
plt.tight_layout()
plt.show()

学习要点

  1. GAN原理:生成器和判别器的对抗训练
  2. 训练技巧:标签平滑、学习率调整
  3. 模式崩溃:GAN训练的常见问题
  4. 图像生成:从噪声生成图片

项目9:聊天机器人(Transformer)

难度:⭐⭐⭐⭐⭐
时间:12-15小时
技能:Transformer、Seq2Seq、对话系统

项目目标

构建简单的聊天机器人,学习Transformer和序列到序列模型。

简化版代码(使用预训练模型)

from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

# 1. 加载预训练模型
model_name = "microsoft/DialoGPT-medium"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

# 2. 聊天函数
def chat(user_input, chat_history_ids=None, max_length=1000):
    # 编码用户输入
    new_input_ids = tokenizer.encode(user_input + tokenizer.eos_token, 
                                      return_tensors='pt')
    
    # 拼接历史对话
    if chat_history_ids is not None:
        bot_input_ids = torch.cat([chat_history_ids, new_input_ids], dim=-1)
    else:
        bot_input_ids = new_input_ids
    
    # 生成回复
    chat_history_ids = model.generate(
        bot_input_ids,
        max_length=max_length,
        pad_token_id=tokenizer.eos_token_id,
        no_repeat_ngram_size=3,
        do_sample=True,
        top_k=100,
        top_p=0.7,
        temperature=0.8
    )
    
    # 解码回复
    bot_response = tokenizer.decode(
        chat_history_ids[:, bot_input_ids.shape[-1]:][0],
        skip_special_tokens=True
    )
    
    return bot_response, chat_history_ids

# 3. 交互式聊天
def interactive_chat():
    print("聊天机器人已启动!输入'quit'退出。")
    print("=" * 50)
    
    chat_history_ids = None
    
    while True:
        user_input = input("你: ")
        
        if user_input.lower() == 'quit':
            print("再见!")
            break
        
        bot_response, chat_history_ids = chat(user_input, chat_history_ids)
        print(f"机器人: {bot_response}\n")

# 运行聊天
# interactive_chat()

学习要点

  1. Transformer架构:自注意力机制
  2. 预训练模型:使用HuggingFace
  3. 文本生成:采样策略、温度参数
  4. 对话系统:上下文管理

项目10:推荐系统

难度:⭐⭐⭐
时间:6-8小时
技能:协同过滤、矩阵分解、深度学习推荐

项目目标

构建电影推荐系统,学习推荐算法。

完整代码

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# 1. 加载数据(MovieLens数据集)
# 下载地址:https://grouplens.org/datasets/movielens/
ratings = pd.read_csv('ratings.csv')
movies = pd.read_csv('movies.csv')

print("评分数据:")
print(ratings.head())
print(f"\n用户数: {ratings['userId'].nunique()}")
print(f"电影数: {ratings['movieId'].nunique()}")
print(f"评分数: {len(ratings)}")

# 2. 数据预处理
# 重新编码用户和电影ID
user_ids = ratings['userId'].unique()
movie_ids = ratings['movieId'].unique()

user2idx = {user_id: idx for idx, user_id in enumerate(user_ids)}
movie2idx = {movie_id: idx for idx, movie_id in enumerate(movie_ids)}
idx2movie = {idx: movie_id for movie_id, idx in movie2idx.items()}

ratings['user_idx'] = ratings['userId'].map(user2idx)
ratings['movie_idx'] = ratings['movieId'].map(movie2idx)

# 3. 数据划分
train_data, test_data = train_test_split(ratings, test_size=0.2, random_state=42)

# 4. 自定义数据集
class RatingsDataset(Dataset):
    def __init__(self, data):
        self.users = torch.LongTensor(data['user_idx'].values)
        self.movies = torch.LongTensor(data['movie_idx'].values)
        self.ratings = torch.FloatTensor(data['rating'].values)
    
    def __len__(self):
        return len(self.users)
    
    def __getitem__(self, idx):
        return self.users[idx], self.movies[idx], self.ratings[idx]

train_dataset = RatingsDataset(train_data)
test_dataset = RatingsDataset(test_data)

train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# 5. 定义矩阵分解模型
class MatrixFactorization(nn.Module):
    def __init__(self, n_users, n_movies, n_factors=50):
        super(MatrixFactorization, self).__init__()
        
        self.user_factors = nn.Embedding(n_users, n_factors)
        self.movie_factors = nn.Embedding(n_movies, n_factors)
        self.user_biases = nn.Embedding(n_users, 1)
        self.movie_biases = nn.Embedding(n_movies, 1)
        
        # 初始化
        nn.init.normal_(self.user_factors.weight, std=0.01)
        nn.init.normal_(self.movie_factors.weight, std=0.01)
        nn.init.zeros_(self.user_biases.weight)
        nn.init.zeros_(self.movie_biases.weight)
    
    def forward(self, user, movie):
        # 获取嵌入
        user_embedding = self.user_factors(user)
        movie_embedding = self.movie_factors(movie)
        user_bias = self.user_biases(user).squeeze()
        movie_bias = self.movie_biases(movie).squeeze()
        
        # 点积 + 偏置
        dot_product = (user_embedding * movie_embedding).sum(1)
        prediction = dot_product + user_bias + movie_bias
        
        return prediction

# 6. 定义深度学习推荐模型
class NeuralCollaborativeFiltering(nn.Module):
    def __init__(self, n_users, n_movies, n_factors=50):
        super(NeuralCollaborativeFiltering, self).__init__()
        
        self.user_embedding = nn.Embedding(n_users, n_factors)
        self.movie_embedding = nn.Embedding(n_movies, n_factors)
        
        self.fc_layers = nn.Sequential(
            nn.Linear(n_factors * 2, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )
        
        nn.init.normal_(self.user_embedding.weight, std=0.01)
        nn.init.normal_(self.movie_embedding.weight, std=0.01)
    
    def forward(self, user, movie):
        user_embedded = self.user_embedding(user)
        movie_embedded = self.movie_embedding(movie)
        
        # 拼接
        vector = torch.cat([user_embedded, movie_embedded], dim=-1)
        
        # 通过全连接层
        prediction = self.fc_layers(vector).squeeze()
        
        return prediction

# 7. 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

n_users = len(user_ids)
n_movies = len(movie_ids)

# 选择模型
# model = MatrixFactorization(n_users, n_movies, n_factors=50).to(device)
model = NeuralCollaborativeFiltering(n_users, n_movies, n_factors=50).to(device)

print(f"用户数: {n_users}, 电影数: {n_movies}")
print(model)

# 8. 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 9. 训练函数
def train_epoch(model, train_loader, optimizer, criterion):
    model.train()
    total_loss = 0
    
    for user, movie, rating in train_loader:
        user, movie, rating = user.to(device), movie.to(device), rating.to(device)
        
        optimizer.zero_grad()
        prediction = model(user, movie)
        loss = criterion(prediction, rating)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)

# 10. 评估函数
def evaluate(model, test_loader, criterion):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for user, movie, rating in test_loader:
            user, movie, rating = user.to(device), movie.to(device), rating.to(device)
            prediction = model(user, movie)
            loss = criterion(prediction, rating)
            total_loss += loss.item()
    
    return total_loss / len(test_loader)

# 11. 训练模型
epochs = 20
train_losses = []
test_losses = []

for epoch in range(1, epochs + 1):
    train_loss = train_epoch(model, train_loader, optimizer, criterion)
    test_loss = evaluate(model, test_loader, criterion)
    
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    
    print(f'Epoch {epoch}/{epochs}: Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

# 12. 可视化
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.title('Training and Test Loss')
plt.legend()
plt.show()

# 13. 推荐函数
def recommend_movies(model, user_id, n_recommendations=10):
    model.eval()
    
    # 获取用户索引
    user_idx = user2idx[user_id]
    
    # 获取用户已评分的电影
    rated_movies = set(ratings[ratings['userId'] == user_id]['movie_idx'].values)
    
    # 预测所有未评分电影的分数
    all_movie_indices = list(range(n_movies))
    unrated_movies = [idx for idx in all_movie_indices if idx not in rated_movies]
    
    user_tensor = torch.LongTensor([user_idx] * len(unrated_movies)).to(device)
    movie_tensor = torch.LongTensor(unrated_movies).to(device)
    
    with torch.no_grad():
        predictions = model(user_tensor, movie_tensor).cpu().numpy()
    
    # 排序
    top_indices = np.argsort(predictions)[::-1][:n_recommendations]
    recommended_movie_indices = [unrated_movies[i] for i in top_indices]
    recommended_movie_ids = [idx2movie[idx] for idx in recommended_movie_indices]
    
    # 获取电影信息
    recommended_movies = movies[movies['movieId'].isin(recommended_movie_ids)]
    recommended_movies['predicted_rating'] = predictions[top_indices]
    
    return recommended_movies[['movieId', 'title', 'genres', 'predicted_rating']]

# 14. 测试推荐
user_id = user_ids[0]
recommendations = recommend_movies(model, user_id, n_recommendations=10)

print(f"\n为用户 {user_id} 推荐的电影:")
print(recommendations)

学习要点

  1. 协同过滤:基于用户-物品交互
  2. 矩阵分解:将评分矩阵分解为用户和物品嵌入
  3. 深度学习推荐:神经网络建模
  4. 评估指标:MSE、RMSE

第四部分:项目实践建议

1. 项目选择策略

初学者(0-3个月)

  • 项目1-3:机器学习基础
  • 重点:理解完整流程
  • 目标:独立完成3个项目

进阶者(3-6个月)

  • 项目4-6:深度学习入门
  • 重点:掌握PyTorch/TensorFlow
  • 目标:独立完成2个项目

高级者(6-12个月)

  • 项目7-10:高级应用
  • 重点:解决实际问题
  • 目标:完成1-2个完整项目

2. 学习方法

第一遍:跟着做

  • 完整运行代码
  • 理解每一行的作用
  • 记录遇到的问题

第二遍:改着做

  • 修改超参数
  • 尝试不同算法
  • 优化模型性能

第三遍:独立做

  • 不看代码,独立实现
  • 遇到问题再查资料
  • 形成自己的代码库

3. 常见问题解决

环境问题

  • 使用Anaconda管理环境
  • 创建独立的虚拟环境
  • 记录依赖版本

数据问题

  • 从Kaggle下载数据集
  • 使用torchvision内置数据集
  • 自己构建小数据集测试

性能问题

  • 从小数据集开始
  • 使用GPU加速
  • 使用Colab免费GPU

调试问题

  • 打印中间结果
  • 可视化数据和模型
  • 逐步调试

4. 项目扩展方向

数据方面

  • 收集更多数据
  • 数据增强
  • 处理不平衡数据

模型方面

  • 尝试不同架构
  • 模型融合
  • 超参数调优

应用方面

  • Web部署(Flask、Streamlit)
  • 移动端部署(TFLite、ONNX)
  • API服务(FastAPI)

📚 学习资源

数据集资源

  • Kaggle:https://www.kaggle.com/datasets
  • UCI ML Repository:https://archive.ics.uci.edu/ml
  • HuggingFace Datasets:https://huggingface.co/datasets

代码资源

  • GitHub:搜索相关项目
  • Papers with Code:论文+代码
  • Awesome Lists:精选资源列表

学习平台

  • Kaggle Learn:交互式教程
  • Fast.ai:实战导向课程
  • DeepLearning.AI:系统化课程

🎯 总结

关键要点:

  1. 💻 动手第一:理论必须结合实践
  2. 📈 循序渐进:从简单到复杂
  3. 🔄 反复迭代:不断优化改进
  4. 🎯 目标明确:每个项目学会一个技能
  5. 📝 记录总结:写博客、做笔记

行动计划:

  • 第1周:完成项目1-2
  • 第2-3周:完成项目3
  • 第4-6周:完成项目4-5
  • 第7-10周:完成项目6
  • 第11-15周:完成项目7-10中的1-2个

记住:

  • 不要追求完美,先完成再优化
  • 遇到问题是正常的,学会查资料
  • 每个项目都要真正理解,不要只是复制代码
  • 分享你的项目,获得反馈

🔗 相关文章

  • 普通人AI技术学习完整路线图
  • AI基础理论与核心概念
  • AI学习的数学基础与编程基础

最后更新:2024年12月22日

最近更新: 2025/12/22 14:25
Contributors: wsyx