返回顶部
首页 > 资讯 > 后端开发 > Python >Python实现线程池之线程安全队列
  • 400
分享到

Python实现线程池之线程安全队列

2024-04-02 19:04:59 400人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录一、线程池组成二、线程安全队列的实现三、测试逻辑3.1、测试阻塞逻辑3.2、测试读写加锁逻辑本文实例为大家分享了python实现线程池之线程安全队列的具体代码,供大家参考,具体内

本文实例为大家分享了python实现线程池之线程安全队列的具体代码,供大家参考,具体内容如下

一、线程池组成

一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。

二、线程安全队列的实现

包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。

class ThreadSafeQueue(object):

    def __init__(self, max_size=0):
        self.queue = []
        self.max_size = max_size  # max_size为0表示无限大
        self.lock = threading.Lock()  # 互斥量
        self.condition = threading.Condition()  # 条件变量

    def size(self):
        """
        获取当前队列的大小
        :return: 队列长度
        """
        # 加锁
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size

    def put(self, item):
        """
        将单个元素放入队列
        :param item:
        :return:
        """
        # 队列已满 max_size为0表示无限大
        if self.max_size != 0 and self.size() >= self.max_size:
            return ThreadSafeException()

        # 加锁
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        # 通知等待读取的线程
        self.condition.notify()
        self.condition.release()

        return item

    def batch_put(self, item_list):
        """
        批量添加元素
        :param item_list:
        :return:
        """
        if not isinstance(item_list, list):
            item_list = list(item_list)

        res = [self.put(item) for item in item_list]

        return res

    def pop(self, block=False, timeout=0):
        """
        从队列头部取出元素
        :param block: 是否阻塞线程
        :param timeout: 等待时间
        :return:
        """
        if self.size() == 0:
            if block:
                self.condition.acquire()
                self.condition.wait(timeout)
                self.condition.release()
            else:
                return None

        # 加锁
        self.lock.acquire()
        item = None
        if len(self.queue):
            item = self.queue.pop()
        self.lock.release()

        return item

    def get(self, index):
        """
        获取指定位置的元素
        :param index:
        :return:
        """
        if self.size() == 0 or index >= self.size():
            return None

        # 加锁
        self.lock.acquire()
        item = self.queue[index]
        self.lock.release()

        return item


class ThreadSafeException(Exception):
    pass

三、测试逻辑

3.1、测试阻塞逻辑

def thread_queue_test_1():
    thread_queue = ThreadSafeQueue(10)

    def producer():
        while True:
            thread_queue.put(random.randint(0, 10))
            time.sleep(2)

    def consumer():
        while True:
            print('current time before pop is %d' % time.time())
            item = thread_queue.pop(block=True, timeout=3)
            # item = thread_queue.get(2)
            if item is not None:
                print('get value from queue is %s' % item)
            else:
                print(item)
            print('current time after pop is %d' % time.time())

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

测试结果:

我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

3.2、测试读写加锁逻辑

def thread_queue_test_2():
    thread_queue = ThreadSafeQueue(10)

    def producer():
        while True:
            thread_queue.put(random.randint(0, 10))
            time.sleep(2)

    def consumer(name):
        while True:
            item = thread_queue.pop(block=True, timeout=1)
            # item = thread_queue.get(2)
            if item is not None:
                print('%s get value from queue is %s' % (name, item))
            else:
                print('%s get value from queue is None' % name)

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer, args=('thread1',))
    t3 = threading.Thread(target=consumer, args=('thread2',))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

测试结果:

生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程网。

--结束END--

本文标题: Python实现线程池之线程安全队列

