使用XGBoost进行销售预测并集成到Airflow自动化流水线
1. 业务场景与目标
零售企业需每周预测未来7天的销售额,以优化库存和营销策略。手动运行模型耗时且易出错,目标是构建自动化流水线:每周一自动拉取最新数据,训练XGBoost模型,预测未来销售额,并将结果写入数据库供业务系统查询。任务类型为回归预测,预测连续销售额数值。
2. 环境准备
使用uv管理Python环境,确保依赖一致。创建requirements.txt文件并安装:
# 创建虚拟环境
uv venv .venv
source .venv/bin/activate # Linux/Mac
# .venv\Scripts\activate # Windows
# 安装依赖
uv pip install xgboost==2.1.0 pandas==2.2.0 scikit-learn==1.5.0 apache-airflow==2.10.0 sqlalchemy==2.0.30
3. 数据说明
使用模拟数据模拟零售销售记录,包含日期、店铺ID、产品类别、促销标志和历史销售额等特征。数据生成逻辑:基于时间序列和随机因素生成过去365天的每日销售数据,未来7天作为预测目标。
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 生成模拟数据
np.random.seed(42)
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
data = []
for date in dates:
for store_id in range(1, 6):
for category in ['A', 'B', 'C']:
base_sales = 100 + 10 * store_id + np.random.normal(0, 20)
promotion = np.random.choice([0, 1], p=[0.7, 0.3])
if promotion:
base_sales *= 1.5
sales = max(0, base_sales + np.random.normal(0, 10))
data.append({
'date': date,
'store_id': store_id,
'category': category,
'promotion': promotion,
'sales': sales
})
df = pd.DataFrame(data)
print(f"数据形状: {df.shape}")
print(df.head())
4. 训练/实现步骤
完整代码包括数据预处理、特征工程、XGBoost模型训练和评估。
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import LabelEncoder
# 数据预处理
df['date'] = pd.to_datetime(df['date'])
df['day_of_week'] = df['date'].dt.dayofweek
df['month'] = df['date'].dt.month
le = LabelEncoder()
df['category_encoded'] = le.fit_transform(df['category'])
# 特征和目标
features = ['store_id', 'category_encoded', 'promotion', 'day_of_week', 'month']
X = df[features]
y = df['sales']
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练XGBoost模型
model = xgb.XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=6, random_state=42)
model.fit(X_train, y_train)
# 预测和评估
y_pred = model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}")
5. 调用方式
支持离线批量预测和单条示例调用。
- 离线批量:读取新数据文件,批量预测并保存结果。
# 假设有新数据文件 new_data.csv new_df = pd.read_csv('new_data.csv') new_df['date'] = pd.to_datetime(new_df['date']) new_df['day_of_week'] = new_df['date'].dt.dayofweek new_df['month'] = new_df['date'].dt.month new_df['category_encoded'] = le.transform(new_df['category']) X_new = new_df[features] predictions = model.predict(X_new) new_df['predicted_sales'] = predictions new_df.to_csv('predictions.csv', index=False) print("批量预测完成,结果保存到 predictions.csv") - 单条示例:输入单条数据字典,返回预测值。
single_data = {'store_id': 3, 'category': 'B', 'promotion': 1, 'date': '2024-01-01'} single_df = pd.DataFrame([single_data]) single_df['date'] = pd.to_datetime(single_df['date']) single_df['day_of_week'] = single_df['date'].dt.dayofweek single_df['month'] = single_df['date'].dt.month single_df['category_encoded'] = le.transform(single_df['category']) X_single = single_df[features] pred = model.predict(X_single) print(f"预测销售额: {pred[0]:.2f}")
6. 指标小白说明
本任务为回归预测,使用MAE(平均绝对误差)和RMSE(均方根误差)评估模型性能。
- MAE:预测值与真实值绝对差的平均值,单位与销售额相同(如元),直观反映平均误差大小,对异常值不敏感。例如MAE=15表示平均预测偏差15元。
- RMSE:预测误差平方的平均值的平方根,同样单位,但更惩罚大误差,适用于业务中需避免严重偏差的场景。例如RMSE=20表示误差分布更广。
- 适用场景:回归任务如销售额、房价预测,AUC/F1用于分类任务不适用。
7. 上线后评估
- 离线监控:每周运行流水线后,记录MAE和RMSE到日志,设置阈值(如MAE>30触发告警)。
- 线上指标:对比预测销售额与实际销售额的偏差率,业务定义可接受范围(如±10%)。
- 重训触发条件:当连续3周MAE上升超过5%或数据分布显著变化(如新增店铺)时,触发模型重训。
8. 常见坑与排查
- 数据源变更导致流水线失败:在Airflow DAG中添加数据校验步骤,检查列名和数据类型,失败时发送告警。
- 模型过拟合预测不准:使用交叉验证调整超参数(如
max_depth减少树深度),添加正则化项(reg_alpha,reg_lambda)。 - Airflow任务依赖管理复杂:使用
ExternalTaskSensor或明确设置depends_on_past,简化DAG结构,避免环形依赖。 - 类别特征编码不一致:保存LabelEncoder对象或使用One-Hot编码,确保训练和预测时处理方式相同。