RabbitMQ的安装及使用

RabbitMQ

RabbitMQ简介

是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

消息队列:是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。

消息队列的主要作用:异步、解耦、削峰。

https://www.rabbitmq.com/networking.html RabbitMQ的服务文档信息

RabbitMQ的安装

一、Linux环境下

  1. 下载:https://www.rabbitmq.com/download.html
  2. 因为rabbitmq是基于erlang语言开发的,需要安装环境 ,查看环境的版本 https://www.rabbitmq.com/which-erlang.html
  3. erlang的下载 https://www.erlang-solutions.com/downloads/
  4. yum install -y erlang 安装erlang
  5. 安装socat yum install -y socat
  6. 安装rabbitmq yum install -y rabbitmq-serve

常用命令:systemctl start rabbitmq-serve 启动服务

systemctl enable rabbitmq-serve 开机自启动

7.安装图形化界面 rabbitmq-plugins enable rabbitmq_management

8.rabbitmq授权账户和密码 rabbitmqctl add_user {username} {passwd}

rabbitmqctl add_user root root123

  1. 设置用户角色 rabbitmqctl set_user_tags {username} {tag}

    rabbitmqctl set_user_tags root administrator

    docker run -d –name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management

RabbitMQ的工作模式

直连模式:一个队列只被一个consumer消费

工作模式:一个队列被多个consumer消费 (包括轮训模式(将channel.basicConsume(…, true, …)autoAck设为true)、公平模式(能者多劳,将autoAck设置为false))

发布订阅模式:Fanout,采用广播的机制

路由模式:direct,采用直连的形式

topic模式:可以通过routingkey将消息发送给复合定义的队列。

参数模式:header

RabbitMQ中各个组件

Producer生产消息(消息分为头和体,头中有许多的信息),发送给服务器端的Exchange
Exchange收到消息,根据routing key,将消息转发给匹配的Queue

Exchange与Queue之间是通过binding来进行绑定的

Queue收到消息,将消息发送给订阅者Consumer
Producer收到消息,发送ACK给队列确认收到消息
Queue收到ACK,删除队列中缓存的此条消息

Connection 连接通道

Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制

Producer、Consumer与代理服务器(RabbitMQ)之间建立的是长链接,并且在连接中开辟出一个个的通道,来进行消息的传递。可以通过一个个的通道来确定某个服务是否还健在。

RabbitMQ的工作流程图

RabbitMQ的安装及使用

1、Direct exchange的特性是直接匹配,例:Routing key : name 创建队列name、name1、name2

我们在发送消息时带上我们设置的路由键name,消息会被交换机接受,由交换机通过name来唯一确定的将消息放入名为name的队列。

2、Fanout exchange:以扇形的形式向队列发送消息,例:Routing key : name 创建队列name、name1、name2

我们在发送消息时带上我们设置的路由键name(这里也可以不设置Routing key),消息会被交换机接受,交换机会将此消息保存到每一个队列中

