博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
集成RabbitMQ---MQ实战演练
阅读量:6192 次
发布时间:2019-06-21

本文共 8272 字,大约阅读时间需要 27 分钟。

hot3.png

RabbitMQ是一个在基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。

消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.

如果你还没有安装rabbitmq的,可以看看这篇

不说了不说了,来一张图直截了当的看看MQ工作的具体过程:

请叫我头头哥_RabbitMQ实战演练

开局一张图 故事全靠编.从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

v基础概念

对于RabbitMQ来说,除了这三个基本模块以外,还添加了一个模块,即交换机(Exchange).它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.那么RabitMQ的工作流程如下所示:

请叫我头头哥_RabbitMQ实战演练

关于rabbitmq几个基础名词的介绍:

Broker: 简单来说就是消息队列服务器实体。

Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue: 消息队列载体,每个消息都会被投入到一个或多个队列。

Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key: 路由关键字,exchange根据这个关键字进行消息投递。

vhost: 虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

producer: 消息生产者,就是投递消息的程序。

consumer: 消息消费者,就是接受消息的程序。

channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout:

Direct: 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “demo”,则只有被标记为“demo”的消息才被转发,不会转发demo.ooo,也不会转发test.123,只会转发demo。

Topic: 转发信息主要是依据通配符,将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。

Headers: 根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

Fanout: 路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

v实战演练

♛ 2.1 创建MQ

请叫我头头哥_RabbitMQ实战演练

注:若是现有工程引入MQ,则添加Maven引用。

org.springframework.boot
spring-boot-starter-amqp

这里我们延续之前中的例子hellospringboot,在已有项目中添加mq的Maven引用。

♛ 2.2 application.properties

在application.properties文件当中引入RabbitMQ基本的配置信息

# ----- MQ -------- #spring.rabbitmq.host=192.168.11.108spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

♛ 2.3 添加实体类MyModel

复制代码

package com.demo.mq.model;import java.io.Serializable;import java.util.UUID;/** * Created by toutou on 2019/1/1. */public class MyModel implements Serializable {    private static final long serialVersionUID = 1L;    private UUID id;    private String info;    public UUID getId() {        return id;    }    public void setId(UUID id) {        this.id = id;    }    public String getInfo() {        return info;    }    public void setInfo(String info) {        this.info = info;    }}

复制代码

♛ 2.4 添加RabbitConfig

复制代码

