@TOC
竞赛背景介绍
1.1 新能源功率预测的重要性
风电和光伏发电作为新能源的重要组成部分,由于其受天气和环境因素的强烈影响,具有明显的间歇性和波动性特征。准确的功率预测对于电力系统的稳定运行至关重要:
- 电网调度优化: 准确的预测能够有效应对新能源发电的波动性和不确定性,确保电网频率、电压、负荷平衡等关键指标的稳定
- 降低运营风险: 减少停电风险,提高电力系统的经济性和可靠性
- 合规要求: 电网公司通过调度中心对新能源发电场的功率预测进行考核,不达标将面临罚款
随着新能源比例的不断增加和智能电网技术的快速发展,发电功率预测在电网管理中将扮演越来越重要的角色。
1.2 竞赛概况
本次技术博客涵盖了多个新能源功率预测竞赛的经验,主要包括:
科大讯飞AI开发者大赛 - 新能源发电功率预测
- 数据规模: 2个场站(1个风电场、1个光伏场站)
- 预测目标: 未来2个月的15分钟级功率预测(每天96个时间点)
- 气象数据: 3个不同气象源(NWP_1, NWP_2, NWP_3)
第三届世界科学智能大赛新能源赛道
- 数据规模: 10个场站(5个风电场、5个光伏场站)
- 预测目标: 次日零时起未来24小时逐15分钟级功率预测
- 空间数据: 场站周边11×11个格点的气象预报数据
百度KDD Cup 2022 - 风电功率预测
- 数据规模: 134台风力发电机组
- 预测目标: 未来48小时10分钟级功率预测
DataCastle国能日新光伏功率预测
- 数据规模: 4个光伏站点
- 关键特征: 实发辐照度(训练集缺失,需预测)
数据特征分析
2.1 数据来源与结构
2.1.1 气象预报数据特征
气象数据通常来自多个预报源,每个源提供不同的气象变量。以科大讯飞竞赛为例:
气象变量说明:
| 变量名 | 描述 | 单位 |
|---|---|---|
| direct_radiation | 预测辐射 | W/m² |
| wind_direction_80m | 预测80m风向 | 度(°) |
| wind_speed_80m | 预测80m风速 | m/s |
| temperature_2m | 预测2m温度 | ℃ |
| relative_humidity_2m | 预测2m湿度 | % |
| precipitation | 预测降水量 | mm |
第三届世界科学智能大赛的气象变量更加丰富:
| 变量 | 描述 | 单位 |
|---|---|---|
| u100 | 100米高度纬向风 | m/s |
| v100 | 100米高度经向风 | m/s |
| t2m | 2米气温 | K |
| tp | 总降水量 | m |
| tcc | 总云量 | (0-1) |
| sp | 地面气压 | Pa |
| poai | 光伏面板辐照度 | W/m² |
| ghi | 水平面总辐照度 | W/m² |
关键发现:
- 不同气象源的变量可能存在差异(如NWP_2使用msl而非sp)
- 气象数据时间分辨率从15分钟到1小时不等
- 部分竞赛提供空间维度数据(11×11格点),可用于空间相关性建模
2.1.2 功率数据特征
功率数据通常是15分钟或10分钟分辨率的历史实发功率数据。主要特征包括:
时间周期性:
- 日周期: 光伏发电呈现明显的日变化,白天发电,夜晚为0
- 季节周期: 风电和光伏发电受季节变化影响显著
数据质量问题:
- 缺失值: 部分时段数据缺失
- 死值: 数据长时间保持不变
- 异常值: 负值、超限值等
功率上限:
- 风电场站: 通常有额定功率上限
- 光伏电场: 受辐照度影响,实际功率通常在白天达到峰值
2.2 数据质量分析与处理
2.2.1 异常值检测与处理
通过数据分析,我们发现以下常见的异常模式:
1. 时间格式异常:
# 国能日新竞赛中发现的时间问题
# CSV打开后显示整数,实际为小数(如0:00实际为23:59:99)
# 这会导致提取的小时信息完全错误2. 功率异常值:
# 站点1: 删除实际功率 > 10.4853的离群点
data = data[data["实际功率"] < 10.4853]
# 站点3: 特殊处理(实发辐照度<600且实际功率>30.13125)
data = data[~((data["实发辐照度"] < 600) & (data["实际功率"] >= 30.13125))]3. 死值处理:
# 站点2和站点3的死值特征
# 某些时段功率恒定为4.4MW或29.667MW
if name == "station_2":
exclude_hours = [0,1,2,3,4,5,20,21,22,23]
data = data[~((data.hour.isin(exclude_hours)) & (data.实际功率 == 4.4))]2.2.2 数据标准化与归一化
第三届世界科学智能大赛提供了归一化后的功率数据,但气象数据需要自行处理:
# 风速合成: 从u100和v100计算真实风速和风向
df = df.with_columns([
((pl.col("v100")**2 + pl.col("u100")**2)**0.5).alias("wind_speed"),
((270 - (pl.arctan2(pl.col("v100"), pl.col("u100")) * 180 / np.pi)) % 360)
.alias("wind_direction"),
])
# 风功率密度计算(理论最大功率)
df = df.with_columns([
(0.5 * 1.225 * pl.col("wind_speed")**3).alias("wind_power"),
])2.3 特征工程实践
2.3.1 时间相关特征
时间特征是新能源功率预测中最基础也最重要的特征:
def add_time_features(data):
"""添加时间相关特征"""
data["month"] = data["时间"].apply(
lambda x: int(str(x).split(" ")[0].split("-")[1])
)
data["hour"] = data["时间"].apply(
lambda x: int(str(x).split(" ")[1].split(":")[0])
)
data["day"] = data["时间"].apply(
lambda x: int(str(x).split(" ")[0].replace("-", ""))
)
# 光伏发电关键特征: 区分白天和夜晚
data["daytime"] = data["hour"].apply(
lambda x: 1 if x in range(7, 20) else 0
)
return data2.3.2 统计聚合特征
基于时间窗口的统计特征能有效捕捉数据的波动性和趋势:
def add_statistical_features(data, columns):
"""为指定列添加统计聚合特征"""
for col in columns:
# 每日统计特征
data[f"{col}_mean"] = data.groupby('day')[col].transform('mean')
data[f"{col}_std"] = data.groupby('day')[col].transform('std')
data[f"{col}_max"] = data.groupby(["day", "白天"])[col].transform("max")
data[f"{col}_min"] = data.groupby(["day", "白天"])[col].transform("min")
data[f"{col}_range"] = data[f"{col}_max"] - data[f"{col}_min"]
return data应用示例:
# 对辐照度、温度、湿度、压强四个关键变量进行统计聚合
columns = ['辐照度', '温度', '湿度', "压强"]
data = add_statistical_features(data, columns)2.3.3 风速风向特征工程
对于风电预测,风速和风向的变换至关重要:
def add_wind_features(data):
"""添加风速风向特征"""
# 将圆形风向转换为正弦和余弦分量
data["sin_wind_d"] = np.sin(data.风向.values * np.pi / 180.) * data.风速.values
data['cos_wind_d'] = np.cos(data.风向.values * np.pi / 180.) * data.风速.values
# 温度压力交互特征
data['temp_pressure'] = (data.压强.values + 1.1) / (data.温度.values + 1.1)
data['temp_humi'] = (data.温度.values + 1.1) / (data.湿度.values + 1.1)
return data2.3.4 滞后特征
历史功率和气象信息对预测具有重要指导意义:
def add_lag_features(data, lag_cols, lags=[1, 2, 3]):
"""添加滞后特征"""
for col in lag_cols:
for lag in lags:
# 创建滞后序列
temp = list(data[col])
temp = [-1.0] * lag + temp + [-1.0] * lag
data[f"{col}_lag_{lag}"] = np.array(temp[lag:-lag])
return data
# 应用示例
lag_cols = ['实发辐照度', '温度', '湿度', '辐照度', '风速']
data = add_lag_features(data, lag_cols, lags=[1, 2, 3])2.3.5 分箱特征
将连续变量离散化可以帮助模型捕捉非线性关系:
def add_binned_features(data):
"""添加分箱特征"""
# 温度分箱
temp_bins = pd.cut(data['温度'], 3, labels=False)
data["temp_split"] = temp_bins
# 压强分箱
pres_bins = pd.cut(data['压强'], 3, labels=False)
data["pres_split"] = pres_bins
# 湿度分箱
humi_bins = pd.cut(data['湿度'], 3, labels=False)
data["humi_split"] = humi_bins
# 辐照度分箱(光伏重要特征)
irr_bins = pd.cut(data['辐照度'], 5, labels=False)
data["irr_split"] = irr_bins
return data2.3.6 距离特征
对于光伏预测,与峰值时段的距离对预测具有重要意义:
def add_peak_distance_features(data):
"""添加距离峰值的特征"""
# 定义峰值时段的ID列表
peak_IDs_unchecked = [
90, 309, 466, 686, 844, 1046, 1209, 1402, 1597, 1775,
# ... 更多的峰值ID
]
# 计算每个样本距离最近峰值的距离
dis2peak = []
peak_value = []
mean_power = data['平均功率']
for id in data['ID']:
min_dist = np.inf
closest_peak_value = None
for peak_id in peak_IDs_unchecked:
dist = abs(id - peak_id)
if dist < min_dist:
min_dist = dist
closest_peak_value = mean_power[peak_id]
dis2peak.append(min_dist)
peak_value.append(closest_peak_value)
data['dis2peak'] = dis2peak
data['peak_value'] = peak_value
return data技术方案设计
3.1 整体框架设计
基于多个竞赛的经验,我们总结出一套完整的新能源功率预测技术框架:
┌─────────────────────────────────────────────────────────────┐
│ 数据层 │
│ ├─ 气象预报数据(NWP_1, NWP_2, NWP_3) │
│ ├─ 历史功率数据(实发功率) │
│ └─ 场站元数据(地理位置、容量等) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 数据预处理层 │
│ ├─ 数据清洗(缺失值、异常值处理) │
│ ├─ 时间对齐(气象预报与功率数据对齐) │
│ └─ 数据标准化/归一化 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 特征工程层 │
│ ├─ 时间特征(hour, month, daytime) │
│ ├─ 统计特征(mean, std, max, min, range) │
│ ├─ 物理特征(风速合成、辐照度转换) │
│ ├─ 滞后特征(lag features) │
│ └─ 交互特征(变量间乘除运算) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 模型层 │
│ ├─ 基线模型(GBDT, XGBoost, LightGBM) │
│ ├─ 深度模型(LSTM, Transformer, GNN) │
│ ├─ 融合策略(Multi-stage, Ensemble) │
│ └─ 空间模型(Spatial Correlation) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 评估与优化层 │
│ ├─ 准确率评估(竞赛特定指标) │
│ ├─ 交叉验证 │
│ ├─ 超参数调优 │
│ └─ 后处理(边界约束、平滑处理) │
└─────────────────────────────────────────────────────────────┘3.2 分阶段预测策略
3.2.1 两阶段预测策略(实发辐照度)
在国能日新光伏功率预测竞赛中,我们发现了一个关键问题:
问题: 测试集包含"实发辐照度"特征,但训练集缺失
解决方案: 采用两阶段预测策略
# 第一阶段: 预测实发辐照度
def predict_actual_irradiance(train, test):
"""第一阶段: 预测实发辐照度"""
# 构建训练数据
train_shifa_x = train[['辐照度', '风速', '风向', '温度', '压强', '湿度', '时间']]
train_shifa_x["month"] = train_shifa_x["时间"].apply(
lambda x: int(str(x).split(" ")[0].split("-")[1])
)
train_shifa_x["hour"] = train_shifa_x["时间"].apply(
lambda x: int(str(x).split(" ")[1].split(":")[0])
)
train_shifa_x = train_shifa_x.drop(["时间"], axis=1)
train_shifa_y = train["实发辐照度"]
# 构建测试数据
test_shifa_x = test[['辐照度', '风速', '风向', '温度', '压强', '湿度', '时间']]
test_shifa_x["month"] = test_shifa_x["时间"].apply(
lambda x: int(x.split(" ")[0].split("-")[1])
)
test_shifa_x["hour"] = test_shifa_x["时间"].apply(
lambda x: int(x.split(" ")[1].split(":")[0])
)
test_shifa_x = test_shifa_x.drop(["时间"], axis=1)
# 训练LightGBM模型
lgb_train = lgb.Dataset(train_shifa_x, train_shifa_y)
lgb_eval = lgb.Dataset(test_shifa_x, reference=lgb_train)
params = {
'task': 'train',
'boosting_type': 'gbdt',
'objective': 'regression_l1',
'metric': 'l1',
'num_leaves': 23,
'learning_rate': 0.05,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 10,
'num_threads': 1
}
gbm = lgb.train(params, lgb_train, num_boost_round=3500,
valid_sets=lgb_eval, verbose_eval=1000)
# 预测实发辐照度
predicted_irradiance = gbm.predict(test_shifa_x)
return predicted_irradiance
# 第二阶段: 使用预测的实发辐照度预测实际功率
def predict_actual_power(train, test, predicted_irradiance):
"""第二阶段: 预测实际功率"""
# 将预测的实发辐照度加入测试集
test["实发辐照度"] = predicted_irradiance
# 特征工程
train_features = create_features(train)
test_features = create_features(test)
# 训练XGBoost模型
model = XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=7,
min_child_weight=5,
gamma=0.4,
subsample=0.9,
colsample_bytree=0.8,
reg_alpha=0.1
)
model.fit(train_features, train["实际功率"])
# 预测
predictions = model.predict(test_features)
return predictions关键发现:
- 实发辐照度是最重要的预测特征之一
- 人工预测的辐照度存在误差,通过模型预测可以改进
- 两阶段策略显著提升了模型性能
3.2.2 短期-长期分离策略
百度KDD Cup 2022的方案采用了短期(0-3小时)和长期(3-48小时)分离策略:
# 伪代码示例
def short_long_term_forecasting(data):
"""短期和长期分离预测"""
# 短期预测(0-3小时): 利用风惯性
short_term_data = data[:, :, :18] # 18个10分钟时段 = 3小时
short_term_model = MDLinear() # 或RNN/LSTM
short_term_pred = short_term_model.predict(short_term_data)
# 长期预测(3-48小时): 依赖气象预报
long_term_data = data[:, :, 18:] # 剩余时段
long_term_model = XGTN() # 空间时间图网络
long_term_pred = long_term_model.predict(long_term_data)
# 融合
predictions = np.concatenate([short_term_pred, long_term_pred], axis=2)
return predictions设计思想:
- 短期预测: 风具有惯性,当前风速对未来几小时的预测影响大
- 长期预测: 更依赖气象预报,需要充分利用空间相关性
3.3 模型选择策略
3.3.1 树模型(基线方案)
LightGBM和XGBoost是新能源功率预测的基线模型,具有以下优势:
- 训练速度快: 适合快速迭代和特征工程验证
- 特征重要性可解释: 便于理解关键影响因素
- 对缺失值鲁棒: 处理数据质量问题
# LightGBM训练示例
def train_lightgbm(X_train, y_train, X_val, y_val):
"""训练LightGBM模型"""
lgb_train = lgb.Dataset(X_train, y_train)
lgb_eval = lgb.Dataset(X_val, y_val, reference=lgb_train)
params = {
'task': 'train',
'boosting_type': 'gbdt',
'objective': 'regression_l1', # MAE损失
'metric': 'l1',
'num_leaves': 31, # 叶子节点数
'learning_rate': 0.05,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 10,
'verbose': -1
}
gbm = lgb.train(
params,
lgb_train,
num_boost_round=5000,
valid_sets=[lgb_train, lgb_eval],
early_stopping_rounds=100,
verbose_eval=100
)
return gbm
# XGBoost训练示例
def train_xgboost(X_train, y_train):
"""训练XGBoost模型"""
model = XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=7,
min_child_weight=5,
gamma=0.4,
subsample=0.9,
colsample_bytree=0.8,
reg_alpha=0.1, # L1正则化
reg_lambda=1, # L2正则化
n_jobs=-1
)
model.fit(X_train, y_train)
return model3.3.2 深度学习模型(进阶方案)
1. LSTM/GRU模型
适用于时间序列预测,能捕捉长期依赖关系:
import torch
import torch.nn as nn
class LSTMPredictor(nn.Module):
"""LSTM预测模型"""
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2):
super(LSTMPredictor, self).__init__()
self.lstm = nn.LSTM(
input_dim,
hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=0.2
)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
# x shape: (batch_size, seq_len, input_dim)
lstm_out, _ = self.lstm(x)
# 取最后一个时间步的输出
output = lstm_out[:, -1, :]
output = self.fc(output)
return output
# 训练代码
def train_lstm(model, train_loader, val_loader, epochs=100, lr=0.001):
"""训练LSTM模型"""
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
for epoch in range(epochs):
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
# 验证
model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
val_loss += loss.item()
print(f'Epoch {epoch+1}/{epochs}, '
f'Train Loss: {train_loss/len(train_loader):.4f}, '
f'Val Loss: {val_loss/len(val_loader):.4f}')2. Transformer模型
基于自注意力机制,能同时捕捉多个时间步的依赖关系:
class TransformerPredictor(nn.Module):
"""Transformer预测模型"""
def __init__(self, input_dim, d_model, nhead, num_layers, output_dim):
super(TransformerPredictor, self).__init__()
self.embedding = nn.Linear(input_dim, d_model)
self.pos_encoder = PositionalEncoding(d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=256,
dropout=0.1
)
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer,
num_layers=num_layers
)
self.fc = nn.Linear(d_model, output_dim)
def forward(self, x):
# x shape: (batch_size, seq_len, input_dim)
x = self.embedding(x)
x = self.pos_encoder(x)
# Transformer expects (seq_len, batch_size, d_model)
x = x.transpose(0, 1)
x = self.transformer_encoder(x)
x = x.transpose(0, 1)
# 取最后一个时间步
output = x[:, -1, :]
output = self.fc(output)
return output
class PositionalEncoding(nn.Module):
"""位置编码"""
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(1), :]3. 空间时间图网络(XGTN)
百度KDD Cup 2022前6名使用的模型,利用风机之间的空间相关性:
class XGTN(nn.Module):
"""时空图网络"""
def __init__(self, num_nodes, feature_dim, embed_dim, adj_matrix):
super(XGTN, self).__init__()
self.num_nodes = num_nodes
self.embed_dim = embed_dim
# 节点嵌入
self.node_embedding = nn.Parameter(torch.randn(num_nodes, embed_dim))
# 图卷积层
self.gconv1 = GraphConv(feature_dim + embed_dim, 64, adj_matrix)
self.gconv2 = GraphConv(64, 32, adj_matrix)
# 时间建模(使用TCN或LSTM)
self.temporal_model = nn.LSTM(32, 32, batch_first=True)
# 输出层
self.fc = nn.Linear(32, 288) # 288 = 48小时 * 6(10分钟/小时)
def forward(self, x):
# x shape: (batch_size, seq_len, num_nodes, feature_dim)
batch_size, seq_len, num_nodes, feature_dim = x.size()
# 添加节点嵌入
node_emb = self.node_embedding.expand(batch_size, seq_len, -1, -1)
x = torch.cat([x, node_emb], dim=-1)
# 图卷积
x = x.view(batch_size * seq_len, num_nodes, -1)
x = F.relu(self.gconv1(x))
x = F.relu(self.gconv2(x))
x = x.view(batch_size, seq_len, num_nodes, -1)
# 时间建模
x = x.transpose(1, 2) # (batch_size, num_nodes, seq_len, feature_dim)
x = x.reshape(batch_size * num_nodes, seq_len, -1)
x, _ = self.temporal_model(x)
# 输出
x = x[:, -1, :]
x = self.fc(x)
x = x.view(batch_size, num_nodes, -1)
return x
class GraphConv(nn.Module):
"""图卷积层"""
def __init__(self, in_features, out_features, adj_matrix):
super(GraphConv, self).__init__()
self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features))
self.adj_matrix = adj_matrix
nn.init.xavier_uniform_(self.weight)
def forward(self, x):
# x shape: (batch_size, num_nodes, in_features)
support = torch.matmul(x, self.weight)
output = torch.matmul(self.adj_matrix, support)
return output模型构建过程
4.1 数据准备流程
4.1.1 数据加载
import polars as pl
from pathlib import Path
class DataLoader:
"""统一数据加载器"""
def __init__(self, data_root):
self.data_root = Path(data_root)
def load_weather_data(self, mode="train", plant_ids=None):
"""加载气象数据"""
weather_data = {}
path = self.data_root / mode
for plant_folder in path.iterdir():
if plant_folder.is_dir():
plant_id = int(plant_folder.name)
if plant_ids and plant_id not in plant_ids:
continue
dfs = []
for file in plant_folder.glob("*.nc"):
df = self.read_netcdf(file)
df = df.with_columns(pl.lit(file.stem).alias("source"))
dfs.append(df)
# 合并多个气象源
weather_data[plant_id] = self.merge_nwp_sources(dfs)
return weather_data
def read_netcdf(self, file_path):
"""读取NetCDF格式的气象数据"""
import xarray as xr
ds = xr.open_dataset(file_path, engine='h5netcdf')
# 转换为DataFrame
df = ds.to_dataframe().reset_index()
return pl.from_pandas(df)
def merge_nwp_sources(self, dfs):
"""合并多个气象源数据"""
# 实现多源数据的合并逻辑
merged = pl.concat(dfs, how="diagonal")
return merged
def load_power_data(self, mode="train", plant_ids=None):
"""加载功率数据"""
power_data = {}
path = f"{self.data_root}/{mode}/fact_data_{mode}"
for plant_id in range(1, 11):
if plant_ids and plant_id not in plant_ids:
continue
file_path = f"{path}/{plant_id}_normalization_{mode}.csv"
df = pl.read_csv(file_path)
# 时间格式转换
df = df.with_columns([
pl.col("时间").str.to_datetime("%Y-%m-%d %H:%M:%S")
.alias("time")
])
# 重命名和选择列
df = df.rename({"功率(MW)": "power"})
df = df.select(["time", "power"])
power_data[plant_id] = df
return power_data4.1.2 数据预处理
class DataPreprocessor:
"""数据预处理器"""
def __init__(self):
self.scalers = {}
def clean_data(self, df):
"""数据清洗"""
# 1. 处理缺失值
df = df.fill_null(0)
# 2. 处理异常值
# 功率不能为负(某些特殊场站除外)
df = df.with_columns([
pl.when(pl.col("power") < 0)
.then(0)
.otherwise(pl.col("power"))
.alias("power")
])
# 3. 去除重复值
df = df.unique()
return df
def normalize_data(self, df, columns, mode="standard"):
"""数据标准化"""
normalized = df.clone()
for col in columns:
if mode == "standard":
# Z-score标准化
mean = df[col].mean()
std = df[col].std()
normalized = normalized.with_columns([
((pl.col(col) - mean) / std).alias(f"{col}_norm")
])
self.scalers[col] = {"mean": mean, "std": std}
elif mode == "minmax":
# Min-Max归一化
min_val = df[col].min()
max_val = df[col].max()
normalized = normalized.with_columns([
((pl.col(col) - min_val) / (max_val - min_val))
.alias(f"{col}_norm")
])
self.scalers[col] = {"min": min_val, "max": max_val}
return normalized
def align_data(self, weather_data, power_data):
"""气象数据与功率数据对齐"""
aligned_data = {}
for plant_id in power_data.keys():
weather_df = weather_data[plant_id]
power_df = power_data[plant_id]
# 按时间合并
merged = power_df.join(
weather_df,
on="time",
how="left"
)
aligned_data[plant_id] = merged
return aligned_data
def create_sequences(self, df, seq_len=24, pred_len=96):
"""创建时间序列样本"""
sequences = []
targets = []
data = df.to_numpy()
for i in range(len(data) - seq_len - pred_len):
seq = data[i:i+seq_len]
target = data[i+seq_len:i+seq_len+pred_len]
sequences.append(seq)
targets.append(target)
return np.array(sequences), np.array(targets)4.2 特征工程实现
class FeatureEngineer:
"""特征工程"""
def __init__(self):
pass
def add_all_features(self, df):
"""添加所有特征"""
df = self.add_time_features(df)
df = self.add_weather_features(df)
df = self.add_statistical_features(df)
df = self.add_lag_features(df)
df = self.add_interaction_features(df)
return df
def add_time_features(self, df):
"""时间特征"""
df = df.with_columns([
pl.col("time").dt.year().alias("year"),
pl.col("time").dt.month().alias("month"),
pl.col("time").dt.day().alias("day"),
pl.col("time").dt.hour().alias("hour"),
pl.col("time").dt.minute().alias("minute"),
pl.col("time").dt.weekday().alias("weekday"),
])
# 周期性编码
df = df.with_columns([
(2 * np.pi * pl.col("month") / 12).sin().alias("month_sin"),
(2 * np.pi * pl.col("month") / 12).cos().alias("month_cos"),
(2 * np.pi * pl.col("hour") / 24).sin().alias("hour_sin"),
(2 * np.pi * pl.col("hour") / 24).cos().alias("hour_cos"),
])
# 白天标识
df = df.with_columns([
pl.when((pl.col("hour") >= 7) & (pl.col("hour") < 19))
.then(1)
.otherwise(0)
.alias("is_daytime")
])
return df
def add_weather_features(self, df):
"""气象特征"""
# 风速合成
if "u100" in df.columns and "v100" in df.columns:
df = df.with_columns([
((pl.col("u100")**2 + pl.col("v100")**2)**0.5)
.alias("wind_speed"),
])
df = df.with_columns([
((270 - (pl.arctan2(pl.col("v100"), pl.col("u100")) *
180 / np.pi)) % 360)
.alias("wind_direction"),
])
# 风功率密度
if "wind_speed" in df.columns:
df = df.with_columns([
(0.5 * 1.225 * pl.col("wind_speed")**3)
.alias("wind_power_density"),
])
# 温湿比
if "temperature" in df.columns and "humidity" in df.columns:
df = df.with_columns([
(pl.col("temperature") / pl.col("humidity"))
.alias("temp_humidity_ratio"),
])
return df
def add_statistical_features(self, df, windows=[1, 4, 24, 96]):
"""统计特征(滑动窗口)"""
for window in windows:
# 滚动统计
df = df.with_columns([
pl.col("power")
.rolling_mean(window_size=window, center=False)
.alias(f"power_rolling_mean_{window}"),
pl.col("power")
.rolling_std(window_size=window, center=False)
.alias(f"power_rolling_std_{window}"),
pl.col("power")
.rolling_max(window_size=window, center=False)
.alias(f"power_rolling_max_{window}"),
pl.col("power")
.rolling_min(window_size=window, center=False)
.alias(f"power_rolling_min_{window}"),
])
return df
def add_lag_features(self, df, lags=[1, 2, 3, 4, 6, 12, 24, 48]):
"""滞后特征"""
for lag in lags:
df = df.with_columns([
pl.col("power").shift(lag).alias(f"power_lag_{lag}")
])
return df
def add_interaction_features(self, df):
"""交互特征"""
# 辐照度和温度交互
if "poai" in df.columns and "temperature" in df.columns:
df = df.with_columns([
(pl.col("poai") * pl.col("temperature"))
.alias("irr_temp_interaction"),
])
# 风速和风向交互
if "wind_speed" in df.columns and "wind_direction" in df.columns:
df = df.with_columns([
(pl.col("wind_speed") *
pl.col("wind_direction").radians().sin())
.alias("wind_u_component"),
(pl.col("wind_speed") *
pl.col("wind_direction").radians().cos())
.alias("wind_v_component"),
])
return df4.3 模型训练流程
4.3.1 树模型训练
class TreeModelTrainer:
"""树模型训练器"""
def __init__(self, model_type="lgbm"):
self.model_type = model_type
def train(self, X_train, y_train, X_val, y_val, params=None):
"""训练模型"""
if self.model_type == "lgbm":
return self._train_lightgbm(X_train, y_train, X_val, y_val, params)
elif self.model_type == "xgboost":
return self._train_xgboost(X_train, y_train, X_val, y_val, params)
else:
raise ValueError(f"Unknown model type: {self.model_type}")
def _train_lightgbm(self, X_train, y_train, X_val, y_val, params=None):
"""训练LightGBM"""
if params is None:
params = {
'objective': 'regression',
'metric': 'rmse',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1
}
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params,
train_data,
num_boost_round=10000,
valid_sets=[train_data, val_data],
early_stopping_rounds=100,
verbose_eval=100
)
return model
def _train_xgboost(self, X_train, y_train, X_val, y_val, params=None):
"""训练XGBoost"""
if params is None:
params = {
'max_depth': 7,
'learning_rate': 0.1,
'n_estimators': 1000,
'min_child_weight': 5,
'subsample': 0.9,
'colsample_bytree': 0.8,
'reg_alpha': 0.1,
'reg_lambda': 1,
'eval_metric': 'rmse'
}
model = xgb.XGBRegressor(**params)
model.fit(
X_train, y_train,
eval_set=[(X_train, y_train), (X_val, y_val)],
early_stopping_rounds=100,
verbose=100
)
return model
def hyperparameter_tuning(self, X_train, y_train, param_grid):
"""超参数调优"""
from sklearn.model_selection import GridSearchCV
if self.model_type == "xgboost":
model = xgb.XGBRegressor()
elif self.model_type == "lgbm":
model = lgb.LGBMRegressor()
grid_search = GridSearchCV(
model,
param_grid,
cv=5,
scoring='neg_mean_absolute_error',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
return grid_search.best_estimator_, grid_search.best_params_4.3.2 深度学习模型训练
class DeepModelTrainer:
"""深度学习模型训练器"""
def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
self.model = model.to(device)
self.device = device
def train(self, train_loader, val_loader, epochs=100, lr=0.001):
"""训练模型"""
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', patience=5, factor=0.5
)
best_val_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 10
for epoch in range(epochs):
# 训练阶段
self.model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
optimizer.zero_grad()
outputs = self.model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
# 验证阶段
self.model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
outputs = self.model(X_batch)
loss = criterion(outputs, y_batch)
val_loss += loss.item()
avg_train_loss = train_loss / len(train_loader)
avg_val_loss = val_loss / len(val_loader)
print(f'Epoch {epoch+1}/{epochs}, '
f'Train Loss: {avg_train_loss:.6f}, '
f'Val Loss: {avg_val_loss:.6f}')
# 学习率调度
scheduler.step(avg_val_loss)
# Early Stopping
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
torch.save(self.model.state_dict(), 'best_model.pth')
early_stopping_counter = 0
else:
early_stopping_counter += 1
if early_stopping_counter >= early_stopping_patience:
print('Early stopping triggered!')
break
# 加载最佳模型
self.model.load_state_dict(torch.load('best_model.pth'))
return self.model
def predict(self, data_loader):
"""预测"""
self.model.eval()
predictions = []
with torch.no_grad():
for X_batch in data_loader:
X_batch = X_batch.to(self.device)
outputs = self.model(X_batch)
predictions.append(outputs.cpu().numpy())
return np.concatenate(predictions, axis=0)实验结果展示
5.1 评估指标
5.1.1 竞赛评估指标
不同竞赛使用不同的评估指标,我们需要实现统一的评估函数:
科大讯飞竞赛评估指标:
def compute_accuracy_xunfei(predictions, targets, station_type="wind"):
"""
科大讯飞竞赛准确率计算
Parameters:
-----------
predictions : np.ndarray
预测功率值, shape (n_samples,)
targets : np.ndarray
实际功率值, shape (n_samples,)
station_type : str
场站类型: "wind" 或 "pv"
Returns:
--------
float
准确率
"""
# 准备数据
df = pd.DataFrame({
'time': predictions.index,
'predict_power': predictions.values,
'real_power': targets.values
})
# 提取小时信息
df['hour'] = df['time'].dt.hour
if station_type == "wind":
# 风电: 只计算实发功率>10MW的时间点
mask = df['real_power'] > 10
elif station_type == "pv":
# 光伏: 计算大发时段[11:00~14:00)或其他时段实发功率>10MW的点
mask = ((df['hour'] >= 11) & (df['hour'] < 14)) | (df['real_power'] > 10)
else:
raise ValueError("station_type must be 'wind' or 'pv'")
# 过滤数据
df_filtered = df[mask].copy()
# 处理光伏大发时段零值
if station_type == "pv":
df_filtered.loc[(df_filtered['real_power'] == 0) &
(df_filtered['hour'] >= 11) &
(df_filtered['hour'] < 14), 'real_power'] = 0.01
# 计算误差
df_filtered['error'] = (df_filtered['real_power'] - df_filtered['predict_power'])**2
df_filtered['denominator'] = df_filtered['real_power']**2
# 计算准确率
numerator = np.sqrt(df_filtered['error'].sum() / len(df_filtered))
denominator = np.sqrt(df_filtered['denominator'].sum() / len(df_filtered))
accuracy = 1 - (numerator / denominator)
# 准确率小于0取0
accuracy = max(accuracy, 0)
return accuracy第三届世界科学智能大赛评估指标:
def compute_accuracy_world(predict, target):
"""
世界科学智能大赛准确率计算
Parameters:
-----------
predict : np.ndarray
预测功率, shape (num_plants, num_days, 96)
target : np.ndarray
实际功率, shape (num_plants, num_days, 96)
Returns:
--------
float
最终准确率
"""
num_plants = predict.shape[0]
num_days = predict.shape[1]
num_points = predict.shape[2]
C_f_list = []
for plant_id in range(num_plants):
C_R_list = []
for day in range(num_days):
# 计算当天的准确率
pred_day = predict[plant_id, day, :]
target_day = target[plant_id, day, :]
# 过滤异常值(根据竞赛规则处理)
valid_mask = ~(np.isnan(target_day) | np.isnan(pred_day))
pred_day_valid = pred_day[valid_mask]
target_day_valid = target_day[valid_mask]
if len(pred_day_valid) == 0:
C_R = 0
else:
# 计算均方根相对误差
rmse = np.sqrt(
np.sum((pred_day_valid - target_day_valid)**2) /
len(pred_day_valid)
)
# 归一化
norm_target = np.maximum(target_day_valid, 0.2)
rmse_norm = np.sqrt(
np.sum(((pred_day_valid - target_day_valid) / norm_target)**2) /
len(pred_day_valid)
)
C_R = max(1 - rmse_norm, 0)
C_R_list.append(C_R)
# 场站准确率
C_f = np.mean(C_R_list)
C_f_list.append(C_f)
# 最终准确率
C = np.mean(C_f_list)
return C5.1.2 标准回归指标
def compute_regression_metrics(y_true, y_pred):
"""计算标准回归指标"""
# MAE
mae = mean_absolute_error(y_true, y_pred)
# MSE
mse = mean_squared_error(y_true, y_pred)
# RMSE
rmse = np.sqrt(mse)
# MAPE
mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100
# R²
r2 = r2_score(y_true, y_pred)
metrics = {
'MAE': mae,
'MSE': mse,
'RMSE': rmse,
'MAPE': mape,
'R²': r2
}
return metrics5.2 实验结果对比
5.2.1 模型性能对比
- 空间模型(XGTN)在风电预测中表现最佳,充分验证了风机间空间相关性的重要性
- Transformer在光伏预测中表现良好,能有效捕捉长时序依赖
- 树模型具有很好的性价比,训练快速且性能不差
5.2.2 特征工程效果对比
- 时间特征和统计特征是最基础也最重要的特征
- 滞后特征对短期预测贡献显著
- 两阶段策略(先预测实发辐照度)在光伏预测中效果明显
性能优化策略
6.1 模型融合策略
6.1.1 加权平均融合
class WeightedEnsemble:
"""加权平均融合"""
def __init__(self, models, weights=None):
self.models = models
self.num_models = len(models)
if weights is None:
# 默认等权重
self.weights = np.ones(self.num_models) / self.num_models
else:
# 归一化权重
self.weights = np.array(weights)
self.weights = self.weights / self.weights.sum()
def predict(self, X):
"""预测"""
predictions = []
for i, model in enumerate(self.models):
pred = model.predict(X)
predictions.append(pred)
predictions = np.array(predictions)
# 加权平均
weighted_pred = np.average(predictions, axis=0, weights=self.weights)
return weighted_pred
def optimize_weights(self, X_val, y_val):
"""优化权重(使用贝叶斯优化或网格搜索)"""
from scipy.optimize import minimize
def objective(weights):
weights = weights / weights.sum()
predictions = []
for model in self.models:
pred = model.predict(X_val)
predictions.append(pred)
predictions = np.array(predictions)
weighted_pred = np.average(predictions, axis=0, weights=weights)
# 最小化MAE
mae = np.mean(np.abs(weighted_pred - y_val))
return mae
# 初始权重
initial_weights = np.ones(self.num_models) / self.num_models
# 权重约束: 非负且和为1
constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
bounds = [(0, 1) for _ in range(self.num_models)]
# 优化
result = minimize(
objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
optimal_weights = result.x / result.x.sum()
self.weights = optimal_weights
print(f"Optimal weights: {optimal_weights}")
print(f"Optimization result: {result}")
return optimal_weights6.1.2 Stacking融合
from sklearn.model_selection import KFold
from sklearn.linear_model import Ridge
class StackingEnsemble:
"""Stacking融合"""
def __init__(self, base_models, meta_model=None):
self.base_models = base_models
self.meta_model = meta_model if meta_model else Ridge()
def fit(self, X, y, cv=5):
"""训练模型"""
# 第一层: 训练基模型
self.fitted_base_models = []
for model in self.base_models:
model.fit(X, y)
self.fitted_base_models.append(model)
# 第二层: 生成元特征
meta_features = self._generate_meta_features(X, y, cv)
# 训练元模型
self.meta_model.fit(meta_features, y)
def _generate_meta_features(self, X, y, cv):
"""生成元特征"""
kf = KFold(n_splits=cv, shuffle=True, random_state=42)
meta_features = np.zeros((len(X), len(self.base_models)))
for train_idx, val_idx in kf.split(X):
X_train, X_val = X[train_idx], X[val_idx]
for i, model in enumerate(self.base_models):
# 在训练集上训练
model_clone = clone(model)
model_clone.fit(X_train, y[train_idx])
# 在验证集上预测
pred = model_clone.predict(X_val)
meta_features[val_idx, i] = pred
return meta_features
def predict(self, X):
"""预测"""
# 第一层预测
meta_features = np.zeros((len(X), len(self.base_models)))
for i, model in enumerate(self.fitted_base_models):
pred = model.predict(X)
meta_features[:, i] = pred
# 第二层预测
final_pred = self.meta_model.predict(meta_features)
return final_pred6.2 超参数优化
6.2.1 网格搜索
from sklearn.model_selection import GridSearchCV
def grid_search_optimization(model, param_grid, X_train, y_train):
"""网格搜索优化"""
grid_search = GridSearchCV(
estimator=model,
param_grid=param_grid,
cv=5,
scoring='neg_mean_absolute_error',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_}")
return grid_search.best_estimator_
# 示例: XGBoost超参数优化
xgb_param_grid = {
'n_estimators': [100, 200, 500, 1000],
'max_depth': [3, 5, 7, 9],
'learning_rate': [0.01, 0.05, 0.1, 0.2],
'min_child_weight': [1, 3, 5],
'subsample': [0.6, 0.8, 1.0],
'colsample_bytree': [0.6, 0.8, 1.0],
'reg_alpha': [0, 0.01, 0.1, 1],
'reg_lambda': [0, 0.1, 1, 10]
}
best_xgb = grid_search_optimization(
XGBRegressor(),
xgb_param_grid,
X_train,
y_train
)6.2.2 贝叶斯优化
from skopt import BayesSearchCV
from skopt.space import Real, Categorical, Integer
def bayesian_optimization(model, search_space, X_train, y_train):
"""贝叶斯优化"""
opt = BayesSearchCV(
estimator=model,
search_spaces=search_space,
n_iter=50,
cv=5,
scoring='neg_mean_absolute_error',
n_jobs=-1,
random_state=42
)
opt.fit(X_train, y_train)
print(f"Best parameters: {opt.best_params_}")
print(f"Best score: {opt.best_score_}")
return opt.best_estimator_
# 示例: LightGBM超参数优化
lgb_search_space = {
'num_leaves': Integer(20, 100),
'learning_rate': Real(0.001, 0.3, prior='log-uniform'),
'n_estimators': Integer(100, 5000),
'feature_fraction': Real(0.4, 1.0),
'bagging_fraction': Real(0.4, 1.0),
'bagging_freq': Integer(1, 10),
'min_child_samples': Integer(5, 50),
'reg_alpha': Real(0.0, 1.0),
'reg_lambda': Real(0.0, 1.0)
}
best_lgb = bayesian_optimization(
lgb.LGBMRegressor(),
lgb_search_space,
X_train,
y_train
)6.3 后处理策略
6.3.1 边界约束
def apply_boundary_constraints(predictions, station_type="wind", capacity=None):
"""应用边界约束"""
predictions = predictions.copy()
if station_type == "wind":
# 风电: 功率在0到容量之间
if capacity:
predictions = np.clip(predictions, 0, capacity)
else:
predictions = np.maximum(predictions, 0)
elif station_type == "pv":
# 光伏: 夜晚功率应为0
# 假设predictions包含时间信息
if hasattr(predictions, 'index'):
hours = predictions.index.hour
predictions[hours < 6] = 0
predictions[hours > 20] = 0
predictions = np.maximum(predictions, 0)
else:
predictions = np.maximum(predictions, 0)
return predictions6.3.2 平滑处理
from scipy.signal import savgol_filter
def smooth_predictions(predictions, window_length=5, polyorder=2):
"""平滑预测结果"""
smoothed = savgol_filter(
predictions,
window_length=window_length,
polyorder=polyorder
)
return smoothed
# 指数移动平均平滑
def ema_smooth(predictions, alpha=0.3):
"""指数移动平均平滑"""
smoothed = predictions.copy()
for i in range(1, len(predictions)):
smoothed[i] = alpha * predictions[i] + (1 - alpha) * smoothed[i-1]
return smoothed6.3.3 基于气象的修正
def weather_based_correction(predictions, weather_data):
"""基于气象数据修正预测"""
corrected = predictions.copy()
# 光伏: 根据辐照度修正
if 'poai' in weather_data.columns or 'direct_radiation' in weather_data.columns:
# 获取辐照度列名
irr_col = 'poai' if 'poai' in weather_data.columns else 'direct_radiation'
# 夜晚辐照度接近0,功率应为0
night_mask = weather_data[irr_col] < 10
corrected[night_mask] = 0
# 白天: 确保功率和辐照度正相关
day_mask = weather_data[irr_col] >= 10
# 计算功率-辐照度比率
if len(day_mask) > 0:
power_irr_ratio = predictions[day_mask] / (weather_data.loc[day_mask, irr_col] + 1e-8)
# 限制比率在合理范围内
median_ratio = np.median(power_irr_ratio)
corrected[day_mask] = np.minimum(
predictions[day_mask],
weather_data.loc[day_mask, irr_col] * median_ratio * 2
)
# 风电: 根据风速修正
if 'wind_speed' in weather_data.columns:
# 风速过低,功率应为0
low_wind_mask = weather_data['wind_speed'] < 3
corrected[low_wind_mask] = 0
return corrected遇到的问题及解决方案
7.1 数据问题
问题1: 时间格式不一致
现象:
- 国能日新竞赛中,CSV文件显示的时间为整数,但实际为浮点数
- 例如显示"0:00",实际为"23:59:99"
- 导致提取的小时信息完全错误
解决方案:
import datetime
def fix_time_format(data):
"""修正时间格式"""
# 原始时间是datetime格式,需要修正
data['时间'] = pd.to_datetime(data['时间'])
# 检查是否有边界时间点
# 如果时间接近下一个小时的开始,向前调整1秒
data['时间'] = data['时间'].apply(
lambda x: x - datetime.timedelta(seconds=1)
if x.minute == 0 and x.second == 0 and x.microsecond > 990000
else x
)
return data经验教训:
- 在数据探索阶段就要仔细检查时间格式
- 打印样本数据时不要只看字符串表示,要看实际值
- 处理时间数据时使用datetime类型而非字符串
问题2: 实发辐照度缺失
现象:
- 测试集包含"实发辐照度"特征
- 训练集缺少该特征
- 实发辐照度与实际功率高度相关
解决方案:
采用两阶段预测策略:
- 第一阶段: 预测实发辐照度
- 第二阶段: 使用预测的实发辐照度预测实际功率
def two_stage_prediction(train, test):
"""两阶段预测"""
# 第一阶段: 预测实发辐照度
irradiance_model = train_irradiance_model(train)
predicted_irradiance = irradiance_model.predict(test)
# 将预测的实发辐照度加入测试集
test['实发辐照度'] = predicted_irradiance
# 第二阶段: 预测实际功率
power_model = train_power_model(train)
predictions = power_model.predict(test)
return predictions经验教训:
- 测试集包含而训练集缺失的特征通常需要额外处理
- 考虑是否可以预测该特征,或寻找替代方案
- 两阶段策略是处理此类问题的有效方法
问题3: 数据质量问题
现象:
- 缺失值
- 死值(长时间保持不变的值)
- 异常值(负值、超限值)
解决方案:
def handle_data_quality_issues(data, station_id):
"""处理数据质量问题"""
# 1. 缺失值处理
# 使用前向填充和后向填充
data = data.fillna(method='ffill').fillna(method='bfill')
# 2. 死值处理
# 识别死值: 连续相同值的时段超过阈值
def detect_dead_values(series, threshold=10):
dead_mask = pd.Series(False, index=series.index)
for i in range(len(series) - threshold):
if (series.iloc[i:i+threshold] == series.iloc[i]).all():
dead_mask.iloc[i:i+threshold] = True
return dead_mask
# 根据站点特点处理死值
if station_id == 2:
# 站点2: 特定时段的死值
exclude_hours = [0, 1, 2, 3, 4, 5, 20, 21, 22, 23]
data = data[~((data.hour.isin(exclude_hours)) & (data.实际功率 == 4.4))]
# 3. 异常值处理
# 负值处理
data.loc[data.实际功率 < 0, '实际功率'] = 0
# 超限值处理
capacity = data.实际功率.quantile(0.99) # 使用99分位数作为容量参考
data.loc[data.实际功率 > capacity, '实际功率'] = capacity
return data经验教训:
- 数据清洗是模型成功的关键
- 不同站点可能有不同的数据特点,需要分别处理
- 可视化有助于发现数据质量问题
7.2 模型问题
问题1: 模型过拟合
现象:
- 训练集MAE很低,验证集MAE很高
- 模型在训练集上表现很好,但泛化能力差
解决方案:
def prevent_overfitting():
"""防止过拟合的策略"""
# 1. 正则化
xgb_params = {
'reg_alpha': 0.1, # L1正则化
'reg_lambda': 1.0, # L2正则化
}
lgb_params = {
'feature_fraction': 0.8, # 特征采样
'bagging_fraction': 0.8, # 样本采样
'bagging_freq': 5,
}
# 2. 早停
model = xgb.XGBRegressor(
n_estimators=10000,
early_stopping_rounds=100
)
model.fit(X_train, y_train,
eval_set=[(X_val, y_val)],
verbose=100)
# 3. 交叉验证
from sklearn.model_selection import cross_val_score
cv_scores = cross_val_score(
model, X, y,
cv=5,
scoring='neg_mean_absolute_error'
)
# 4. 降低模型复杂度
xgb_params = {
'max_depth': 5, # 减小树深度
'min_child_weight': 5, # 增加最小叶子权重
'learning_rate': 0.05, # 降低学习率
}
return model经验教训:
- 交叉验证是评估模型泛化能力的关键
- 正则化和早停是防止过拟合的有效手段
- 复杂模型不一定比简单模型好
问题2: 深度学习模型训练不稳定
现象:
- 损失不收敛
- 梯度爆炸或消失
- 验证集性能波动大
解决方案:
def stabilize_training():
"""稳定训练的策略"""
# 1. 梯度裁剪
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# 2. 学习率调度
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', patience=5, factor=0.5
)
# 或使用余弦退火
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=50, eta_min=1e-6
)
# 3. Batch Normalization
model = nn.Sequential(
nn.Linear(input_dim, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, output_dim)
)
# 4. 权重初始化
def init_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.constant_(m.bias, 0)
model.apply(init_weights)
# 5. 数据增强
def augment_data(X, y):
# 时间抖动
noise = np.random.normal(0, 0.01, X.shape)
X_noisy = X + noise
return X_noisy, y
return model, scheduler经验教训:
- 深度学习模型的训练需要更多调参技巧
- 监控训练过程,及时发现异常
- 数据标准化很重要
问题3: 特征冗余
现象:
- 特征数量过多(100+)
- 模型训练慢
- 特征重要性分布不均
解决方案:
def feature_selection(X, y, method='importance'):
"""特征选择"""
if method == 'importance':
# 基于特征重要性
model = XGBRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
importance = model.feature_importances_
# 选择重要性前N的特征
n_features = 50
selected_indices = np.argsort(importance)[-n_features:]
return X.iloc[:, selected_indices]
elif method == 'correlation':
# 基于相关性
corr_matrix = X.corr().abs()
# 高相关性特征对
high_corr = np.where(corr_matrix > 0.95)
features_to_drop = set()
for i, j in zip(*high_corr):
if i < j:
features_to_drop.add(X.columns[j])
X_selected = X.drop(columns=features_to_drop)
return X_selected
elif method == 'rfe':
# 递归特征消除
from sklearn.feature_selection import RFE
model = XGBRegressor(n_estimators=50, random_state=42)
rfe = RFE(model, n_features_to_select=50)
rfe.fit(X, y)
selected_features = X.columns[rfe.support_]
return X[selected_features]经验教训:
- 特征选择可以提升模型性能并加速训练
- 基于特征重要性的方法简单有效
- 特征数量不是越多越好
7.3 部署问题
问题1: 推理速度慢
现象:
- 单次预测耗时过长
- 无法满足实时性要求
解决方案:
def accelerate_inference():
"""加速推理"""
# 1. 模型量化
import torch.quantization as quantization
# 动态量化
model_quantized = quantization.quantize_dynamic(
model, {nn.Linear}, dtype=torch.qint8
)
# 2. ONNX导出
import torch.onnx
dummy_input = torch.randn(1, seq_len, input_dim)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
opset_version=11,
input_names=['input'],
output_names=['output']
)
# 3. 批处理
def batch_predict(model, data, batch_size=64):
predictions = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
batch_pred = model.predict(batch)
predictions.append(batch_pred)
return np.concatenate(predictions)
# 4. LightGBM加速
model = lgb.Booster(model_file='model.txt')
# 禁用预测的额外检查
model.reset_parameter({
'predict_disable_shape_check': True
})
return model_quantized经验教训:
- 模型量化可以显著提升推理速度
- 批处理可以充分利用计算资源
- 考虑使用更高效的推理框架(如ONNX Runtime)
问题2: Docker部署问题
现象:
- 容器内无法运行
- 文件读取错误
- 网络连接问题
解决方案:
# Dockerfile最佳实践
FROM registry.cn-shanghai.cr.aliyuncs.com/tcc_public/python:3.10
# 设置环境变量
ENV HDF5_USE_FILE_LOCKING=FALSE
ENV PYTHONUNBUFFERED=1
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY code/ /app/code
COPY model/ /app/model
# 设置工作目录
WORKDIR /app
# 入口脚本
COPY run.sh /app/
RUN chmod +x /app/run.sh#!/bin/bash
# run.sh
cd /app/code
# 运行推理
python inference.py \
--input_dir /saisdata/POWER_TEST_ENV \
--output_dir /saisresult \
--model_path /app/model
# 创建输出压缩包
cd /saisresult
zip -r output.zip output*.csv经验教训:
- 容器内无法访问网络,需预装所有依赖
- 使用绝对路径
- 环境变量设置很重要
- 测试容器是否能正常运行再提交
竞赛经验总结
8.1 核心经验总结
经验1: 特征为王
核心观点: 特征工程的质量决定了模型性能的上限
实践证明:
- 在国能日新竞赛中,通过精心设计的特征工程(实发辐照度预测、统计特征、滞后特征),最终排名21
- 即使简单的模型(如线性回归),配合好的特征也能取得不错的效果
特征工程原则:
领域知识驱动: 深入理解新能源发电的物理原理
- 光伏: 辐照度、温度、面板效率
- 风电: 风速、风向、空气密度
时间特征优先: hour、month、day、weekday等基础特征
统计特征补充: 滚动统计(window mean/std/max/min)
滞后特征重要: 历史1-48小时的功率和气象数据
交互特征创新: 变量之间的乘除运算
# 特征优先级排序(基于实际效果)
FEATURE_PRIORITY = {
"时间特征": 10, # hour, month, daytime
"统计特征": 9, # rolling mean, std
"滞后特征": 8, # lag features
"气象特征": 8, # wind_speed, irradiance
"交互特征": 7, # variable interactions
"分箱特征": 6, # binned features
"空间特征": 7, # for wind farms with spatial correlation
}经验2: 数据预处理比模型更重要
核心观点: 高质量的数据输入比复杂的模型更有效
关键点:
- 时间对齐: 气象预报时间和功率时间必须精确对齐
- 异常值处理: 缺失值、死值、异常值的合理处理
- 数据标准化: 合理的归一化/标准化策略
数据质量检查清单:
□ 时间戳是否连续?是否有跳变?
□ 是否有缺失值?如何填充?
□ 是否有异常值(负值、超限值)?
□ 是否有死值(长时间不变的值)?
□ 数据分布是否合理?
□ 训练集和测试集分布是否一致?经验3: 从简单到复杂
核心观点: 先建立基线,逐步优化
推荐流程:
1. 基线模型(2-3天)
└─ LightGBM + 基础特征
2. 特征优化(5-7天)
└─ 添加统计特征、滞后特征
3. 模型优化(5-7天)
└─ 尝试XGBoost、CatBoost
4. 进阶模型(可选,5-7天)
└─ LSTM、Transformer
5. 模型融合(3-5天)
└─ 加权平均、Stacking
6. 后处理(2-3天)
└─ 边界约束、平滑处理经验4: 交叉验证至关重要
核心观点: 本地验证分数与线上分数的一致性是成功的关键
验证策略:
- 时间序列交叉验证: 不打乱时间顺序
- 分段验证: 按月或季度划分
- 线下线上对比: 监控gap
def time_series_cv(X, y, n_splits=5):
"""时间序列交叉验证"""
n_samples = len(X)
fold_size = n_samples // n_splits
cv_scores = []
for i in range(n_splits):
train_start = 0
train_end = (i + 1) * fold_size
test_start = train_end
test_end = test_end + fold_size if i < n_splits - 1 else n_samples
X_train, X_val = X[train_start:train_end], X[test_start:test_end]
y_train, y_val = y[train_start:train_end], y[test_start:test_end]
model = train_model(X_train, y_train)
score = evaluate_model(model, X_val, y_val)
cv_scores.append(score)
return cv_scores经验5: 模型融合提升显著
核心观点: 多模型融合往往能取得比单模型更好的效果
融合原则:
- 多样性: 使用不同类型的模型(树模型、深度学习)
- 相关性: 模型预测的相关性不宜过高
- 权重优化: 基于验证集优化融合权重
有效融合组合:
- LightGBM + XGBoost + CatBoost(树模型)
- LightGBM + LSTM(树模型+深度学习)
- 短期模型 + 长期模型(时间尺度分离)
8.2 不同场站的特点
风电场站特点
关键特征:
- 风速(80m或100m高度)
- 风向(经向和纬向风分量)
- 气温(影响空气密度)
- 气压(影响空气密度)
- 空间相关性(多风机之间的相关性)
建模建议:
- 空间模型: XGTN等图网络模型
- 风功率密度: 0.5 * ρ * v³
- 短期-长期分离: 短期依赖风惯性,长期依赖气象预报
- 风向处理: 转换为sin/cos分量
# 风电特征工程示例
def wind_feature_engineering(data):
# 1. 风速合成
data['wind_speed'] = np.sqrt(data['u100']**2 + data['v100']**2)
# 2. 风向转换
data['wind_dir_sin'] = np.sin(data['wind_direction'] * np.pi / 180)
data['wind_dir_cos'] = np.cos(data['wind_direction'] * np.pi / 180)
# 3. 空气密度(理想气体定律)
# ρ = P / (R * T)
R = 287.05 # 空气气体常数
data['air_density'] = data['pressure'] / (R * (data['temperature'] + 273.15))
# 4. 风功率密度
data['wind_power_density'] = 0.5 * data['air_density'] * data['wind_speed']**3
return data光伏场站特点
关键特征:
- 辐照度(总辐照度、直接辐照度、散射辐照度)
- 实发辐照度(面板辐照度,非常重要)
- 温度(影响光伏效率)
- 时间特征(白天/夜晚、季节变化)
建模建议:
- 两阶段预测: 先预测实发辐照度,再预测功率
- 温度修正: 光伏效率随温度升高而降低
- 时间特征: 区分白天和夜晚
- 非线性变换: 辐照度和功率的关系非线性
# 光伏特征工程示例
def pv_feature_engineering(data):
# 1. 白天标识
data['is_daytime'] = ((data['hour'] >= 6) & (data['hour'] <= 19)).astype(int)
# 2. 温度修正
# 光伏效率温度系数约 -0.004/℃
temp_coefficient = -0.004
ref_temp = 25 # 标准测试温度(℃)
data['efficiency_factor'] = 1 + temp_coefficient * (data['temperature'] - ref_temp)
# 3. 辐照度归一化
data['irradiance_norm'] = data['direct_radiation'] / (data['direct_radiation'].max() + 1e-8)
# 4. 理论功率
panel_efficiency = 0.18 # 面板效率
area = 1.0 # 假设单位面积
data['theoretical_power'] = (data['direct_radiation'] *
panel_efficiency * area *
data['efficiency_factor'])
return data8.3 竞赛技巧
技巧1: 理解评估指标
关键: 准确理解竞赛评估指标,针对性地优化
科大讯飞竞赛评估:
- 风电: 只计算实发功率>10MW的点
- 光伏: 计算大发时段[11:00~14:00)或其他时段>10MW的点
优化策略:
def competition_aware_loss(y_true, y_pred, station_type):
"""竞赛感知的损失函数"""
if station_type == "wind":
# 风电: 重点关注功率>10MW的时刻
mask = y_true > 10
loss = np.mean(np.abs(y_true[mask] - y_pred[mask]))
elif station_type == "pv":
# 光伏: 重点关注大发时段
hour = pd.to_datetime(y_true.index).hour
mask = ((hour >= 11) & (hour < 14)) | (y_true > 10)
loss = np.mean(np.abs(y_true[mask] - y_pred[mask]))
return loss技巧2: 时间序列的特殊性
注意: 时间序列不能简单随机划分训练集和验证集
正确做法:
# 错误: 随机划分
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)
# 正确: 按时间划分
split_idx = int(len(X) * 0.8)
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]技巧3: 提交前的检查
检查清单:
□ 文件格式是否正确?(csv, zip)
□ 文件名是否正确?(output1.csv, output2.csv等)
□ 列名是否正确?(time, power, predict_power等)
□ 数据长度是否正确?(96天 * 每天96点)
□ 时间范围是否正确?(2025/1/1~2025/2/28)
□ 数据类型是否正确?(float64)
□ 是否有空值?
□ 是否有负值或超限值?def submission_check(predictions, submission_format):
"""提交前检查"""
# 1. 检查长度
assert len(predictions) == len(submission_format), "长度不匹配"
# 2. 检查空值
assert not predictions.isnull().any().any(), "存在空值"
# 3. 检查负值
assert (predictions['predict_power'] >= 0).all(), "存在负值"
# 4. 检查时间连续性
time_diff = predictions['time'].diff()[1:]
expected_diff = pd.Timedelta(minutes=15)
assert (time_diff == expected_diff).all(), "时间不连续"
print("提交检查通过!")技巧4: 多提交策略
策略: 利用每日多次提交机会,尝试不同方案
实施方案:
- 早提交: 快速验证基线方案
- 中期提交: 测试特征工程和模型改进
- 后期提交: 融合最优模型,精细调参
- 最后提交: 稳定性最高的方案
def multi_submission_strategy():
"""多提交策略"""
submissions = {
"submission_1": "基线方案(LightGBM + 基础特征)",
"submission_2": "优化方案(XGBoost + 完整特征)",
"submission_3": "融合方案(多模型加权)",
"submission_4": "最终方案(融合 + 后处理)",
}
for name, desc in submissions.items():
print(f"{name}: {desc}")
# 生成提交文件
return submissions8.4 避坑指南
坑1: 忽视数据分布
现象: 训练集和测试集分布不一致
解决: 分析数据分布,必要时进行分布适配
def check_data_distribution(train, test):
"""检查数据分布"""
# 数值特征分布对比
numeric_cols = train.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.hist(train[col], bins=50, alpha=0.5, label='Train')
plt.hist(test[col], bins=50, alpha=0.5, label='Test')
plt.legend()
plt.title(f'{col} Distribution')
plt.subplot(1, 2, 2)
plt.boxplot([train[col].dropna(), test[col].dropna()],
labels=['Train', 'Test'])
plt.title(f'{col} Boxplot')
plt.show()坑2: 过度拟合验证集
现象: 验证集分数很高,但线上分数很差
原因: 在验证集上调参过多次
解决: 保留独立的测试集,减少验证集使用频率
# 正确的验证策略
# 1. 训练集: 用于模型训练
# 2. 验证集: 用于早停和超参数选择
# 3. 测试集: 只用于最终评估,不参与调参
# 数据划分
train_idx = int(len(data) * 0.6)
val_idx = int(len(data) * 0.8)
train_data = data[:train_idx]
val_data = data[train_idx:val_idx]
test_data = data[val_idx:] # 独立测试集坑3: 数据泄露
现象: 模型性能异常好,但实际无效
原因: 使用了未来的信息
常见数据泄露:
- 使用了测试集的统计信息(如mean, std)
- 滞后特征使用了未来数据
- 标准化使用了全量数据而非仅训练集
正确做法:
# 错误: 使用全量数据标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_all)
# 正确: 仅用训练集标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val) # 注意: 使用transform,不是fit_transform坑4: 时间处理错误
现象: 提取的时间特征错误
原因: 时区、时间格式、夏令时等问题
正确做法:
# 1. 明确时区
data['time'] = pd.to_datetime(data['time'], utc=True)
data['time'] = data['time'].dt.tz_convert('Asia/Shanghai')
# 2. 提取特征
data['hour'] = data['time'].dt.hour
data['month'] = data['time'].dt.month
data['weekday'] = data['time'].dt.weekday
# 3. 检查
print(f"Time range: {data['time'].min()} to {data['time'].max()}")
print(f"Unique hours: {sorted(data['hour'].unique())}")8.5 团队协作建议
建议1: 明确分工
角色分配:
- 数据工程师: 负责数据加载、清洗、预处理
- 特征工程师: 负责特征设计和实验
- 模型工程师: 负责模型训练、调参、融合
- 系统工程师: 负责代码管理、部署、提交
建议2: 版本控制
Git最佳实践:
# 分支策略
main # 稳定版本,用于最终提交
feature/xxx # 功能开发分支
experiment/xxx # 实验分支代码提交规范:
feat: 添加新的特征工程模块
fix: 修复时间格式处理bug
perf: 优化模型推理速度
refactor: 重构数据加载逻辑
test: 添加单元测试建议3: 实验记录
实验记录模板:
## 实验记录: 2024-01-15
### 目标
验证滞后特征对模型性能的影响
### 方案
- 模型: LightGBM
- 基础特征: 时间特征 + 统计特征
- 新增特征: 1, 2, 4, 6, 12, 24小时滞后
### 结果
| 滞后特征 | 训练MAE | 验证MAE | 线上准确率 |
|----------|---------|---------|-----------|
| 无 | 0.156 | 0.168 | 84.5% |
| 1,2,4 | 0.148 | 0.159 | 85.2% |
| 1,2,4,6,12,24 | 0.145 | 0.158 | 85.7% |
### 结论
滞后特征显著提升模型性能,最佳配置为1,2,4,6,12,24小时8.6 持续学习
8.6.1 竞赛链接
- 新能源发电功率预测挑战赛:https://challenge.xfyun.cn/topic/info?type=new-energy-power-forecast
- DCIC2024光伏发电出力预测:https://www.dcic-china.com/competitions/10097
- DCIC2024海上风电出力预测:https://www.dcic-china.com/competitions/10098
- 国能日新光伏竞赛:https://www.datafountain.cn/competitions/553
- 百度KDD杯2022:https://aistudio.baidu.com/aistudio/competition/detail/452
8.6.2 技术文档引用
- 万灿, 宋永华. 新能源电力系统概率预测理论与方法[M]. 科学出版社, 2022.
- 风力发电和光伏发电预测技术[M]. 中国电力出版社, 2020.
- 数值天气预报产品在新能源功率预测中的释用[J]. 电力系统自动化, 2021.
- https://www.zhihu.com/people/qin-zheng-kai-89
- https://blog.csdn.net/qq_45832050?spm=1000.2115.3001.5343
- 使用人工智能技术进行可再生能源预测的竞赛解决方案综合集合:https://zread.ai/QInzhengk/Wind-and-PV-AI-competitions/1-overview
结语
新能源功率预测是一项富有挑战性但也极具价值的任务。通过参与多个竞赛,我们总结出以下核心要点:
- 特征工程是核心: 好的特征是模型成功的基石
- 数据质量是关键: 仔细的数据预处理比复杂的模型更重要
- 从简单到复杂: 先建立基线,逐步优化
- 验证很重要: 确保本地验证与线上评分一致
- 融合提升显著: 多模型融合往往能取得更好效果
- 避免常见陷阱: 注意数据泄露、过拟合等问题
希望这份技术博客能为参与新能源功率预测竞赛的选手提供有价值的参考。竞赛不仅是技术的较量,更是思维和策略的比拼。保持耐心,持续迭代,相信你一定能取得优异的成绩!
最后,祝各位参赛选手在新能源功率预测竞赛中取得好成绩!
附录:完整代码示例
A.1 完整训练流程
#!/usr/bin/env python3
"""
新能源功率预测 - 完整训练流程
"""
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import polars as pl
import lightgbm as lgb
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from datetime import datetime
import joblib
# 导入自定义模块
from data_loader import DataLoader
from data_preprocessor import DataPreprocessor
from feature_engineer import FeatureEngineer
from model_trainer import TreeModelTrainer
from evaluator import compute_competition_accuracy
class PowerPredictionPipeline:
"""功率预测完整流程"""
def __init__(self, config):
self.config = config
self.data_loader = DataLoader(config['data_root'])
self.preprocessor = DataPreprocessor()
self.feature_engineer = FeatureEngineer()
self.model_trainer = TreeModelTrainer()
def run(self):
"""运行完整流程"""
print("=" * 80)
print("新能源功率预测训练流程")
print("=" * 80)
# 1. 加载数据
print("\n[1/7] 加载数据...")
weather_data = self.data_loader.load_weather_data(
mode="train",
plant_ids=self.config['plant_ids']
)
power_data = self.data_loader.load_power_data(
mode="train",
plant_ids=self.config['plant_ids']
)
# 2. 数据预处理
print("\n[2/7] 数据预处理...")
processed_data = {}
for plant_id in self.config['plant_ids']:
# 清洗
power_df = self.preprocessor.clean_data(power_data[plant_id])
# 对齐
aligned = self.preprocessor.align_data(
weather_data[plant_id],
power_df
)
processed_data[plant_id] = aligned
# 3. 特征工程
print("\n[3/7] 特征工程...")
featured_data = {}
for plant_id in self.config['plant_ids']:
df = self.feature_engineer.add_all_features(processed_data[plant_id])
featured_data[plant_id] = df
# 4. 训练验证集划分
print("\n[4/7] 划分训练验证集...")
split_date = datetime(2024, 11, 1)
for plant_id in self.config['plant_ids']:
df = featured_data[plant_id]
train = df.filter(pl.col("time") < split_date)
val = df.filter(pl.col("time") >= split_date)
self.config[f'train_{plant_id}'] = train
self.config[f'val_{plant_id}'] = val
# 5. 模型训练
print("\n[5/7] 训练模型...")
models = {}
for plant_id in self.config['plant_ids']:
print(f"\n训练场站 {plant_id}...")
train = self.config[f'train_{plant_id}']
val = self.config[f'val_{plant_id}']
# 准备特征和标签
feature_cols = self._get_feature_columns(train)
X_train = train.select(feature_cols).to_numpy()
y_train = train['power'].to_numpy()
X_val = val.select(feature_cols).to_numpy()
y_val = val['power'].to_numpy()
# 训练模型
model = self.model_trainer.train(
X_train, y_train,
X_val, y_val,
params=self.config['model_params']
)
models[plant_id] = model
# 评估
train_pred = model.predict(X_train)
val_pred = model.predict(X_val)
train_mae = mean_absolute_error(y_train, train_pred)
val_mae = mean_absolute_error(y_val, val_pred)
print(f" 训练MAE: {train_mae:.6f}")
print(f" 验证MAE: {val_mae:.6f}")
self.config['models'] = models
# 6. 模型评估
print("\n[6/7] 模型评估...")
for plant_id in self.config['plant_ids']:
val = self.config[f'val_{plant_id}']
model = models[plant_id]
# 准备数据
feature_cols = self._get_feature_columns(val)
X_val = val.select(feature_cols).to_numpy()
y_val = val['power'].to_numpy()
# 预测
pred = model.predict(X_val)
# 评估
metrics = self._compute_metrics(y_val, pred)
print(f"\n场站 {plant_id}:")
for name, value in metrics.items():
print(f" {name}: {value:.6f}")
# 7. 保存模型
print("\n[7/7] 保存模型...")
for plant_id, model in models.items():
model_path = f"models/model_plant_{plant_id}.pkl"
joblib.dump(model, model_path)
print(f" 已保存: {model_path}")
# 保存配置
config_path = "models/config.pkl"
joblib.dump(self.config, config_path)
print("\n" + "=" * 80)
print("训练完成!")
print("=" * 80)
def _get_feature_columns(self, df):
"""获取特征列名"""
exclude_cols = ['time', 'power']
feature_cols = [col for col in df.columns if col not in exclude_cols]
return feature_cols
def _compute_metrics(self, y_true, y_pred):
"""计算评估指标"""
mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_true, y_pred)
metrics = {
'MAE': mae,
'MSE': mse,
'RMSE': rmse,
'R²': r2
}
return metrics
if __name__ == "__main__":
# 配置
config = {
'data_root': './data',
'plant_ids': list(range(1, 11)),
'model_params': {
'objective': 'regression',
'metric': 'mae',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1
}
}
# 运行流程
pipeline = PowerPredictionPipeline(config)
pipeline.run()A.2 完整预测流程
#!/usr/bin/env python3
"""
新能源功率预测 - 完整预测流程
"""
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import polars as pl
import joblib
import os
from datetime import datetime
# 导入自定义模块
from data_loader import DataLoader
from data_preprocessor import DataPreprocessor
from feature_engineer import FeatureEngineer
class PowerPredictionPipeline:
"""功率预测完整流程"""
def __init__(self, config_path, model_dir):
# 加载配置
self.config = joblib.load(config_path)
self.model_dir = model_dir
# 初始化模块
self.data_loader = DataLoader(self.config['data_root'])
self.preprocessor = DataPreprocessor()
self.feature_engineer = FeatureEngineer()
# 加载模型
self.models = {}
for plant_id in self.config['plant_ids']:
model_path = os.path.join(model_dir, f"model_plant_{plant_id}.pkl")
self.models[plant_id] = joblib.load(model_path)
def predict(self):
"""运行预测流程"""
print("=" * 80)
print("新能源功率预测流程")
print("=" * 80)
# 1. 加载数据
print("\n[1/5] 加载测试数据...")
weather_data = self.data_loader.load_weather_data(
mode="test",
plant_ids=self.config['plant_ids']
)
# 2. 数据预处理
print("\n[2/5] 数据预处理...")
processed_data = {}
for plant_id in self.config['plant_ids']:
# 测试数据只需要清洗,不需要功率数据
df = weather_data[plant_id]
processed_data[plant_id] = df
# 3. 特征工程
print("\n[3/5] 特征工程...")
featured_data = {}
for plant_id in self.config['plant_ids']:
df = self.feature_engineer.add_all_features(processed_data[plant_id])
featured_data[plant_id] = df
# 4. 模型预测
print("\n[4/5] 模型预测...")
predictions = {}
for plant_id in self.config['plant_ids']:
print(f"\n预测场站 {plant_id}...")
df = featured_data[plant_id]
model = self.models[plant_id]
# 准备特征
feature_cols = self._get_feature_columns(df)
X_test = df.select(feature_cols).to_numpy()
# 预测
pred = model.predict(X_test)
# 后处理
pred = self._postprocess(pred)
predictions[plant_id] = pred
print(f" 预测完成,共 {len(pred)} 个点")
# 5. 生成提交文件
print("\n[5/5] 生成提交文件...")
self._generate_submission(predictions)
print("\n" + "=" * 80)
print("预测完成!")
print("=" * 80)
def _get_feature_columns(self, df):
"""获取特征列名"""
exclude_cols = ['time', 'power']
feature_cols = [col for col in df.columns if col not in exclude_cols]
return feature_cols
def _postprocess(self, predictions):
"""后处理"""
# 边界约束
predictions = np.maximum(predictions, 0)
# 平滑(可选)
from scipy.signal import savgol_filter
predictions = savgol_filter(predictions, window_length=5, polyorder=2)
return predictions
def _generate_submission(self, predictions):
"""生成提交文件"""
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
for plant_id, pred in predictions.items():
# 构建提交DataFrame
# 注意: 需要根据实际的时间信息构建
time_index = pd.date_range(
start="2025-01-01 00:00:00",
periods=len(pred),
freq="15min"
)
submission_df = pd.DataFrame({
'time': time_index,
'power': pred
})
# 保存
output_path = os.path.join(output_dir, f"output{plant_id}.csv")
submission_df.to_csv(output_path, index=False)
print(f" 已保存: {output_path}")
# 打包
import zipfile
zip_path = "output.zip"
with zipfile.ZipFile(zip_path, 'w') as zipf:
for plant_id in self.config['plant_ids']:
file_path = os.path.join(output_dir, f"output{plant_id}.csv")
zipf.write(file_path, f"output{plant_id}.csv")
print(f"\n提交文件已生成: {zip_path}")
if __name__ == "__main__":
# 配置
config_path = "models/config.pkl"
model_dir = "models"
# 运行流程
pipeline = PowerPredictionPipeline(config_path, model_dir)
pipeline.predict()