3、Topic exchange:可以在设置Routing key时,依通配符的形式来设置(# *)name.#表示以name开始,后面只能有一个单词。name. *表示后面可以有多个单词。例:Routing key : name# 创建队列name.zs、name.lisi、name2

我们在发送消息时带上我们设置的路由键name.*,消息会被交换机接受,由交换机通过Routing key确定的将消息放入名为name.zs和name.lisi的队列中。

4、Headers exchange 它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了。在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

整合SpringBoot

1、导入依赖

引入依赖后RabbitAutoConfiguration就会工作,来进行一些自动配置,

CachingConnectionFactory 相当于是一个工厂,用来创建rabbitmq

RabbitTemplate 相当于是一个实例,用来操作rabbitmq消息

AmqpAdmin 相当于是管理组件,可以用来创建交换机、队列,并管理它们

RabbitMessagingTemplate 实现了RabbitMessageOperations接口

2、还需要进行配置文件的配置

3、@EnableRabbit

消息的发送

消息的接收

RabbitMQ的消息确认机制-可靠抵达

保证消息不丢失,可靠抵达,使用事务消息,性能低下

一、服务收到消息回调

二、消息正确抵达回调

三、消费端确认

publisher: confirmCallback 消息被broker接收就会触发回调

publisher: returnCallback 消息被queue接收就会触发回调

consumer: ack机制

RabbitMQ的安装及使用

三、消费端确认(保证每个消息都被消费)

消息端的确认默认是自动确认的,只要消息接受到,服务端默认确认,并且服务端移除消息

我们可以手动进行确认

可靠消费-重试机制

解决消息重试的方案:1、控制重试的次数

2、try catch +手动ack 在catch中要将requeue设置为false

如果为true会一直重试,就算设置了重试次数也没用

3、try catch +手动ack+死信队列

死信队列

死信的概念

无法被消费的消息被称为死信,而为了保证消息的不丢失,就产生了死信队列,将无法消费的消息放入死信队列。例:当用户下完单并没有支付时,可以将其放入死信队列,设置自动过期时间。

死信来源

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  • 消息TTL过期(参考:[RabbitMQ之TTL(Time-To-Live 过期时间)])
  • 队列达到最大长度

延迟队列

1、本质是 消息TTL过期 过期时间有生产者设置

2、使用:订单在十分钟内未支付被自动取消

例:延迟队列会出现死信问题,因为其只会检测第一条的过期时间,如果第一条的过期时间太长 ,第二条的过期时间很短,必须得等待第一条执行完,才能执行第二条,第二条就是死信

3、rabbitmq的插件可以解决此问题

插件安装完成后会在图形化界面exchange中新出现一个类型:x-delayed-message

rabbitmq会出现幂等性问题

幂等性就是重复提交表单,造成多次消费。

解决幂等性的方法:

1、唯一id+指纹码机制

2、redis原子性 setnx操作天然原子性

设置队列优先级

rabbitmq中的队列可以设置优先级,来先消费某些消息(0-255数字越大,优先级越高)

惰性队列

消息是被保存在磁盘上的,例:消费者下线,不能消费消息时,为了减少mq的压力,应当将消息磁盘化。

rabbitmq的集群

一、集群的搭建:http://www.rabbitmq.com/install-rpm.html https://www.cnblogs.com/knowledgesea/p/6535766.html

  1. RabbitMQ的集群是依赖erlang集群,而erlang集群是通过这个cookie进行通信认证的,因此我们做集群的第一步就是干cookie。必须使集群中也就是F,G这两台机器的.erlang.cookie文件中cookie值一致,且权限为owner只读 (.erlang.cookie 存在于/var/lib/rabbitmq/.erlang.cookie 和~/.erlang.cookie中)

    保持服务器中的/var/lib/rabbitmq/.erlang.cookie和~/.erlang.cookie一致

  2. 后台启动结点 rabbitmq-server -detached

  3. 查看结点状态 rabbitmqctl status

  4. 启动第一个服务 rabbitmq-server

  5. rabbitmqctl stop_app 先停止应用

    rabbitmqctl reset //可以不做

    rabbitmqctl join_cluster rabbit@名称1

    rabbitmqctl start_app

    rabbitmqctl cluster_status 查看集群状态信息

二、镜像队列,保证数据不丢失,进行数据的备份

RabbitMQ的安装及使用

三、实现高可用的负载均衡

使用Nginx来实现

四、federation exchange 来实现不同地区之间服务器数据的同步 (联盟交换机)是通过交换机之间进行连接

五、federation queue 来实现不同地区之间服务器数据的同步,是通过队列之间进行连接

rabbitmq实现分布式事务

分布式事务会出现数据不一致的情况,两个服务运行在不同的机器上,通过远程调用的方式来调用,因为两个服务都不在一台机器上,所以事务是不能控制的。如:订单服务(加上事务)有自己的数据库,(在数据库的内存中生成一条记录,因为有事务的原因,不会第一时间跟新数据库),向用户服务发起请求,用户服务也有自己的数据库,并在数据库中生成数据,因为某些原因,该服务不可用,订单服务就会出现异常,并回滚事务,那么就会出现数据不一致的情况。

利用RabbitMQ的消息确认机制-可靠抵达作为中间件

文章知识点与官方知识档案匹配,可进一步学习相关知识Java技能树首页概览91390 人正在系统学习中

来源:取次花丛懒回顾!!

声明:本站部分文章及图片转载于互联网,内容版权归原作者所有,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2021年6月4日
下一篇 2021年6月4日

相关推荐