你在实际业务中是否有遇到过下面这样的场景:

  1. 订单十分钟没有支付,自动取消
  2. 用户注册成功后无任何操作,一天后自动提醒
  3. 预定票之后,在出发之前的一天自动提醒用户

这样类似的场景经常会发生在实际的业务中,它们总有一个共性,就是当前并不是马上触发,而是需要过一段时间才进行触发,当触发时间到达时才进行具体的执行。那么问题就来了,为了实现这样的功能,我们如何更加灵活的实现呢?

为什么使用延迟队列

我们以 订单十分钟过期 场景举例:

  • 方案 1:为当前订单创建一个定时器,定时器时间到了之后自动去查询当前订单状态,如果没有支付,则进行取消操作

  • 方案 2:设定一个总的定时器,每一分钟检查一次,当检查发现过期的订单就直接进行取消操作

  • 方案 3:如果你有一个延迟队列,你只需将任务丢进去,等到了对应的时间,这个任务会出队,然后出队的时候进行订单过期时间判断

方案比较

正所谓抛弃场景谈方案都是耍流氓:我的观点也很明确,这三种方案都有自己所试用的场景。

方案 1

如果全局只有一个用户,并且这个订单又是那种量比较小的,可能每天有个 30 个已经撑死了,这样的后台的系统,可能都谈不上需要高可用的情况,那么方案一的简单实现就足够了。

方案 1 的优点是时间肯定是准的,问题也很明显,使用了过多的定时器会使得系统压力变大,并且肯定有单点问题;当然可以搞个分布式定时任务调度….搞个 cronjob 也能做,

方案 2

这个方案可谓是经常被使用了,虽然看着不太优雅,但是实现简单,大多数简单场景其实用它也够了;但是它也有一个最大的问题,就是时间不准,很有可能到了对应的时间还没有轮到检查,就还没有过期,所以对于时间要求比较高的情况就不能使用了。当然你也可以缩小检查的时间间隔,但是同样的就会增加 系统的压力。

方案 3

看似这个方案是一个最优雅的解决方案,确实,不得不承认,如何有这样一个队列的话,那么不仅可以解决时间不准的问题,也可能解决压力的问题。但是这个方案的问题就是需要单独维护这个队列了,如果这个队列是个单点,那么出问题一样凉凉。

如何使用延迟队列

分析完了使用场景,进入我们今天的主角,我们在 golang 里面如何使用 rabbitmq 构建这样的一个延迟队列

如果让你来实现

首先考虑一下如果让你自己来实现你会怎么做?下面是个人的想法:

那么首先这个队列并不是一个简单的队列了,应该是一个以时间为 key 的小顶堆,每一个任务来了之后都按时间排序入堆。

然后不停的判断堆顶元素是否满足条件,如果满足条件则出堆。

这样的设计就好像 golang timer 的旧版本设计类似(挖个坑有机会写一篇 golang timer 分析)

rabbitmq 要如何使用

我们知道 mq 可不就是消息从一端发送,另一端进行接收嘛,那要如何实现延迟呢?

首先要引入一个概念:死信队列,当我们的发送的消息被接收端nck或reject,消息在队列的存活时间超过设定的 TTL,消息数量超过最大队列长度,这样的消息会被认为是死信(“dead letter”)通过配置的死信交换机这样的死信可以被投递到对应的死信队列中

没错,你会发现第二个条件就是实现一个延迟队列的关键。

image-20210919173419547

  • 我们将需要延迟的消息设定需要延迟的时间,也就是这个消息的最大存活时间(TTL),然后发送到普通队列中
  • 然后因为普通队列中没有消费者,所以只有静静的等待消息超时
  • 消息超时后,经过死信交换机,发送到对应的死信队列中
  • 我们只需要消费对应死信队列中的消息即可认为当前消息对应的任务已经到了应该执行的时间了

坑点

我一开始也是这样想的,一切看起来很完美对不对?然后现实并不是那么简单。

举例来说,如果当前队列中为 A -> B -> C

  • A:过期时间为 1 分钟
  • B:过期时间为 5 分钟
  • C:过期时间为 10 分钟

而如果只是按照上面的方式实现,那么因为它毕竟还是一个队列,只有当 C 过期了之后,出队了之后才轮到 B 和 A

也就是说,即使你已经过期了,但是因为你排在后面,还是轮不到你先出队,也就没有办法到死信队列了。

所以这也就是为什么我一开始想着实现的时候,这并不是一个队列,而是一个堆的实现,因为过期早的其实应该排到前面去才对。

那咋办

别慌,有插件的支持 https://www.rabbitmq.com/community-plugins.html rabbitmq_delayed_message_exchange

