欢迎光临
我们一直在努力

记录一次在阿里云ECS服务器部署验证RocketMQ的经历

==背景==

购买了3台阿里云ECS服务器,上面部署了rocketmq,用来作为业务后台与平台之间的数据通讯中间件。

部署倒是异常顺利,不过在本地写程序,测试生产和消费数据的时候,出现了一些问题。

耗费了将近1天的时间,终于解决了,记录一下本次排查的经历。

 

==环境==

Linux:CentOS8(阿里云ECS服务器)

RocketMQ:4.6.1

 

==集群==

节点数:3个

节点1:broker-a(master)

节点2:broker-a(slave),broker-b(master)

节点3:broker-b(slave)

 

配置文件如下(IP地址省略了):

broker-a.properties

brokerClusterName=rexel brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10921 brokerIP1=xx.xx.xx.01 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-m storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-m storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-m storePathIndex=/home/radmin/data/rocketmq/index-a-m storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-m

 

broker-a-s.properties

brokerClusterName=rexel brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10931 brokerIP1=xx.xx.xx.02 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-s storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-s storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-s storePathIndex=/home/radmin/data/rocketmq/index-a-s storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-s

 

broker-b.properties

brokerClusterName=rexel brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10921 brokerIP1=xx.xx.xx.02 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-m storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-m storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-m storePathIndex=/home/radmin/data/rocketmq/index-b-m storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-m

 

broker-b-s.properties

brokerClusterName=rexel brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10931 brokerIP1=xx.xx.xx.03 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-s storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-s storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-s storePathIndex=/home/radmin/data/rocketmq/index-b-s storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-s

 

==最终代码== 

RocketUtils.java

package com.rexel.stream.common.utils; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.exception.RemotingException; public class RocketUtils implements Serializable{ private static RocketUtils rocketUtils = null; private static Map<String, DefaultMQProducer> nameSrvMap = null; private RocketUtils() { } public synchronized static RocketUtils getInstance() { if (rocketUtils == null) { synchronized (RocketUtils.class) { rocketUtils = new RocketUtils(); } } nameSrvMap = new HashMap<>(); return rocketUtils; } public DefaultMQPushConsumer createConsumer(String namesrvAddr, String topic, String group) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); consumer.setNamesrvAddr(namesrvAddr); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setVipChannelEnabled(false); consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1); consumer.setConsumeMessageBatchMaxSize(1); try { consumer.subscribe(topic, "*"); } catch (MQClientException e) { e.printStackTrace(); return null; } return consumer; } public DefaultMQProducer createProducer(String nameSrvAddr, String group) { if (nameSrvMap == null) { return null; } if (nameSrvMap.containsKey(nameSrvAddr)) { return nameSrvMap.get(nameSrvAddr); } DefaultMQProducer producer = new DefaultMQProducer(group); producer.setNamesrvAddr(nameSrvAddr); producer.setSendMessageWithVIPChannel(false); producer.setSendMsgTimeout(5000); producer.setInstanceName(UUID.randomUUID().toString()); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); return null; } nameSrvMap.put(nameSrvAddr, producer); return producer; } public boolean sendOr(DefaultMQProducer producer, Message msg, boolean async) { if (async) { return sendAsync(producer, msg); } else { return send(producer, msg); } } public boolean sendAsync(DefaultMQProducer producer, Message msg) { try { producer.send(msg, new CallBack()); return true; } catch (MQClientException | RemotingException | InterruptedException e) { e.printStackTrace(); return false; } } public boolean send(DefaultMQProducer producer, Message msg) { try { producer.send(msg); return true; } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) { e.printStackTrace(); return false; } } private class CallBack implements SendCallback,Serializable{ @Override public void onSuccess(SendResult sendResult) { System.out.println("[------]onSuccess"); } @Override public void onException(Throwable throwable) { System.out.println("[------]onException. " + throwable.getMessage()); } } }

 

RmqProducer.java

