Java essay Java essay
首页
  • Java基础
  • Java进阶
  • 设计模式
  • Java你不知道的小事
  • Spring初识
  • Spring进阶
  • SpringBoot基础
  • SpringBoot进阶
  • 什么是微服务
  • SpringCloud全家桶
  • Dubbo
  • SpringCloud Alibaba
  • Zookeeper
  • Nginx
  • RabbitMQ
  • RocketMQ
  • Docker入门到精通
  • 性能分析工具
  • 数据库性能优化
  • 性能优化
  • Java基础面试必问
  • JavaWeb面试必问
  • Java框架面试必问
  • 数据库面试必问
  • 中间件面试必问
  • 分布式微服务面试必问
  • Linux面试必问
  • 计算机网络面试必问
  • 开放性问题面试问必问
  • 简介
  • 联系我

Mr.Fire

后端程序员一枚
首页
  • Java基础
  • Java进阶
  • 设计模式
  • Java你不知道的小事
  • Spring初识
  • Spring进阶
  • SpringBoot基础
  • SpringBoot进阶
  • 什么是微服务
  • SpringCloud全家桶
  • Dubbo
  • SpringCloud Alibaba
  • Zookeeper
  • Nginx
  • RabbitMQ
  • RocketMQ
  • Docker入门到精通
  • 性能分析工具
  • 数据库性能优化
  • 性能优化
  • Java基础面试必问
  • JavaWeb面试必问
  • Java框架面试必问
  • 数据库面试必问
  • 中间件面试必问
  • 分布式微服务面试必问
  • Linux面试必问
  • 计算机网络面试必问
  • 开放性问题面试问必问
  • 简介
  • 联系我
  • Zookeeper

  • RabbitMq

    • 高级消费队列RabbitMq入门
      • 前言
      • 1.什么是消息队列
      • 2.下载与安装
        • 2.1安装 Erlang(由于 RabbitMQ 是基于 Erlang 的)
        • 2.2 安装 RabbitMQ
      • 3.RabbitMQ 的工作原理
      • 4.六种消息模型
        • 4.1简单模式(simple)
        • 4.2工作模式(work)
        • 4.3 发布订阅模式(publish/subscribe)
        • 4.4 路由模式(routing)
        • 4.5 主题模式(topic)
        • 4.6 RPC 模式
      • 5.消息确认机制(ACK)
        • 5.1消息发送确认
        • 5.2.消息接收确认
      • 6.Spring Boot 整合 RabbitMQ
      • 7. 扩展面试思考题
  • RocketMQ

  • 中间件
  • RabbitMq
Mr.Fire
2022-11-05
目录

高级消费队列RabbitMq入门

# 前言

说到消息队列,现在主流的很多,今天的主角 RabbitMQ,是一套基于 AMQP 协议的开源消息代理软件,编写语言是 Erlang。本文将讲解 RabbitMQ 入门,从安装到使用到代码。

# 1.什么是消息队列

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。通俗的来讲,消息队列就是生产者生产消息,消费者监听到消息做各自的业务操作,也就是消费消息的过程。

# 2.下载与安装

# 2.1安装 Erlang(由于 RabbitMQ 是基于 Erlang 的)

RabbitMQ 和 Erlang 的对应关系:

Erlang 下载地址:https://www.erlang.org/downloads

安装过程简单粗暴,一直 next 就行。

# 2.2 安装 RabbitMQ

  1. 下载地址:https://www.rabbitmq.com/install-windows-manual.html (opens new window)

  1. 安装:点 next 就行。

  2. 安装完成之后,配置 RabbitMQ,执行以下命令,启用 Web 管理插件:

rabbitmq-plugins enable rabbitmq_management
1
  1. 访问 http://localhost:15672/ (opens new window)

  1. 进入首页,用户名密码 guest/guest:

# 3.RabbitMQ 的工作原理

  • 生产者发送/发布消息到代理。

  • 消费者从代理那里接收消息。RabbitMQ 扮演代理中间件的角色。

  • 当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用交换机来发送。

  • 交换机把消息分发到不同的队列里,消费者就能从监听的队列中消费消息。

# 4.六种消息模型

