Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
long startTime = System.currentTimeMillis();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);

LOGGER.info("Broker receive request to update or create topic={}, caller address={}",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
Expand Down Expand Up @@ -709,7 +709,7 @@ private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerCo
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
LOGGER.info("Broker receive request to update or create static topic={}, caller address={}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

final TopicQueueMappingDetail topicQueueMappingDetail = RemotingSerializable.decode(request.getBody(), TopicQueueMappingDetail.class);
Expand Down Expand Up @@ -757,7 +757,7 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);

LOGGER.info("AdminBrokerProcessor#deleteTopic: broker receive request to delete topic={}, caller={}",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
Expand Down Expand Up @@ -1168,7 +1168,7 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);

TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);

Expand Down Expand Up @@ -1314,7 +1314,7 @@ private CompletableFuture<RpcResponse> handleGetMinOffset(RpcRequest request) {
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
try {
CompletableFuture<RpcResponse> responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null));
RpcResponse rpcResponse = responseFuture.get();
Expand Down Expand Up @@ -1363,7 +1363,7 @@ private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx,
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);

TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
Expand Down Expand Up @@ -1667,7 +1667,7 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteSubscriptionGroupRequestHeader requestHeader =
(DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);

LOGGER.info("AdminBrokerProcessor#deleteSubscriptionGroup, caller={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
Expand Down Expand Up @@ -1750,7 +1750,7 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerConnectionListRequestHeader requestHeader =
(GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);

ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
Expand Down Expand Up @@ -1790,7 +1790,7 @@ private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetAllProducerInfoRequestHeader requestHeader =
(GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);

ProducerTableInfo producerTable = this.brokerController.getProducerManager().getProducerTable();
if (producerTable != null) {
Expand All @@ -1809,7 +1809,7 @@ private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetProducerConnectionListRequestHeader requestHeader =
(GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);

ProducerConnection bodydata = new ProducerConnection();
Map<Channel, ClientChannelInfo> channelInfoHashMap =
Expand Down Expand Up @@ -2037,7 +2037,7 @@ private RemotingCommand getAllMessageRequestMode(ChannelHandlerContext ctx, Remo
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
LOGGER.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce());
Expand Down Expand Up @@ -2170,7 +2170,7 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetConsumerStatusRequestHeader requestHeader =
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);

LOGGER.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
Expand All @@ -2183,7 +2183,7 @@ private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryTopicConsumeByWhoRequestHeader requestHeader =
(QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);

HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());

Expand All @@ -2206,7 +2206,7 @@ private RemotingCommand queryTopicsByConsumer(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryTopicsByConsumerRequestHeader requestHeader =
(QueryTopicsByConsumerRequestHeader) request.decodeCommandCustomHeader(QueryTopicsByConsumerRequestHeader.class);
request.decodeCommandCustomHeader(QueryTopicsByConsumerRequestHeader.class);

Set<String> topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getGroup());

Expand All @@ -2225,7 +2225,7 @@ private RemotingCommand querySubscriptionByConsumer(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QuerySubscriptionByConsumerRequestHeader requestHeader =
(QuerySubscriptionByConsumerRequestHeader) request.decodeCommandCustomHeader(QuerySubscriptionByConsumerRequestHeader.class);
request.decodeCommandCustomHeader(QuerySubscriptionByConsumerRequestHeader.class);

SubscriptionData subscriptionData = this.brokerController.getConsumerManager()
.findSubscriptionData(requestHeader.getGroup(), requestHeader.getTopic());
Expand Down Expand Up @@ -2358,7 +2358,7 @@ public RemotingCommand cleanUnusedTopic() {
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);

return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(),
requestHeader.getClientId());
Expand All @@ -2368,7 +2368,7 @@ private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryCorrectionOffsetHeader requestHeader =
(QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
.queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
Expand Down Expand Up @@ -2436,7 +2436,7 @@ private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);

Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
Expand Down Expand Up @@ -2478,7 +2478,7 @@ private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx,
private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
MessageStore messageStore = this.brokerController.getMessageStore();

Expand Down Expand Up @@ -2757,7 +2757,7 @@ private RemotingCommand callConsumer(
private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
QueryConsumeQueueRequestHeader requestHeader =
(QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);
request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);

RemotingCommand response = RemotingCommand.createResponseCommand(null);

Expand Down Expand Up @@ -2898,7 +2898,7 @@ private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {

private RemotingCommand getTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
GetTopicConfigRequestHeader requestHeader = request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
Expand Down Expand Up @@ -2931,7 +2931,7 @@ private RemotingCommand getTopicConfig(ChannelHandlerContext ctx,

private RemotingCommand notifyMinBrokerIdChange(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
NotifyMinBrokerIdChangeRequestHeader requestHeader = (NotifyMinBrokerIdChangeRequestHeader) request.decodeCommandCustomHeader(NotifyMinBrokerIdChangeRequestHeader.class);
NotifyMinBrokerIdChangeRequestHeader requestHeader = request.decodeCommandCustomHeader(NotifyMinBrokerIdChangeRequestHeader.class);

RemotingCommand response = RemotingCommand.createResponseCommand(null);

Expand All @@ -2951,7 +2951,7 @@ private RemotingCommand updateBrokerHaInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(ExchangeHAInfoResponseHeader.class);

ExchangeHAInfoRequestHeader requestHeader = (ExchangeHAInfoRequestHeader) request.decodeCommandCustomHeader(ExchangeHAInfoRequestHeader.class);
ExchangeHAInfoRequestHeader requestHeader = request.decodeCommandCustomHeader(ExchangeHAInfoRequestHeader.class);
if (requestHeader.getMasterHaAddress() != null) {
this.brokerController.getMessageStore().updateHaMasterAddress(requestHeader.getMasterHaAddress());
this.brokerController.getMessageStore().updateMasterAddress(requestHeader.getMasterAddress());
Expand Down Expand Up @@ -3017,7 +3017,7 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,

if (this.brokerController.getBrokerConfig().getBrokerId() != MixAll.MASTER_ID) {

ResetMasterFlushOffsetHeader requestHeader = (ResetMasterFlushOffsetHeader) request.decodeCommandCustomHeader(ResetMasterFlushOffsetHeader.class);
ResetMasterFlushOffsetHeader requestHeader = request.decodeCommandCustomHeader(ResetMasterFlushOffsetHeader.class);

if (requestHeader.getMasterFlushOffset() != null) {
this.brokerController.getMessageStore().setMasterFlushedOffset(requestHeader.getMasterFlushOffset());
Expand All @@ -3031,7 +3031,7 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,

private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
NotifyBrokerRoleChangedRequestHeader requestHeader = (NotifyBrokerRoleChangedRequestHeader) request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
NotifyBrokerRoleChangedRequestHeader requestHeader = request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
SyncStateSet syncStateSetInfo = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);

RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Loading
Loading