package com.rexel.stream.tools; import com.alibaba.fastjson.JSONObject; import com.rexel.stream.common.utils.RocketUtils; import java.nio.charset.StandardCharsets; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class RmqProducer { public static void main(String[] args) { System.out.println("[------]start."); RocketUtils rocketUtils = RocketUtils.getInstance(); DefaultMQProducer producer = rocketUtils.createProducer("xx.xx.xx.01:9876;xx.xx.xx.02:9876", "pro_test3"); for (int i = 0; i < 10; i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("name", "VA_2YC_VAL"); jsonObject.put("judge", "≥"); jsonObject.put("value", "100"); rocketUtils.sendAsync(producer, new Message( "app_notice", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))); } //如果使用异步发送,这里不要shutdown // producer.shutdown(); System.out.println("[------]end."); } }

 

RmqConsumer.java

package com.rexel.stream.tools; import com.rexel.stream.common.utils.RocketUtils; import java.nio.charset.StandardCharsets; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; public class RmqConsumer { public static void main(String[] args) throws MQClientException { System.out.println("[------]start."); RocketUtils rocketUtils = RocketUtils.getInstance(); DefaultMQPushConsumer consumer = rocketUtils.createConsumer( "xx.xx.xx.01:9876;xx.xx.xx.02:9876", "app_notice", "rexel_stream3"); consumer.registerMessageListener( (MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { for (Message msg : list) { try { byte[] body = msg.getBody(); String message = new String(body, StandardCharsets.UTF_8); System.out.println("[------]rmq message= " + message); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("[------]end."); } }

 

==问题1==

配置完成之后,尝试在客户端编写生产者代码,结果生产数据的时候报错。

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [xxxx] failed

 

尝试1:

把生产者和消费者的代码中增加setSendMessageWithVIPChannel(false)。

结果:依然报错,错误没有改变

 

尝试2:

 

在配置文件中增加brokerIP1=xx.xx.xx.xx的配置。

结果:依然报错,错误没有改变

 

尝试3:

网上说是防火墙的问题,服务器本身的防火墙很早就已经被关闭了。尝试去设置阿里云ECS服务器产品的端口。

一次性的把一个10900/10999的端口全部开放

记录一次在阿里云ECS服务器部署验证RocketMQ的经历

结果:测试同步生产数据正常。

 

尝试4:

测试异步生产数据。调用RocketUtils中的sendAsync方法。结果报错:

[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed [------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed [------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed

 

原因是使用异步生产数据的时候,我的程序里调用了shutdown方法,

导致后续的异步线程无法正常执行。注释掉shutdown处理之后,异步生产正常。

记录一次在阿里云ECS服务器部署验证RocketMQ的经历

 

结论:

如果出现connect to [xxxx] failed的问题,不外乎尝试以下几种办法:

1、程序中:生产者或者消费者:setSendMessageWithVIPChannel(false)

2、配置文件:如果是阿里云ECS服务器,以下两个配置使用外网地址:

brokerIP1=xx.xx.xx.01 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876

3、防火墙:关闭服务器本身的防火墙。

4、安全组:阿里云服务器本身的网络安全组中需要开通端口。

 

==问题2==

生产者已经没有问题了,但是消费者一直消费不到数据。程序不报任何错误,就是消费不到数据。

在网上找了一些到有的博客,其中这篇给了我一些方向,

http://www.jiangxinlingdu.com/rocketmq/2019/08/06/noconsumer.html

初步怀疑是消费者的偏移量有问题。

 

解决办法:

我这个环境由于是新搭环境,目前还不是生产环境,所以我直接采用的方式是:

1、停止rocketmq集群

2、删除所有rocketmq的文件

3、重启集群

4、重新创建topic

 

一套暴利连招之后,消费者果然可以消费到数据了。

索然没有真正的找到问题的原因,不过基本上可以确定是rocketmq的元数据出现了问题,

这个问题的产生可能是我最近不断的调试配置文件,修改内外网地址,重启引起的。

 

--END--

  • 海报
海报图正在生成中...
赞(0) 打赏
声明:
1、本博客不从事任何主机及服务器租赁业务,不参与任何交易,也绝非中介。博客内容仅记录博主个人感兴趣的服务器测评结果及一些服务器相关的优惠活动,信息均摘自网络或来自服务商主动提供;所以对本博客提及的内容不作直接、间接、法定、约定的保证,博客内容也不具备任何参考价值及引导作用,访问者需自行甄别。
2、访问本博客请务必遵守有关互联网的相关法律、规定与规则;不能利用本博客所提及的内容从事任何违法、违规操作;否则造成的一切后果由访问者自行承担。
3、未成年人及不能独立承担法律责任的个人及群体请勿访问本博客。
4、一旦您访问本博客,即表示您已经知晓并接受了以上声明通告。
文章名称:《记录一次在阿里云ECS服务器部署验证RocketMQ的经历》
文章链接:https://www.456zj.com/4768.html
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址