# 4.1简单模式(simple)

  • 消息产生着将消息放入队列。

  • 消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被消费后,自动从队列中删除。

缺点: 这种模式下消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

应用场景: 客户端服务端模式的聊天程序。

代码:

(1)先定义一个简单的队列存储消息:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:25
* @description:
* @modified By:
* @version: $
  */
  @Configuration
  public class SimpleQueue {

  /**
    * 创建一个简单的队列,叫 hello
    * @return
      */
      @Bean
      public Queue queue() {
          return new Queue("hello");
      }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

(2) 定义消费者:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:25
* @description:消费者,监听 hello 队列
* @modified By:
* @version: $
  */
@Component
public class Consumer1 {

  @RabbitListener(queues = "hello")
  @RabbitHandler
  public void receive(String msg){
    System.out.println("Consumer1 收到消息:"+msg);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

(3)定义生产者:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:27
* @description:这里直接用 rest 接口来做生产者
* @modified By:
* @version: $
  */
@RestController
@RequestMapping("/")
public class RabbitRestController {

  @Autowired
  @Qualifier("fireRabbitTemplate")
  private RabbitTemplate rabbitTemplate;

  private final Logger log =LoggerFactory.getLogger(getClass());

  @RequestMapping("/send")
  public String send() {
    String context = "hello==========" + new Date();
    log.info("发送消息 : " + context);
    //生产者,正在往 hello 这个路由规则中发送,由于没有交换机,所以路由规则就是队列名称
    this.rabbitTemplate.convertAndSend("hello", context);
    return "success";
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

(4) 前端发送 Rest 请求,看控制台效果

http://localhost:8090/send (opens new window)

# 4.2工作模式(work)

  • 消息产生者将消息放入队列。生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢。

  • 消费者 A,消费者 B,当然可以更多,同时监听同一个队列,消费者共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

缺点: 高并发情况下,会产生某一个消息被多个消费者共同消费。

应用场景: 发红包。

代码:

work 模式我们只需要在简单模式的基础上添加一个消费者,也监听 hello 这个队列。

(1)添加消费者2:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:29
* @description:
* @modified By:
* @version: $
  */
@Component
public class Consumer2 {

  @RabbitHandler
  @RabbitListener(queues = "hello")
  public void receive(String msg){
        System.out.println("Consumer2 收到消息:"+msg);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

(2)前端发送 Rest 请求,看控制台效果

http://localhost:8090/send (opens new window)

发送四次请求,消费者 1 和 2 分别接收到两次。

# 4.3 发布订阅模式(publish/subscribe)

  • X 代表交换机 rabbitMQ 内部组件,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中。

  • 消费者监听队列,对应消息队列的消费者拿到消息进行消费。

相关场景: 邮件群发、群聊天。

(1)定义交换机与队列:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:47
* @description:交换机-发布订阅模式
* @modified By:
* @version: $
  */
@Configuration
public class QueueExchange {

      @Bean
      public Queue queueA() {
      return new Queue("queueA", true);
      }
    
      @Bean
      public Queue queueB() {
      return new Queue("queueB", true);
      }
    
      /**
       * 创建一个 fanoutExchange 交换机
       */
      @Bean
      FanoutExchange fanoutExchange() {
      return new FanoutExchange("fanoutExchange");
      }

  /**
    * 将 queueA 队列绑定到 fanoutExchange 交换机上面
      */
      @Bean
      Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {
      return BindingBuilder.bind(queueA).to(fanoutExchange);
      }

  /**
    * 将 queueB 队列绑定到 fanoutExchange 交换机上面
      */
      @Bean
      Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {
      return BindingBuilder.bind(queueB).to(fanoutExchange);
      }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

(2)定义消费者:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:消费者 3
* @modified By:
* @version: $
  */
@Component
public class Consumer3 {

  @RabbitHandler
  @RabbitListener(queues = "queueA")
  public void receive(String msg){
      System.out.println("Consumer3 收到消息:"+msg);
  }
}

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:消费者 4
* @modified By:
* @version: $
  */
@Component
public class Consumer4 {

  @RabbitHandler
  @RabbitListener(queues = "queueB")
  public void receive(String msg){
    System.out.println("Consumer4 收到消息:"+msg);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

(3)定义生产者:

@RequestMapping("/sendExchange")
public String sendToExchange(){
  String context = "exchange=======" + new Date();
  log.info("发送消息 : " + context);
  //生产者,正在往交换机发送消息,交换机会根据绑定的队列来发送(如果多个客户端监听同一个队列,只有一个能收到消息)
  this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
  return "success";
}

1
2
3
4
5
6
7
8
9

(4) 前端发送 Rest 请求,看控制台效果

http://localhost:8090/sendExchange (opens new window)

这里消费者 3 和 4 都收到了消息,因为他们分别监听不同的两个队列。

# 4.4 路由模式(routing)

消息生产者将消息发送给交换机按照路由判断,路由是字符串,交换机根据路由的 key 去匹配,只有匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。

根据业务功能定义路由字符串。

从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

业务场景: 统一门户和子系统交互,每个子系统对应不同的业务处理,通过路由 key 分发不同的消息到对应子系统队列,完成消息消费。

(1)定义交换机与队列,绑定路由 key:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 17:06
* @description:路由模式
* @modified By:
* @version: $
 */
@Configuration
public class QueueRouter {

  public static final String DIRECT_EXCHANGE = "directExchange";
  public static final String QUEUE_DIRECT_A = "direct.A";

  public static final String QUEUE_DIRECT_B = "direct.B";

  /**
   * 创建一个 direct 交换机
   * @return
   */
  @Bean
  DirectExchange directExchange() {
    return new DirectExchange(DIRECT_EXCHANGE);
  }

  @Bean
  Queue queueDirectNameA() {
    return new Queue(QUEUE_DIRECT_A);
  }


  /**
   * 创建队列
   * @return
   */
  @Bean
  Queue queueDirectNameB() {
    return new Queue(QUEUE_DIRECT_B);
  }


  /**
   * 将 direct.A 队列绑定到 directExchange 交换机中,使用 a.key 作为路由规则
   * @param queueDirectNameA
   * @param directExchange
   * @return
   */
  @Bean
  Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
    return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("a.key");
  }

  /**
   * 将 direct.B 队列绑定到 directExchange 交换机中,使用 b.key 作为路由规则
   * @param queueDirectNameB
   * @param directExchange
   * @return
   */
  @Bean
  Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
    return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("b.key");
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

(2)定义消费者

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/
@Component
public class Consumer5 {

      @RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
      @RabbitHandler
      public void receiveA(String msg){
          System.out.println("Consumer5-direct-A 收到路由消息:"+msg);
      }

      @RabbitListener(queues = QueueRouter.QUEUE_DIRECT_B)
      @RabbitHandler
      public void receiveB(String msg){
          System.out.println("Consumer5-direct-B 收到路由消息:"+msg);
      }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

(3)定义生产者

@RequestMapping("/sendRouter")
public String sendToExchangeByRouter(){
  String context = "exchange=======" + new Date();
  log.info("发送路由消息 : " + context);
  //生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用 a.key 作为路由规则来发送
  this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
  return "success";
}
1
2
3
4
5
6
7
8

(4)发送前端请求,看效果

http://localhost:8090/sendRouter (opens new window)

可以看到只有监听了 QUEUE_DIRECT_A 的消费者能收到消息,因为队列 A 使用的路由 key 为 a.key。

# 4.5 主题模式(topic)

注意:与路由模式的区别就是路由 key 可以是通配符,模糊匹配。交换机类型为 topic。

  • 星号井号代表通配符。

  • 星号代表多个单词,井号代表一个单词。

  • 路由功能添加模糊匹配。

  • 消息产生者产生消息,把消息交给交换机。

  • 交换机根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

(1) 定义队列与 topic 交换机:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 17:06
* @description:主题模式
* @modified By:
* @version: $
  */
  @Configuration
  public class QueueTopic {

  public static final String TOPIC_EXCHANGE = "topicExchange";

  public static final String DIRECT_REGXA = "fire.topic.#";
  public static final String DIRECT_REGXB = "fire.topic.b";
  public static final String DIRECT_REGXC = "fire.topic.c";

  public static final String QUEUE_TOPIC_A = "topic.A";

  public static final String QUEUE_TOPIC_B = "topic.B";

  public static final String QUEUE_TOPIC_C = "topic.C";

  /**
    * 创建一个 topic 交换机
    * @return
      */
      @Bean
      TopicExchange topicExchange() {
      return new TopicExchange(TOPIC_EXCHANGE);
      }

  /**
    * 创建队列
    * @return
      */
      @Bean
      Queue queueTopicNameA() {
      return new Queue(QUEUE_TOPIC_A);
      }


    @Bean
    Queue queueTopicNameB() {
        return new Queue(QUEUE_TOPIC_B);
    }

    @Bean
    Queue queueTopicNameC() {
        return new Queue(QUEUE_TOPIC_C);
    }


    /**
     * 将 direct.A 队列绑定到 topicExchange 交换机中,使用 nr.topic.#作为路由规则
     * @param queueTopicNameA
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(DIRECT_REGXA);
    }

    /**
     * 将 direct.B 队列绑定到 topicExchange 交换机中,使用 nr.topic.b 作为路由规则
     * @param queueTopicNameB
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(DIRECT_REGXB);
    }

    /**
     * 将 direct.B 队列绑定到 topicExchange 交换机中,使用 nr.topic.c 作为路由规则
     * @param queueTopicNameC
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessageTopicC(Queue queueTopicNameC, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameC).to(topicExchange).with(DIRECT_REGXC);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

(2)定义消费者

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
  */
  @Component
  public class Consumer6 {

  @RabbitListener(queues = QueueTopic.QUEUE_TOPIC_A)
  @RabbitHandler
  public void receiveA(String msg){
    System.out.println("Consumer6-topic-A 收到路由消息:"+msg);
  }

  @RabbitListener(queues = QueueTopic.QUEUE_TOPIC_B)
  @RabbitHandler
  public void receiveB(String msg){
     System.out.println("Consumer6-topic-B 收到路由消息:"+msg);
  }

  @RabbitListener(queues = QueueTopic.QUEUE_TOPIC_C)
  @RabbitHandler
  public void receiveC(String msg){
    System.out.println("Consumer6-topic-C 收到路由消息:"+msg);
  }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

(3)定义生产者:

@RequestMapping("/sendTopic")
public String sendToExchangeByTopic(){
  String context = "topic=======" + new Date();
  log.info("发送 topic 消息 : " + context);
  //生产者,正在往 topic 交换机发送消息,队列绑定了不同路由规则,交换机会使用 fire.topic.b 作为路由规则来发送
  // 用 fire.topic.b 和 fire.topic.#作为路由 key 的队列都能收到消息
  this.rabbitTemplate.convertAndSend(QueueTopic.TOPIC_EXCHANGE,"fire.topic.b", context);
  return "success";
}
1
2
3
4
5
6
7
8
9

(4)前端发送 Rest 请求,看效果

和预期一样,A 和 B 都收到了消息。

# 4.6 RPC 模式

  • 客户端启动时,它将创建一个匿名排他回调队列。

  • 对于 RPC 请求,客户端发送一条消息,该消息具有两个属性:reply_to(设置为回调队列)和 correlation_id(设置为每个请求的唯一值)。

  • 求被发送到 rpc_queue 队列。

  • RPC 工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它将使用 reply_to 字段中的队列来完成工作,并将消息和结果发送回客户端。

  • 客户端等待回调队列上的数据。当出现一条消息时,它将检查 correlation_id 属性。如果它与请求中的值匹配,则将响应返回给应用程序。

注:此处来源于官网,这里只做简单介绍,详情可看官网。

https://www.rabbitmq.com/tutorials/tutorial-six-python.html (opens new window)

# 5.消息确认机制(ACK)

业务系统中,消息丢了怎么办,消息发送到哪了?我们通常需要一些消息补偿机制去处理这些问题。

消息确认分为两种,发送确认和接收确认

# 5.1消息发送确认

确认生产者将消息发送给交换机,交换机传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列。

(1)通过实现 ConfirmCallBack 接口确认消息发送到交换机

代码:

/**
* 如果消息到达交换机, 则 confirm 回调, ack = true
* 如果消息不到达交换机, 则 confirm 回调, ack = false
* 需要设置 spring.rabbitmq.publisher-confirm-type=correlated
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
    log.info("收到回调:{}", ack == true ? "消息成功到达交换机" : "消息到达交换机失败");
    if (!ack) {
    log.info("correlationData:{}", correlationData.getId());
    log.info("消息到达交换机失败原因:{}", cause);
    // 根据业务逻辑实现消息补偿机制
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13

(2)通过实现 ReturnCallback 接口确认消息从交换机发送到队列。


/**
* 消息从交换机到达队列成功, 则 returnedMessage 不回调
* 消息从交换机到达队列失败, 则 returnedMessage 回调
* 需要设置 spring.rabbitmq.publisher-returns=true
  */
rabbitTemplate.setReturnsCallback(returnedMessage->{
  log.info("消息未到达队列,setReturnsCallback 回调");
  log.info("消息报文:{}", new String(returnedMessage.getMessage().getBody()));
  log.info("消息编号:{}", returnedMessage.getReplyCode());
  log.info("描述:{}", returnedMessage.getReplyText());
  log.info("交换机名称:{}", returnedMessage.getExchange());
  log.info("路由名称:{}", returnedMessage.getRoutingKey());
  // 根据业务逻辑实现消息补偿机制
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 5.2.消息接收确认

(1) 确认模式

AcknowledgeMode.NONE:不确认

AcknowledgeMode.AUTO:自动确认

AcknowledgeMode.MANUAL:手动确认
1
2
3
4
5

需要配置:

spring.rabbitmq.listener.simple.acknowledge-mode = manual
1

(2)代码

消费者确认:

/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
  */
@Component
public class Consumer8 {

    @RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
    @RabbitHandler
    public void receiveA(Message msg, Channel channel) throws IOException {
          try {
              //消息确认机制还可以起到限流作用,比如在接收到此条消息时休眠几秒钟
              Thread.sleep(3000);
              // 确认收到消息,消息将被队列移除
              // false 只确认当前 consumer 一个消息收到,true 确认所有 consumer 获得的消息。
              channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          System.out.println("Consumer8-direct-A 收到确认消息:"+msg);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

生产者:

@RequestMapping("/sendAck")
@ResponseBody
public String sendAck() {
    String context = "exchange=======" + new Date();
    log.info("发送确认消息 : " + context);
    //生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用 a.key 作为路由规则来发送
    this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
    return "success";
}
1
2
3
4
5
6
7
8
9

前端请求看效果:

http://localhost:8090/sendAck (opens new window)

当然还有其他,比如失败确认 basicNack、拒绝 basicReject、basicPublish 重新发布等。

# 6.Spring Boot 整合 RabbitMQ

(1)新建一个 Maven 工程:

勾选 Spring Web 和 RabbitMQ 的依赖,也可以建好工程自己添加,创建完成,来看看 POM 文件依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

(2)配置 yml 或者 properties,我这里使用 properties。

server.port= 8090
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消息确认需要配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
#手动确认消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual
1
2
3
4
5
6
7
8
9

(3)启动项目

到此,已经可以开始开发你自己的业务逻辑了。

# 7. 扩展面试思考题

  • 消息基于什么传输?

  • 如何避免消息重复投递或重复消费?

  • 如何保证消息不丢失?

  • 手动确认模式中,消息手动拒绝中如果 requeue 为 true 会重新放入队列,消费者处理过程中一直有异常情况下会导致入队—拒绝—入队的死循环,怎么处理?

参考:

https://www.rabbitmq.com/documentation.html (opens new window)

http://rabbitmq.mr-ping.com/ (opens new window)

最后更新时间: 2022/11/20, 20:59:47
Zookeeper如何保证数据一致性
RocketMQ单机和集群搭建

← Zookeeper如何保证数据一致性 RocketMQ单机和集群搭建→

最近更新
01
分布式系统核心理论CAP和BASE理论
03-05
02
分布式锁的几种实现方式
03-05
03
一文详解ThreadLocal是什么
03-01
更多文章>
Theme by Vdoing | Copyright © 2021-2023 Mr.Fire | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式