返回顶部
首页 > 资讯 > 前端开发 > node.js >Node.js + Redis Sorted Set实现任务队列
  • 527
分享到

Node.js + Redis Sorted Set实现任务队列

队列jsNode 2022-06-04 17:06:34 527人浏览 泡泡鱼
摘要

需求:功能 A 需要调用第三方 api 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔

需求:功能 A 需要调用第三方 api 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。

根据以上问题,想到使用 node.js + Redis sorted set 来实现任务队列。node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。

在设计任务队列的过程中需要考虑到的几个问题

并行执行多个任务 任务唯一性 任务成功或失败后的处理

针对以上问题的解决方案

并行执行多个任务利用 Promise.all 来实现 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码


// remote_api.js 模拟第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
  setTimeout(() => {
    let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求
    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
  }, 3000);
});

app.listen('9001', () => {
  console.log('API 服务监听端口:9001');
});
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
  // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
    if (error) {
      console.log(error);
    } else {
      if (task) {
        console.log('任务已存在,不新增相同任务');
        callback(null, task);
      } else {
        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, result);
          }
        });
      }
    }
  });
}

app.get('/', (req, res) => {
  let taskName = req.query['task-name'];
  addTaskToQueue(taskName, (error, result) => {
    if (error) {
      console.log(error);
    } else {
      res.status(200).send('正在查询中......');
    }
  });
});

app.listen(9002, () => {
  console.log('生产者服务监听端口:9002');
});
// consumer.js 定时获取任务并执行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量

function getTasksFroMQueue(callback) {
  // 获取多个任务
  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
    if (error) {
      callback(error);
    } else {
      // 将任务分值设置为 0,表示正在处理
      if (tasks.length > 0) {
        let tmp = [];
        tasks.forEach((task) => {
          tmp.push(0);
          tmp.push(task);
        });
        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, tasks)
          }
        });
      }
    }
  });
}

function addFailedTaskToQueue(taskName, callback) {
  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  });
}

function removeSucceedTaskFromQueue(taskName, callback) {
  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  })
}

function execTask(taskName) {
  return new Promise((resolve, reject) => {
    let requestOptions = {
      'url': 'Http://127.0.0.1:9001',
      'method': 'GET',
      'timeout': 5000
    };
    request(requestOptions, (error, response, body) => {
      if (error) {
        resolve('failed');
        console.log(error);
        addFailedTaskToQueue(taskName, (error) => {
          if (error) {
            console.log(error);
          } else {

          }
        });
      } else {
        try {
          body = typeof body !== 'object' ? JSON.parse(body) : body;
        } catch (error) {
          resolve('failed');
          console.log(error);
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
          return;
        }
        if (body.status !== 200) {
          resolve('failed');
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        } else {
          resolve('succeed');
          removeSucceedTaskFromQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        }
      }
    });
  });
}

// 定时,每隔 5 秒获取新的任务来执行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
  console.log('获取新任务');
  getTasksFromQueue((error, tasks) => {
    if (error) {
      console.log(error);
    } else {
      if (tasks.length > 0) {
        console.log(tasks);

        Promise.all(tasks.map(execTask))
        .then((results) => {
          console.log(results);
        })
        .catch((error) => {
          console.log(error);
        });
        
      }
    }
  });
});

--结束END--

本文标题: Node.js + Redis Sorted Set实现任务队列

