ClientMetricTaskRunnable.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.rocketmq.exporter.task;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.exporter.service.RMQMetricsService;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.slf4j.Logger;

public class ClientMetricTaskRunnable implements Runnable {
    private String consumerGroup;
    private ConsumerConnection connection;
    private boolean enableCollectJStack;
    private MQAdminExt mqAdmin;
    private Logger logger;
    private RMQMetricsService metricsService;

    public ClientMetricTaskRunnable(String consumerGroup, ConsumerConnection connection,
                                    boolean enableCollectJStack, MQAdminExt mqAdmin, Logger logger,
                                    RMQMetricsService metricsService) {
        this.consumerGroup = consumerGroup;
        this.connection = connection;
        this.enableCollectJStack = enableCollectJStack;
        this.mqAdmin = mqAdmin;
        this.logger = logger;
        this.metricsService = metricsService;
    }

    @Override

    public void run() {
        if (this.connection == null || this.connection.getConnectionSet() == null ||
                this.connection.getConnectionSet().isEmpty()) {
            return;
        }
        logger.debug(String.format("ClientMetricTask-group=%s,enable jstack=%s",
                consumerGroup,
                this.enableCollectJStack

        ));
        long start = System.currentTimeMillis();
        ConsumerRunningInfo runningInfo = null;
        for (Connection conn : this.connection.getConnectionSet()) {
            try {
                runningInfo = mqAdmin.getConsumerRunningInfo(this.consumerGroup, conn.getClientId(), this.enableCollectJStack);
            } catch (InterruptedException | RemotingException e) {
                logger.warn(String.format("ClientMetricTask-exception.ignore. group=%s,client id=%s, client addr=%s, language=%s,version=%d",
                        consumerGroup,
                        conn.getClientId(),
                        conn.getClientAddr(),
                        conn.getLanguage(),
                        conn.getVersion()
                        ),
                        e);
                runningInfo = null;
            } catch (MQClientException e) {
                logger.warn(String.format("ClientMetricTask-exception.ignore. group=%s,client id=%s, client addr=%s, language=%s,version=%d, error code=%d, error msg=%s",
                        consumerGroup,
                        conn.getClientId(),
                        conn.getClientAddr(),
                        conn.getLanguage(),
                        conn.getVersion(),
                        e.getResponseCode(),
                        e.getErrorMessage())
                );
                runningInfo = null;
            }
            if (runningInfo == null) {
                continue;
            }
            if (!StringUtils.isBlank(runningInfo.getJstack())) {
                logger.error(String.format("group=%s, jstack=%s", consumerGroup, runningInfo.getJstack()));
            }
            if (runningInfo.getStatusTable() != null && !runningInfo.getStatusTable().isEmpty()) {
                for (String topic : runningInfo.getStatusTable().keySet()) {
                    metricsService.getCollector().addConsumerClientFailedMsgCountsMetric(
                            this.consumerGroup,
                            topic,
                            conn.getClientAddr(),
                            conn.getClientId(),
                            runningInfo.getStatusTable().get(topic).getConsumeFailedMsgs());
                    metricsService.getCollector().addConsumerClientFailedTPSMetric(
                            this.consumerGroup,
                            topic,
                            conn.getClientAddr(),
                            conn.getClientId(),
                            runningInfo.getStatusTable().get(topic).getConsumeFailedTPS());
                    metricsService.getCollector().addConsumerClientOKTPSMetric(
                            this.consumerGroup,
                            topic,
                            conn.getClientAddr(),
                            conn.getClientId(),
                            runningInfo.getStatusTable().get(topic).getConsumeOKTPS());
                    metricsService.getCollector().addConsumeRTMetricMetric(
                            this.consumerGroup,
                            topic,
                            conn.getClientAddr(),
                            conn.getClientId(),
                            runningInfo.getStatusTable().get(topic).getConsumeRT());
                    metricsService.getCollector().addPullRTMetric(
                            this.consumerGroup,
                            topic,
                            conn.getClientAddr(),
                            conn.getClientId(),
                            runningInfo.getStatusTable().get(topic).getPullRT());
                    metricsService.getCollector().addPullTPSMetric(
                            this.consumerGroup,
                            topic,
                            conn.getClientAddr(),
                            conn.getClientId(),
                            runningInfo.getStatusTable().get(topic).getPullTPS());


                }

            }
        }
        long cost = System.currentTimeMillis() - start;
        logger.debug(String.format("one-ClientMetricTask-group=%s, cost=%d, online-instance count=%d", this.consumerGroup, cost, this.connection.getConnectionSet().size()));
    }
}