返回顶部
首页 > 资讯 > 后端开发 > GO >Golang实现基于Redis的可靠延迟队列
  • 918
分享到

Golang实现基于Redis的可靠延迟队列

摘要

目录前言原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume前言 在之前探讨延时队列的文章中我们提到了 Redisson delayque

前言

在之前探讨延时队列的文章中我们提到了 Redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 Go 语言社区中并无类似的库。不过问题不大,没有轮子我们自己造。

本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用。

使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member, 投递时间戳作为 score 使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。

然而消息队列不是将消息发给消费者就万事大吉,它们还有一个重要职责是确保送达和消费。通常的实现方式是当消费者收到消息后向消息队列返回确认(ack),若消费者返回否定确认(nack)或超时未返回,消息队列则会按照预定规则重新发送,直到到达最大重试次数后停止。如何实现 ack 和重试机制是我们要重点考虑的问题。

我们的消息队列允许分布式地部署多个生产者和消费者,消费者实例定时执行 lua 脚本驱动消息在队列中的流转无需部署额外组件。由于 Redis 保证了 lua 脚本执行的原子性,整个流程无需加

消费者采用拉模式获得消息,保证每条消息至少投递一次,消息队列会重试超时或者被否定确认的消息(nack) 直至到达最大重试次数。一条消息最多有一个消费者正在处理,减少了要考虑的并发问题。

请注意:若消费时间超过了 MaxConsumeDuration 消息队列会认为消费超时并重新投递,此时可能有多个消费者同时消费。

具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:

package main

import (
	"GitHub.com/go-redis/redis/v8"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
		// 注册处理消息的回调函数
        // 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
		return true
	})
	// 发送延时消息
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}

	// start consume
	done := queue.StartConsume()
	<-done
}

由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。

原理详解

消息队列涉及几个关键的 redis 数据结构

  • msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免
  • pendingKey: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。
  • readyKey: 列表类型,需要投递的消息 ID。
  • unAckKey: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。
  • retryKey: 列表类型,已到重试时间的消息 ID
  • garbageKey: 集合类型,用于暂存已达重试上线的消息 ID
  • retryCounTKEy: 哈希表类型,键为消息 ID, 值为剩余的重试次数

流程如下图所示:

Golang实现基于Redis的可靠延迟队列

由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。

因此我们需要保证上图中五个操作满足三个条件:

  • 都是原子性的
  • 不会重复处理同一条消息
  • 操作前后消息队列始终处于正确的状态

只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。

是不是听起来有点吓人?其实简单的很,让我们一起来详细看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 扫描已到投递时间的消息ID并把它们移动到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 从 pending key 中找出已到投递时间的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中
for _,v in ipairs(msgs) do
	table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 从 pending key 中删除已投递的消息

ready2UnackScript

ready2UnackScript 从 ready 或者 retry 中取出一条消息发送给消费者并放入 unack 中,类似于 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 从 retry 中找出所有已到重试时间的消息并把它们移动到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 找到已到重试时间的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数
for i,v in ipairs(retryCounts) do
	local k = msgs[i]
	if tonumber(v) > 0 then -- 剩余次数大于 0
		redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数
		redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
	else -- 剩余重试次数为 0
		redis.call("HDel", KEYS[2], k) -- 删除重试次数记录
		redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除
	end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 将已处理的消息从 unack key 中删除

因为 redis 要求 lua 脚本必须在执行前在 KEYS 参数中声明自己要访问的 key, 而我们将每个 msg 有一个独立的 key,我们在执行 unack2RetryScript 之前是不知道哪些 msg key 需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key 中,脚本执行完后再通过 del 命令将他们删除:

