返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >C#实现万物皆可排序的队列方法详解
  • 138
分享到

C#实现万物皆可排序的队列方法详解

2024-04-02 19:04:59 138人浏览 独家记忆
摘要

需求 产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是Http协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连

需求

产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是Http协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高。希望能够提高网络的利用率,并降低系统的负载。

分析

一个很自然的想法就是将多条数据一起发送,这里有几个关键点:

1、多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送。如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需要的数据条数可能比较困难,这时候还得需要一个过期时间,因为客户可能接受不了太多的延迟。既然不管怎样都需要使用时间进行控制,我这里索性就选择按照时间周期发送了。思路是:自上次发送时间起,经过了某个时长之后,就发送客户在这段时间内产生的所有数据。

2、数据到期判断方法:既然选择了按照时间周期发送,那么就必须有办法判断是否到了发送时间。一个很简单的想法就是轮询,把所有客户轮询一遍,看看谁的数据到期了,就发送谁的。这个算法的时间复杂度是O(N),如果客户比较多,就会消耗过多的时间在这上边。还有一个办法:如果客户按照时间排序好了,那么只需要取时间最早的客户的数据时间判断就好了,满足就发送,一直向后找,直到获取的客户数据时间不符合条件,则退出处理,然后等一会再进行判断处理。这就需要有一个支持排序的数据结构,写入数据时自动排序,这种数据结构的时间复杂度一般可以做到O(log(n))。对于这个数据结构的读写操作原理上就是队列的操作方式,只不过是个可排序的队列。

3、区分客户:不同客户的数据接收地址不同,向具体某个客户发送数据时,应该能比较方便的聚合他的数据,最好是直接就能拿到需要发送的数据。可以使用字典数据结构来满足这个需求,取某个客户数据的时间复杂度可以降低到O(1)。

4、数据的安全性问题:如果程序在数据发送成功之前退出了,未发送的数据怎么办?是还能继续发送,还是就丢掉不管了。如果要在程序重启后恢复未发送成功的数据,则必须将数据同步到别的地方,比如持久化到磁盘。因为我这里的数据安全性要求不高,丢失一些数据也是允许的,所以要发送的数据收到之后放到内存就行了。

实现

上文提到可排序的数据结构,可以使用SortedList<TKEy,TValue>,键是时间,值是这个时间产生了数据的客户标识列表。不过它的读写操作不是线程安全的,需要自己做同步,这里简单点就使用lock了。

对于不同客户的数据,为了方便获取,使用Dictionary<TKey,TValue>来满足,键是客户的标识,值是累积的未发送客户数据。这个数据读写也不是线程安全的,可以和SortedList的读写放到同一个lock中。

下边是它们的定义:

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

插入数据的时候,需要先写入SortedList,然后再写入Dictionary。代码逻辑比较简单,请看:

    public void Publish(TKey key, TValue value)
    {
        DateTime now = DateTime.Now;
        lock (_lock)
        {
            if (_queue.TryGetValue(now, out List<TKey>? keys))
            {
                if (!keys!.Contains(key))
                {
                    keys.Add(key);
                }
            }
            else
            {
                _queue.Add(now, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

对于消费数据,这里采用拉数据的模式。最开始写的方法逻辑是:读取一条数据,处理它,然后从队列中删除。但是这个逻辑需要对队列进行读写,所以必须加。一般处理数据比较耗时,比如这里要通过HTTP发送数据,加锁的话就可能导致写数据到队列时阻塞的时间比较长。所以这里实现的是把可以发送的数据全部提取出来,然后就释放锁,数据的处理放到锁的外部实现,这样队列的读写性能就比较好了。

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        DateTime now = DateTime.Now;

        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var first = _queue.First();
                var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
                if (diffMillseconds < _valueDequeueMillseconds)
                {
                    break;
                }

                var keys = first.Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }

这段代码比较长一些,我梳理下逻辑:取队列的第一条数据,判断时间是否达到发送周期,未达到则直接退出,方法返回空列表。如果达到发送周期,则取出第一条数据中存储的客户标识,然后根据这些标识获取对应的客户未发送数据,将这些数据按照客户维度添加到返回列表中,将这些客户及其数据从队列中移除,返回有数据的列表。这里还增加了一个拉取数据的条数限制,方便根据业务实际情况进行控制。

再来看一下怎么使用这个队列,这里模拟多个生产者加一个消费者,其实可以任意多个生产者和消费者:

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
{
    var j = i;
    publishTasks.Add(Task.Factory.StartNew(() =>
    {
        int k = 0;
        while (true)
        {
            queue.Publish($"key_{k}", $"value_{j}_{k}");
            Thread.Sleep(15);
            k++;
        }
    }, TaskCreationOptions.LongRunning));
}

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var list = queue.Pull(100);
        if (list.Count <= 0)
        {
            Thread.Sleep(100);
            continue;
        }

        foreach (var item in list)
        {
            Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
        }
    }

}, TaskCreationOptions.LongRunning);

