前言 在开始之前,大家可以先了解RocketMQ的一些特性。
RocketMQ简介
今天我们接上之前说的,对RocketMQ进行简单使用。主要的也是讲如何在SpringBoot项目中使用RocketMQ。
环境 RocketMQ安装 我们先在RocketMQ官网 上下载最新的MQ版本并进行安装。
可以通过镜像进行下载。
将压缩包解压并放在一个指定文件夹下。(这里要注意的是文件夹路径中尽量不要有空格,像Program Files这种,有可能导致mq无法正常启动)
RocketMQ启动 通过命令行进入到bin目录下,使用 mqnamesrv -n localhost:9876 (windows)可以启动mq的namesrv。如下图:
使用 mqbroker -n localhost:9876 (windows)可以启动mqbroker。如下图:
注意:上图表示RocketMQ的namesrv和broker启动成功,RocketMQ若正常使用应保证namesrv和broker均启动成功。
与Java集成使用 主要依赖于rocketmq-client的jar包,在与SpringBoot进行集成时,应当引入该jar包。
1 2 3 4 5 <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${version.rocketmq}</version> </dependency>
例子 我们简单使用下该jar包创建消费者和生产者进行消费,来了解下它们的一些参数。
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ProducerTest { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("producer1" ); producer.setNamesrvAddr("127.0.0.1:9876" ); producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY" ); producer.start(); System.out.println("Producer started" ); Message message=new Message(); message.setTopic("Test" ); message.setTags("123" ); message.setBody(new String("Hello" ).getBytes()); SendResult result=producer.send(message); System.out.println(result); } }
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ConsumerTest { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1" ); consumer.setNamesrvAddr("127.0.0.1:9876" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("Test" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { msgs.forEach((a)->{ System.out.println(new String(a.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer started" ); } }
运行这两个类进行测试。
生产者结果:
消费者结果:
备注: 如果启动中出现异常 com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TestTopic 可能是没有开启AUTO_CREATE_TOPIC,我们可以在启动broker的时候加上该参数,mqbroker -n localhost:9876&autoCreateTopicEnable=true 可以保证使用时自动创建Topic。
生产等环境一般需要什么Topic就配置什么,不会开启这个参数让程序自动创建。
简要说明 我们对上面例子里的一些参数等做些说明,以便于我们可以更好的封装功能。
可与之前RocketMQ的简介 结合理解。
DefaultMQProducer部分参数
producerGroup:生产者组名称。
namesrvAddr:生产者NameSrvAddr的服务地址。
createTopicKey:可以创建指定的Topic。AUTO_CREATE_TOPIC_KEY为自动创建Topic。
其它参数略。
发送Message时,需要设置Message的Topic和Tag,并能收到发送状态结果。
DefaultMQPushConsumer部分参数
consumerGroup:消费者组名称。
namesrvAddr:消费者NameSrvAddr的服务地址。
ConsumeFromWhere:消费策略。
---> CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
---> CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
---> CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
subscribe方法:使消费者订阅指定的Topic和Tag。
registerMessageListener方法:注册消费者监听,用于消费消息,这里面也是我们业务逻辑的主要内容。有顺序消费和并行消费两种模式。
分别需要实现MessageListenerOrderly接口和MessageListenerConcurrently接口。
我们的例子是并行消费处理的。
小试牛刀 每次处理时都要这样写会有很多的与业务无关的代码,也不美观。
我们对其进行必要封装,之前集成了SpringBoot的一个自己封装的starter插件,今天我们把RocketMQ也与SpringBoot集成下。
A 首先新建SpringBoot项目,引入RocketMQ 的jar依赖,不在详述。
然后在项目下创建必要的package。
producer:用来存放我们构建producer的类的包。
consumer:用来存放我们构建consumer的类的包。
listener:用来存放我们构建listener的类的包
factory:用来构建生产者和消费者群组的包。
config:存放SpringBoot配置类的包。
autoware:存放启动配置生效的类的包。
annotation:用来存放注解的包。
先说下简单思路吧。
首先这个生产者和消费者是可以有多个的,然后我们怎么管理它们? 生产者可以发送顺序消息和并发消息,消费者可以处理顺序消息和并发消息,同时我们可能有两种业务要使用同一个Listener,如何解耦呢?
关于管理:我们可以管理生产者和消费者的一个集合来解决。
关于解耦:可以提供一个接口,业务类实现这个接口拿到Message,进行处理。那如何知道这个业务类需要哪个listener呢?自然需要customerId或者listenerId。
好了开始工作。
B 先从配置入手。
下面是生产者和消费者的配置Bean。
由于可以配置多个生产者或者消费者,故使用List处理它们。部分代码如下:
C 然后从消费者和生产者的提供入手。也是比较简单的,主要是根据参数生成一个生产者或者消费者,然后暴露一些方法,如start,stop等方法。
提供生产者的类,部分代码如下。
备注: 生产环境一般不设置AUTO_CREATE_TOPIC_KEY,需要什么Topic要手动创建加入管理。
创建消费者的类,部分代码如下。
我们认为两个consumerId相等则获取的是一个Consumer,因此需要重写equals方法。
D 创建消费者监听,为处理消息提供一个接口。
使用抽象类部分实现这个接口。
这一步的目的是由于不同的业务逻辑可能用到一个监听,这样可以两个业务逻辑写到两个不同的类中,只需实现IProcessor。
写两个并行处理监听和顺序处理监听,对其进行实现。
并行监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ConcurrentlyRocketMQMessageListener extends AbstractRocketMQMessageListener implements MessageListenerConcurrently { private static final transient Logger logger = LoggerFactory.getLogger(ConcurrentlyRocketMQMessageListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { logger.debug("ConcurrentlyRocketMQMessageListener receive message begin,length:{}" , msgs.size()); for (MessageExt msg:msgs ) { for (IProcessor processor : processorList) { try { process(processor, msg); } catch (Exception ex) { logger.error("ConcurrentlyRocketMQMessageListener error" ,ex); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
顺序监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class OrderlyRocketMQMessageListener extends AbstractRocketMQMessageListener implements MessageListenerOrderly { private static final transient Logger logger = LoggerFactory.getLogger(OrderlyRocketMQMessageListener.class); @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) { logger.debug("OrderlyRocketMQMessageListener receive message begin,length:{}" , msgs.size()); for (MessageExt msg:msgs ) { for (IProcessor processor : processorList) { try { process(processor, msg); } catch (Exception ex) { logger.error("OrderlyRocketMQMessageListener error" ,ex); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } } return ConsumeOrderlyStatus.SUCCESS; } }
然后需要对实现IProcesser接口的类添加如下注解,用来查看调用的哪个Listener。
1 2 3 4 5 6 7 8 9 10 11 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface RocketMQProcessor { String consumerId () default "" ; }
E 好,我们开始构建factory。
使用ConcurrentHashMap存储生产者和消费者集合。
1 2 3 4 private static Map<String, RocketMQMessageProducer> producers=new ConcurrentHashMap<>();private static Map<String, RocketMQMessageConsumer> consumers=new ConcurrentHashMap<>();
创建生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public RocketMQMessageProducer createProducer (MqProducerConfiguration configuration) { if (producers.get(configuration.getProducerId())!=null ){ return producers.get(configuration.getProducerId()); } RocketMQMessageProducer producer=new RocketMQMessageProducer(configuration.getGroupName(), configuration.getNamesrvAddr()); if (configuration.getSendMsgTimeout()!=null ){ producer.setSendMsgTimeout(configuration.getSendMsgTimeout()); } if (configuration.getMaxMessageSize()!=null ){ producer.setMaxMessageSize(configuration.getMaxMessageSize()); } try { producer.start(); producers.put(configuration.getProducerId(), producer); logger.info("MqProducer start success " +configuration.toString()); } catch (MQClientException e) { logger.error("MqProducer start error " +configuration.toString(),e); throw new RuntimeException(e); } return producer; }
创建消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public RocketMQMessageConsumer createConsumer (MqConsumerConfiguration mqConsumerConfiguration, List<IProcessor> list) { if (consumers.get(mqConsumerConfiguration.getConsumerId()) != null ) { return consumers.get(mqConsumerConfiguration.getConsumerId()); } try { RocketMQMessageConsumer consumer = new RocketMQMessageConsumer(mqConsumerConfiguration.getConsumerId(), mqConsumerConfiguration.getGroupName(), mqConsumerConfiguration.getNamesrvAddr()); consumer.subscribe(mqConsumerConfiguration.getTopicAndTagMap()); if (!CollectionUtils.isEmpty(mqConsumerConfiguration.getOptions())){ String consumeFromWhere = mqConsumerConfiguration.getOptions().get("consumeFromWhere" ); String consumeThreadMin = mqConsumerConfiguration.getOptions().get("consumeThreadMin" ); String consumeThreadMax = mqConsumerConfiguration.getOptions().get("consumeThreadMax" ); String pullThresholdForQueue = mqConsumerConfiguration.getOptions().get("pullThresholdForQueue" ); String consumeMessageBatchMaxSize = mqConsumerConfiguration.getOptions().get("consumeMessageBatchMaxSize" ); String pullBatchSize = mqConsumerConfiguration.getOptions().get("pullBatchSize" ); String pullInterval = mqConsumerConfiguration.getOptions().get("pullInterval" ); if (StringUtils.isNotBlank(consumeFromWhere)) { if (StringUtils.equals(consumeFromWhere, "CONSUME_FROM_LAST_OFFSET" )) { consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); } else if (StringUtils.equals(consumeFromWhere, "CONSUME_FROM_FIRST_OFFSET" )) { consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); } } if (StringUtils.isNotBlank(consumeThreadMin)) { consumer.setConsumeThreadMin(Integer.parseInt(consumeThreadMin)); } if (StringUtils.isNotBlank(consumeThreadMax)) { consumer.setConsumeThreadMax(Integer.parseInt(consumeThreadMax)); } if (StringUtils.isNotBlank(pullThresholdForQueue)) { consumer.setPullThresholdForQueue(Integer.parseInt(pullThresholdForQueue)); } if (StringUtils.isNotBlank(consumeMessageBatchMaxSize)) { consumer.setConsumeMessageBatchMaxSize(Integer.parseInt(consumeMessageBatchMaxSize)); } if (StringUtils.isNotBlank(pullBatchSize)) { consumer.setPullBatchSize(Integer.parseInt(pullBatchSize)); } if (StringUtils.isNotBlank(pullInterval)) { consumer.setPullInterval(Integer.parseInt(pullInterval)); } } if (mqConsumerConfiguration.isOrderly()){ OrderlyRocketMQMessageListener orderlyRocketMQMessageListener=new OrderlyRocketMQMessageListener(); orderlyRocketMQMessageListener.setProcessorList(list); consumer.registerMessageListener(orderlyRocketMQMessageListener); }else { ConcurrentlyRocketMQMessageListener concurrentlyRocketMQMessageListener=new ConcurrentlyRocketMQMessageListener(); concurrentlyRocketMQMessageListener.setProcessorList(list); consumer.registerMessageListener(concurrentlyRocketMQMessageListener); } consumer.start(); consumers.put(mqConsumerConfiguration.getConsumerId(), consumer); logger.info("MqConsumer start success " +mqConsumerConfiguration.toString()); logger.info("MqConsumer processors size " +list.size()); return consumer; } catch (Exception e) { logger.error("MqConsumer start error" , e); throw new RuntimeException(e); } }
它们的其它方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public RocketMQMessageProducer getProducer (String producerId) { if (producers.get(producerId)!=null ){ return producers.get(producerId); } return null ; } public void stopProducer (String producerId) { if (producers.get(producerId)!=null ){ producers.get(producerId).shutdown(); producers.remove(producerId); logger.info("MqProducer " +producerId+" is shutdown!" ); } } public RocketMQMessageConsumer getConsumer (String customerId) { if (consumers.get(customerId)!=null ){ return consumers.get(customerId); } return null ; } public void stopConsumer (String customerId) { if (consumers.get(customerId)!=null ){ consumers.get(customerId).shutdown(); consumers.remove(customerId); logger.info("MqConsumer " +customerId+" is shutdown!" ); } }
F 然后到了我们关键的自动配置部分了。
让这个类实现ApplicationContextAware可以拿到applicationContext。
同时生成一个FactoryBean。
主要方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Bean @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "spring.rocketmq.config", name = "enabled", havingValue = "true") RocketMQFactory rocketMQFactory () { RocketMQFactory rocketMQFactory=new RocketMQFactory(); if (mqConfigurations.getMqProducerConfigurations()!=null &&mqConfigurations.getMqProducerConfigurations().size()>0 ){ mqConfigurations.getMqProducerConfigurations().forEach(producerConfiguration ->{ rocketMQFactory.createProducer(producerConfiguration); }); } if (mqConfigurations.getMqConsumerConfigurations()!=null &&mqConfigurations.getMqConsumerConfigurations().size()>0 ){ mqConfigurations.getMqConsumerConfigurations().forEach(consumerConfiguration->{ final Map<String, Object> annotationMap = applicationContext.getBeansWithAnnotation(RocketMQProcessor.class); List<IProcessor> list = new ArrayList<>(); if (annotationMap!=null ){ annotationMap.forEach((key,value)->{ RocketMQProcessor annotation = value.getClass().getAnnotation(RocketMQProcessor.class); if (consumerConfiguration.getConsumerId().equals(annotation.consumerId())){ try { list.add((IProcessor) value); }catch (Exception e){ throw new RuntimeException(e); } } }); } rocketMQFactory.createConsumer(consumerConfiguration,list); }); } return rocketMQFactory; }
总的来说就是拿到配置生产生产者组,生成消费者组。
在生成消费者的时候需要注册监听,一个监听可以有很多业务类,通过注解拿到业务类,放到处理器列表里,再把该监听注册到指定的customerId上。
G 配置spring.factories。
1 org.springframework.boot.autoconfigure.EnableAutoConfiguration =com.zwt.rocketmqspringboot.autoware.RocketMqAutoConfig
H 然后我们添加application.properties进行测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 spring.rocketmq.config.enabled =true spring.rocketmq.config.mq-producer-configurations[0].group-name =rocketmq-producer spring.rocketmq.config.mq-producer-configurations[0].namesrv-addr =localhost:9876 spring.rocketmq.config.mq-producer-configurations[0].producer-id =001 spring.rocketmq.config.mq-consumer-configurations[0].namesrv-addr =localhost:9876 spring.rocketmq.config.mq-consumer-configurations[0].group-name =rocketmq-consumer spring.rocketmq.config.mq-consumer-configurations[0].consumer-id =001 spring.rocketmq.config.mq-consumer-configurations[0].topic-and-tag-map.123 =123 spring.rocketmq.config.mq-consumer-configurations[0].orderly =false server.port =8001 spring.rocketmq.config.mq-producer-configurations[1].group-name =rocketmq-producer1 spring.rocketmq.config.mq-producer-configurations[1].namesrv-addr =localhost:9876 spring.rocketmq.config.mq-producer-configurations[1].producer-id =002 spring.rocketmq.config.mq-consumer-configurations[1].namesrv-addr =localhost:9876 spring.rocketmq.config.mq-consumer-configurations[1].group-name =rocketmq-consumer1 spring.rocketmq.config.mq-consumer-configurations[1].consumer-id =002 spring.rocketmq.config.mq-consumer-configurations[1].topic-and-tag-map.1234 =1234 spring.rocketmq.config.mq-consumer-configurations[1].orderly =false
编写一些测试类进行测试。
1 2 3 4 5 6 7 8 9 @Service @RocketMQProcessor(consumerId = "001") public class TestConsumer implements IProcessor { private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class); @Override public void handleMessage (MessageExt msg) throws Exception { System.out.println(new String(msg.getBody())+"TestConsumer" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Controller public class TestProducer { @Autowired RocketMQFactory rocketMQFactory; @RequestMapping("/test") public String doSomething () throws Exception { Message message=new Message(); message.setTopic("123" ); message.setTags("123" ); message.setBody(new String("Hello World" ).getBytes()); SendResult result=rocketMQFactory.getProducer("001" ).sendMessage(message); System.out.println(result); return result.toString(); } }
可以看到正常运行。
实战 我们按照上篇文章那样把它封装成jar包。
完成后进行测试。
新建一个test的SpringBoot项目。作为消费者。
引入我们的包。
PS:这儿没有命名为xxxx-spring-boot-starter的形式。
创建一个消费者。
再新建一个test1的SpringBoot项目。作为生产者。
引入我们的包并进行配置。
启动test消费者,同时使用test1生产者发送一条消息。
可以看到。
生产者发送成功:
消费者处理成功:
总结 通过对RocketMQ的集成封装使用,更好地学会了如何使用RocketMQ,及对其的更多理解。
消息中间件在我们软件开发中具有重要作用,应当好好理解。
如果觉得properties配置太繁琐可以改用yml配置,会更简介好看些。
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 spring: rocketmq: config: enabled: true mq-producer-configurations: - producerId: TestProducer1 groupName: producer1 namesrvAddr: 127.0 .0 .1 :9876 - producerId: TestProducer2 groupName: producer2 namesrvAddr: 127.0 .0 .1 :9876 mq-consumer-configurations: - consumerId: TestConsumer1 groupName: consumer1 namesrvAddr: 127.0 .0 .1 :9876 topicAndTagMap: { TestTopic1:TestTag1 } orderly: false - consumerId: TestConsumer2 groupName: consumer2 namesrvAddr: 127.0 .0 .1 :9876 topicAndTagMap: { TestTopic2:TestTag2 } orderly: true
代码地址:rocketmq-spring-boot