func (q *DelayQueue) garbageCollect() error {
	ctx := context.Background()
	msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
	if err != nil {
		return fmt.Errorf("smembers failed: %v", err)
	}
	if len(msgIds) == 0 {
		return nil
	}
	// allow concurrent clean
	msgKeys := make([]string, 0, len(msgIds))
	for _, idStr := range msgIds {
		msgKeys = append(msgKeys, q.genMsgKey(idStr))
	}
	err = q.redisCli.Del(ctx, msgKeys...).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("del msgs failed: %v", err)
	}
	err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("remove from garbage key failed: %v", err)
	}
	return nil
}

之前提到的 lua 脚本都是原子性执行的,不会有其它命令插入其中。 GC 函数由 3 条 redis 命令组成,在执行过程中可能会有其它命令插入执行过程中,不过考虑到一条消息进入垃圾回收流程之后不会复活所以不需要保证 3 条命令原子性。

ack

ack 只需要将消息彻底删除即可:

func (q *DelayQueue) ack(idStr string) error {
	ctx := context.Background()
	err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
	if err != nil {
		return fmt.Errorf("remove from unack failed: %v", err)
	}
	// msg key has ttl, ignore result of delete
	_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
	q.redisCli.HDel(ctx, q.retryCountKey, idStr)
	return nil
}

否定确认只需要将 unack key 中消息的重试时间改为现在,随后执行的 unack2RetryScript 会立即将它移动到 retry key

func (q *DelayQueue) nack(idStr string) error {
	ctx := context.Background()
	// update retry time as now, unack2Retry will move it to retry immediately
	err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
		Member: idStr,
		Score:  float64(time.Now().Unix()),
	}).Err()
	if err != nil {
		return fmt.Errorf("negative ack failed: %v", err)
	}
	return nil
}

consume

消息队列的核心逻辑是每秒执行一次的 consume 函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer 来消费消息:

