MQAdminInstance.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.exporter.service.client;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.exporter.config.RMQConfigure;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.common.MixAll.TOOLS_CONSUMER_GROUP;
@Service
public class MQAdminInstance {
private final static Logger log = LoggerFactory.getLogger(MQAdminInstance.class);
@Autowired
private RMQConfigure configure;
private RPCHook aclHook;
private MQAdminInstance(RMQConfigure configure) {
this.configure = configure;
aclHook = getAclRPCHook();
}
private RPCHook getAclRPCHook() {
if (configure.enableACL()) {
if (StringUtils.isAllBlank(configure.getAccessKey())) {
throw new RuntimeException("acl config error: accessKey is empty");
}
if (StringUtils.isAllBlank(configure.getSecretKey())) {
throw new RuntimeException("acl config error: secretKey is empty");
}
return new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
return null;
}
@Bean(destroyMethod = "shutdown", name = "defaultMQAdminExt")
private DefaultMQAdminExt buildDefaultMQAdminExt() throws Exception {
String namesrvAddress = configure.getNamesrvAddr();
if (StringUtils.isBlank(namesrvAddress)) {
log.error("Build DefaultMQAdminExt error, namesrv is null");
throw new Exception("Build DefaultMQAdminExt error, namesrv is null", null);
}
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(this.aclHook,5000L);
defaultMQAdminExt.setInstanceName("admin-" + System.currentTimeMillis());
defaultMQAdminExt.setNamesrvAddr(namesrvAddress);
try {
defaultMQAdminExt.start();
} catch (MQClientException ex) {
log.error(String.format("init default admin error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
}
return defaultMQAdminExt;
}
@Bean(destroyMethod = "shutdown")
private DefaultMQPullConsumer buildPullConsumer() throws Exception {
String namesrvAddress = configure.getNamesrvAddr();
if (StringUtils.isBlank(namesrvAddress)) {
log.error("init default pull consumer error, namesrv is null");
throw new Exception("init default pull consumer error, namesrv is null", null);
}
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(TOOLS_CONSUMER_GROUP, this.aclHook);
pullConsumer.setInstanceName("consumer-" + System.currentTimeMillis());
pullConsumer.setNamesrvAddr(namesrvAddress);
try {
pullConsumer.start();
pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
} catch (MQClientException ex) {
log.error(String.format("init default pull consumer error, namesrv=%s", System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY)), ex);
}
return pullConsumer;
}
@Bean(destroyMethod = "shutdown")
private MQClientInstance buildInstance(@Qualifier("defaultMQAdminExt") DefaultMQAdminExt defaultMQAdminExt) {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = Reflect.on(defaultMQAdminExt).get("defaultMQAdminExtImpl");
return Reflect.on(defaultMQAdminExtImpl).get("mqClientInstance");
}
@Bean
private RemotingClient client(MQClientInstance instance) {
MQClientAPIImpl mQClientAPIImpl = Reflect.on(instance).get("mQClientAPIImpl");
return Reflect.on(mQClientAPIImpl).get("remotingClient");
}
}