Task.WaitAll(publishTasks.ToArray());

以上就是针对这个特定需求实现的一个按照时间进行排序的队列。

万物皆可排序的队列
我们很容易想到,既然可以按照时间排序,那么按照别的数据类型排序也是可以的。这个数据结构可以应用的场景很多,比如按照权重排序的队列、按照优先级排序的队列、按照年龄排序的队列、按照银行存款排序的队列,等等。这就是一个万物皆可排序的队列。

我这里把主要代码贴出来(完整代码和示例请看文末):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
    Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();

    SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();

    readonly object _lock = new object();

    /// <summary>
    /// Create a new instance of SortedQueue
    /// </summary>
    public SortedQueue(int maxNumberOfMessageConsumedOnce)
    {
    }

    /// <summary>
    /// Publish a message to queue
    /// </summary>
    /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
    /// <param name="key">The message key.</param>
    /// <param name="value">The message value.</param>
    public void Publish(TSortKey sortKey, TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
            {
                keys.Add(key);
            }
            else
            {
                _queue.Add(sortKey, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }


    /// <summary>
    /// Pull a batch of messages.
    /// </summary>
    /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
    /// <returns></returns>
    public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var keys = _queue.First().Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }
}

代码逻辑还是比较简单的,就不罗嗦了,如有问题欢迎留言交流。

再说数据安全

因为在这个实现中所有待处理的数据都在内存中,丢失数据会带来一定的风险,因为我这个程序前边还有一个队列,即使程序崩溃了,也只损失没处理的一小部分数据,业务上可以接受,所以这样做没有问题。如果你对这个程序感兴趣,需要慎重考虑你的应用场景。

来看看数据丢失可能发生的两种情况:

一是数据还在队列中时程序重启了:对于这种情况,前文提到将数据同步到其它地方,比如写入Redis、写入数据库、写入磁盘等等。不过因为网络IO、磁盘IO较慢,这往往会带来吞吐量的大幅下降,想要保证一定的吞吐量,还得引入一些分片机制,又因为分布式的不可靠,可能还得增加一些容错容灾机制,比较复杂,可以参考kafka

二是数据处理的时候失败了:对于这种情况,可以让程序重试;但是如果异常导致程序崩溃了,数据已经从内存或者其它存储中移除了,数据还是会发生丢失。这时候可以采用一个ACK机制,处理成功后向队列发送一个ACK,携带已经处理的数据标识,队列根据标识删除数据。否则消费者还能消费到这些数据。

这些问题并不一定要完全解决,还是得看业务场景,有可能你把数据持久化到Redis就够了,或者你也不用引入ACK机制,记录下处理到哪一条了就行了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程网。

--结束END--

本文标题: C#实现万物皆可排序的队列方法详解