只需要安装了插件,这个功能就能实现了,有关这个插件的安装比较简单,这里不多做介绍。(我采用的是 docker 部署,所以直接挂载到对应的目录,并指定启用对应的插件并重启就可以了)

golang 实现连接

发送者

发送者的实现就很简单了,就和普通的发送实现几乎一致,因为反正就是投递到对应的队列中就可以了,只需要将发送消息的部分,在消息的 header 中加入 x-delay 字段表示当前消息的 TTL 就可以了,也就是设定延迟时间,注意单位为毫秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package producer

import (
"encoding/json"
"errors"
"time"

"github.com/streadway/amqp"
)

// Config 链接配置
type Config struct {
Addr, Exchange, Queue, RoutingKey string
AutoDelete bool
}

// Producer rabbitmq 生产者
type Producer struct {
conn *amqp.Connection
Channel *amqp.Channel
Queue amqp.Queue
config Config
done chan bool
connErr chan error
channelErr chan *amqp.Error
}

// NewProducer 创建生产者
func NewProducer(config Config) *Producer {
return &Producer{
config: config,
done: make(chan bool),
connErr: make(chan error),
channelErr: make(chan *amqp.Error),
}
}

// Connect 链接到 MQ 服务器
func (c *Producer) Connect() error {
var err error
if c.conn, err = amqp.Dial(c.config.Addr); err != nil {
return err
}

if c.Channel, err = c.conn.Channel(); err != nil {
_ = c.Close()
return err
}

// watching tcp connect
go c.WatchConnect()
return nil
}

// Close to close remote mq server connection
func (c *Producer) Close() error {
close(c.done)

if !c.conn.IsClosed() {
if err := c.conn.Close(); err != nil {
logger.Error("rabbitmq producer - connection close failed: ", err)
return err
}
}
return nil
}

// Publish 发送消息至mq
func (c *Producer) Publish(body []byte, delay int64) error {
publishing := amqp.Publishing{
ContentType: "text/plain",
Body: body,
}
if delay >= 0 {
publishing.Headers = amqp.Table{
"x-delay": delay,
}
}
err := c.Channel.Publish(c.config.Exchange, c.config.RoutingKey, true, false, publishing)
if err != nil {
target := &amqp.Error{}
if errors.As(err, target) {
c.channelErr <- target
} else {
c.connErr <- err
}
}
return err
}

// PublishJSON 将对象JSON格式化后发送消息
func (c *Producer) PublishJSON(body interface{}, delay int64) error {
if data, err := json.Marshal(body); err != nil {
return err
} else {
return c.Publish(data, delay)
}
}

// WatchConnect 监控 MQ 的链接状态
func (c *Producer) WatchConnect() {
ticker := time.NewTicker(30 * time.Second) // every 30 second
defer ticker.Stop()

for {
select {
case err := <-c.connErr:
logger.Errorf("rabbitmq producer - connection notify close: %s", err.Error())
c.ReConnect()

case err := <-c.channelErr:
logger.Errorf("rabbitmq producer - channel notify close: %s", err.Error())
c.ReConnect()

case <-ticker.C:
c.ReConnect()

case <-c.done:
logger.Debug("auto detect connection is done")
return
}
}
}

// ReConnect 根据当前链接状态判断是否需要重新连接,如果连接异常则尝试重新连接
func (c *Producer) ReConnect() {
if c.conn == nil || (c.conn != nil && c.conn.IsClosed()) {
logger.Errorf("rabbitmq connection is closed try to reconnect")
if err := c.Connect(); err != nil {
logger.Errorf("rabbitmq reconnect failed: %s", err.Error())
} else {
logger.Infof("rabbitmq reconnect succeeded")
}
}
}

消费者

消费者部分主要是需要声明正确的交换机类型和对应的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package consumer

import (
"os"
"time"

"github.com/streadway/amqp"
)

// RabbitConsumer rabbitmq 消费者
type RabbitConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
connNotify chan *amqp.Error
channelNotify chan *amqp.Error
done chan struct{}
addr string
exchange string
queue string
routingKey string
consumerTag string
autoDelete bool
handler func([]byte) error
delivery <-chan amqp.Delivery
}

// NewConsumer 创建消费者
func NewConsumer(addr, exchange, queue, routingKey string, autoDelete bool, handler func([]byte) error) *RabbitConsumer {
hostname, _ := os.Hostname()
return &RabbitConsumer{
addr: addr,
exchange: exchange,
queue: queue,
routingKey: routingKey,
consumerTag: hostname,
autoDelete: autoDelete,
handler: handler,
done: make(chan struct{}),
}
}

func (c *RabbitConsumer) Start() error {
if err := c.Run(); err != nil {
return err
}
go c.ReConnect()
return nil
}

