BrokerRuntimeStats.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.exporter.model;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.exporter.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BrokerRuntimeStats {
private long msgPutTotalTodayNow;
private long msgGetTotalTodayNow;
private long msgPutTotalTodayMorning;
private long msgGetTotalTodayMorning;
private long msgPutTotalYesterdayMorning;
private long msgGetTotalYesterdayMorning;
private List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables = new ArrayList<>();
private long sendThreadPoolQueueHeadWaitTimeMills;
private long queryThreadPoolQueueHeadWaitTimeMills;
private long pullThreadPoolQueueHeadWaitTimeMills;
private long queryThreadPoolQueueSize;
private long pullThreadPoolQueueSize;
private long sendThreadPoolQueueCapacity;
private long pullThreadPoolQueueCapacity;
private Map<String, Integer> putMessageDistributeTimeMap = new HashMap<>();
private double remainHowManyDataToFlush;
private long commitLogMinOffset;
private long commitLogMaxOffset;
private String runtime;
private long bootTimestamp;
private double commitLogDirCapacityTotal;
private double commitLogDirCapacityFree;
private int brokerVersion;
private long dispatchMaxBuffer;
private PutTps putTps = new PutTps();
private GetMissTps getMissTps = new GetMissTps();
private GetTransferedTps getTransferedTps = new GetTransferedTps();
private GetTotalTps getTotalTps = new GetTotalTps();
private GetFoundTps getFoundTps = new GetFoundTps();
private double consumeQueueDiskRatio;
private double commitLogDiskRatio;
private long pageCacheLockTimeMills;
private long getMessageEntireTimeMax;
private long putMessageTimesTotal;
private String brokerVersionDesc;
private long sendThreadPoolQueueSize;
private long startAcceptSendRequestTimeStamp;
private long putMessageEntireTimeMax;
private long earliestMessageTimeStamp;
private long remainTransientStoreBufferNumbs;
private long queryThreadPoolQueueCapacity;
private double putMessageAverageSize;
private long putMessageSizeTotal;
private long dispatchBehindBytes;
private double putLatency99;
private double putLatency999;
private final static Logger log = LoggerFactory.getLogger(BrokerRuntimeStats.class);
public BrokerRuntimeStats(KVTable kvTable) {
this.msgPutTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayNow"));
loadScheduleMessageOffsets(kvTable);
loadPutMessageDistributeTime(kvTable.getTable().get("putMessageDistributeTime"));
loadTps(this.putTps, kvTable.getTable().get("putTps"));
loadTps(this.getMissTps, kvTable.getTable().get("getMissTps"));
if (kvTable.getTable().containsKey("getTransferredTps")) {
loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferredTps"));
} else {
loadTps(this.getTransferedTps, kvTable.getTable().get("getTransferedTps"));
}
loadTps(this.getTotalTps, kvTable.getTable().get("getTotalTps"));
loadTps(this.getFoundTps, kvTable.getTable().get("getFoundTps"));
loadCommitLogDirCapacity(kvTable.getTable().get("commitLogDirCapacity"));
this.sendThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills"));
this.queryThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueHeadWaitTimeMills"));
this.remainHowManyDataToFlush = Double.parseDouble(kvTable.getTable().get("remainHowManyDataToFlush").split(" ")[0]);//byte
this.msgGetTotalTodayNow = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayNow"));
this.queryThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueSize"));
this.bootTimestamp = Long.parseLong(kvTable.getTable().get("bootTimestamp"));
this.msgPutTotalYesterdayMorning = Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning"));
this.msgGetTotalYesterdayMorning = Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning"));
this.pullThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueSize"));
this.commitLogMinOffset = Long.parseLong(kvTable.getTable().get("commitLogMinOffset"));
this.pullThreadPoolQueueHeadWaitTimeMills = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills"));
this.runtime = kvTable.getTable().get("runtime");
this.dispatchMaxBuffer = Long.parseLong(kvTable.getTable().get("dispatchMaxBuffer"));
this.brokerVersion = Integer.parseInt(kvTable.getTable().get("brokerVersion"));
this.consumeQueueDiskRatio = Double.parseDouble(kvTable.getTable().get("consumeQueueDiskRatio"));
this.pageCacheLockTimeMills = Long.parseLong(kvTable.getTable().get("pageCacheLockTimeMills"));
this.commitLogDiskRatio = Double.parseDouble(kvTable.getTable().get("commitLogDiskRatio"));
this.commitLogMaxOffset = Long.parseLong(kvTable.getTable().get("commitLogMaxOffset"));
this.getMessageEntireTimeMax = Long.parseLong(kvTable.getTable().get("getMessageEntireTimeMax"));
this.msgPutTotalTodayMorning = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning"));
this.putMessageTimesTotal = Long.parseLong(kvTable.getTable().get("putMessageTimesTotal"));
this.msgGetTotalTodayMorning = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning"));
this.brokerVersionDesc = kvTable.getTable().get("brokerVersionDesc");
this.sendThreadPoolQueueSize = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueSize"));
this.startAcceptSendRequestTimeStamp = Long.parseLong(kvTable.getTable().get("startAcceptSendRequestTimeStamp"));
this.putMessageEntireTimeMax = Long.parseLong(kvTable.getTable().get("putMessageEntireTimeMax"));
this.earliestMessageTimeStamp = Long.parseLong(kvTable.getTable().get("earliestMessageTimeStamp"));
this.remainTransientStoreBufferNumbs = Long.parseLong(kvTable.getTable().get("remainTransientStoreBufferNumbs"));
this.queryThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("queryThreadPoolQueueCapacity"));
this.putMessageAverageSize = Double.parseDouble(kvTable.getTable().get("putMessageAverageSize"));
this.dispatchBehindBytes = Long.parseLong(kvTable.getTable().get("dispatchBehindBytes"));
this.putMessageSizeTotal = Long.parseLong(kvTable.getTable().get("putMessageSizeTotal"));
this.sendThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("sendThreadPoolQueueCapacity"));
this.pullThreadPoolQueueCapacity = Long.parseLong(kvTable.getTable().get("pullThreadPoolQueueCapacity"));
this.putLatency99 = Double.parseDouble(kvTable.getTable().getOrDefault("putLatency99", "-1"));
this.putLatency999 = Double.parseDouble(kvTable.getTable().getOrDefault("putLatency999", "-1"));
}
private void loadCommitLogDirCapacity(String commitLogDirCapacity) {
String[] arr = commitLogDirCapacity.split(" ");
String total = String.format("%s %s", arr[2], arr[3].substring(0, arr[3].length() - 1));
String free = String.format("%s %s", arr[6], arr[7].substring(0, arr[7].length() - 1));
this.commitLogDirCapacityTotal = Utils.machineReadableByteCount(total);
this.commitLogDirCapacityFree = Utils.machineReadableByteCount(free);
}
private void loadTps(PutTps putTps, String value) {
String[] arr = value.split(" ");
if (arr.length >= 1) {
putTps.ten = Double.parseDouble(arr[0]);
}
if (arr.length >= 2) {
putTps.sixty = Double.parseDouble(arr[1]);
}
if (arr.length >= 3) {
putTps.sixHundred = Double.parseDouble(arr[2]);
}
}
private void loadPutMessageDistributeTime(String str) {
if ("null".equalsIgnoreCase(str)) {
log.warn("loadPutMessageDistributeTime WARN, value is null");
return;
}
String[] arr = str.split(" ");
String key = "", value = "";
for (String ar : arr) {
String[] tarr = ar.split(":");
if (tarr.length < 2) {
log.warn("loadPutMessageDistributeTime WARN, wrong value is {}, {}", ar, str);
continue;
}
key = tarr[0].replace("[", "").replace("]", "");
value = tarr[1];
this.putMessageDistributeTimeMap.put(key, Integer.parseInt(value));
}
}
public void loadScheduleMessageOffsets(KVTable kvTable) {
for (String key : kvTable.getTable().keySet()) {
if (key.startsWith("scheduleMessageOffset")) {
String[] arr = kvTable.getTable().get(key).split(",");
ScheduleMessageOffsetTable table = new ScheduleMessageOffsetTable(
Long.parseLong(arr[0]),
Long.parseLong(arr[1])
);
this.scheduleMessageOffsetTables.add(table);
}
}
}
public static class ScheduleMessageOffsetTable {
private long delayOffset;
private long maxOffset;
public ScheduleMessageOffsetTable(long first, long second) {
this.delayOffset = first;
this.maxOffset = second;
}
public long getDelayOffset() {
return delayOffset;
}
public void setDelayOffset(long delayOffset) {
this.delayOffset = delayOffset;
}
public long getMaxOffset() {
return maxOffset;
}
public void setMaxOffset(long maxOffset) {
this.maxOffset = maxOffset;
}
}
public class PutTps {
private double ten;
private double sixty;
private double sixHundred;
public double getTen() {
return ten;
}
public void setTen(double ten) {
this.ten = ten;
}
public double getSixty() {
return sixty;
}
public void setSixty(double sixty) {
this.sixty = sixty;
}
public double getSixHundred() {
return sixHundred;
}
public void setSixHundred(double sixHundred) {
this.sixHundred = sixHundred;
}
}
public class GetMissTps extends PutTps {
}
public class GetTransferedTps extends PutTps {
}
public class GetTotalTps extends PutTps {
}
public class GetFoundTps extends PutTps {
}
public long getMsgPutTotalTodayNow() {
return msgPutTotalTodayNow;
}
public void setMsgPutTotalTodayNow(long msgPutTotalTodayNow) {
this.msgPutTotalTodayNow = msgPutTotalTodayNow;
}
public long getMsgGetTotalTodayNow() {
return msgGetTotalTodayNow;
}
public void setMsgGetTotalTodayNow(long msgGetTotalTodayNow) {
this.msgGetTotalTodayNow = msgGetTotalTodayNow;
}
public long getMsgPutTotalTodayMorning() {
return msgPutTotalTodayMorning;
}
public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
}
public long getMsgGetTotalTodayMorning() {
return msgGetTotalTodayMorning;
}
public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
}
public long getMsgPutTotalYesterdayMorning() {
return msgPutTotalYesterdayMorning;
}
public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
}
public long getMsgGetTotalYesterdayMorning() {
return msgGetTotalYesterdayMorning;
}
public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
}
public List<ScheduleMessageOffsetTable> getScheduleMessageOffsetTables() {
return scheduleMessageOffsetTables;
}
public void setScheduleMessageOffsetTables(List<ScheduleMessageOffsetTable> scheduleMessageOffsetTables) {
this.scheduleMessageOffsetTables = scheduleMessageOffsetTables;
}
public long getSendThreadPoolQueueHeadWaitTimeMills() {
return sendThreadPoolQueueHeadWaitTimeMills;
}
public void setSendThreadPoolQueueHeadWaitTimeMills(long sendThreadPoolQueueHeadWaitTimeMills) {
this.sendThreadPoolQueueHeadWaitTimeMills = sendThreadPoolQueueHeadWaitTimeMills;
}
public long getQueryThreadPoolQueueHeadWaitTimeMills() {
return queryThreadPoolQueueHeadWaitTimeMills;
}
public void setQueryThreadPoolQueueHeadWaitTimeMills(long queryThreadPoolQueueHeadWaitTimeMills) {
this.queryThreadPoolQueueHeadWaitTimeMills = queryThreadPoolQueueHeadWaitTimeMills;
}
public long getPullThreadPoolQueueHeadWaitTimeMills() {
return pullThreadPoolQueueHeadWaitTimeMills;
}
public void setPullThreadPoolQueueHeadWaitTimeMills(long pullThreadPoolQueueHeadWaitTimeMills) {
this.pullThreadPoolQueueHeadWaitTimeMills = pullThreadPoolQueueHeadWaitTimeMills;
}
public long getQueryThreadPoolQueueSize() {
return queryThreadPoolQueueSize;
}
public void setQueryThreadPoolQueueSize(long queryThreadPoolQueueSize) {
this.queryThreadPoolQueueSize = queryThreadPoolQueueSize;
}
public long getPullThreadPoolQueueSize() {
return pullThreadPoolQueueSize;
}
public void setPullThreadPoolQueueSize(long pullThreadPoolQueueSize) {
this.pullThreadPoolQueueSize = pullThreadPoolQueueSize;
}
public long getSendThreadPoolQueueCapacity() {
return sendThreadPoolQueueCapacity;
}
public void setSendThreadPoolQueueCapacity(long sendThreadPoolQueueCapacity) {
this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
}
public long getPullThreadPoolQueueCapacity() {
return pullThreadPoolQueueCapacity;
}
public void setPullThreadPoolQueueCapacity(long pullThreadPoolQueueCapacity) {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
public Map<String, Integer> getPutMessageDistributeTimeMap() {
return putMessageDistributeTimeMap;
}
public void setPutMessageDistributeTimeMap(Map<String, Integer> putMessageDistributeTimeMap) {
this.putMessageDistributeTimeMap = putMessageDistributeTimeMap;
}
public double getRemainHowManyDataToFlush() {
return remainHowManyDataToFlush;
}
public void setRemainHowManyDataToFlush(double remainHowManyDataToFlush) {
this.remainHowManyDataToFlush = remainHowManyDataToFlush;
}
public long getCommitLogMinOffset() {
return commitLogMinOffset;
}
public void setCommitLogMinOffset(long commitLogMinOffset) {
this.commitLogMinOffset = commitLogMinOffset;
}
public long getCommitLogMaxOffset() {
return commitLogMaxOffset;
}
public void setCommitLogMaxOffset(long commitLogMaxOffset) {
this.commitLogMaxOffset = commitLogMaxOffset;
}
public String getRuntime() {
return runtime;
}
public void setRuntime(String runtime) {
this.runtime = runtime;
}
public long getBootTimestamp() {
return bootTimestamp;
}
public void setBootTimestamp(long bootTimestamp) {
this.bootTimestamp = bootTimestamp;
}
public double getCommitLogDirCapacityTotal() {
return commitLogDirCapacityTotal;
}
public void setCommitLogDirCapacityTotal(double commitLogDirCapacityTotal) {
this.commitLogDirCapacityTotal = commitLogDirCapacityTotal;
}
public double getCommitLogDirCapacityFree() {
return commitLogDirCapacityFree;
}
public void setCommitLogDirCapacityFree(double commitLogDirCapacityFree) {
this.commitLogDirCapacityFree = commitLogDirCapacityFree;
}
public int getBrokerVersion() {
return brokerVersion;
}
public void setBrokerVersion(int brokerVersion) {
this.brokerVersion = brokerVersion;
}
public long getDispatchMaxBuffer() {
return dispatchMaxBuffer;
}
public void setDispatchMaxBuffer(long dispatchMaxBuffer) {
this.dispatchMaxBuffer = dispatchMaxBuffer;
}
public PutTps getPutTps() {
return putTps;
}
public void setPutTps(PutTps putTps) {
this.putTps = putTps;
}
public GetMissTps getGetMissTps() {
return getMissTps;
}
public void setGetMissTps(GetMissTps getMissTps) {
this.getMissTps = getMissTps;
}
public GetTransferedTps getGetTransferedTps() {
return getTransferedTps;
}
public void setGetTransferedTps(GetTransferedTps getTransferedTps) {
this.getTransferedTps = getTransferedTps;
}
public GetTotalTps getGetTotalTps() {
return getTotalTps;
}
public void setGetTotalTps(GetTotalTps getTotalTps) {
this.getTotalTps = getTotalTps;
}
public GetFoundTps getGetFoundTps() {
return getFoundTps;
}
public void setGetFoundTps(GetFoundTps getFoundTps) {
this.getFoundTps = getFoundTps;
}
public double getConsumeQueueDiskRatio() {
return consumeQueueDiskRatio;
}
public void setConsumeQueueDiskRatio(double consumeQueueDiskRatio) {
this.consumeQueueDiskRatio = consumeQueueDiskRatio;
}
public double getCommitLogDiskRatio() {
return commitLogDiskRatio;
}
public void setCommitLogDiskRatio(double commitLogDiskRatio) {
this.commitLogDiskRatio = commitLogDiskRatio;
}
public long getPageCacheLockTimeMills() {
return pageCacheLockTimeMills;
}
public void setPageCacheLockTimeMills(long pageCacheLockTimeMills) {
this.pageCacheLockTimeMills = pageCacheLockTimeMills;
}
public long getGetMessageEntireTimeMax() {
return getMessageEntireTimeMax;
}
public void setGetMessageEntireTimeMax(long getMessageEntireTimeMax) {
this.getMessageEntireTimeMax = getMessageEntireTimeMax;
}
public long getPutMessageTimesTotal() {
return putMessageTimesTotal;
}
public void setPutMessageTimesTotal(long putMessageTimesTotal) {
this.putMessageTimesTotal = putMessageTimesTotal;
}
public String getBrokerVersionDesc() {
return brokerVersionDesc;
}
public void setBrokerVersionDesc(String brokerVersionDesc) {
this.brokerVersionDesc = brokerVersionDesc;
}
public long getSendThreadPoolQueueSize() {
return sendThreadPoolQueueSize;
}
public void setSendThreadPoolQueueSize(long sendThreadPoolQueueSize) {
this.sendThreadPoolQueueSize = sendThreadPoolQueueSize;
}
public long getStartAcceptSendRequestTimeStamp() {
return startAcceptSendRequestTimeStamp;
}
public void setStartAcceptSendRequestTimeStamp(long startAcceptSendRequestTimeStamp) {
this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
}
public long getPutMessageEntireTimeMax() {
return putMessageEntireTimeMax;
}
public void setPutMessageEntireTimeMax(long putMessageEntireTimeMax) {
this.putMessageEntireTimeMax = putMessageEntireTimeMax;
}
public long getEarliestMessageTimeStamp() {
return earliestMessageTimeStamp;
}
public void setEarliestMessageTimeStamp(long earliestMessageTimeStamp) {
this.earliestMessageTimeStamp = earliestMessageTimeStamp;
}
public long getRemainTransientStoreBufferNumbs() {
return remainTransientStoreBufferNumbs;
}
public void setRemainTransientStoreBufferNumbs(long remainTransientStoreBufferNumbs) {
this.remainTransientStoreBufferNumbs = remainTransientStoreBufferNumbs;
}
public long getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
public void setQueryThreadPoolQueueCapacity(long queryThreadPoolQueueCapacity) {
this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
}
public double getPutMessageAverageSize() {
return putMessageAverageSize;
}
public void setPutMessageAverageSize(double putMessageAverageSize) {
this.putMessageAverageSize = putMessageAverageSize;
}
public long getPutMessageSizeTotal() {
return putMessageSizeTotal;
}
public void setPutMessageSizeTotal(long putMessageSizeTotal) {
this.putMessageSizeTotal = putMessageSizeTotal;
}
public long getDispatchBehindBytes() {
return dispatchBehindBytes;
}
public void setDispatchBehindBytes(long dispatchBehindBytes) {
this.dispatchBehindBytes = dispatchBehindBytes;
}
public double getPutLatency99() {
return putLatency99;
}
public void setPutLatency99(double putLatency99) {
this.putLatency99 = putLatency99;
}
public double getPutLatency999() {
return putLatency999;
}
public void setPutLatency999(double putLatency999) {
this.putLatency999 = putLatency999;
}
}