本文链接: https://lsjlt.com/news/153301.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • C#实现万物皆可排序的队列方法详解
    需求 产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连...
    99+
    2024-04-02
  • C++实现可排序最大块数的方法
    这篇“C++实现可排序最大块数的方法”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“C++实现可排序最大块数的方法”文章吧。M...
    99+
    2023-06-19
  • C语言详解链式队列与循环队列的实现
    目录队列的实现链式队列链式队列的定义链式队列的实现循环队列循环队列的定义循环队列的实现队列的实现 队列是一种先进先出(First in First Out)的线性表,简称FIFO。与...
    99+
    2024-04-02
  • C++实现可排序的最大块数的方法
    这篇文章主要介绍“C++实现可排序的最大块数的方法”,在日常操作中,相信很多人在C++实现可排序的最大块数的方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”C++实现可排序的最大块数的方法”的疑惑有所帮助!...
    99+
    2023-06-20
  • 详解C++实现链表的排序算法
    目录一、链表排序二、另外一种链表排序方式三、比较两种排序的效率四、下面通过交换结点实现链表的排序一、链表排序 最简单、直接的方式(直接采用冒泡或者选择排序,而且不是交换结点,只交换数...
    99+
    2024-04-02
  • 详解C++实现拓扑排序算法
    目录一、拓扑排序的介绍二、拓扑排序的实现步骤三、拓扑排序示例手动实现四、拓扑排序的代码实现五、完整的代码和输出展示一、拓扑排序的介绍 拓扑排序对应施工的流程图具有特别重要的作用,它可...
    99+
    2024-04-02
  • Python实现堆排序的方法详解
    本文实例讲述了Python实现堆排序的方法。分享给大家供大家参考,具体如下: 堆排序作是基本排序方法的一种,类似于合并排序而不像插入排序,它的运行时间为O(nlogn),像插入排序而不像合并排序,它是一种原...
    99+
    2022-06-04
    详解 方法 Python
  • Java实现ArrayList排序的方法详解
    目录简介法1:JDK8的stream法2:Comparator#compare()法3:Comparable#compareTo()简介 说明 本文用示例介绍Java的ArrayLi...
    99+
    2024-04-02
  • C++实现优先队列的示例详解
    目录前言堆的存储方式维护堆的方法1、上浮操作2、下沉操作附上代码前言 首先,啊,先简单介绍一下优先队列的概念,学数据结构以及出入算法竞赛的相信都对队列这一数据结构十分熟悉,这是一个线...
    99+
    2024-04-02
  • C语言实现队列的示例详解
    目录前言一. 什么是队列二. 使用什么来实现栈三. 队列的实现3.1头文件3.2 函数的实现四.完整代码前言 前一段时间,我们试着用C语言实现了数据结构中的顺序表,单链表,双向循环链...
    99+
    2024-04-02
  • C++中队列queue的用法实例详解
    目录一、定义一、queue初始化二、queue常用函数补充:queue 的基本操作举例如下总结一、定义 queue是一种容器转换器模板,调用#include< queue>...
    99+
    2024-04-02
  • C语言简明讲解队列的实现方法
    目录前言队列的表示和实现队列的概念及结构代码实现束语前言 大家好啊,我又双叒叕来水博客了,道路是曲折的,前途是光明的,事物是呈螺旋式上升的,事物最终的发展结果还是我们多多少少能够决定...
    99+
    2024-04-02
  • 超详细解析C++实现快速排序算法的方法
    目录一、前言1.分治算法2.分治算法解题方法二、快速排序1.问题分析2.算法设计3.算法分析三、AC代码一、前言 1.分治算法 快速排序,其实是一种分治算法,那么在了解快速排序之前,...
    99+
    2024-04-02
  • Java实现异步延迟队列的方法详解
    目录1.应用场景2.延时处理方式调研1.DelayQueue2.延迟队列mq3.定时任务4.redis5. 时间轮3.实现目标4.架构设计5.延迟组件实现方式1.实现原理2.消息结构...
    99+
    2023-03-22
    Java异步延迟队列 Java延迟队列
  • C语言实现数组元素排序方法详解
    目录前言算法总结及实现优化算法前言 在实际开发中,有很多场景需要我们将数组元素按照从大到小(或者从小到大)的顺序排列,这样在查阅数据时会更加直观,例如: 一个保存了班级学号的数组,排...
    99+
    2023-02-11
    C语言数组元素排序 C语言数组元素 C语言数组排序
  • JavaScript实现拖拽排序的方法详解
    目录实现原理概述代码实现完整代码实现可拖拽排序的菜单效果大家想必都很熟悉,本次我们通过一个可拖拽排序的九宫格案例来演示其实现原理。 先看一下完成效果: 实现原理概述 拖拽原理 当鼠...
    99+
    2024-04-02
  • C++ 栈和队列的实现超详细解析
    目录1、栈的介绍:2、栈的常用接口实现 3、队列的介绍4、队列的常用接口实现 可算是把链表给结束了,很多小伙伴已经迫不及待想看到栈和队列了,那么它来了!相信有了顺...
    99+
    2024-04-02
  • Python实现优先级队列结构的方法详解
    最简单的实现 一个队列至少满足2个方法,put和get. 借助最小堆来实现. 这里按"值越大优先级越高"的顺序. #coding=utf-8 from heapq import heappush, h...
    99+
    2022-06-04
    优先级 队列 详解
  • php实现归并排序算法的方法详解
    目录php实现归并排序算法归并排序原理总结php实现归并排序算法 归并排序算法的复杂度是O(nlogn)。 代码如下,只需要clone下来执行composer install然后执行...
    99+
    2024-04-02
  • Java实现HashMap排序方法的示例详解
    目录简介排序已有数据按key排序按value排序按插入顺序存放HashMap不按插入顺序存放LinkedHashMap会按照插入顺序存放简介 本文用示例介绍HashMap排序的方法。...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作