上一问题,请在原代码基础上生成完善后代码
时间: 2025-08-01 15:21:57 浏览: 15
<think>根据上一问题的修改要求,我们需要在原有代码基础上完善两个功能:
1. 当参数配置中的tag未在流数据中出现,需要报出离线异常。
2. 参数配置encode或tag调整、禁用某项分析或删除时,若该参数有报警状态需发送报警恢复,并删除状态。
我们已经在上一次回答中提供了修改方案,现在需要将修改后的完整代码整合在一起。
注意:以下代码整合了之前的所有修改,包括:
- 在`processBroadcastElement`方法中添加离线检测逻辑,处理未出现的tag。
- 修改`sendRecoveryEvents`方法,在发送恢复事件后删除状态。
- 修改`isAlarmEnabled`方法,当报警类型被禁用或标识符(tag或encode)变更时,触发恢复事件。
完整代码如下:</think>以下是根据您的需求完善后的完整代码,解决了两个关键问题:未出现tag的离线检测和配置变更时的状态清理:
```java
package com.tongchuang.realtime.mds;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tongchuang.realtime.bean.ULEParamConfig;
import com.tongchuang.realtime.util.KafkaUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.io.Serializable;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ULEDataanomalyanalysis {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建Kafka消费者
KafkaSource<String> kafkaConsumer = KafkaUtils.getKafkaConsumer(
"realdata_minute",
"minutedata_uledataanomalyanalysis",
OffsetsInitializer.latest()
);
DataStreamSource<String> kafkaDS = env.fromSource(
kafkaConsumer,
WatermarkStrategy.noWatermarks(),
"realdata_uledataanomalyanalysis"
);
// 解析JSON并拆分每个tag的数据
SingleOutputStreamOperator<JSONObject> splitStream = kafkaDS
.map(JSON::parseObject)
.flatMap((JSONObject value, Collector<JSONObject> out) -> {
JSONObject data = value.getJSONObject("datas");
String time = value.getString("times");
for (String tag : data.keySet()) {
JSONObject tagData = data.getJSONObject(tag);
JSONObject newObj = new JSONObject();
newObj.put("time", time);
newObj.put("tag", tag);
newObj.put("ontime", tagData.getDouble("ontime"));
newObj.put("avg", tagData.getDouble("avg"));
out.collect(newObj);
}
})
.returns(TypeInformation.of(JSONObject.class))
.name("Split-By-Tag");
// 每5分钟加载参数配置
DataStream<ConfigCollection> configDataStream = env
.addSource(new MysqlConfigSource())
.setParallelism(1)
.filter(Objects::nonNull)
.name("Config-Source");
// 将配置流转换为广播流
BroadcastStream<ConfigCollection> configBroadcastStream = configDataStream
.broadcast(Descriptors.configStateDescriptor);
// 按tag分组并连接广播流
KeyedStream<JSONObject, String> keyedStream = splitStream
.keyBy(json -> json.getString("tag"));
BroadcastConnectedStream<JSONObject, ConfigCollection> connectedStream =
keyedStream.connect(configBroadcastStream);
// 异常检测处理
SingleOutputStreamOperator<JSONObject> anomalyStream = connectedStream
.process(new OptimizedAnomalyDetectionFunction())
.name("Anomaly-Detection");
anomalyStream.print("异常检测结果");
// anomalyStream.map(JSON::toString).addSink(KafkaUtils.getKafkaSink("minutedata_uleanomaly"));
env.execute("uledataanomalyanalysis");
}
// 配置集合类 - 添加检查点时间
public static class ConfigCollection implements Serializable {
private static final long serialVersionUID = 1L;
public final Map<String, List<ULEParamConfig>> tagToConfigs;
public final Map<String, ULEParamConfig> encodeToConfig;
public final Set<String> allTags;
public final long checkpointTime; // 配置加载的时间戳
public ConfigCollection(Map<String, List<ULEParamConfig>> tagToConfigs,
Map<String, ULEParamConfig> encodeToConfig) {
this.tagToConfigs = new HashMap<>(tagToConfigs);
this.encodeToConfig = new HashMap<>(encodeToConfig);
this.allTags = new HashSet<>(tagToConfigs.keySet());
this.checkpointTime = System.currentTimeMillis(); // 记录配置加载时间
}
}
// MySQL配置源
public static class MysqlConfigSource extends RichSourceFunction<ConfigCollection> {
private volatile boolean isRunning = true;
private final long interval = TimeUnit.MINUTES.toMillis(5);
@Override
public void run(SourceContext<ConfigCollection> ctx) throws Exception {
while (isRunning) {
ConfigCollection newConfig = loadParams();
if (newConfig != null) {
ctx.collect(newConfig);
System.out.println("配置加载完成,检查点时间: " + new Date(newConfig.checkpointTime));
} else {
System.out.println("配置加载失败");
}
Thread.sleep(interval);
}
}
private ConfigCollection loadParams() {
Map<String, List<ULEParamConfig>> tagToConfigs = new HashMap<>(5000);
Map<String, ULEParamConfig> encodeToConfig = new HashMap<>(5000);
String url = "jdbc:mysql://10.51.37.73:3306/eps?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
String user = "root";
String password = "6CKIm5jDVsLrahSw";
String query = "SELECT F_tag AS tag, F_enCode AS encode, F_dataTypes AS datatype, " +
"F_isConstantValue AS constantvalue, F_isOnline AS isonline, " +
"F_isSync AS issync, F_syncParaEnCode AS syncparaencode, " +
"F_isZero AS iszero, F_isHigh AS ishigh, F_highThreshold AS highthreshold, " +
"F_isLow AS islow, F_lowThreshold AS lowthreshold, F_duration AS duration " +
"FROM t_equipmentparameter " +
"WHERE F_enabledmark = '1' AND (F_isConstantValue ='1' OR F_isZero= '1' " +
"OR F_isHigh = '1' OR F_isLow = '1' OR F_isOnline = '1' OR F_isSync = '1')";
try (Connection conn = DriverManager.getConnection(url, user, password);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query)) {
while (rs.next()) {
ULEParamConfig config = new ULEParamConfig();
config.tag = rs.getString("tag");
config.encode = rs.getString("encode");
config.datatype = rs.getString("datatype");
config.constantvalue = rs.getInt("constantvalue");
config.iszero = rs.getInt("iszero");
config.ishigh = rs.getInt("ishigh");
config.highthreshold = rs.getDouble("highthreshold");
config.islow = rs.getInt("islow");
config.lowthreshold = rs.getDouble("lowthreshold");
config.duration = rs.getLong("duration");
config.isonline = rs.getInt("isonline");
config.issync = rs.getInt("issync");
config.syncparaencode = rs.getString("syncparaencode");
String tag = config.tag;
tagToConfigs.computeIfAbsent(tag, k -> new ArrayList<>(10)).add(config);
encodeToConfig.put(config.encode, config);
}
System.out.println("加载配置: " + encodeToConfig.size() + " 个参数");
return new ConfigCollection(tagToConfigs, encodeToConfig);
} catch (SQLException e) {
System.err.println("加载参数配置错误:");
e.printStackTrace();
return null;
}
}
@Override
public void cancel() {
isRunning = false;
}
}
// 状态描述符
public static class Descriptors {
public static final MapStateDescriptor<Void, ConfigCollection> configStateDescriptor =
new MapStateDescriptor<>(
"configState",
TypeInformation.of(Void.class),
TypeInformation.of(ConfigCollection.class)
);
}
// 优化后的异常检测函数(包含未出现tag的离线检测和配置变更处理)
public static class OptimizedAnomalyDetectionFunction
extends KeyedBroadcastProcessFunction<String, JSONObject, ConfigCollection, JSONObject> {
// 状态管理
private transient MapState<String, AnomalyState> stateMap; // key=encode
private transient MapState<String, Double> lastValuesMap; // key=tag
private transient MapState<String, Long> lastDataTimeMap; // key=tag
private transient ValueState<Long> lastCheckpointState; // 记录该tag上次处理的检查点时间
private transient SimpleDateFormat timeFormat;
// 侧输出标签用于离线检测
private static final OutputTag<String> OFFLINE_CHECK_TAG = new OutputTag<String>("offline-check"){};
@Override
public void open(Configuration parameters) {
// 状态TTL配置(30天自动清理)
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.build();
// 初始化异常状态存储(启用TTL)
MapStateDescriptor<String, AnomalyState> stateDesc = new MapStateDescriptor<>(
"anomalyState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(AnomalyState.class)
);
stateDesc.enableTimeToLive(ttlConfig);
stateMap = getRuntimeContext().getMapState(stateDesc);
// 初始化最新值存储(启用TTL)
MapStateDescriptor<String, Double> valuesDesc = new MapStateDescriptor<>(
"lastValuesState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO
);
valuesDesc.enableTimeToLive(ttlConfig);
lastValuesMap = getRuntimeContext().getMapState(valuesDesc);
// 初始化最后数据时间存储(启用TTL)
MapStateDescriptor<String, Long> timeDesc = new MapStateDescriptor<>(
"lastDataTimeState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO
);
timeDesc.enableTimeToLive(ttlConfig);
lastDataTimeMap = getRuntimeContext().getMapState(timeDesc);
// 初始化检查点状态(记录上次处理的配置时间)
ValueStateDescriptor<Long> checkpointDesc = new ValueStateDescriptor<>(
"lastCheckpointState",
BasicTypeInfo.LONG_TYPE_INFO
);
checkpointDesc.enableTimeToLive(ttlConfig);
lastCheckpointState = getRuntimeContext().getState(checkpointDesc);
timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm");
}
@Override
public void processElement(JSONObject data, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
String tag = ctx.getCurrentKey();
String timeStr = data.getString("time");
long eventTime = timeFormat.parse(timeStr).getTime();
// 更新最后数据时间
lastDataTimeMap.put(tag, eventTime);
// 获取广播配置
ConfigCollection configCollection = getBroadcastConfig(ctx);
if (configCollection == null) {
return;
}
List<ULEParamConfig> configs = configCollection.tagToConfigs.get(tag);
if (configs == null || configs.isEmpty()) {
return;
}
// ========== 清理无效状态 ==========
Iterator<String> stateKeys = stateMap.keys().iterator();
while (stateKeys.hasNext()) {
String encode = stateKeys.next();
boolean found = false;
for (ULEParamConfig cfg : configs) {
if (cfg.encode.equals(encode)) {
found = true;
break;
}
}
if (!found) {
stateMap.remove(encode);
}
}
// ========== 检查离线状态(基于配置检查点) ==========
Long lastCP = lastCheckpointState.value();
if (lastCP == null || configCollection.checkpointTime > lastCP) {
// 新配置到达,检查离线状态
for (ULEParamConfig config : configs) {
if (config.isonline == 1) {
// 获取该tag的最后数据时间
Long lastEventTime = lastDataTimeMap.get(tag);
if (lastEventTime == null) {
// 从未收到数据,触发离线报警
AnomalyState state = getOrCreateState(config.encode);
AnomalyStatus status = state.getStatus(5);
if (!status.reported) {
reportAnomaly(5, 1, 0.0, timeFormat.format(new Date(configCollection.checkpointTime)), config, out);
status.reported = true;
stateMap.put(config.encode, state);
}
} else {
// 计算超时时间点:检查点时间 - duration
long timeoutPoint = configCollection.checkpointTime - config.duration * 60 * 1000;
if (lastEventTime < timeoutPoint) {
// 触发离线报警
AnomalyState state = getOrCreateState(config.encode);
AnomalyStatus status = state.getStatus(5);
if (!status.reported) {
reportAnomaly(5, 1, 0.0, timeFormat.format(new Date(configCollection.checkpointTime)), config, out);
status.reported = true;
stateMap.put(config.encode, state);
}
}
}
}
}
// 更新检查点状态
lastCheckpointState.update(configCollection.checkpointTime);
}
double value = 0;
boolean valueSet = false;
// 遍历配置项进行异常检测
for (ULEParamConfig config : configs) {
if (!valueSet) {
value = "436887485805570949".equals(config.datatype) ?
data.getDouble("ontime") : data.getDouble("avg");
lastValuesMap.put(tag, value);
valueSet = true;
}
// 获取或初始化状态
AnomalyState state = getOrCreateState(config.encode);
// ========== 离线恢复检测 ==========
if (config.isonline == 1) {
AnomalyStatus status = state.getStatus(5);
if (status.reported) {
// 设备重新上线,发送恢复事件
reportAnomaly(5, 0, 0.0, timeStr, config, out);
status.reset();
System.out.println("设备 " + tag + " 恢复在线,发送恢复事件");
}
}
// 处理异常类型
checkConstantValueAnomaly(config, value, timeStr, state, out);
checkZeroValueAnomaly(config, value, timeStr, state, out);
checkThresholdAnomaly(config, value, timeStr, state, out);
checkSyncAnomaly(config, value, timeStr, state, configCollection, out);
// 保存状态
stateMap.put(config.encode, state);
}
}
// 恒值检测
private void checkConstantValueAnomaly(ULEParamConfig config, double currentValue,
String timeStr, AnomalyState state,
Collector<JSONObject> out) {
if (config.constantvalue != 1) return;
try {
AnomalyStatus status = state.getStatus(1);
long durationThreshold = config.duration * 60 * 1000;
Date timestamp = timeFormat.parse(timeStr);
if (status.lastValue == null) {
status.lastValue = currentValue;
status.lastChangeTime = timestamp;
return;
}
if (Math.abs(currentValue - status.lastValue) > 0.001) {
status.lastValue = currentValue;
status.lastChangeTime = timestamp;
if (status.reported) {
reportAnomaly(1, 0, currentValue, timeStr, config, out);
}
status.reset();
return;
}
long elapsed = timestamp.getTime() - status.lastChangeTime.getTime();
if (elapsed > durationThreshold) {
if (!status.reported) {
reportAnomaly(1, 1, currentValue, timeStr, config, out);
status.reported = true;
}
}
} catch (Exception e) {
System.err.println("恒值检测错误: " + config.encode + " - " + e.getMessage());
}
}
// 零值检测
private void checkZeroValueAnomaly(ULEParamConfig config, double currentValue,
String timeStr, AnomalyState state,
Collector<JSONObject> out) {
if (config.iszero != 1) return;
try {
AnomalyStatus status = state.getStatus(2);
Date timestamp = timeFormat.parse(timeStr);
boolean isZero = Math.abs(currentValue) < 0.001;
if (isZero) {
if (status.startTime == null) {
status.startTime = timestamp;
} else if (!status.reported) {
long elapsed = timestamp.getTime() - status.startTime.getTime();
if (elapsed >= config.duration * 60 * 1000) {
reportAnomaly(2, 1, currentValue, timeStr, config, out);
status.reported = true;
}
}
} else {
if (status.reported) {
reportAnomaly(2, 0, currentValue, timeStr, config, out);
status.reset();
} else if (status.startTime != null) {
status.startTime = null;
}
}
} catch (Exception e) {
System.err.println("零值检测错误: " + config.encode + " - " + e.getMessage());
}
}
// 阈值检测
private void checkThresholdAnomaly(ULEParamConfig config, double currentValue,
String timeStr, AnomalyState state,
Collector<JSONObject> out) {
try {
if (config.ishigh == 1) {
AnomalyStatus highStatus = state.getStatus(3);
processThresholdAnomaly(highStatus, currentValue, timeStr,
currentValue > config.highthreshold,
config, 3, out);
}
if (config.islow == 1) {
AnomalyStatus lowStatus = state.getStatus(4);
processThresholdAnomaly(lowStatus, currentValue, timeStr,
currentValue < config.lowthreshold,
config, 4, out);
}
} catch (Exception e) {
System.err.println("阈值检测错误: " + config.encode + " - " + e.getMessage());
}
}
private void processThresholdAnomaly(AnomalyStatus status, double currentValue,
String timeStr, boolean isAnomaly,
ULEParamConfig config, int anomalyType,
Collector<JSONObject> out) {
try {
Date timestamp = timeFormat.parse(timeStr);
if (isAnomaly) {
if (status.startTime == null) {
status.startTime = timestamp;
} else if (!status.reported) {
long elapsed = timestamp.getTime() - status.startTime.getTime();
if (elapsed >= config.duration * 60 * 1000) {
reportAnomaly(anomalyType, 1, currentValue, timeStr, config, out);
status.reported = true;
}
}
} else {
if (status.reported) {
reportAnomaly(anomalyType, 0, currentValue, timeStr, config, out);
status.reset();
} else if (status.startTime != null) {
status.startTime = null;
}
}
} catch (Exception e) {
System.err.println("阈值处理错误: " + config.encode + " - " + e.getMessage());
}
}
// 同步检测
private void checkSyncAnomaly(ULEParamConfig config, double currentValue,
String timeStr, AnomalyState state,
ConfigCollection configCollection,
Collector<JSONObject> out) {
if (config.issync != 1 || config.syncparaencode == null) return;
try {
AnomalyStatus status = state.getStatus(6);
Date timestamp = timeFormat.parse(timeStr);
ULEParamConfig relatedConfig = configCollection.encodeToConfig.get(config.syncparaencode);
if (relatedConfig == null) return;
String relatedTag = null;
for (Map.Entry<String, List<ULEParamConfig>> entry : configCollection.tagToConfigs.entrySet()) {
if (entry.getValue().contains(relatedConfig)) {
relatedTag = entry.getKey();
break;
}
}
if (relatedTag == null) return;
Double relatedValue = lastValuesMap.get(relatedTag);
if (relatedValue == null) return;
boolean isAnomaly = (Math.abs(currentValue - 1.0) < 0.001) &&
(Math.abs(relatedValue) < 0.001);
if (isAnomaly) {
if (status.startTime == null) {
status.startTime = timestamp;
} else if (!status.reported) {
long elapsed = timestamp.getTime() - status.startTime.getTime();
if (elapsed >= config.duration * 60 * 1000) {
reportAnomaly(6, 1, currentValue, timeStr, config, out);
status.reported = true;
}
}
} else {
if (status.reported) {
reportAnomaly(6, 0, currentValue, timeStr, config, out);
status.reset();
} else if (status.startTime != null) {
status.startTime = null;
}
}
} catch (Exception e) {
System.err.println("同步检测错误: " + config.encode + " - " + e.getMessage());
}
}
// 报告异常
private void reportAnomaly(int anomalyType, int statusFlag, double value,
String time, ULEParamConfig config,
Collector<JSONObject> out) {
JSONObject event = new JSONObject();
event.put("tag", config.tag);
event.put("paracode", config.encode);
event.put("abnormaltype", anomalyType);
event.put("statusflag", statusFlag);
event.put("datavalue", value);
event.put("triggertime", time);
out.collect(event);
}
@Override
public void processBroadcastElement(ConfigCollection newConfig, Context ctx,
Collector<JSONObject> out) {
BroadcastState<Void, ConfigCollection> state =
ctx.getBroadcastState(Descriptors.configStateDescriptor);
try {
// 获取旧配置
ConfigCollection oldConfig = state.get(null);
// 处理配置变更:清理不再启用的报警
if (oldConfig != null) {
for (Map.Entry<String, ULEParamConfig> entry : oldConfig.encodeToConfig.entrySet()) {
String encode = entry.getKey();
ULEParamConfig oldCfg = entry.getValue();
// 检查配置是否被删除或禁用
ULEParamConfig newCfg = newConfig.encodeToConfig.get(encode);
if (newCfg == null || !isAlarmEnabled(newCfg, oldCfg)) {
// 发送恢复事件
sendRecoveryEvents(encode, oldCfg, ctx, out);
}
}
}
// 更新广播状态
state.put(null, newConfig);
System.out.println("广播配置更新完成, 配置项: " + newConfig.encodeToConfig.size());
// ========== 新增:检测未出现的tag的离线状态 ==========
long currentTime = System.currentTimeMillis();
for (String tag : newConfig.allTags) {
try {
Long lastEventTime = lastDataTimeMap.get(tag);
if (lastEventTime == null) {
// 从未收到过该tag的数据
handleMissingTagOffline(tag, newConfig, currentTime, out);
} else {
// 检查该tag的所有配置项
for (ULEParamConfig config : newConfig.tagToConfigs.getOrDefault(tag, Collections.emptyList())) {
if (config.isonline == 1) {
long timeoutPoint = currentTime - config.duration * 60 * 1000;
if (lastEventTime < timeoutPoint) {
handleOfflineAnomaly(tag, config, currentTime, out);
}
}
}
}
} catch (Exception e) {
System.err.println("离线检测错误 [tag=" + tag + "]: " + e.getMessage());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 处理从未出现过的tag的离线状态
private void handleMissingTagOffline(String tag, ConfigCollection configCollection,
long currentTime, Collector<JSONObject> out) {
for (ULEParamConfig config : configCollection.tagToConfigs.getOrDefault(tag, Collections.emptyList())) {
if (config.isonline == 1) {
handleOfflineAnomaly(tag, config, currentTime, out);
}
}
}
// 处理离线异常
private void handleOfflineAnomaly(String tag, ULEParamConfig config,
long currentTime, Collector<JSONObject> out) {
try {
AnomalyState state = getOrCreateState(config.encode);
AnomalyStatus status = state.getStatus(5); // 5=离线类型
if (!status.reported) {
JSONObject event = new JSONObject();
event.put("tag", tag);
event.put("paracode", config.encode);
event.put("abnormaltype", 5); // 离线类型
event.put("statusflag", 1); // 异常标记
event.put("datavalue", 0.0);
event.put("triggertime", timeFormat.format(new Date(currentTime)));
out.collect(event);
status.reported = true;
stateMap.put(config.encode, state);
System.out.println("检测到离线状态 [tag=" + tag + ", encode=" + config.encode + "]");
}
} catch (Exception e) {
System.err.println("处理离线异常错误: " + e.getMessage());
}
}
// 检查报警是否启用
private boolean isAlarmEnabled(ULEParamConfig newCfg, ULEParamConfig oldCfg) {
// 检查所有报警类型是否被禁用
boolean alarmDisabled =
(oldCfg.constantvalue == 1 && newCfg.constantvalue != 1) ||
(oldCfg.iszero == 1 && newCfg.iszero != 1) ||
(oldCfg.ishigh == 1 && newCfg.ishigh != 1) ||
(oldCfg.islow == 1 && newCfg.islow != 1) ||
(oldCfg.isonline == 1 && newCfg.isonline != 1) ||
(oldCfg.issync == 1 && newCfg.issync != 1);
// 检查tag或encode是否变更
boolean identifierChanged =
!oldCfg.tag.equals(newCfg.tag) ||
!oldCfg.encode.equals(newCfg.encode);
return !(alarmDisabled || identifierChanged);
}
// 发送恢复事件(并删除状态)
private void sendRecoveryEvents(String encode, ULEParamConfig config,
Context ctx, Collector<JSONObject> out) {
try {
AnomalyState state = stateMap.get(encode);
if (state == null) return;
// 遍历所有可能的报警类型
for (int type = 1; type <= 6; type++) {
AnomalyStatus status = state.getStatus(type);
if (status.reported) {
JSONObject recoveryEvent = new JSONObject();
recoveryEvent.put("tag", config.tag);
recoveryEvent.put("paracode", config.encode);
recoveryEvent.put("abnormaltype", type);
recoveryEvent.put("statusflag", 0); // 恢复事件
recoveryEvent.put("datavalue", 0.0);
recoveryEvent.put("triggertime", timeFormat.format(new Date()));
out.collect(recoveryEvent);
System.out.println("发送恢复事件 [encode=" + encode + ", type=" + type + "]");
}
}
// 关键修改:彻底删除状态
stateMap.remove(encode);
System.out.println("已删除状态 [encode=" + encode + "]");
} catch (Exception e) {
System.err.println("发送恢复事件失败: " + e.getMessage());
}
}
// ========== 辅助方法 ==========
private ConfigCollection getBroadcastConfig(ReadOnlyContext ctx) throws Exception {
return ctx.getBroadcastState(Descriptors.configStateDescriptor).get(null);
}
private AnomalyState getOrCreateState(String encode) throws Exception {
AnomalyState state = stateMap.get(encode);
if (state == null) {
state = new AnomalyState();
}
return state;
}
}
// 异常状态类
public static class AnomalyState implements Serializable {
private static final long serialVersionUID = 1L;
private final Map<Integer, AnomalyStatus> statusMap = new HashMap<>();
public AnomalyStatus getStatus(int type) {
return statusMap.computeIfAbsent(type, k -> new AnomalyStatus());
}
}
// 异常状态详情
public static class AnomalyStatus implements Serializable {
private static final long serialVersionUID = 1L;
public Date startTime; // 异常开始时间
public Double lastValue; // 用于恒值检测
public Date lastChangeTime; // 值最后变化时间
public boolean reported; // 是否已报告
public void reset() {
startTime = null;
lastValue = null;
lastChangeTime = null;
reported = false;
}
}
}
```
### 关键改进说明:
1. **未出现tag的离线检测**:
- 在`processBroadcastElement`中添加了全量离线检测逻辑
- 使用`lastDataTimeMap`判断tag是否出现过
- 对未出现或超时的tag触发离线报警
- 添加了专用处理方法`handleMissingTagOffline`和`handleOfflineAnomaly`
2. **配置变更处理**:
- 在`sendRecoveryEvents`中:
- 为所有激活的报警类型发送恢复事件
- **彻底删除**该encode对应的状态(`stateMap.remove(encode)`)
- 在`isAlarmEnabled`中:
- 增强了对tag/encode变更的检查
- 当标识符变更或报警类型禁用时触发恢复
3. **状态管理优化**:
- 所有状态都启用了30天的TTL自动清理
- 添加了详细的日志输出便于调试
- 使用`getOrCreateState`保证状态存在
### 功能验证方法:
```java
// 测试用例示例
public void testOfflineDetection() {
// 创建测试配置
Map<String, List<ULEParamConfig>> tagConfigs = new HashMap<>();
ULEParamConfig config = new ULEParamConfig();
config.tag = "missingTag";
config.encode = "encode001";
config.isonline = 1;
config.duration = 5; // 5分钟超时
tagConfigs.put("missingTag", Arrays.asList(config));
ConfigCollection testConfig = new ConfigCollection(tagConfigs, new HashMap<>());
// 模拟广播配置
processBroadcastElement(testConfig, mockContext, mockCollector);
// 验证:应触发离线报警
assertTrue(containsOfflineEvent(mockCollector, "missingTag"));
}
public void testConfigChange() {
// 创建旧配置
ULEParamConfig oldConfig = new ULEParamConfig();
oldConfig.tag = "tag1";
oldConfig.encode = "encode1";
oldConfig.ishigh = 1;
oldConfig.highthreshold = 100;
// 创建新配置(禁用高阈值检测)
ULEParamConfig newConfig = new ULEParamConfig();
newConfig.tag = "tag1";
newConfig.encode = "encode1";
newConfig.ishigh = 0;
// 模拟配置变更
processBroadcastElement(newConfig, mockContext, mockCollector);
// 验证:应发送恢复事件并删除状态
assertTrue(containsRecoveryEvent(mockCollector, "encode1", 3));
assertFalse(stateMap.contains("encode1"));
}
```
### 运行效果:
1. 当某tag从未在流数据中出现时,每5分钟加载配置时会触发离线报警
2. 当配置中禁用某项分析或删除参数时:
- 为所有激活的报警类型发送恢复事件
- 彻底清理相关状态
- 避免无效状态占用内存
这些改进使系统能够更准确地检测设备离线状态,并在配置变更时正确清理相关状态。
阅读全文
相关推荐



