func (q *DelayQueue) consume() error {
	// 执行 pending2ready,将已到时间的消息转移到 ready
	err := q.pending2Ready()
	if err != nil {
		return err
	}
	// 循环调用 ready2Unack 拉取消息进行消费
	var fetchCount uint
	for {
		idStr, err := q.ready2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	// 将 nack 或超时的消息放入重试队列
	err = q.unack2Retry()
	if err != nil {
		return err
	}
    // 清理已达到最大重试次数的消息
	err = q.garbageCollect()
	if err != nil {
		return err
	}
	// 消费重试队列
	fetchCount = 0
	for {
		idStr, err := q.retry2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	return nil
}

至此一个简单可靠的延时队列就做好了,何不赶紧开始试用呢?

以上就是golang实现基于Redis的可靠延迟队列的详细内容,更多关于Golang Redis可靠延迟队列的资料请关注我们其它相关文章!

您可能感兴趣的文档:

--结束END--

本文标题: Golang实现基于Redis的可靠延迟队列

本文链接: https://lsjlt.com/news/33016.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Golang实现基于Redis的可靠延迟队列
    目录前言原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume前言 在之前探讨延时队列的文章中我们提到了 redisson delayque...
    99+
    2022-06-22
    Golang Redis可靠延迟队列 Golang Redis 延迟队列 Golang 延迟队列
  • 百行代码实现基于Redis的可靠延迟队列
    目录原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用...
    99+
    2022-06-23
    Redis可靠延迟队列 Redis延迟队列
  • 基于Golang实现延迟队列(DelayQueue)
    目录背景原理堆随机删除重置元素到期时间Golang实现数据结构实现原理添加元素阻塞获取元素Channel方式阻塞读取性能测试总结背景 延迟队列是一种特殊的队列,元素入队时需要指定到期...
    99+
    2024-04-02
  • 基于Redis延迟队列的实现代码
    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户。 2.对于实时支付场景,如果账户 A 对商户 S 付款...
    99+
    2024-04-02
  • Redis延迟队列和分布式延迟队列的简答实现
            最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redi...
    99+
    2024-04-02
  • 如何实现Redis延迟队列
    这期内容当中小编将会给大家带来有关如何实现Redis延迟队列,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么...
    99+
    2024-04-02
  • Redis如何实现延迟队列
    目录Redis实现延迟队列Redis延迟队列Redis实现延时队列的优化方案延时队列的应用延时队列的实现总结Redis实现延迟队列 Redis延迟队列 Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,Z...
    99+
    2023-04-28
    Redis延迟队列 Redis实现延迟队列 Redis队列
  • redis延迟队列如何实现
    redis 延迟队列的实现采用有序集合,将任务以分数(时间戳)存储,定期检索已到期的任务,删除并执行。步骤如下:创建有序集合 delayed_queue,将任务以分数(时间戳)存储。检索...
    99+
    2024-06-12
    redis 键值对
  • Go+Redis实现延迟队列实操
    目录前言简单的实现定义消息PushConsume存在的问题多消费者实现定义消息PushConsume存在的问题总结前言 延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消...
    99+
    2024-04-02
  • 怎么在Redis中实现延迟队列和分布式延迟队列
    这篇文章给大家介绍怎么在Redis中实现延迟队列和分布式延迟队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1. 实现一个简单的延迟队列。  我们知道目前JAVA可以有DelayedQueue,我们首先开一个Dela...
    99+
    2023-06-15
  • 使用Redis怎么实现延迟队列
    本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。方案一:采用通过定时任务采用数据库/非关系型数据库轮询方案。优点: 实现简...
    99+
    2023-06-15
  • Redis实现延迟队列方法介绍
    延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么场景下我才需要这样的队列呢? 1. 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何及时的关闭订单如何定期检查处于退款状态的订单是否已经退款成功在订单长时间没有...
    99+
    2023-09-17
    redis java java-rabbitmq
  • .Net实现延迟队列
    目录介绍使用场景方案Redis过期事件配置控制台订阅WebApi中订阅RabbitMq延迟队列生产消息消费消息其他方案介绍 具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就...
    99+
    2024-04-02
  • Redis实现延迟队列的全流程详解
    目录1、前言1.1、什么是延迟队列1.2、应用场景1.3、为什么要使用延迟队列2、Redis sorted set3、Redis 过期键监听回调4、Quartz定时任务5、Delay...
    99+
    2023-03-14
    Redis延迟队列实现 Redis延迟队列原理
  • Redis实现延迟队列的方法是什么
    这篇文章主要介绍“Redis实现延迟队列的方法是什么”,在日常操作中,相信很多人在Redis实现延迟队列的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Redis实现延迟队列的方法是什么”的疑惑有所...
    99+
    2023-07-05
  • 如何在Redis中实现延迟任务队列
    在Redis中实现延迟任务队列可以使用有序集合(Sorted Set)和定时任务的方式来实现。以下是一个基本的实现方法: 将任务存...
    99+
    2024-04-09
    Redis
  • 基于Redis实现延时队列的优化方案小结
    目录一、延时队列的应用二、延时队列的实现三、总结一、延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小...
    99+
    2022-07-05
    Redis延时队列 Redis延时队列优化
  • Redis优雅地实现延迟队列的方法分享
    目录前言使用依赖配置配置文件demo代码执行效果原理分析队列创建生产者消费者整个流程总结思考前言 工作中常常会遇到这样的场景,如订单到期未支付取消,到期自动续费等,我们发现延迟队列非常适合在这样的场景中使用。常见的延迟队...
    99+
    2023-02-26
    Redis实现延迟队列 Redis延迟队列
  • Redis在PHP应用中的延迟队列
    随着PHP应用的不断发展,延迟队列的应用变得越来越普遍。而在PHP应用中,一个可靠的延迟队列方案是非常必要的。本文将介绍Redis在PHP应用中的延迟队列,着重讨论Redis的数据结构、使用场景以及一些最佳实践。一、Redis数据结构在理解...
    99+
    2023-05-16
    redis PHP应用 延迟队列
  • 如何实现一个延迟队列
    本篇内容介绍了“如何实现一个延迟队列”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!延迟队列定义首先,队列这...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作