假设我们有这样一个场景,系统会在某个特定的情况下给用户推送一条消息,可能是短信、邮件或站内信,由于该场景消息可能会在某一个时刻同时推送大量的消息,且主进程不希望会阻塞。该场景对实时性要求不高,允许消息被延时送达。在系统的构建初期,使用专业的消息队列中间件Rabbitmq和Kafka来实现消息的异步推送就显得不是很方便,此时我们可以考虑使用Redis来实现简单的消息队列。当然,假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。
实现的原理是生产者将数据push到Redis的list里,消费者轮训去pop数据,如果能取到数据,则直接消费数据,如果等不到则继续循环pop数据。
以上示意图中的红色箭头就是模拟的一个生产者从左部添加数据,消费者在右侧获取数据的场景。反之一样。
但是这是就有个问题,假如队列空了怎么办?当列表为空时,消费者就会不断的轮训来获取数据,但是每次都获取不到数据,就会陷入一个取不到数据的死循环里,这不仅拉高了客户端的CPU,还拉高了Redis的QPS,并且这些访问都是无效的。
这时我们可以使用sleep(1)的方式去延时1秒,也可以使用Redis提供的阻塞式访问,BRPP和BLPOP命令,消费者可以在获取不到数据的时候指定一个如果数据不存在的阻塞的超时时间,如果在这个时间内能取到数据,则会立即返回,否则会返回null,当这个超时时间设置为0的时候,表示会一直阻塞,但我们通常并不建议如此。如果都有多个客户端同时在阻塞等待消息,则会按照先后顺序排序。
首先继续先看一下redis client。
import (
"log"
"github.com/go-redis/redis"
)
// 队列的key
var queueKey = "queue:message"
var rdb *redis.Client
func NewRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})
pong, err := rdb.Ping().Result()
if err != nil {
log.Fatalln(err)
}
log.Println(pong,"redis success.")
}
生产者的具体实现代码,模拟一个随机生成消息的生产者,使用lpush将数据添加到list里。
// 使用list生产消息
func ProducerMessageList(){
rand.Seed(time.Now().UnixNano())
log.Println("开启生产者。。。。")
for i := 0;i < 10;i++ {
score := time.Now().Unix()
log.Println("正在生产一条消息...", score, i)
_,err := rdb.LPush(queueListKey,i).Result()
if err != nil {
log.Println(err)
}
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
}
}
消费者则使用rpop来从队列里pop出一条消息进行消费,但我们前面讲过,如果队列空了,则会不断的轮训pop消息,会造成大量的资源的浪费,因此我们此处使用brpop命令来实现阻塞的读,阻塞读在队列没有数据时会立即进入休眠状态,一旦有数据了,则会立即被唤醒并弹出消息,延迟可以忽略不计。
// 使用list格式消费消息
func ConsumerMessageList() {
for {
// 设置一个5秒的超时时间
value, err := rdb.BRPop(5 *time.Second,"queue:list").Result()
if err == redis.Nil{
// 查询不到数据
time.Sleep(1 * time.Second)
continue
}
if err != nil {
// 查询出错
time.Sleep(1 * time.Second)
continue
}
log.Println("消费到数据:", value, "当前时间是:", time.Now().Unix())
time.Sleep(time.Second)
}
}
从代码里可以看到,我们设置了一个5秒的阻塞读超时时间,是因为阻塞读也不能一直阻塞,长时间的阻塞可能会被服务器端主动断开链接,然后会抛出异常,所以这里需要设置一个不是很长的阻塞超时时间。
实现的原理是将生产的数据使用zadd命令存入Redis,将field的的score设置成需要延时的时间戳,如果需要立即执行,不需要延迟的话,则将score设置成当前时间戳即可。同时单独运行一个goroutine,使用zrangebyscore 命令去截取数据,数据的score为当前时间戳,这样一来,到达执行时间的数据将被取出,然后我们取第一个数据,使用zrem命令将数据移除队列,移除成功则表示该条消息允许被发送,否则,重新执行以上流程。
我们先来看一下使用go-redis实现的Redis客户端链接。
import (
"log"
"github.com/go-redis/redis"
)
// 队列的key
var queueKey = "queue:message"
var rdb *redis.Client
func NewRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})
pong, err := rdb.Ping().Result()
if err != nil {
log.Fatalln(err)
}
log.Println(pong,"redis success.")
}
注意:这里demo环境下,我使用了一个var rdb *redis.Client,后面的代码均使用rdb直接来调用Redis,实际项目中根据项目的情况来。
这里我模拟了一个随机产生消息的情况, 我们使用随机产生一个数字来延时一段时间,用来观察消费者是否能实时对数据的消费。
// 生产消息
func ProducerMessage() {
rand.Seed(time.Now().UnixNano())
log.Println("开启生产者。。。。")
for i := 0;i < 5;i++ {
score := time.Now().Unix()
log.Println("正在生产一条消息...", score, i)
rdb.ZAdd("queue:message", redis.Z{
Score: float64(score + 1),// 秒级时间戳+1,表示延时1秒
Member: i,
})
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
}
}
产生的数据将使用zadd命令存入Redis,并添加score为当前的时间戳,并给score添加一个延时时间,由于示例中使用的秒级时间戳作为score,则我们延时1秒就直接score+1即可。member里可以存我们要延时的数据,比如将要发送的消息。
使用zrangebyscore截取截止到当前时间戳的消息,如果消息延时到当前时间,则可以直接被截取出来,并使用zrem移除,目的是为了防止同一条数据被重复消费,移除成功的消息才可进行后续的消费过程。
func ConsumerMessage() {
log.Println("正在启动消费者...")
for {
// score := time.Now().UnixNano()
values, err := rdb.ZRangeByScore("queue:message", redis.ZRangeBy{
Min: "0",
Max: fmt.Sprint(time.Now().Unix()),
Offset: 0,
Count: 1,
}).Result()
if err != nil {
log.Fatalln(err)
// Redis查询出错,延迟1秒继续
time.Sleep(time.Second)
continue
}
if len(values) == 0 {
// 没有数据,延迟1秒继续
// time.Sleep(time.Second)
continue
}
// 由于使用zrangebyscore的时候指定了count=1,因此此处理论上只会有一条数据
value := values[0]
num, err := rdb.ZRem("queue:message", value).Result()
if err != redis.Nil && err != nil {
log.Println(err)
time.Sleep(time.Second)
continue
}
if num == 1 {
log.Println("消费到数据:", value, "当前时间是:", time.Now().Unix())
// 模拟一个耗时的操作
time.Sleep(2 * time.Second)
}
}
}
由于我们使用秒级时间戳作为score,因此我们在查询无数据时可以采用sleep(1)的操作减少对Redis的重复请求。
Redis里有一个单独的模块来针对一对多的单向通信方案,这就是PubSub模块,PubSub模块可以用作广播模式,即一个发布者多个订阅者。
发布者订阅者模式可以理解为:订阅者(Subscriber)会在Redis上订阅一个通道,生产者在该通道上发布一条消息,订阅者就会立刻收到该消息,如果我们有多个订阅者,那么发布者发布的消息会被多个订阅者同时收到一模一样的消息。
我们可以基于此模型来实现一个简单的消息队列,使用一个发布者和一个订阅者来实现消息的发布和订阅。
实战代码里的Redis client和上面一样,就不再列出了。
发布者使用publish命令想频道里发布数据,订阅者即可以订阅到消息
func ProducerMessagePubSub() {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 10; i++ {
log.Println("正在生产一条消息...", i)
r, err := rdb.Publish("queue:pubsub", i).Result()
if err != nil {
log.Println(err,r)
}
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
}
}
订阅者订阅指定的频道,然后使用管道来接收消息,并处理消息。
func ConsumerMessagePubSub(node int){
//订阅频道
pubsub := rdb.Subscribe("queue:pubsub")
// 用管道来接收消息
ch := pubsub.Channel()
// 处理消息
for msg := range ch {
log.Printf("当前节点:%d,消费到数据,channel:%s;message:%s\n", node, msg.Channel, msg.Payload)
}
}
当然如果我们会发布大量的消息, 同时会有多个消费者去消费,也可以将通道分成多个,每个通道有自己的订阅者订阅,然后发布者在发布消息的时候根据节点ID或随机分配的方式分配到每个通道上来实现。
不过我们也需要注意,以上的示例代码里,如果同时开启两个goroutine的话,发布者立刻Publish消息,而订阅者不能在第一时间订阅到消息,因为相对于用go这个关键字去开启两个goroutine的话,几乎是瞬间的,因此一定要先使订阅者订阅到频道后,再有发布操作。