func (c *RabbitConsumer) Stop() {
close(c.done)

if !c.conn.IsClosed() {
// 关闭 SubMsg message delivery
if err := c.channel.Cancel(c.consumerTag, true); err != nil {
logger.Error("rabbitmq consumer - channel cancel failed: ", err)
}

if err := c.conn.Close(); err != nil {
logger.Error("rabbitmq consumer - connection close failed: ", err)
}
}
}

func (c *RabbitConsumer) Run() (err error) {
if c.conn, err = amqp.Dial(c.addr); err != nil {
return err
}

if c.channel, err = c.conn.Channel(); err != nil {
c.conn.Close()
return err
}

defer func() {
if err != nil {
c.channel.Close()
c.conn.Close()
}
}()

// 声明一个主要使用的 exchange
err = c.channel.ExchangeDeclare(
c.exchange, "x-delayed-message", true, c.autoDelete, false, false, amqp.Table{
"x-delayed-type": "fanout",
})
if err != nil {
return err
}

// 声明一个延时队列, 延时消息就是要发送到这里
q, err := c.channel.QueueDeclare(c.queue, false, c.autoDelete, false, false, nil)
if err != nil {
return err
}

err = c.channel.QueueBind(q.Name, "", c.exchange, false, nil)
if err != nil {
return err
}

c.delivery, err = c.channel.Consume(
q.Name, c.consumerTag, false, false, false, false, nil)
if err != nil {
return err
}

go c.Handle()

c.connNotify = c.conn.NotifyClose(make(chan *amqp.Error))
c.channelNotify = c.channel.NotifyClose(make(chan *amqp.Error))
return
}

func (c *RabbitConsumer) ReConnect() {
for {
select {
case err := <-c.connNotify:
if err != nil {
logger.Error("rabbitmq consumer - connection NotifyClose: ", err)
}
case err := <-c.channelNotify:
if err != nil {
logger.Error("rabbitmq consumer - channel NotifyClose: ", err)
}
case <-c.done:
return
}

// backstop
if !c.conn.IsClosed() {
// close message delivery
if err := c.channel.Cancel(c.consumerTag, true); err != nil {
logger.Error("rabbitmq consumer - channel cancel failed: ", err)
}

if err := c.conn.Close(); err != nil {
logger.Error("rabbitmq consumer - channel cancel failed: ", err)
}
}

// IMPORTANT: 必须清空 Notify,否则死连接不会释放
for err := range c.channelNotify {
logger.Error(err)
}
for err := range c.connNotify {
logger.Error(err)
}

quit:
for {
select {
case <-c.done:
return
default:
logger.Error("rabbitmq consumer - reconnect")

if err := c.Run(); err != nil {
logger.Error("rabbitmq consumer - failCheck: ", err)
// sleep 15s reconnect
time.Sleep(time.Second * 15)
continue
}
break quit
}
}
}
}

func (c *RabbitConsumer) Handle() {
for d := range c.delivery {
go func(delivery amqp.Delivery) {
if err := c.handler(delivery.Body); err != nil {
// 重新入队,否则未确认的消息会持续占用内存,这里的操作取决于你的实现,你可以当出错之后并直接丢弃也是可以的
_ = delivery.Reject(true)
} else {
_ = delivery.Ack(false)
}
}(d)
}
}
  • 交换机必须设定为 x-delayed-message
  • 需要设定 x-delayed-type 根据你所需要的路由方式 topic\fanout\direct….

问题

其实 rabbitmq 的这个延迟队列也是有一些问题的:

  1. 延迟队列插件的实现是先将消息存到一个 Mnesia 一个分布式数据库管理系统,所以消息有没有落盘 Mnesia 重启之后能否存在也就会影响消息的延迟触发了
  2. 并且插件官方也说了不支持 RAM 节点
  3. 100s of thousands or millions 也就是 数十万或数百万 消息的场景也会有问题,毕竟定时还是有瓶颈的 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72

总之对于我当前业务场景的使用确实是绰绰有余了,但是你在使用之前还是需要提前考虑好这些问题。

总结

如果你需要使用 rabbitmq 实现一个延迟队列,就需要看你的使用场景了,如果你的使用场景,延迟时间相同,可以直接使用 TTL + 死信交换机来实现,如果延迟时间不确定,则需要安装插件来满足实现。

参考链接

插件官方地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

直接参考这个 golang 代码进行实现即可:https://github.com/ghigt/rabbitmq-delayed

非插件的版本可以参考这个实现:https://blog.justwe.site/post/go-rabbitmq-delay-queue/

https://mp.weixin.qq.com/s/T6CKU3m8Xv7XYGbquz-04w

https://www.cnblogs.com/mfrank/p/11260355.html#autoid-0-7-0