Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());

this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), requestHeader.getConsumerGroup(), getMessageResult.getMessageCount());

if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());

this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), requestHeader.getConsumerGroup(), getMessageResult.getMessageCount());

if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
return atomicRestNum.get();
}
if (!result.getMessageMapedList().isEmpty()) {
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), result.getMessageCount());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), requestHeader.getConsumerGroup(), result.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), topic,
result.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public PullResult getMessage(String group, String topic, int queueId, long offse
foundList = decodeMsgList(getMessageResult, deCompressBody);
brokerController.getBrokerStatsManager().incGroupGetNums(group, topic, getMessageResult.getMessageCount());
brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize());
brokerController.getBrokerStatsManager().incBrokerGetNums(topic, getMessageResult.getMessageCount());
brokerController.getBrokerStatsManager().incBrokerGetNums(topic, group, getMessageResult.getMessageCount());
brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1).getStoreTimestamp());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private PullResult getMessage(String group, String topic, int queueId, long offs
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic,
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(topic, getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(topic, group, getMessageResult.getMessageCount());
if (foundList == null || foundList.size() == 0) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void record() {
this.msgPutTotalTodayMorning =
this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
this.msgGetTotalTodayMorning =
this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopic();
this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopicAndSystemGroup();

log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
Expand Down Expand Up @@ -88,6 +88,6 @@ public long getMsgPutTotalTodayNow() {
}

public long getMsgGetTotalTodayNow() {
return this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopic();
return this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopicAndSystemGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class BrokerStatsManager {
public static final String DLQ_PUT_NUMS = "DLQ_PUT_NUMS";
public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
public static final String BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC = "BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC";
public static final String BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP = "BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP";
public static final String BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC = "BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC";
public static final String SNDBCK2DLQ_TIMES = "SNDBCK2DLQ_TIMES";

Expand Down Expand Up @@ -194,8 +195,8 @@ public void init() {
this.statsTable.put(Stats.BROKER_GET_NUMS, new StatsItemSet(Stats.BROKER_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_ACK_NUMS, new StatsItemSet(BROKER_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_CK_NUMS, new StatsItemSet(BROKER_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC,
new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP,
new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
new StatsItemSet(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_FROM_DISK_NUMS,
Expand Down Expand Up @@ -539,9 +540,9 @@ public void incBrokerPutNums(final String topic, final int incValue) {
incBrokerPutNumsWithoutSystemTopic(topic, incValue);
}

public void incBrokerGetNums(final String topic, final int incValue) {
public void incBrokerGetNums(final String topic, final String group, final int incValue) {
this.statsTable.get(Stats.BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
this.incBrokerGetNumsWithoutSystemTopic(topic, incValue);
this.incBrokerGetNumsWithoutSystemTopicAndSystemGroup(topic, group, incValue);
}

public void incBrokerAckNums(final int incValue) {
Expand All @@ -552,11 +553,11 @@ public void incBrokerCkNums(final int incValue) {
this.statsTable.get(BROKER_CK_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
}

public void incBrokerGetNumsWithoutSystemTopic(final String topic, final int incValue) {
if (TopicValidator.isSystemTopic(topic)) {
public void incBrokerGetNumsWithoutSystemTopicAndSystemGroup(final String topic, final String group, final int incValue) {
if (TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroupPullMessage(group)) {
return;
}
this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
}

public void incBrokerPutNumsWithoutSystemTopic(final String topic, final int incValue) {
Expand All @@ -566,8 +567,8 @@ public void incBrokerPutNumsWithoutSystemTopic(final String topic, final int inc
this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
}

public long getBrokerGetNumsWithoutSystemTopic() {
final StatsItemSet statsItemSet = this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC);
public long getBrokerGetNumsWithoutSystemTopicAndSystemGroup() {
final StatsItemSet statsItemSet = this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP);
if (statsItemSet == null) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,16 @@ public void testOnGroupDeleted() {
}

@Test
public void testIncBrokerGetNumsWithoutSystemTopic() {
brokerStatsManager.incBrokerGetNumsWithoutSystemTopic(TOPIC, 1);
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, CLUSTER_NAME)
public void testIncBrokerGetNumsWithoutSystemTopicAndSystemGroup() {
brokerStatsManager.incBrokerGetNumsWithoutSystemTopicAndSystemGroup(TOPIC, GROUP_NAME, 1);
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP, CLUSTER_NAME)
.getValue().doubleValue()).isEqualTo(1L);
assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopic()).isEqualTo(1L);
assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopicAndSystemGroup()).isEqualTo(1L);

brokerStatsManager.incBrokerGetNumsWithoutSystemTopic(TopicValidator.RMQ_SYS_TRACE_TOPIC, 1);
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, CLUSTER_NAME)
brokerStatsManager.incBrokerGetNumsWithoutSystemTopicAndSystemGroup(TopicValidator.RMQ_SYS_TRACE_TOPIC, GROUP_NAME, 1);
assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC_AND_SYSTEM_GROUP, CLUSTER_NAME)
.getValue().doubleValue()).isEqualTo(1L);
assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopic()).isEqualTo(1L);
assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopicAndSystemGroup()).isEqualTo(1L);
}

@Test
Expand Down
Loading