在RabbitMQ的UI界面或命令行上创建新的Virtual Host,取名为vhtest02,如下图所示:
使用Idea的Spring Initializr创建生产者工程springrabbitmqtest,坐标如下:
配置application.properties,可参考添加如下内容:
spring.rabbitmq.host=192.168.36.132spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=vhtest02spring.rabbitmq.username=sujiangmingspring.rabbitmq.password=openGauss@1234根据你自己的环境改成你自己的ip、port、virtual-host、用户名和密码。
编写生产者配置类,用于创建Exchange、Queue以及将两者绑定在一起,代码如下: 类名为:com.rabbitmq.springboot.config.RabbitMqConfig,代码如下所示:
package com.rabbitmq.springboot.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMqConfig {//创建交换机@Bean(name = "topicExchange")public TopicExchange topicExchange(){return new TopicExchange("springboot_topic_exchange");}//创建队列@Bean(name = "topicQueueSpringBoot")public Queue topicQueue(){return QueueBuilder.durable("springboot_topic_queue").build();}//队列绑定交换机@Beanpublic Binding bindingExchangeTopicQueue(@Qualifier("topicQueueSpringBoot") Queue queue,@Qualifier("topicExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("log.#").noargs();}}修改com.rabbitmq.springboot.SpringRabbitMqTestApplicationTests类,添加注解和测试方法,具体代码如下:
package com.rabbitmq.springboot;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestclass SpringRabbitMqTestApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() throws InterruptedException {//convertAndSend(交换机名称,路由key,消息内容)int temp = 0;while (true){rabbitTemplate.convertAndSend("springboot_topic_exchange","log.info","发送了info消息" + temp);rabbitTemplate.convertAndSend("springboot_topic_exchange","log.error","发送了error消息"+ temp);rabbitTemplate.convertAndSend("springboot_topic_exchange","log.warning","发送了warning消息"+ temp);temp++;Thread.sleep(2000);}}@Testvoid contextLoads() {}}该程序会一直运行,因为我加了while(true),模拟用户一直产生数据。
运行测试:运行com.rabbitmq.springboot.SpringRabbitMqTestApplicationTests类中的方法testSendMessage(),正常运行会看到如下内容:
编写消费者工程,具体创建工程如步骤2所示;
修改application.properties,如步骤3所示,可直接复步骤3内容即可;
创建监听类:com.rabbitmq.consumer.listener.MessageListener,用于监听某个队列的消息,一旦监听到有数据,立马进行消费,代码如下:
package com.rabbitmq.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 消息监听器*/@Componentpublic class MessageListener {/*** 监听某个队列的消息* @param msg 接收到的消息*/@RabbitListener(queues = "springboot_topic_queue")public void topicListener(String msg){System.out.println("接收到消息:" + msg);}}修改启动类:SpringbootRabbitmqConsumerApplication,在其类上面添加注解@ComponentScan("com.rabbitmq.consumer.*"),如不添加该注解运行会自动退出,修改好如下图所示:
运行测试:运行SpringbootRabbitmqConsumerApplication类,正常情况下会看到如下内容:
三、小结本文参考了来自网络上的资料,如有侵权,请及时联系博主进行删除。本文仅是博主本人在学习过程中作为学习笔记使用,常言道:好记性不如烂笔头。如本文对您有所帮助,请您动动发财的手指给博主点个赞,谢谢您的阅读~~~