package com.demo.mq.common;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Value;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;/** * Created by toutou on 2019/1/1. */@Configurationpublic class RabbitConfig {    @Value("${spring.rabbitmq.host}")    private String host;    @Value("${spring.rabbitmq.port}")    private int port;    @Value("${spring.rabbitmq.username}")    private String username;    @Value("${spring.rabbitmq.password}")    private String password;    public static final String EXCHANGE_A = "my-mq-exchange_A";    public static final String EXCHANGE_B = "my-mq-exchange_B";    public static final String QUEUE_A = "QUEUE_A";    public static final String QUEUE_B = "QUEUE_B";    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";    @Bean    public CachingConnectionFactory connectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost("/");        connectionFactory.setPublisherConfirms(true);        return connectionFactory;    }    @Bean    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)    public RabbitTemplate rabbitTemplate() {        RabbitTemplate template = new RabbitTemplate(connectionFactory());        return template;    }    /**     * 针对消费者配置     * 1. 设置交换机类型     * 2. 将队列绑定到交换机     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念     HeadersExchange :通过添加属性key-value匹配     DirectExchange:按照routingkey分发到指定队列     TopicExchange:多关键字匹配     */    @Bean    public DirectExchange defaultExchange() {        return new DirectExchange(EXCHANGE_A);    }    /**     * 获取队列A     * @return     */    @Bean    public Queue queueA() {        return new Queue(QUEUE_A, true); //队列持久    }    /**     * 获取队列B     * @return     */    @Bean    public Queue queueB() {        return new Queue(QUEUE_B, true); //队列持久    }    /**     * 把交换机,队列,通过路由关键字进行绑定     * @return     */    @Bean    public Binding binding() {        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);    }    /**     * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。     * @return     */    @Bean    public Binding bindingB(){        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);    }}

复制代码

♛ 2.5 添加消息的生产者MyProducer

复制代码

package com.demo.mq.producer;import com.demo.mq.common.RabbitConfig;import com.demo.mq.model.MyModel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * Created by toutou on 2019/1/1. */@Componentpublic class MyProducer implements RabbitTemplate.ConfirmCallback {    private final Logger logger = LoggerFactory.getLogger(this.getClass());    //由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入    private RabbitTemplate rabbitTemplate;    /**     * 构造方法注入rabbitTemplate     */    @Autowired    public MyProducer(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容    }    public void sendMsg(MyModel model) {        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model);    }    /**     * 回调     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        logger.info(" 回调id:" + correlationData);        if (ack) {            logger.info("消息成功消费");        } else {            logger.info("消息消费失败:" + cause);        }    }}

复制代码

♛ 2.6 添加消息的消费者MyReceiver

复制代码

package com.demo.mq.receiver;import com.demo.mq.common.RabbitConfig;import com.demo.mq.model.MyModel;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * Created by toutou on 2019/1/1. */@Component@RabbitListener(queues = RabbitConfig.QUEUE_A)public class MyReceiver {    @RabbitHandler    public void process(MyModel model) {        System.out.println("接收处理队列A当中的消息: " + model.getInfo());    }}

复制代码

♛ 2.7 添加MyMQController

复制代码

package com.demo.controller;import com.demo.mq.model.MyModel;import com.demo.mq.producer.MyProducer;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.util.UUID;/** * Created by toutou on 2019/1/1. */@RestController@Slf4jpublic class MyMQController {    @Autowired    MyProducer myProducers;    @GetMapping("/mq/producer")    public String myProducer(String content){        MyModel model = new MyModel();        model.setId(UUID.randomUUID());        model.setInfo(content);        myProducers.sendMsg(model);        return "已发送:" + content;    }}

复制代码

♛ 2.8 项目整体目录

请叫我头头哥_RabbitMQ实战演练

 

♛ 2.9 调试

2.9.1 在页面中请求http://localhost:8081/mq/producer?content=hello rabbitmq

请叫我头头哥_RabbitMQ实战演练

2.9.2 查看http://ip:15672/#/queues的变化

关于RabbitMQ Management有疑问的,可以看上篇博文。。

 

2.9.3 查看消费者日志记录

请叫我头头哥_RabbitMQ实战演练

这样一个完整的rabbitmq实例就有了。

转载于:https://my.oschina.net/u/4052893/blog/2996193

你可能感兴趣的文章
Safari/Chrome中placeholder属性实现不完整
查看>>
转载 - 18个最佳代码编辑器/IDE推荐
查看>>
用Opencv保存视频文件avi(转)
查看>>
几条常见的数据库分页 SQL 语句
查看>>
XCode最佳实践之最佳数据类型
查看>>
asp.net 中sender 的理解
查看>>
RSS文章订阅及生成RSS格式的xml
查看>>
你自认为理解了JavaScript?
查看>>
读《程序员的SQL金典》[4]--SQL调优
查看>>
死锁产生的原因及四个必要条件
查看>>
CSS3----background:-webkit-gradient()渐变效果
查看>>
RTP协议分析
查看>>
Android代码中动态设置图片的大小(自动缩放),位置
查看>>
前后端分离了,然后呢?(转)
查看>>
AutoMapper queryable extensions 只找需要的字段
查看>>
linux自定义脚本添加到rc.local脚本无法正常运行的问题
查看>>
【BZOJ】3526: [Poi2014]Card
查看>>
mode(思维,注意内存)
查看>>
GDC2016【全境封锁(Tom Clancy's The Division)】对为何对应Eye Tracked System,以及各种优点的演讲报告...
查看>>
B-树和B+树的应用:数据搜索和数据库索引
查看>>