RabbitMQ
RabbitMQ基本介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。
RabitMQ官方网站:
https://www.rabbitmq.com/
- 1.点对点(简单)的队列
- 2.工作(公平性)队列模式
- 3.发布订阅模式
- 4.路由模式Routing
- 5.通配符模式Topics
- 6.RPC
https://www.rabbitmq.com/getstarted.html
RabbitMQ环境的基本安装
启动Rabbitmq常见问题
如果rabbitmq 启动成功无法访问 管理平台页面
进入到F:\path\rabbitmq\rabbitmq\rabbitmq_server-3.6.9\sbin> 执行
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl start_app
Rabbitmq管理平台中心
RabbitMQ 管理平台地址 http://127.0.0.1:15672
默认账号:guest/guest 用户可以自己创建新的账号
Virtual Hosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?
RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通
默认的端口15672:rabbitmq管理平台端口号
默认的端口5672: rabbitmq消息中间内部通讯的端口
默认的端口号25672 rabbitmq集群的端口号
RabbitMQ常见名词
/Virtual Hosts---分类
/队列 存放我们消息
Exchange 分派我们消息在那个队列存放起来 类似于nginx
15672---rabbitmq控制台管理平台 http协议
25672---rabbitmq 集群通信端口号
Amqp 5672---rabbitmq内部通信的一个端口号
快速入门RabbitMQ简单队列
首先需要再RabbitMQ平台创建Virtual Hosts 和队列。
/meiteVirtualHosts
----订单队列
----支付队列
- 在RabbitMQ平台创建一个队列:
- 在编写生产者代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer { private static final String QUEUE_NAME = "mayikt--queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); String msg = "每特教育666"; channel.basicPublish(null,QUEUE_NAME,null,msg.getBytes()); channel.close(); connection.close(); } }
|
- 在编写消费者代码
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer { private static final String QUEUE_NAME = "mayikt--queue";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body)throws IOException{ String msg = new String(body,"UTF-8"); System.out.println("消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
|
- 连接代码
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RabbitMQConnection {
public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/meiteVirtualHosts"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); return connectionFactory.newConnection(); } }
|
RabbitMQ如何保证消息不丢失
Mq如何保证消息不丢失:
- 生产者角色
- 确保生产者投递消息到MQ服务器端成功。
- Ack 消息确认机制
- 同步或者异步的形式
- 方式1:Confirms
- 方式2:事务消息
- 消费者模式
- 在rabbitmq情况下:
- 必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。
- 在kafka中的情况下:
- 不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。
RabitMQ工作队列
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。channel.basicQos(1);
import com.rabbitmq.client.*; import com.st.mq.RabbitMQConnection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer02 { private static final String QUEUE_NAME = "mayikt-queue";
public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException { Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(100); }catch (Exception e){
} String msg = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msg); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
|
import com.rabbitmq.client.*; import com.st.mq.RabbitMQConnection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer01 { private static final String QUEUE_NAME = "mayikt-queue";
public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException { Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(2); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msg); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
} }
|
RabbitMQ交换机类型
Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange(主题交换机)
Headers exchange(头交换机)
/Virtual Hosts---区分不同的团队
----队列 存放消息
----交换机 路由消息存放在那个队列中 类似于nginx
---路由key 分发规则
RabbitMQ Fanout 发布订阅
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。
原理:
- 需要创建两个队列 ,每个队列对应一个消费者;
- 队列需要绑定我们交换机
- 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
- 消费者从队列中获取这个消息。
生产者代码
import com.mayikt.rabbitmq.RabbitMQConnection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); String msg = "每特教育6666"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); connection.close(); }
}
|
消费者代码
邮件消费者
import com.mayikt.rabbitmq.RabbitMQConnection; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class MailConsumer {
private static final String QUEUE_NAME = "fanout_email_queue";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者..."); Connection connection = RabbitMQConnection.getConnection(); final Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("邮件消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
短信消费者
public class SmsConsumer {
private static final String QUEUE_NAME = "fanout_email_sms";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者..."); Connection connection = RabbitMQConnection.getConnection(); final Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("短信消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
Direct路由模式
当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息
Topic主题模式
当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。
#号表示支持匹配多个词;
*号表示只能匹配一个词
SpringBoot整合RabbitMQ
Maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies>
<!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
|
配置类:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;
@Component public class RabbitMQConfig {
private String EXCHANGE_SPRINGBOOT_NAME = "/mayikt_ex";
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
@Bean public Queue smsQueue() { return new Queue(FANOUT_SMS_QUEUE); }
@Bean public Queue emailQueue() { return new Queue(FANOUT_EMAIL_QUEUE); }
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME); }
@Bean public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(smsQueue).to(fanoutExchange); }
@Bean public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(emailQueue).to(fanoutExchange); } }
|
配置文件
application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /meiteVirtualHosts
|
生产者
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController public class FanoutProducer {
@Autowired private AmqpTemplate amqpTemplate;
@RequestMapping("/sendMsg") public String sendMsg(String msg) {
amqpTemplate.convertAndSend("/mayikt_ex", "", msg); return "success"; } }
|
消费者
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Slf4j @Component @RabbitListener(queues = "fanout_email_queue") public class FanoutEmailConsumer {
@RabbitHandler public void process(String msg) { log.info(">>邮件消费者消息msg:{}<<", msg); } }
|
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Slf4j @Component @RabbitListener(queues = "fanout_sms_queue") public class FanoutSmsConsumer {
@RabbitHandler public void process(String msg) { log.info(">>短信消费者消息msg:{}<<", msg); } }
|
生产者如何获取消费结果
1. 根据业务来定
消费者消费成功结果:
1.能够在数据库中插入一条数据
2.Rocketmq 自带全局消息id,能够根据该全局消息获取消费结果
原理: 生产者投递消息到mq服务器,mq服务器端在这时候返回一个全局的消息id,
当我们消费者消费该消息成功之后,消费者会给我们mq服务器端发送通知标记该消息消费成功。
生产者获取到该消息全局id,每隔2s时间调用mq服务器端接口查询该消息是否有被消费成功。
- 异步返回一个全局id,前端使用ajax定时主动查询;
- 在rocketmq中,自带根据消息id查询是否消费成功
RabbitMQ死信队列
死信队列产生的背景
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生死信队列的原因
- 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
- 队列达到最大的长度 (队列容器已经满了)
- 消费者消费多次消息失败,就会转移存放到死信队列中
死信队列的架构原理
死信队列和普通队列区别不是很大
普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
区别:
- 生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到普通队列中缓存起来,普通队列对应有自己独立普通消费者。
- 如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。
死信队列应用场景
- 30分钟订单超时设计
A. Redis过期key
B. 死信延迟队列实现:
采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后就会将该消息转移到死信备胎消费者实现消费。
备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下则会开始回滚库存操作。
RabbitMQ消息幂等问题
RabbitMQ消息自动重试机制
- 当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下
在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。需要人为指定重试次数限制问题
- 在什么情况下消费者需要实现重试策略?
A. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?
该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。
B. 消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?
该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。
可以将日志存放起来,后期通过定时任务或者人工补偿形式。
如果是重试多次还是失败消息,需要重新发布消费者版本实现消费
可以使用死信队列
Mq在重试的过程中,有可能会引发消费者重复消费的问题。
Mq消费者需要解决 幂等性问题
幂等性 保证数据唯一
方式1:
生产者在投递消息的时候,生成一个全局唯一id,放在我们消息中。
Msg id=123456
Msg id=123456
Msg id=123456
消费者获取到我们该消息,可以根据该全局唯一id实现去重复。
全局唯一id 根据业务来定的 订单号码作为全局的id
实际上还是需要再db层面解决数据防重复。
业务逻辑是在做insert操作 使用唯一主键约束
业务逻辑是在做update操作 使用乐观锁
- 当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)
- 应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿
如何合理选择消息重试
总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。
Rabbitmq如何开启重试策略
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /meite_rabbitmq listener: simple: retry: enabled: true max-attempts: 5 initial-interval: 3000
|
消费者重试过程中,如何避免幂等性问题
重试的过程中,为了避免业务逻辑重复执行,建议提前全局id提前查询,如果存在
的情况下,就无需再继续做该流程。
重试的次数最好有一定间隔次数,在数据库底层层面保证数据唯一性,比如加上唯一id。
SpringBoot开启消息确认机制
配置文件新增
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /meiteVirtualHosts listener: simple: retry: enabled: true max-attempts: 5 initial-interval: 3000 acknowledge-mode: manual datasource: url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8 username: root password: root driver-class-name: com.mysql.jdbc.Driver
|
消费者ack代码
@Slf4j @Component @RabbitListener(queues = "fanout_order_queue") public class FanoutOrderConsumer {
@Autowired private OrderManager orderManager; @Autowired private OrderMapper orderMapper;
@RabbitHandler public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
log.info(">>orderEntity:{}<<", orderEntity.toString()); String orderId = orderEntity.getOrderId(); if (StringUtils.isEmpty(orderId)) { log.error(">>orderId is null<<"); return; } OrderEntity dbOrderEntity = orderMapper.getOrder(orderId); if (dbOrderEntity != null) { log.info(">>该订单已经被消费过,无需重复消费!<<"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } int result = orderManager.addOrder(orderEntity);
log.info(">>插入数据库中数据成功<<"); if (result >= 0) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
} }
|