本文链接: https://lsjlt.com/news/118354.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Python实现线程池之线程安全队列
    目录一、线程池组成二、线程安全队列的实现三、测试逻辑3.1、测试阻塞逻辑3.2、测试读写加锁逻辑本文实例为大家分享了Python实现线程池之线程安全队列的具体代码,供大家参考,具体内...
    99+
    2024-04-02
  • Python线程之线程安全的队列Queue
    目录一、什么是队列?二、队列基操 入队/出队/查队列状态三、Queue是一个线程安全的类一、什么是队列? 像排队一样,从头到尾排成一排,还可以有人继续往后排队,这就是队列。 这里学委...
    99+
    2024-04-02
  • JavaEE线程安全实现线程池方法
    前言: 线程虽然比进程更轻量,但是如果创建销毁的频率进一步增加,开销还是很大 解决方案:线程池or协程 线程池:把线程提前创建好放到池子里,后续用到线程直接从池子里取不必这边申请了。...
    99+
    2024-04-02
  • C#多线程系列之线程池
    目录线程池ThreadPool 常用属性和方法线程池说明和示例线程池线程数线程池线程数说明不支持的线程池异步委托任务取消功能计时器线程池 线程池全称为托管线程池,线程池受 .NET ...
    99+
    2024-04-02
  • 线程池02-LinkedBlockingQueue 阻塞队列
    首先,我们先了解一下什么是阻塞队列: 当队列满了时,队列会阻塞插入元素的线程,直到队列不满; 当队列为空时,获取元素的线程会等待队列变成非空。 常用到的方法 上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlocki...
    99+
    2021-10-24
    线程池02-LinkedBlockingQueue 阻塞队列 数据库入门 数据库基础教程 数据库 mysql
  • python实现线程池
    什么是线程池?     诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中...
    99+
    2023-01-31
    线程 python
  • Python线程之认识线程安全
    目录一、什么是线程安全?二、在Python中有哪些类是线程安全的?三、如何做到真正线程安全?1.无状态函数2.另一种 化繁为简一、什么是线程安全? 线程安全,名字就非常直接,在多线程...
    99+
    2024-04-02
  • python线程池队列满了怎么解决
    当线程池的任务队列满了,有几种可能的解决方法: 增加队列的大小:可以通过调整线程池的任务队列的大小,来增加队列的容量。可以使用Th...
    99+
    2023-10-24
    python
  • 浅谈python 线程池threadpool之实现
    首先介绍一下自己使用到的名词: 工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务; 任务(requests):即工作线程处理的任务,任务可能成千上万个,但...
    99+
    2022-06-04
    浅谈 线程 python
  • C++线程安全的队列是什么
    这篇文章将为大家详细讲解有关C++线程安全的队列是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。无界队列#include<queue>#include<mutex>#inclu...
    99+
    2023-06-29
  • 线程池之newFixedThreadPool定长线程池的实例
    newFixedThreadPool定长线程池的实例 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newFixedThr...
    99+
    2024-04-02
  • Python中线程安全队列Queue的示例分析
    小编给大家分享一下Python中线程安全队列Queue的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!一、什么是队列?像排队一样,从头到尾排成一排,还可以有人继续往后排队,这就是队列。这里学委想说的是Queue这个...
    99+
    2023-06-29
  • 详解Java线程池队列中的延迟队列DelayQueue
    目录DelayQueue延迟队列DelayQueue使用场景DelayQueue属性DelayQueue构造方法实现Delayed接口使用示例DelayQueue总结在阻塞队里中,除...
    99+
    2022-12-08
    Java延迟队列DelayQueue Java延迟队列 Java DelayQueue
  • python多线程的线程如何安全实现
    1、引言 当前随着计算机硬件的快速发展,个人电脑上的 CPU 也是多核的,现在普遍的 CUP 核数都是 4 核或者 8 核的。因此,在编写程序时,需要为了提高效率,充分发挥硬件的能力,则需要编写并行的程序。Java ...
    99+
    2022-06-02
    python 多线程 线程安全
  • Python实现简单多线程任务队列
    最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码): def gradient_descent(): # the gradient descent...
    99+
    2022-06-04
    队列 多线程 简单
  • C++线程安全的队列你了解嘛
    目录无界队列有界队列总结 无界队列 #include<queue> #include<mutex> #include<condition_variabl...
    99+
    2024-04-02
  • 线程池之newCachedThreadPool可缓存线程池的实例
    java线程池: Java通过Executors提供四种线程池,分别为: newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,...
    99+
    2024-04-02
  • Python并发编程之线程池/进程池
    原文来自开源中国前言python标准库提供线程和多处理模块来编写相应的多线程/多进程代码,但当项目达到一定规模时,频繁地创建/销毁进程或线程是非常消耗资源的,此时我们必须编写自己的线程池/进程池来交换时间空间。但是从Python3.2开始,...
    99+
    2023-06-02
  • Python+多线程+队列爬虫
    Python+多线程+队列,爬虫例子 # -*- coding: utf-8-*- import urllib2 import urllib import json import time import datetime import t...
    99+
    2023-01-31
    爬虫 队列 多线程
  • 用 Python 实现的线程池
    为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不...
    99+
    2023-01-31
    线程 Python
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作