RMQMetricsCollector.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.rocketmq.exporter.collector;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.model.metrics.BrokerMetric;
import org.apache.rocketmq.exporter.model.metrics.ConsumerCountMetric;
import org.apache.rocketmq.exporter.model.metrics.ConsumerMetric;
import org.apache.rocketmq.exporter.model.metrics.ConsumerTopicDiffMetric;
import org.apache.rocketmq.exporter.model.metrics.DLQTopicOffsetMetric;
import org.apache.rocketmq.exporter.model.metrics.TopicPutNumMetric;
import org.apache.rocketmq.exporter.model.metrics.brokerruntime.BrokerRuntimeMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeFailedMsgsMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeFailedTPSMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeOKTPSMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimeConsumeRTMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullRTMetric;
import org.apache.rocketmq.exporter.model.metrics.clientrunime.ConsumerRuntimePullTPSMetric;
import org.apache.rocketmq.exporter.model.metrics.producer.ProducerCountMetric;
import org.apache.rocketmq.exporter.model.metrics.producer.ProducerMetric;
import org.apache.rocketmq.exporter.otlp.OtlpMetricsCollectorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class RMQMetricsCollector extends Collector {

    private Cache<ProducerMetric, Double> topicOffset;
    //max offset of retry topic consume queue
    private Cache<ProducerMetric, Double> topicRetryOffset;
    //max offset of dlq consume queue
    private Cache<DLQTopicOffsetMetric, Double> topicDLQOffset;
    // producer instance count
    private Cache<ProducerCountMetric, Integer> producerCounts;

    //total put numbers for topics
    private Cache<TopicPutNumMetric, Double> topicPutNums;
    //total get numbers for topics
    private Cache<TopicPutNumMetric, Double> topicPutSize;

    //diff for consumer group
    private Cache<ConsumerTopicDiffMetric, Long> consumerDiff;
    //retry diff for consumer group
    private Cache<ConsumerTopicDiffMetric, Long> consumerRetryDiff;
    //dlq diff for consumer group
    private Cache<ConsumerTopicDiffMetric, Long> consumerDLQDiff;
    //consumer count
    private Cache<ConsumerCountMetric, Integer> consumerCounts;

    //count of consume fail
    private Cache<ConsumerRuntimeConsumeFailedMsgsMetric, Long> consumerClientFailedMsgCounts;
    //TPS of consume fail
    private Cache<ConsumerRuntimeConsumeFailedTPSMetric, Double> consumerClientFailedTPS;
    //TPS of consume success
    private Cache<ConsumerRuntimeConsumeOKTPSMetric, Double> consumerClientOKTPS;
    //rt of consume
    private Cache<ConsumerRuntimeConsumeRTMetric, Double> consumerClientRT;
    //pull RT
    private Cache<ConsumerRuntimePullRTMetric, Double> consumerClientPullRT;
    //pull tps
    private Cache<ConsumerRuntimePullTPSMetric, Double> consumerClientPullTPS;

    //broker offset for consumer-topic
    private Cache<ConsumerMetric, Long> groupBrokerTotalOffset;
    //consumer offset for consumer-topic
    private Cache<ConsumerMetric, Long> groupConsumeTotalOffset;
    //consume tps
    private Cache<ConsumerMetric, Double> groupConsumeTPS;
    //consumed message count for consumer-topic
    private Cache<ConsumerMetric, Double> groupGetNums;
    //consumed message size(byte) for consumer-topic
    private Cache<ConsumerMetric, Double> groupGetSize;
    //re-consumed message count for consumer-topic
    private Cache<ConsumerMetric, Double> sendBackNums;
    // group latency time
    private Cache<ConsumerMetric, Long> groupLatencyByTime;

    //total put message count for one broker
    private Cache<BrokerMetric, Double> brokerPutNums;
    //total get message count for one broker
    private Cache<BrokerMetric, Double> brokerGetNums;
    private Cache<BrokerMetric, Double> brokerCommitLogDiff;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayNow;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayNow;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalYesterdayMorning;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalYesterdayMorning;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgGetTotalTodayMorning;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeMsgPutTotalTodayMorning;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchBehindBytes;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageSizeTotal;

    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutMessageAverageSize;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueCapacity;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeRemainTransientStoreBufferNumbs;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeEarliestMessageTimeStamp;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageEntireTimeMax;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeStartAcceptSendRequestTimeStamp;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueSize;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePutMessageTimesTotal;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeGetMessageEntireTimeMax;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePageCacheLockTimeMills;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDiskRatio;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeConsumeQueueDiskRatio;

    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps600;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps60;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetFoundTps10;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps600;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps60;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTotalTps10;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps600;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps60;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetTransferedTps10;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps600;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps60;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeGetMissTps10;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps600;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps60;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutTps10;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutLatency99;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimePutLatency999;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeDispatchMaxBuffer;

    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10toMore;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap5to10s;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap4to5s;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap3to4s;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap2to3s;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap1to2s;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap500to1s;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap200to500ms;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap100to200ms;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap50to100ms;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap10to50ms;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0to10ms;
    private Cache<BrokerRuntimeMetric, Integer> brokerRuntimePutMessageDistributeTimeMap0ms;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueCapacity;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueCapacity;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueSize;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueSize;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimePullThreadPoolQueueHeadWaitTimeMills;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityFree;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeCommitLogDirCapacityTotal;

    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMaxOffset;
    private Cache<BrokerRuntimeMetric, Long> brokerRuntimeCommitLogMinOffset;
    private Cache<BrokerRuntimeMetric, Double> brokerRuntimeRemainHowManyDataToFlush;

    public RMQMetricsCollector(long outOfTimeSeconds) {
        this.topicOffset = initCache(outOfTimeSeconds);
        this.topicRetryOffset = initCache(outOfTimeSeconds);
        this.topicDLQOffset = initCache(outOfTimeSeconds);
        this.producerCounts = initCache(outOfTimeSeconds);
        this.topicPutNums = initCache(outOfTimeSeconds);
        this.topicPutSize = initCache(outOfTimeSeconds);

        this.consumerDiff = initCache(outOfTimeSeconds);
        this.consumerRetryDiff = initCache(outOfTimeSeconds);
        this.consumerDLQDiff = initCache(outOfTimeSeconds);
        this.consumerCounts = initCache(outOfTimeSeconds);
        this.consumerClientFailedMsgCounts = initCache(outOfTimeSeconds);
        this.consumerClientFailedTPS = initCache(outOfTimeSeconds);
        this.consumerClientOKTPS = initCache(outOfTimeSeconds);
        this.consumerClientRT = initCache(outOfTimeSeconds);
        this.consumerClientPullRT = initCache(outOfTimeSeconds);
        this.consumerClientPullTPS = initCache(outOfTimeSeconds);
        this.groupBrokerTotalOffset = initCache(outOfTimeSeconds);
        this.groupConsumeTotalOffset = initCache(outOfTimeSeconds);
        this.groupConsumeTPS = initCache(outOfTimeSeconds);
        this.groupGetNums = initCache(outOfTimeSeconds);
        this.groupGetSize = initCache(outOfTimeSeconds);
        this.sendBackNums = initCache(outOfTimeSeconds);
        this.groupLatencyByTime = initCache(outOfTimeSeconds);
        this.brokerPutNums = initCache(outOfTimeSeconds);
        this.brokerGetNums = initCache(outOfTimeSeconds);
        this.brokerCommitLogDiff = initCache(outOfTimeSeconds);
        this.brokerRuntimeMsgPutTotalTodayNow = initCache(outOfTimeSeconds);
        this.brokerRuntimeMsgGetTotalTodayNow = initCache(outOfTimeSeconds);
        this.brokerRuntimeMsgGetTotalYesterdayMorning = initCache(outOfTimeSeconds);
        this.brokerRuntimeMsgPutTotalYesterdayMorning = initCache(outOfTimeSeconds);
        this.brokerRuntimeMsgGetTotalTodayMorning = initCache(outOfTimeSeconds);
        this.brokerRuntimeMsgPutTotalTodayMorning = initCache(outOfTimeSeconds);
        this.brokerRuntimeDispatchBehindBytes = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageSizeTotal = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageAverageSize = initCache(outOfTimeSeconds);
        this.brokerRuntimeQueryThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
        this.brokerRuntimeRemainTransientStoreBufferNumbs = initCache(outOfTimeSeconds);
        this.brokerRuntimeEarliestMessageTimeStamp = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageEntireTimeMax = initCache(outOfTimeSeconds);
        this.brokerRuntimeStartAcceptSendRequestTimeStamp = initCache(outOfTimeSeconds);
        this.brokerRuntimeSendThreadPoolQueueSize = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageTimesTotal = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetMessageEntireTimeMax = initCache(outOfTimeSeconds);
        this.brokerRuntimePageCacheLockTimeMills = initCache(outOfTimeSeconds);
        this.brokerRuntimeCommitLogDiskRatio = initCache(outOfTimeSeconds);
        this.brokerRuntimeConsumeQueueDiskRatio = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetFoundTps600 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetFoundTps60 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetFoundTps10 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetTotalTps600 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetTotalTps60 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetTotalTps10 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetTransferedTps600 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetTransferedTps60 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetTransferedTps10 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetMissTps600 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetMissTps60 = initCache(outOfTimeSeconds);
        this.brokerRuntimeGetMissTps10 = initCache(outOfTimeSeconds);
        this.brokerRuntimePutTps600 = initCache(outOfTimeSeconds);
        this.brokerRuntimePutTps60 = initCache(outOfTimeSeconds);
        this.brokerRuntimePutTps10 = initCache(outOfTimeSeconds);
        this.brokerRuntimePutLatency99 = initCache(outOfTimeSeconds);
        this.brokerRuntimePutLatency999 = initCache(outOfTimeSeconds);
        this.brokerRuntimeDispatchMaxBuffer = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap10toMore = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap5to10s = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap4to5s = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap3to4s = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap2to3s = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap1to2s = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap500to1s = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap200to500ms = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap100to200ms = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap50to100ms = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap10to50ms = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap0to10ms = initCache(outOfTimeSeconds);
        this.brokerRuntimePutMessageDistributeTimeMap0ms = initCache(outOfTimeSeconds);
        this.brokerRuntimePullThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
        this.brokerRuntimeSendThreadPoolQueueCapacity = initCache(outOfTimeSeconds);
        this.brokerRuntimePullThreadPoolQueueSize = initCache(outOfTimeSeconds);
        this.brokerRuntimeQueryThreadPoolQueueSize = initCache(outOfTimeSeconds);
        this.brokerRuntimePullThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
        this.brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
        this.brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills = initCache(outOfTimeSeconds);
        this.brokerRuntimeCommitLogDirCapacityFree = initCache(outOfTimeSeconds);
        this.brokerRuntimeCommitLogDirCapacityTotal = initCache(outOfTimeSeconds);
        this.brokerRuntimeCommitLogMaxOffset = initCache(outOfTimeSeconds);
        this.brokerRuntimeCommitLogMinOffset = initCache(outOfTimeSeconds);
        this.brokerRuntimeRemainHowManyDataToFlush = initCache(outOfTimeSeconds);
    }

    private <T extends Cache> T initCache(long outOfTimeSeconds) {
        return (T) CacheBuilder.newBuilder().expireAfterWrite(outOfTimeSeconds, TimeUnit.SECONDS).build();
    }

    private final static Logger log = LoggerFactory.getLogger(RMQMetricsCollector.class);

    private static final List<String> GROUP_DIFF_LABEL_NAMES = Arrays.asList("group", "topic", "countOfOnlineConsumers", "msgModel");

    private static <T extends Number> void loadGroupDiffMetric(GaugeMetricFamily family, Map.Entry<ConsumerTopicDiffMetric, T> entry) {
        family.addMetric(
            Arrays.asList(
                entry.getKey().getGroup(),
                entry.getKey().getTopic(),
                entry.getKey().getCountOfOnlineConsumers(),
                entry.getKey().getMsgModel()
            ),
            entry.getValue().doubleValue());
    }

    private static final List<String> GROUP_COUNT_LABEL_NAMES = Arrays.asList("caddr", "localaddr", "group");

    private void collectConsumerMetric(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily groupGetLatencyByConsumerDiff = new GaugeMetricFamily("rocketmq_group_diff", "GroupDiff", GROUP_DIFF_LABEL_NAMES);
        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDiff.asMap().entrySet()) {
            loadGroupDiffMetric(groupGetLatencyByConsumerDiff, entry);
        }
        mfs.add(groupGetLatencyByConsumerDiff);

        GaugeMetricFamily groupGetLatencyByConsumerRetryDiff = new GaugeMetricFamily("rocketmq_group_retrydiff", "GroupRetryDiff", GROUP_DIFF_LABEL_NAMES);
        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerRetryDiff.asMap().entrySet()) {
            loadGroupDiffMetric(groupGetLatencyByConsumerRetryDiff, entry);
        }
        mfs.add(groupGetLatencyByConsumerRetryDiff);

        GaugeMetricFamily groupGetLatencyByConsumerDLQDiff = new GaugeMetricFamily("rocketmq_group_dlqdiff", "GroupDLQDiff", GROUP_DIFF_LABEL_NAMES);
        for (Map.Entry<ConsumerTopicDiffMetric, Long> entry : consumerDLQDiff.asMap().entrySet()) {
            loadGroupDiffMetric(groupGetLatencyByConsumerDLQDiff, entry);
        }
        mfs.add(groupGetLatencyByConsumerDLQDiff);

        GaugeMetricFamily consumerCountsF = new GaugeMetricFamily("rocketmq_group_count", "GroupCount", GROUP_COUNT_LABEL_NAMES);
        for (Map.Entry<ConsumerCountMetric, Integer> entry : consumerCounts.asMap().entrySet()) {
            consumerCountsF.addMetric(
                Arrays.asList(
                    entry.getKey().getCaddrs(),
                    entry.getKey().getLocaladdrs(),
                    entry.getKey().getGroup()
                ),
                entry.getValue().doubleValue());
        }
        mfs.add(consumerCountsF);

    }


    private static final List<String> TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
        "cluster", "broker", "topic"
    );

    private static final List<String> DLQ_TOPIC_OFFSET_LABEL_NAMES = Arrays.asList(
        "cluster", "broker", "group"
    );

    private void loadTopicOffsetMetric(GaugeMetricFamily family, Map.Entry<ProducerMetric, Double> entry) {
        family.addMetric(
            Arrays.asList(
                entry.getKey().getClusterName(),
                entry.getKey().getBrokerName(),
                entry.getKey().getTopicName()
            ),
            entry.getValue());
    }

    private static final List<String> PRODUCER_GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
            "cluster", "broker", "group"
    );

    private void collectProducerMetric(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily producerCount = new GaugeMetricFamily("rocketmq_producer_count", "producer instance counter", PRODUCER_GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ProducerCountMetric, Integer> entry : producerCounts.asMap().entrySet()) {
            producerCount.addMetric(
                    Arrays.asList(
                            entry.getKey().getClusterName(),
                            entry.getKey().getBrokerName(),
                            entry.getKey().getGroup()
                    ),
                    entry.getValue().doubleValue());
        }
        mfs.add(producerCount);
    }

    private void collectTopicOffsetMetric(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily topicOffsetF = new GaugeMetricFamily("rocketmq_producer_offset", "TopicOffset", TOPIC_OFFSET_LABEL_NAMES);
        for (Map.Entry<ProducerMetric, Double> entry : topicOffset.asMap().entrySet()) {
            loadTopicOffsetMetric(topicOffsetF, entry);
        }
        mfs.add(topicOffsetF);

        GaugeMetricFamily topicRetryOffsetF = new GaugeMetricFamily("rocketmq_topic_retry_offset", "TopicRetryOffset", TOPIC_OFFSET_LABEL_NAMES);
        for (Map.Entry<ProducerMetric, Double> entry : topicRetryOffset.asMap().entrySet()) {
            loadTopicOffsetMetric(topicRetryOffsetF, entry);
        }
        mfs.add(topicRetryOffsetF);

        GaugeMetricFamily topicDLQOffsetF = new GaugeMetricFamily("rocketmq_topic_dlq_offset", "TopicRetryOffset", DLQ_TOPIC_OFFSET_LABEL_NAMES);
        for (Map.Entry<DLQTopicOffsetMetric, Double> entry : topicDLQOffset.asMap().entrySet()) {
            topicDLQOffsetF.addMetric(
                Arrays.asList(
                    entry.getKey().getClusterName(),
                    entry.getKey().getBrokerName(),
                    entry.getKey().getGroup()
                ),
                entry.getValue());
        }
        mfs.add(topicDLQOffsetF);

    }

    private OtlpMetricsCollectorService otlpMetricsCollectorService;

    public void setOtlpMetricsCollectorService(
        OtlpMetricsCollectorService otlpMetricsCollectorService) {
        log.info("init RMQMetricsService add opentelemetry collector...");
        this.otlpMetricsCollectorService = otlpMetricsCollectorService;
    }

    @Override
    public List<MetricFamilySamples> collect() {

        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();

        collectConsumerMetric(mfs);

        collectProducerMetric(mfs);

        collectTopicOffsetMetric(mfs);

        collectTopicNums(mfs);

        collectGroupNums(mfs);

        collectClientGroupMetric(mfs);

        collectBrokerNums(mfs);

        collectBrokerRuntimeStats(mfs);

        // v2 opentelemetry metrics
        otlpMetricsCollectorService.collectOtlpMetrics(mfs);

        return mfs;
    }

    private static final List<String> GROUP_CLIENT_METRIC_LABEL_NAMES = Arrays.asList(
        "clientAddr", "clientId", "group", "topic"
    );

    private void collectClientGroupMetric(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily consumerClientFailedMsgCountsF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_count", "consumerClientFailedMsgCounts", GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ConsumerRuntimeConsumeFailedMsgsMetric, Long> entry : consumerClientFailedMsgCounts.asMap().entrySet()) {
            loadClientRuntimeStatsMetric(consumerClientFailedMsgCountsF, entry);
        }
        mfs.add(consumerClientFailedMsgCountsF);

        GaugeMetricFamily consumerClientFailedTPSF = new GaugeMetricFamily("rocketmq_client_consume_fail_msg_tps", "consumerClientFailedTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ConsumerRuntimeConsumeFailedTPSMetric, Double> entry : consumerClientFailedTPS.asMap().entrySet()) {
            loadClientRuntimeStatsMetric(consumerClientFailedTPSF, entry);
        }
        mfs.add(consumerClientFailedTPSF);

        GaugeMetricFamily consumerClientOKTPSF = new GaugeMetricFamily("rocketmq_client_consume_ok_msg_tps", "consumerClientOKTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ConsumerRuntimeConsumeOKTPSMetric, Double> entry : consumerClientOKTPS.asMap().entrySet()) {
            loadClientRuntimeStatsMetric(consumerClientOKTPSF, entry);
        }
        mfs.add(consumerClientOKTPSF);

        GaugeMetricFamily consumerClientRTF = new GaugeMetricFamily("rocketmq_client_consume_rt", "consumerClientRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ConsumerRuntimeConsumeRTMetric, Double> entry : consumerClientRT.asMap().entrySet()) {
            loadClientRuntimeStatsMetric(consumerClientRTF, entry);
        }
        mfs.add(consumerClientRTF);

        GaugeMetricFamily consumerClientPullRTF = new GaugeMetricFamily("rocketmq_client_consumer_pull_rt", "consumerClientPullRT", GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ConsumerRuntimePullRTMetric, Double> entry : consumerClientPullRT.asMap().entrySet()) {
            loadClientRuntimeStatsMetric(consumerClientPullRTF, entry);
        }
        mfs.add(consumerClientPullRTF);

        GaugeMetricFamily consumerClientPullTPSF = new GaugeMetricFamily("rocketmq_client_consumer_pull_tps", "consumerClientPullTPS", GROUP_CLIENT_METRIC_LABEL_NAMES);
        for (Map.Entry<ConsumerRuntimePullTPSMetric, Double> entry : consumerClientPullTPS.asMap().entrySet()) {
            loadClientRuntimeStatsMetric(consumerClientPullTPSF, entry);
        }
        mfs.add(consumerClientPullTPSF);
    }


    private <T2 extends Number, T1 extends ConsumerRuntimeConsumeFailedMsgsMetric> void loadClientRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<T1, T2> entry) {
        family.addMetric(Arrays.asList(
            entry.getKey().getCaddrs(),
            entry.getKey().getLocaladdrs(),
            entry.getKey().getGroup(),
            entry.getKey().getTopic()
        ), entry.getValue().doubleValue());
    }

    private static final List<String> GROUP_PULL_LATENCY_LABEL_NAMES = Arrays.asList(
        "cluster", "broker", "topic", "group", "queueid"
    );
    private static final List<String> GROUP_LATENCY_BY_STORETIME_LABEL_NAMES = Arrays.asList(
        "cluster", "broker", "topic", "group"
    );

    private static final List<String> BROKER_NUMS_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "broker");

    private static void loadBrokerNums(GaugeMetricFamily family, Map.Entry<BrokerMetric, Double> entry) {
        family.addMetric(Arrays.asList(
            entry.getKey().getClusterName(),
            entry.getKey().getBrokerIP(),
            entry.getKey().getBrokerName()),
            entry.getValue()
        );
    }

    private void collectBrokerNums(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily brokerPutNumsGauge = new GaugeMetricFamily("rocketmq_broker_tps", "BrokerPutNums", BROKER_NUMS_LABEL_NAMES);
        for (Map.Entry<BrokerMetric, Double> entry : brokerPutNums.asMap().entrySet()) {
            loadBrokerNums(brokerPutNumsGauge, entry);
        }
        mfs.add(brokerPutNumsGauge);

        GaugeMetricFamily brokerGetNumsGauge = new GaugeMetricFamily("rocketmq_broker_qps", "BrokerGetNums", BROKER_NUMS_LABEL_NAMES);
        for (Map.Entry<BrokerMetric, Double> entry : brokerGetNums.asMap().entrySet()) {
            loadBrokerNums(brokerGetNumsGauge, entry);
        }
        mfs.add(brokerGetNumsGauge);

        GaugeMetricFamily brokerCommitLogDiffGauge = new GaugeMetricFamily("rocketmq_broker_commitlog_diff", "brokerCommitLogDiffGauge", BROKER_NUMS_LABEL_NAMES);
        for (Map.Entry<BrokerMetric, Double> entry : brokerCommitLogDiff.asMap().entrySet()) {
            loadBrokerNums(brokerCommitLogDiffGauge, entry);
        }
        mfs.add(brokerCommitLogDiffGauge);
    }


    private static final List<String> GROUP_NUMS_LABEL_NAMES = Arrays.asList(
        "cluster", "broker", "topic", "group"
    );

    private static <T extends Number> void loadGroupNumsMetric(GaugeMetricFamily family, Map.Entry<ConsumerMetric, T> entry) {
        family.addMetric(Arrays.asList(
            entry.getKey().getClusterName(),
            entry.getKey().getBrokerName(),
            entry.getKey().getTopicName(),
            entry.getKey().getConsumerGroupName()),
            entry.getValue().doubleValue()
        );
    }

    private void collectBrokerRuntimeStatsPutMessageDistributeTime(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily pmdt0 = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0ms", "PutMessageDistributeTimeMap0ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0ms.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt0, entry);
        }
        mfs.add(pmdt0);

        GaugeMetricFamily pmdt0to10ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_0to10ms", "PutMessageDistributeTimeMap0to10ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap0to10ms.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt0to10ms, entry);
        }
        mfs.add(pmdt0to10ms);

        GaugeMetricFamily pmdt10to50ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10to50ms", "PutMessageDistributeTimeMap10to50ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10to50ms.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt10to50ms, entry);
        }
        mfs.add(pmdt10to50ms);

        GaugeMetricFamily pmdt50to100ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_50to100ms", "PutMessageDistributeTimeMap50to100ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap50to100ms.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt50to100ms, entry);
        }
        mfs.add(pmdt50to100ms);

        GaugeMetricFamily pmdt100to200ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_100to200ms", "PutMessageDistributeTimeMap100to200ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap100to200ms.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt100to200ms, entry);
        }
        mfs.add(pmdt100to200ms);

        GaugeMetricFamily pmdt200to500ms = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_200to500ms", "PutMessageDistributeTimeMap200to500ms", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap200to500ms.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt200to500ms, entry);
        }
        mfs.add(pmdt200to500ms);

        GaugeMetricFamily pmdt500to1s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_500to1s", "PutMessageDistributeTimeMap500to1s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap500to1s.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt500to1s, entry);
        }
        mfs.add(pmdt500to1s);

        GaugeMetricFamily pmdt1to2s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_1to2s", "PutMessageDistributeTimeMap1to2s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap1to2s.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt1to2s, entry);
        }
        mfs.add(pmdt1to2s);

        GaugeMetricFamily pmdt2to3s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_2to3s", "PutMessageDistributeTimeMap2to3s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap2to3s.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt2to3s, entry);
        }
        mfs.add(pmdt2to3s);

        GaugeMetricFamily pmdt3to4s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_3to4s", "PutMessageDistributeTimeMap3to4s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap3to4s.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt3to4s, entry);
        }
        mfs.add(pmdt3to4s);

        GaugeMetricFamily pmdt4to5s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_4to5s", "PutMessageDistributeTimeMap4to5s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap4to5s.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt4to5s, entry);
        }
        mfs.add(pmdt4to5s);

        GaugeMetricFamily pmdt5to10s = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_5to10s", "PutMessageDistributeTimeMap5to10s", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap5to10s.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt5to10s, entry);
        }
        mfs.add(pmdt5to10s);

        GaugeMetricFamily pmdt10stoMore = new GaugeMetricFamily("rocketmq_brokeruntime_pmdt_10stomore", "PutMessageDistributeTimeMap10toMore", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Integer> entry : brokerRuntimePutMessageDistributeTimeMap10toMore.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(pmdt10stoMore, entry);
        }
        mfs.add(pmdt10stoMore);
    }

    private void collectGroupNums(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily groupGetNumsGauge = new GaugeMetricFamily("rocketmq_consumer_tps", "GroupGetNums", GROUP_NUMS_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Double> entry : groupGetNums.asMap().entrySet()) {
            loadGroupNumsMetric(groupGetNumsGauge, entry);
        }
        mfs.add(groupGetNumsGauge);

        GaugeMetricFamily groupConsumeTPSF = new GaugeMetricFamily("rocketmq_group_consume_tps", "GroupConsumeTPS", GROUP_NUMS_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Double> entry : groupConsumeTPS.asMap().entrySet()) {
            loadGroupNumsMetric(groupConsumeTPSF, entry);
        }
        mfs.add(groupConsumeTPSF);

        GaugeMetricFamily groupBrokerTotalOffsetF = new GaugeMetricFamily("rocketmq_consumer_offset", "GroupBrokerTotalOffset", GROUP_NUMS_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Long> entry : groupBrokerTotalOffset.asMap().entrySet()) {
            loadGroupNumsMetric(groupBrokerTotalOffsetF, entry);
        }
        mfs.add(groupBrokerTotalOffsetF);

        GaugeMetricFamily groupConsumeTotalOffsetF = new GaugeMetricFamily("rocketmq_group_consume_total_offset", "GroupConsumeTotalOffset", GROUP_NUMS_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Long> entry : groupConsumeTotalOffset.asMap().entrySet()) {
            loadGroupNumsMetric(groupConsumeTotalOffsetF, entry);
        }
        mfs.add(groupConsumeTotalOffsetF);

        GaugeMetricFamily groupGetSizeGauge = new GaugeMetricFamily("rocketmq_consumer_message_size", "GroupGetMessageSize", GROUP_NUMS_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Double> entry : groupGetSize.asMap().entrySet()) {
            loadGroupNumsMetric(groupGetSizeGauge, entry);
        }
        mfs.add(groupGetSizeGauge);

        GaugeMetricFamily sendBackNumsGauge = new GaugeMetricFamily("rocketmq_send_back_nums", "SendBackNums", GROUP_NUMS_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Double> entry : sendBackNums.asMap().entrySet()) {
            loadGroupNumsMetric(sendBackNumsGauge, entry);
        }
        mfs.add(sendBackNumsGauge);

        GaugeMetricFamily groupLatencyByTimeF = new GaugeMetricFamily("rocketmq_group_get_latency_by_storetime",
            "GroupGetLatencyByStoreTime", GROUP_LATENCY_BY_STORETIME_LABEL_NAMES);
        for (Map.Entry<ConsumerMetric, Long> entry : groupLatencyByTime.asMap().entrySet()) {
            loadGroupNumsMetric(groupLatencyByTimeF, entry);
        }
        mfs.add(groupLatencyByTimeF);

    }

    private void collectTopicNums(List<MetricFamilySamples> mfs) {
        GaugeMetricFamily topicPutNumsGauge = new GaugeMetricFamily("rocketmq_producer_tps", "TopicPutNums", TOPIC_NUMS_LABEL_NAMES);
        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutNums.asMap().entrySet()) {
            loadTopicNumsMetric(topicPutNumsGauge, entry);
        }
        mfs.add(topicPutNumsGauge);

        GaugeMetricFamily topicPutSizeGauge = new GaugeMetricFamily("rocketmq_producer_message_size", "TopicPutMessageSize", TOPIC_NUMS_LABEL_NAMES);
        for (Map.Entry<TopicPutNumMetric, Double> entry : topicPutSize.asMap().entrySet()) {
            loadTopicNumsMetric(topicPutSizeGauge, entry);
        }
        mfs.add(topicPutSizeGauge);
    }

    private static final List<String> TOPIC_NUMS_LABEL_NAMES = Arrays.asList("cluster", "broker", "topic");

    private void loadTopicNumsMetric(GaugeMetricFamily family, Map.Entry<TopicPutNumMetric, Double> entry) {
        family.addMetric(
            Arrays.asList(
                entry.getKey().getClusterName(),
                entry.getKey().getBrokerName(),
                entry.getKey().getTopicName()
            ),
            entry.getValue()
        );
    }

    public void addTopicOffsetMetric(String clusterName, String brokerName, String topic, long lastUpdateTimestamp, double value) {
        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            topicRetryOffset.put(new ProducerMetric(clusterName, brokerName, topic, lastUpdateTimestamp), value);
        } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
            topicDLQOffset.put(new DLQTopicOffsetMetric(clusterName, brokerName, topic.replace(MixAll.DLQ_GROUP_TOPIC_PREFIX, ""), lastUpdateTimestamp), value);
        } else {
            topicOffset.put(new ProducerMetric(clusterName, brokerName, topic, lastUpdateTimestamp), value);
        }
    }

    public void addProducerCountMetric(String clusterName, String brokerName, String groupName, int value) {
        producerCounts.put(new ProducerCountMetric(clusterName, brokerName, groupName), value);
    }

    public void addGroupCountMetric(String group, String caddrs, String localaddrs, int count) {
        this.consumerCounts.put(new ConsumerCountMetric(group, caddrs, localaddrs), count);
    }

    public void addGroupDiffMetric(String countOfOnlineConsumers, String group, String topic, String msgModel, long value) {
        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            this.consumerRetryDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers, msgModel), value);
        } else if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
            this.consumerDLQDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers, msgModel), value);
        } else {
            this.consumerDiff.put(new ConsumerTopicDiffMetric(group, topic, countOfOnlineConsumers, msgModel), value);
        }
    }

    public void addTopicPutNumsMetric(String cluster, String brokerName, String brokerIP, String topic, double value) {
        topicPutNums.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, topic), value);
    }

    public void addTopicPutSizeMetric(String cluster, String brokerName, String brokerIP, String topic, double value) {
        topicPutSize.put(new TopicPutNumMetric(cluster, brokerName, brokerIP, topic), value);
    }

    public void addGroupBrokerTotalOffsetMetric(String clusterName, String brokerName,String topic, String group, long value) {
        groupBrokerTotalOffset.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
    }

    public void addGroupGetLatencyByStoreTimeMetric(String clusterName, String brokerName, String topic, String group, long value) {
        groupLatencyByTime.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
    }

    public void addGroupConsumerTotalOffsetMetric(String topic, String group, long value) {
        //groupConsumeTotalOffset.put(new ConsumerMetric(topic, group), value);
    }

    public void addGroupConsumeTPSMetric(String clusterName, String brokerName, String topic, String group, double value) {
        groupConsumeTPS.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
    }

    public void addGroupGetNumsMetric(String clusterName, String brokerName, String topic, String group, double value) {
        groupGetNums.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
    }

    public void addGroupGetSizeMetric(String clusterName, String brokerName, String topic, String group, double value) {
        groupGetSize.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
    }

    public void addSendBackNumsMetric(String clusterName, String brokerName, String topic, String group, double value) {
        sendBackNums.put(new ConsumerMetric(clusterName, brokerName, topic, group), value);
    }

    public void addConsumerClientFailedMsgCountsMetric(String group, String topic, String clientAddr, String clientId, long value) {
        consumerClientFailedMsgCounts.put(new ConsumerRuntimeConsumeFailedMsgsMetric(group, topic, clientAddr, clientId), value);
    }

    public void addConsumerClientFailedTPSMetric(String group, String topic, String clientAddr, String clientId, double value) {
        consumerClientFailedTPS.put(new ConsumerRuntimeConsumeFailedTPSMetric(group, topic, clientAddr, clientId), value);
    }

    public void addConsumerClientOKTPSMetric(String group, String topic, String clientAddr, String clientId, double value) {
        consumerClientOKTPS.put(new ConsumerRuntimeConsumeOKTPSMetric(group, topic, clientAddr, clientId), value);
    }

    public void addConsumeRTMetricMetric(String group, String topic, String clientAddr, String clientId, double value) {
        consumerClientRT.put(new ConsumerRuntimeConsumeRTMetric(group, topic, clientAddr, clientId), value);
    }

    public void addPullRTMetric(String group, String topic, String clientAddr, String clientId, double value) {
        consumerClientPullRT.put(new ConsumerRuntimePullRTMetric(group, topic, clientAddr, clientId), value);
    }

    public void addPullTPSMetric(String group, String topic, String clientAddr, String clientId, double value) {
        consumerClientPullTPS.put(new ConsumerRuntimePullTPSMetric(group, topic, clientAddr, clientId), value);
    }

    public void addBrokerPutNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
        brokerPutNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
    }

    public void addBrokerGetNumsMetric(String clusterName, String brokerIP, String brokerName, double value) {
        brokerGetNums.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
    }

    public void addBrokerCommitLogDiffMetric(String clusterName, String brokerIP, String brokerName, double value) {
        brokerCommitLogDiff.put(new BrokerMetric(clusterName, brokerIP, brokerName), value);
    }

    public void addBrokerRuntimeStatsMetric(BrokerRuntimeStats stats, String clusterName, String brokerAddress, String brokerHost) {
        addBrokerRuntimePutMessageDistributeTimeMap(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion(), stats);
        addCommitLogDirCapacity(clusterName, brokerAddress, brokerHost, stats);
        addAllKindOfTps(clusterName, brokerAddress, brokerHost, stats);

        brokerRuntimePutLatency99.put(new BrokerRuntimeMetric(
                clusterName, brokerAddress, brokerHost,
                stats.getBrokerVersionDesc(),
                stats.getBootTimestamp(),
                stats.getBrokerVersion()), stats.getPutLatency99());

        brokerRuntimePutLatency999.put(new BrokerRuntimeMetric(
                clusterName, brokerAddress, brokerHost,
                stats.getBrokerVersionDesc(),
                stats.getBootTimestamp(),
                stats.getBrokerVersion()), stats.getPutLatency999());

        brokerRuntimeMsgPutTotalTodayNow.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getMsgPutTotalTodayNow());

        brokerRuntimeMsgGetTotalTodayNow.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getMsgGetTotalTodayNow());

        brokerRuntimeMsgPutTotalTodayMorning.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getMsgPutTotalTodayMorning());
        brokerRuntimeMsgGetTotalTodayMorning.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getMsgGetTotalTodayMorning());
        brokerRuntimeMsgPutTotalYesterdayMorning.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getMsgPutTotalYesterdayMorning());
        brokerRuntimeMsgGetTotalYesterdayMorning.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getMsgGetTotalYesterdayMorning());
        brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getSendThreadPoolQueueHeadWaitTimeMills());
        brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueHeadWaitTimeMills());
        brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPullThreadPoolQueueHeadWaitTimeMills());
        brokerRuntimeQueryThreadPoolQueueSize.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueSize());
        brokerRuntimePullThreadPoolQueueSize.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPullThreadPoolQueueSize());
        brokerRuntimeSendThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getSendThreadPoolQueueCapacity());
        brokerRuntimePullThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPullThreadPoolQueueCapacity());

        brokerRuntimeRemainHowManyDataToFlush.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getRemainHowManyDataToFlush());
        brokerRuntimeCommitLogMinOffset.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getCommitLogMinOffset());
        brokerRuntimeCommitLogMaxOffset.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getCommitLogMaxOffset());


        brokerRuntimeDispatchMaxBuffer.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getDispatchMaxBuffer());
        brokerRuntimeConsumeQueueDiskRatio.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getConsumeQueueDiskRatio());
        brokerRuntimeCommitLogDiskRatio.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getCommitLogDiskRatio());
        brokerRuntimePageCacheLockTimeMills.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPageCacheLockTimeMills());
        brokerRuntimeGetMessageEntireTimeMax.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetMessageEntireTimeMax());
        brokerRuntimePutMessageTimesTotal.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutMessageTimesTotal());
        brokerRuntimeSendThreadPoolQueueSize.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getSendThreadPoolQueueSize());
        brokerRuntimeStartAcceptSendRequestTimeStamp.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getStartAcceptSendRequestTimeStamp());
        brokerRuntimePutMessageEntireTimeMax.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutMessageEntireTimeMax());
        brokerRuntimeEarliestMessageTimeStamp.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getEarliestMessageTimeStamp());
        brokerRuntimeRemainTransientStoreBufferNumbs.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getRemainTransientStoreBufferNumbs());
        brokerRuntimeQueryThreadPoolQueueCapacity.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getQueryThreadPoolQueueCapacity());
        brokerRuntimePutMessageAverageSize.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutMessageAverageSize());
        brokerRuntimePutMessageSizeTotal.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutMessageSizeTotal());
        brokerRuntimeDispatchBehindBytes.put(new BrokerRuntimeMetric(
            clusterName, brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getDispatchBehindBytes());
    }

    private void addAllKindOfTps(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
        brokerRuntimePutTps10.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutTps().getTen());
        brokerRuntimePutTps60.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutTps().getSixty());
        brokerRuntimePutTps600.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getPutTps().getSixHundred());
        brokerRuntimeGetMissTps10.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetMissTps().getTen());
        brokerRuntimeGetMissTps60.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetMissTps().getSixty());
        brokerRuntimeGetMissTps600.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetMissTps().getSixHundred());
        brokerRuntimeGetTransferedTps10.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetTransferedTps().getTen());
        brokerRuntimeGetTransferedTps60.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetTransferedTps().getSixty());
        brokerRuntimeGetTransferedTps600.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetTransferedTps().getSixHundred());
        brokerRuntimeGetTotalTps10.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetTotalTps().getTen());
        brokerRuntimeGetTotalTps60.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetTotalTps().getSixty());
        brokerRuntimeGetTotalTps600.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetTotalTps().getSixHundred());
        brokerRuntimeGetFoundTps10.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetFoundTps().getTen());
        brokerRuntimeGetFoundTps60.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetFoundTps().getSixty());
        brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
        brokerRuntimeGetFoundTps600.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getGetFoundTps().getSixHundred());
    }

    private void addCommitLogDirCapacity(String clusterName, String brokerAddress, String brokerHost, BrokerRuntimeStats stats) {
        brokerRuntimeCommitLogDirCapacityTotal.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getCommitLogDirCapacityTotal());
        brokerRuntimeCommitLogDirCapacityFree.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            stats.getBrokerVersionDesc(),
            stats.getBootTimestamp(),
            stats.getBrokerVersion()), stats.getCommitLogDirCapacityFree());
    }

    private void addBrokerRuntimePutMessageDistributeTimeMap(
        String clusterName, String brokerAddress, String brokerHost,
        String brokerDes, long bootTimestamp, int brokerVersion,
        BrokerRuntimeStats stats) {
        if (stats.getPutMessageDistributeTimeMap() == null || stats.getPutMessageDistributeTimeMap().isEmpty()) {
            log.warn("WARN putMessageDistributeTime is null or empty");
            return;
        }
        brokerRuntimePutMessageDistributeTimeMap0ms.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("<=0ms"));
        brokerRuntimePutMessageDistributeTimeMap0to10ms.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("0~10ms"));
        brokerRuntimePutMessageDistributeTimeMap10to50ms.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("10~50ms"));
        brokerRuntimePutMessageDistributeTimeMap50to100ms.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("50~100ms"));
        brokerRuntimePutMessageDistributeTimeMap100to200ms.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("100~200ms"));
        brokerRuntimePutMessageDistributeTimeMap200to500ms.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("200~500ms"));
        brokerRuntimePutMessageDistributeTimeMap500to1s.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("500ms~1s"));
        brokerRuntimePutMessageDistributeTimeMap1to2s.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("1~2s"));
        brokerRuntimePutMessageDistributeTimeMap2to3s.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("2~3s"));
        brokerRuntimePutMessageDistributeTimeMap3to4s.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("3~4s"));
        brokerRuntimePutMessageDistributeTimeMap4to5s.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("4~5s"));
        brokerRuntimePutMessageDistributeTimeMap5to10s.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("5~10s"));
        brokerRuntimePutMessageDistributeTimeMap10toMore.put(new BrokerRuntimeMetric(
            clusterName,
            brokerAddress, brokerHost,
            brokerDes,
            bootTimestamp,
            brokerVersion), stats.getPutMessageDistributeTimeMap().get("10s~"));
    }

    private static <T extends Number> void loadBrokerRuntimeStatsMetric(GaugeMetricFamily family, Map.Entry<BrokerRuntimeMetric, T> entry) {
        family.addMetric(Arrays.asList(
            entry.getKey().getClusterName(),
            entry.getKey().getBrokerAddress(),
            entry.getKey().getBrokerHost(),
            entry.getKey().getBrokerDes(),
            String.valueOf(entry.getKey().getBootTimestamp()),
            String.valueOf(entry.getKey().getBrokerVersion())
        ), entry.getValue().doubleValue());
    }

    private static final List<String> BROKER_RUNTIME_METRIC_LABEL_NAMES = Arrays.asList("cluster", "brokerIP", "brokerHost", "des", "boottime", "broker_version");

    private void collectBrokerRuntimeStats(List<MetricFamilySamples> mfs) {
        collectBrokerRuntimeStatsPutMessageDistributeTime(mfs);

        GaugeMetricFamily brokerRuntimeMsgPutTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_put_total_today_now", "brokerRuntimeMsgPutTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayNow.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayNowF, entry);
        }
        mfs.add(brokerRuntimeMsgPutTotalTodayNowF);

        GaugeMetricFamily brokerRuntimeMsgGetTotalTodayNowF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_today_now", "brokerRuntimeMsgGetTotalTodayNow", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayNow.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayNowF, entry);
        }
        mfs.add(brokerRuntimeMsgGetTotalTodayNowF);

        GaugeMetricFamily brokerRuntimeDispatchBehindBytesF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_behind_bytes", "brokerRuntimeDispatchBehindBytes", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchBehindBytes.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchBehindBytesF, entry);
        }
        mfs.add(brokerRuntimeDispatchBehindBytesF);

        GaugeMetricFamily brokerRuntimePutMessageSizeTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_size_total", "brokerRuntimePutMessageSizeTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageSizeTotal.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageSizeTotalF, entry);
        }
        mfs.add(brokerRuntimePutMessageSizeTotalF);

        GaugeMetricFamily brokerRuntimePutMessageAverageSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_put_message_average_size", "brokerRuntimePutMessageAverageSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutMessageAverageSize.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageAverageSizeF, entry);
        }
        mfs.add(brokerRuntimePutMessageAverageSizeF);

        GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpool_queue_capacity", "brokerRuntimeQueryThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueCapacity.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueCapacityF, entry);
        }
        mfs.add(brokerRuntimeQueryThreadPoolQueueCapacityF);

        GaugeMetricFamily brokerRuntimeRemainTransientStoreBufferNumbsF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_transientstore_buffer_numbs", "brokerRuntimeRemainTransientStoreBufferNumbs", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeRemainTransientStoreBufferNumbs.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeRemainTransientStoreBufferNumbsF, entry);
        }
        mfs.add(brokerRuntimeRemainTransientStoreBufferNumbsF);

        GaugeMetricFamily brokerRuntimeEarliestMessageTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_earliest_message_timestamp", "brokerRuntimeEarliestMessageTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeEarliestMessageTimeStamp.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeEarliestMessageTimeStampF, entry);
        }
        mfs.add(brokerRuntimeEarliestMessageTimeStampF);

        GaugeMetricFamily brokerRuntimePutMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_entire_time_max", "brokerRuntimePutMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageEntireTimeMax.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageEntireTimeMaxF, entry);
        }
        mfs.add(brokerRuntimePutMessageEntireTimeMaxF);

        GaugeMetricFamily brokerRuntimeStartAcceptSendRequestTimeStampF = new GaugeMetricFamily("rocketmq_brokeruntime_start_accept_sendrequest_time", "brokerRuntimeStartAcceptSendRequestTimeStamp", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeStartAcceptSendRequestTimeStamp.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeStartAcceptSendRequestTimeStampF, entry);
        }
        mfs.add(brokerRuntimeStartAcceptSendRequestTimeStampF);

        GaugeMetricFamily brokerRuntimeSendThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpool_queue_size", "brokerRuntimeSendThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueSize.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueSizeF, entry);
        }
        mfs.add(brokerRuntimeSendThreadPoolQueueSizeF);

        GaugeMetricFamily brokerRuntimePutMessageTimesTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_putmessage_times_total", "brokerRuntimePutMessageTimesTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePutMessageTimesTotal.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutMessageTimesTotalF, entry);
        }
        mfs.add(brokerRuntimePutMessageTimesTotalF);

        GaugeMetricFamily brokerRuntimeGetMessageEntireTimeMaxF = new GaugeMetricFamily("rocketmq_brokeruntime_getmessage_entire_time_max", "brokerRuntimeGetMessageEntireTimeMax", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeGetMessageEntireTimeMax.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMessageEntireTimeMaxF, entry);
        }
        mfs.add(brokerRuntimeGetMessageEntireTimeMaxF);

        GaugeMetricFamily brokerRuntimePageCacheLockTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pagecache_lock_time_mills", "brokerRuntimePageCacheLockTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePageCacheLockTimeMills.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePageCacheLockTimeMillsF, entry);
        }
        mfs.add(brokerRuntimePageCacheLockTimeMillsF);

        GaugeMetricFamily brokerRuntimeCommitLogDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_disk_ratio", "brokerRuntimeCommitLogDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDiskRatio.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDiskRatioF, entry);
        }
        mfs.add(brokerRuntimeCommitLogDiskRatioF);

        GaugeMetricFamily brokerRuntimeConsumeQueueDiskRatioF = new GaugeMetricFamily("rocketmq_brokeruntime_consumequeue_disk_ratio", "brokerRuntimeConsumeQueueDiskRatio", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeConsumeQueueDiskRatio.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeConsumeQueueDiskRatioF, entry);
        }
        mfs.add(brokerRuntimeConsumeQueueDiskRatioF);

        GaugeMetricFamily brokerRuntimeGetFoundTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps600", "brokerRuntimeGetFoundTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps600.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps600F, entry);
        }
        mfs.add(brokerRuntimeGetFoundTps600F);

        GaugeMetricFamily brokerRuntimeGetFoundTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps60", "brokerRuntimeGetFoundTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps60.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps60F, entry);
        }
        mfs.add(brokerRuntimeGetFoundTps60F);

        GaugeMetricFamily brokerRuntimeGetFoundTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getfound_tps10", "brokerRuntimeGetFoundTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetFoundTps10.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetFoundTps10F, entry);
        }
        mfs.add(brokerRuntimeGetFoundTps10F);

        GaugeMetricFamily brokerRuntimeGetTotalTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps600", "brokerRuntimeGetTotalTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps600.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps600F, entry);
        }
        mfs.add(brokerRuntimeGetTotalTps600F);

        GaugeMetricFamily brokerRuntimeGetTotalTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps60", "brokerRuntimeGetTotalTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps60.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps60F, entry);
        }
        mfs.add(brokerRuntimeGetTotalTps60F);

        GaugeMetricFamily brokerRuntimeGetTotalTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettotal_tps10", "brokerRuntimeGetTotalTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTotalTps10.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTotalTps10F, entry);
        }
        mfs.add(brokerRuntimeGetTotalTps10F);

        GaugeMetricFamily brokerRuntimeGetTransferedTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps600", "brokerRuntimeGetTransferedTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps600.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps600F, entry);
        }
        mfs.add(brokerRuntimeGetTransferedTps600F);

        GaugeMetricFamily brokerRuntimeGetTransferedTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps60", "brokerRuntimeGetTransferedTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps60.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps60F, entry);
        }
        mfs.add(brokerRuntimeGetTransferedTps60F);

        GaugeMetricFamily brokerRuntimeGetTransferedTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_gettransfered_tps10", "brokerRuntimeGetTransferedTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetTransferedTps10.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetTransferedTps10F, entry);
        }
        mfs.add(brokerRuntimeGetTransferedTps10F);

        GaugeMetricFamily brokerRuntimeGetMissTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps600", "brokerRuntimeGetMissTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps600.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps600F, entry);
        }
        mfs.add(brokerRuntimeGetMissTps600F);

        GaugeMetricFamily brokerRuntimeGetMissTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps60", "brokerRuntimeGetMissTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps60.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps60F, entry);
        }
        mfs.add(brokerRuntimeGetMissTps60F);

        GaugeMetricFamily brokerRuntimeGetMissTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_getmiss_tps10", "brokerRuntimeGetMissTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeGetMissTps10.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeGetMissTps10F, entry);
        }
        mfs.add(brokerRuntimeGetMissTps10F);

        GaugeMetricFamily brokerRuntimePutTps600F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps600", "brokerRuntimePutTps600", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps600.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutTps600F, entry);
        }
        mfs.add(brokerRuntimePutTps600F);

        GaugeMetricFamily brokerRuntimePutTps60F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps60", "brokerRuntimePutTps60", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps60.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutTps60F, entry);
        }
        mfs.add(brokerRuntimePutTps60F);

        GaugeMetricFamily brokerRuntimePutTps10F = new GaugeMetricFamily("rocketmq_brokeruntime_put_tps10", "brokerRuntimePutTps10", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutTps10.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutTps10F, entry);
        }
        mfs.add(brokerRuntimePutTps10F);

        GaugeMetricFamily brokerRuntimeDispatchMaxBufferF = new GaugeMetricFamily("rocketmq_brokeruntime_dispatch_maxbuffer", "brokerRuntimeDispatchMaxBuffer", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeDispatchMaxBuffer.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeDispatchMaxBufferF, entry);
        }
        mfs.add(brokerRuntimeDispatchMaxBufferF);

        GaugeMetricFamily brokerRuntimePullThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_capacity", "brokerRuntimePullThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueCapacity.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueCapacityF, entry);
        }
        mfs.add(brokerRuntimePullThreadPoolQueueCapacityF);

        GaugeMetricFamily brokerRuntimeSendThreadPoolQueueCapacityF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_capacity", "brokerRuntimeSendThreadPoolQueueCapacity", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueCapacity.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueCapacityF, entry);
        }
        mfs.add(brokerRuntimeSendThreadPoolQueueCapacityF);

        GaugeMetricFamily brokerRuntimePullThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_size", "brokerRuntimePullThreadPoolQueueSizeF", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueSize.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueSizeF, entry);
        }
        mfs.add(brokerRuntimePullThreadPoolQueueSizeF);

        GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueSizeF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_size", "brokerRuntimeQueryThreadPoolQueueSize", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueSize.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueSizeF, entry);
        }
        mfs.add(brokerRuntimeQueryThreadPoolQueueSizeF);

        GaugeMetricFamily brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills", "brokerRuntimePullThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimePullThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF, entry);
        }
        mfs.add(brokerRuntimePullThreadPoolQueueHeadWaitTimeMillsF);

        GaugeMetricFamily brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills", "brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF, entry);
        }
        mfs.add(brokerRuntimeQueryThreadPoolQueueHeadWaitTimeMillsF);

        GaugeMetricFamily brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF = new GaugeMetricFamily("rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills", "brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeSendThreadPoolQueueHeadWaitTimeMills.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF, entry);
        }
        mfs.add(brokerRuntimeSendThreadPoolQueueHeadWaitTimeMillsF);

        GaugeMetricFamily brokerRuntimeMsgGetTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_yesterdaymorning", "brokerRuntimeMsgGetTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalYesterdayMorning.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalYesterdayMorningF, entry);
        }
        mfs.add(brokerRuntimeMsgGetTotalYesterdayMorningF);

        GaugeMetricFamily brokerRuntimeMsgPutTotalYesterdayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_yesterdaymorning", "brokerRuntimeMsgPutTotalYesterdayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalYesterdayMorning.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalYesterdayMorningF, entry);
        }
        mfs.add(brokerRuntimeMsgPutTotalYesterdayMorningF);

        GaugeMetricFamily brokerRuntimeMsgGetTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_gettotal_todaymorning", "brokerRuntimeMsgGetTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgGetTotalTodayMorning.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgGetTotalTodayMorningF, entry);
        }
        mfs.add(brokerRuntimeMsgGetTotalTodayMorningF);

        GaugeMetricFamily brokerRuntimeMsgPutTotalTodayMorningF = new GaugeMetricFamily("rocketmq_brokeruntime_msg_puttotal_todaymorning", "brokerRuntimeMsgPutTotalTodayMorning", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeMsgPutTotalTodayMorning.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeMsgPutTotalTodayMorningF, entry);
        }
        mfs.add(brokerRuntimeMsgPutTotalTodayMorningF);

        GaugeMetricFamily brokerRuntimeCommitLogDirCapacityFreeF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_free", "brokerRuntimeCommitLogDirCapacityFree", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityFree.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityFreeF, entry);
        }
        mfs.add(brokerRuntimeCommitLogDirCapacityFreeF);

        GaugeMetricFamily brokerRuntimeCommitLogDirCapacityTotalF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlogdir_capacity_total", "brokerRuntimeCommitLogDirCapacityTotal", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeCommitLogDirCapacityTotal.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogDirCapacityTotalF, entry);
        }
        mfs.add(brokerRuntimeCommitLogDirCapacityTotalF);

        GaugeMetricFamily brokerRuntimeCommitLogMaxOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_maxoffset", "brokerRuntimeCommitLogMaxOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMaxOffset.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMaxOffsetF, entry);
        }
        mfs.add(brokerRuntimeCommitLogMaxOffsetF);

        GaugeMetricFamily brokerRuntimeCommitLogMinOffsetF = new GaugeMetricFamily("rocketmq_brokeruntime_commitlog_minoffset", "brokerRuntimeCommitLogMinOffset", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Long> entry : brokerRuntimeCommitLogMinOffset.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeCommitLogMinOffsetF, entry);
        }
        mfs.add(brokerRuntimeCommitLogMinOffsetF);

        GaugeMetricFamily brokerRuntimeRemainHowManyDataToFlushF = new GaugeMetricFamily("rocketmq_brokeruntime_remain_howmanydata_toflush", "brokerRuntimeRemainHowManyDataToFlush", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimeRemainHowManyDataToFlush.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimeRemainHowManyDataToFlushF, entry);
        }
        mfs.add(brokerRuntimeRemainHowManyDataToFlushF);

        GaugeMetricFamily brokerRuntimePutLatency99F = new GaugeMetricFamily("rocketmq_brokeruntime_put_latency_99", "brokerRuntimePutLatency99", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutLatency99.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutLatency99F, entry);
        }
        mfs.add(brokerRuntimePutLatency99F);

        GaugeMetricFamily brokerRuntimePutLatency999F = new GaugeMetricFamily("rocketmq_brokeruntime_put_latency_999", "brokerRuntimePutLatency999", BROKER_RUNTIME_METRIC_LABEL_NAMES);
        for (Map.Entry<BrokerRuntimeMetric, Double> entry : brokerRuntimePutLatency999.asMap().entrySet()) {
            loadBrokerRuntimeStatsMetric(brokerRuntimePutLatency999F, entry);
        }
        mfs.add(brokerRuntimePutLatency999F);
    }
}