本文链接: https://lsjlt.com/news/12955.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Node.js + Redis Sorted Set实现任务队列
    需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔...
    99+
    2022-06-04
    队列 js Node
  • 如何在Redis中实现延迟任务队列
    在Redis中实现延迟任务队列可以使用有序集合(Sorted Set)和定时任务的方式来实现。以下是一个基本的实现方法: 将任务存...
    99+
    2024-04-09
    Redis
  • Flutter 队列任务的实现
    目录前言队列添加任务进队列移除队列指定任务判断是否包含对应任务执行队列任务任务条件添加任务时加入条件执行任务前判断条件是否满足使用和总结前言 在电商的应用中,最常见的就是在首页或完成...
    99+
    2024-04-02
  • 如何用Go语言和Redis实现任务队列
    如何用Go语言和Redis实现任务队列引言:在实际的软件开发中,经常会遇到需要处理大量任务的场景。为了提高处理效率和可靠性,我们可以使用任务队列来分发和执行这些任务。本文将介绍如何使用Go语言和Redis实现一个简单的任务队列,以及具体的代...
    99+
    2023-10-26
    Go语言 redis 任务队列
  • 如何利用Redis实现分布式任务队列
    如何利用Redis实现分布式任务队列引言:随着互联网应用的快速发展,分布式系统成为了企业追求高性能和高可扩展性的重要选择。而在分布式系统中,任务队列被广泛应用于各种场景,例如消息发布、数据同步、任务调度等。Redis作为一款快速的内存数据库...
    99+
    2023-11-07
    分布式 redis 任务队列
  • Redis数据库队列怎么实现异步任务
    在Redis中实现异步任务可以通过Redis的列表数据结构来实现队列。下面是一种常见的实现方式: 生产者将需要执行的任务加入到Re...
    99+
    2024-04-22
    Redis
  • Flutter队列任务如何实现
    本篇内容介绍了“Flutter队列任务如何实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!队列任务队列,那当然要有个队列。这个队列的任务内...
    99+
    2023-07-02
  • 基于Redis实现分布式锁以及任务队列
    一、前言   双十一刚过不久,大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并...
    99+
    2022-06-04
    队列 分布式 Redis
  • redis实现简单队列
    在工作中,时常会有用到队列的场景,比较常见的用rabbitMQ这些专业的组件,官网地址是:http://www.rabbitmq.com,重要的是官方有.net的客户端,但是如果对rabbitMQ不熟悉的话...
    99+
    2022-06-04
    队列 简单 redis
  • Redis+Node.js如何实现一个能处理海量数据的异步任务队列系统
    这篇文章主要介绍了Redis+Node.js如何实现一个能处理海量数据的异步任务队列系统,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。在最近...
    99+
    2024-04-02
  • JS怎么实现异步任务队列
    本篇内容主要讲解“JS怎么实现异步任务队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“JS怎么实现异步任务队列”吧!问题有个需求,需要实现一个异步任务队列,并...
    99+
    2024-04-02
  • golang异步任务队列怎么实现
    在Go语言中,可以使用goroutine和channel来实现异步任务队列。下面是一个简单的示例代码: package main ...
    99+
    2023-10-27
    golang
  • PHP redis Sorted Set实现字符串去重代码示例
    可以使用 Redis 的 Sorted Set 有序集合来实现字符串去重的功能。 具体步骤如下: 首先将要去重的字符串作为 Sorted Set 的 member 值,可以考虑把相同...
    99+
    2023-05-20
    redis如何去重 php redis redis sorted set
  • Python实现简单多线程任务队列
    最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码): def gradient_descent(): # the gradient descent...
    99+
    2022-06-04
    队列 多线程 简单
  • Java实现FIFO任务调度队列策略
    目录前言FIFO任务调度器架构示例代码前言 在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高...
    99+
    2024-04-02
  • Laravel怎么实现队列和任务调度
    本文小编为大家详细介绍“Laravel怎么实现队列和任务调度”,内容详细,步骤清晰,细节处理妥当,希望这篇“Laravel怎么实现队列和任务调度”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、 我们首先准备一下...
    99+
    2023-07-04
  • 如何利用Redis和Rust语言实现异步任务队列功能
    如何利用Redis和Rust语言实现异步任务队列功能引言:在当今高并发的互联网应用中,异步任务队列是非常常见和实用的功能。它可以将耗时较长的任务从主线程异步处理,提高系统的吞吐能力和响应速度。本文将介绍如何利用Redis和Rust语言实现一...
    99+
    2023-10-22
    Rust redis 异步任务队列
  • 如何实现Redis延迟队列
    这期内容当中小编将会给大家带来有关如何实现Redis延迟队列,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么...
    99+
    2024-04-02
  • redis如何实现异步队列
    redis实现异步队列的方法:一般使用list结构作为队列,rpush生产消息,lpop消费消息,当lpop没有消息时,需适当sleep一会再重试。示例:public class RedisClient {@R...
    99+
    2024-04-02
  • redis怎么实现消息队列
    Redis可以通过以下几种方式实现消息队列:1. List数据结构:使用Redis的List数据结构实现简单的消息队列。生产者将消息...
    99+
    2023-09-14
    redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作