/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zto.zms.collector.report;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.zto.zms.stats.*;
import com.zto.zms.collector.kafka.mbean.MBeanRateInfo;
import com.zto.zms.collector.model.KafkaMetrics;
import com.zto.zms.collector.model.*;
import com.zto.zms.service.domain.MetricsDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
/**
* Created by liangyong on 2018/9/17.
*/
@Service
public class MetricsTransformService {
private static final Logger logger = LoggerFactory.getLogger(MetricsTransformService.class);
private Map<String, Long> cacheMap = Maps.newHashMap();
public List<MetricsDo> transformMqRtInfo(ClusterRtTime info) {
List<MetricsDo> metricsDoList = Lists.newArrayList();
if (null == info.getTimes()) {
return metricsDoList;
}
info.getTimes().forEach((timer) -> {
MetricsDo<Double> metricsDo = new MetricsDo<Double>();
Map<String, String> tagOptions = metricsDo.getTagOptions();
Map<String, Double> segmentMap = metricsDo.getSegmentMap();
tagOptions.put("clusterName", info.getCluster());
tagOptions.put("brokerName", timer.getBrokerName());
segmentMap.put("result", (double) timer.getResult());
segmentMap.put("rt", (double) timer.getRt());
metricsDo.setTagOptions(tagOptions);
metricsDo.setSegmentMap(segmentMap);
metricsDoList.add(metricsDo);
});
return metricsDoList;
}
public List<MetricsDo> transformStrDataKafkaCluster(KafkaMetrics info) {
List<MetricsDo> metricsDoList = Lists.newArrayList();
if (null == info) {
return metricsDoList;
}
MetricsDo<String> metricsDo = new MetricsDo<String>();
Map<String, String> tagOptions = metricsDo.getTagOptions();
Map<String, String> segmentMap = metricsDo.getSegmentMap();
tagOptions.put("clusterName", info.getClusterName());
segmentMap.put("controllerHost", info.getControllerHost());
segmentMap.put("controllerId", String.valueOf(info.getControllerId()));
metricsDo.setTagOptions(tagOptions);
metricsDo.setSegmentMap(segmentMap);
metricsDoList.add(metricsDo);
return metricsDoList;
}
public List<MetricsDo> transformDoubleDataKafkaCluster(KafkaMetrics info) {
List<MetricsDo> metricsDoList = Lists.newArrayList();
if (null == info || null == info.getBrokersMetrics()) {
return metricsDoList;
}
double totalTps = 0d;
Map<String, KafkaBrokerInfo> brokerInfoMap = info.getBrokersMetrics();
for (Map.Entry<String, KafkaBrokerInfo> entry : brokerInfoMap.entrySet()) {
KafkaBrokerInfo kafkaBrokerInfo = entry.getValue();
if (null == kafkaBrokerInfo.getMessagesInPerSec()) {
continue;
}
totalTps = addCalc(totalTps, kafkaBrokerInfo.getMessagesInPerSec().getOneMinuteRate());
}
return clusterTpsMetrics(metricsDoList, info.getClusterName(), totalTps);
}
private double addCalc(double value1, double value2) {
BigDecimal b1 = BigDecimal.valueOf(value1);
BigDecimal b2 = BigDecimal.valueOf(value2);
return b1.add(b2).doubleValue();
}
public List<MetricsDo> transformMqCluster(RocketmqStatus info) {
List<MetricsDo> metricsDoList = Lists.newArrayList();
if (null == info) {
return metricsDoList;
}
return clusterTpsMetrics(metricsDoList, info.getClusterName(), info.getTotalTps());
}
private List<MetricsDo> clusterTpsMetrics(List<MetricsDo> metricsDoList, String clusterName, double totalTps) {
MetricsDo<Double> metricsDo = new MetricsDo<>();
Map<String, Double> segmentMap = metricsDo.getSegmentMap();
Map<String, String> tagOptions = metricsDo.getTagOptions();
tagOptions.put("clusterName", clusterName);
segmentMap.put("totalTps", totalTps);
metricsDo.setTagOptions(tagOptions);
metricsDo.setSegmentMap(segmentMap);
metricsDoList.add(metricsDo);
return metricsDoList;
}
public List<MetricsDo> transformMqBroker(RocketmqStatus info) {
List<MetricsDo> metricsDoList = Lists.newArrayList();
if (null == info.getBrokers()) {
return metricsDoList;
}
info.getBrokers().forEach((broker) -> {
MetricsDo<Double> metricsDo = new MetricsDo<Double>();
Map<String, String> tagOptions = metricsDo.getTagOptions();
Map<String, Double> segmentMap = metricsDo.getSegmentMap();
tagOptions.put("clusterName", info.getClusterName());
tagOptions.put("brokerName", broker.getBrokerName());
segmentMap.put("bootTime", broker.getBootTime());
segmentMap.put("brokerId", (double) broker.getBrokerId());
segmentMap.put("getTotalTps", broker.getGetTotalTps());
segmentMap.put("msgGetTotalTodayNow", (double) broker.getMsgGetTotalTodayNow());
segmentMap.put("msgPutTotalTodayNow", (double) broker.getMsgPutTotalTodayNow());
segmentMap.put("pageCacheLockTimeMillis", (double) broker.getPageCacheLockTimeMillis());
segmentMap.put("pullThreadPoolQueueCapacity", (double) broker.getPullThreadPoolQueueCapacity());
segmentMap.put("pullThreadPoolQueueHeadWaitTimeMills", (double) broker.getPullThreadPoolQueueHeadWaitTimeMills());
segmentMap.put("pullThreadPoolQueueSize", (double) broker.getPullThreadPoolQueueSize());
segmentMap.put("putTps", broker.getPutTps());
segmentMap.put("sendThreadPoolQueueCapacity", (double) broker.getSendThreadPoolQueueCapacity());
segmentMap.put("sendThreadPoolQueueSize", (double) broker.getSendThreadPoolQueueSize());
metricsDo.setTagOptions(tagOptions);
metricsDo.setSegmentMap(segmentMap);
metricsDoList.add(metricsDo);
});
return metricsDoList;
}
public List<MetricsDo> transformStringDataMqBroker(RocketmqStatus info) {
List<MetricsDo> metricsDoList = Lists.newArrayList();
if (null == info.getBrokers()) {
return metricsDoList;
}
info.getBrokers().forEach((broker) -> {
MetricsDo<String> metricsStrDo = new MetricsDo<>();
Map<String, String> tagOptions = metricsStrDo.getTagOptions();
Map<String, String> segmentStrMap = metricsStrDo.getSegmentMap();
tagOptions.put("clusterName", info.getClusterName());
tagOptions.put("brokerName", broker.getBrokerName());
segmentStrMap.put("ip", broker.getIp());
segmentStrMap.put("putMessageDistributeTime", broker.getPutMessageDistributeTime());
metricsStrDo.setTagOptions(tagOptions);
metricsStrDo.setSegmentMap(segmentStrMap);
metricsDoList.add(metricsStrDo);
});
return metricsDoList;
}
public List<MetricsDo> transformMqTopic(RocketmqStatus info) {
List<MetricsDo> metricsDoList = Lists.newArrayLis