返回顶部
首页 > 资讯 > 后端开发 > Python >Python分布式进程中会遇到的坑都有哪些呢
  • 291
分享到

Python分布式进程中会遇到的坑都有哪些呢

2023-06-16 20:06:05 291人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

python分布式进程中会遇到的坑都有哪些呢,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。小惊大怪你是不是在用python3或者在windows系统上编程?最重要的是你对进程和

python分布式进程中会遇到的坑都有哪些呢,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

小惊大怪

你是不是在用python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在Python分布式进程中,会有坑等着你去挖。。。(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say  byebye了。好了话不多数,直接进入正题。

分布式进程

正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

代码记录

举个例子

如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,这应该怎么用分布式进程来实现呢?你已经知道了原有的Queue可以继续使用,而且通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程来访问Queue了。好,那我们就这么干!

写个task_master.py

我们先看服务进程。服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。

#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_master.py import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager):  pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.reGISter('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10):  n = random.randint(0, 10000)  print('Put task %d...' % n)  task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10):  r = result.get(timeout=10)  print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)

写个task_worker.py

#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager):  pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10):  try:  n = task.get(timeout=1)  print('run task %d * %d...' % (n, n))  r = '%d * %d = %d' % (n, n, n*n)  time.sleep(1)  result.put(r)  except Queue.Empty:  print('task queue is empty.') # 处理结束: print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

运行结果

现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:

Traceback (most recent call last):  File "F:/Python/untitled/xianchengjincheng/master.py", line 25, in <module>  manager.start()  File "F:Pythonpystalllibmultiprocessingmanagers.py", line 513, in start  self._process.start()  File "F:Pythonpystalllibmultiprocessingprocess.py", line 105, in start  self._popen = self._Popen(self)  File "F:PythonpystalllibmultiprocessinGContext.py", line 322, in _Popen  return Popen(process_obj)  File "F:Pythonpystalllibmultiprocessingpopen_spawn_win32.py", line 65, in __init__  reduction.dump(process_obj, to_child)  File "F:Pythonpystalllibmultiprocessing eduction.py", line 60, in dump  ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function <lambda> at 0x00000202D1921E18>: attribute lookup <lambda> on __main__ failed

task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:

Connect to server 127.0.0.1... Traceback (most recent call last):  File "F:/Python/untitled/xianchengjincheng/work.py", line 24, in <module>  m.connect()  File "F:Pythonpystalllibmultiprocessingmanagers.py", line 489, in connect  conn = Client(self._address, authkey=self._authkey)  File "F:Pythonpystalllibmultiprocessingconnection.py", line 487, in Client  c = SocketClient(address)  File "F:Pythonpystalllibmultiprocessingconnection.py", line 614, in SocketClient  s.connect(address) ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。

看到没,结果都出错了,我们好好分析一下到底哪出错了。。。

错误分析

在task_master.py的报错提示中,我们知道它说lambda错误,这是因为序列化不支持匿名函数,所以我们得修改代码,重新对queue用QueueManager进行封装放到网络中。

# 把两个Queue都注册到网络上, callable参数关联了Queue对象 QueueManager.register('get_task_queue',callable=return_task_queue)  QueueManager.register('get_result_queue',callable=return_result_queue)

其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。

因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。

值得注意 在windows系统中你必须要写IP地址,而其他操作系统比如linux操作系统则就不要了。

# windows需要写ip地址 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

修改后的代码

在task_master.py中修改如下:

#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_master.py # task_master.py import random,time,queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support task_queue = queue.Queue() # 发送任务的队列: result_queue = queue.Queue() # 接收结果的队列: class QueueManager(BaseManager): # 从BaseManager继承的QueueManager:  pass # windows下运行 def return_task_queue():  global task_queue  return task_queue # 返回发送任务队列 def return_result_queue ():  global result_queue  return result_queue # 返回接收结果队列 def test():  # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象  #QueueManager.register('get_task_queue', callable=lambda: task_queue)  #QueueManager.register('get_result_queue', callable=lambda: result_queue)  QueueManager.register('get_task_queue', callable=return_task_queue)  QueueManager.register('get_result_queue', callable=return_result_queue)  # 绑定端口5000, 设置验证码'abc':  #manager = QueueManager(address=('', 5000), authkey=b'abc')  # windows需要写ip地址  manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')  manager.start() # 启动Queue:  # 获得通过网络访问的Queue对象:  task = manager.get_task_queue()  result = manager.get_result_queue()  for i in range(10): # 放几个任务进去:  n = random.randint(0, 10000)  print('Put task %d...' % n)  task.put(n)  # 从result队列读取结果:  print('Try get results...')  for i in range(10):  # 这里加了异常捕获  try:  r = result.get(timeout=5)  print('Result: %s' % r)  except queue.Empty:  print('result queue is empty.')  # 关闭:  manager.shutdown()  print('master exit.') if __name__=='__main__':  freeze_support()  print('start!')  test()

在task_worker.py中修改如下:

#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_worker.py # task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager):  pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10):  try:  n = task.get(timeout=1)  print('run task %d * %d...' % (n, n))  r = '%d * %d = %d' % (n, n, n*n)  time.sleep(1)  result.put(r)  except queue.Empty:  print('task queue is empty.') # 处理结束: print('worker exit.')

先运行task_master.py,然后再运行task_worker.py

(1)task_master.py运行结果如下

start! Put task 7872... Put task 6931... Put task 1395... Put task 8477... Put task 8300... Put task 1597... Put task 8738... Put task 8627... Put task 1884... Put task 2561... Try get results... Result: 7872 * 7872 = 61968384 Result: 6931 * 6931 = 48038761 Result: 1395 * 1395 = 1946025 Result: 8477 * 8477 = 71859529 Result: 8300 * 8300 = 68890000 Result: 1597 * 1597 = 2550409 Result: 8738 * 8738 = 76352644 Result: 8627 * 8627 = 74425129 Result: 1884 * 1884 = 3549456 Result: 2561 * 2561 = 6558721 master exit.

(2)task_worker.py运行结果如下

Connect to server 127.0.0.1... run task 8640 * 8640... run task 7418 * 7418... run task 9303 * 9303... run task 568 * 568... run task 1633 * 1633... run task 3583 * 3583... run task 3293 * 3293... run task 8975 * 8975... run task 8189 * 8189... run task 731 * 731... worker exit.

知识补充

这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:

Python分布式进程中会遇到的坑都有哪些呢

而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。

authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。

看完上述内容,你们掌握Python分布式进程中会遇到的坑都有哪些呢的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网Python频道,感谢各位的阅读!

--结束END--

本文标题: Python分布式进程中会遇到的坑都有哪些呢

本文链接: https://lsjlt.com/news/285316.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Python分布式进程中会遇到的坑都有哪些呢
    Python分布式进程中会遇到的坑都有哪些呢,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。小惊大怪你是不是在用Python3或者在windows系统上编程最重要的是你对进程和线...
    99+
    2023-06-16
  • redis分布式锁的坑有哪些
    这篇“redis分布式锁的坑有哪些”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“redis分布式锁的坑有哪些”文章吧。1 非...
    99+
    2023-07-02
  • Redis分布式锁遇到的序列化问题有哪些
    这篇文章主要介绍Redis分布式锁遇到的序列化问题有哪些,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!问题排查既然是释放锁有问题,那就先看看释放锁的代码吧。释放锁释放锁使用了 Lua 脚本,代码逻辑和 Lua 脚本如...
    99+
    2023-06-14
  • pytorch中DataLoader()过程中会遇到的问题有哪些
    这篇文章将为大家详细讲解有关pytorch中DataLoader()过程中会遇到的问题有哪些,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。如下所示:RuntimeError: stack expects ...
    99+
    2023-06-15
  • mysql中timestamp比较查询遇到的坑有哪些
    这篇文章主要介绍mysql中timestamp比较查询遇到的坑有哪些,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!timestamp比较查询遇到的坑要求mysql建表的时候update_time 为timestamp...
    99+
    2023-06-21
  • 微信小程序开发时遇到的坑有哪些
    这篇文章主要讲解了“微信小程序开发时遇到的坑有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“微信小程序开发时遇到的坑有哪些”吧!最近参与开发了公司的第一款小程序,开发体验基本类似于基于w...
    99+
    2023-06-13
  • 分区表进行alter-switch时遇到的错误有哪些
    本篇内容介绍了“分区表进行alter-switch时遇到的错误有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学...
    99+
    2024-04-02
  • python中进程有哪些交流方式
    python中进程有哪些交流方式?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。python的五大特点是什么python的五大特点:1.简单易学,开发程序时,专注的是解决问题,而...
    99+
    2023-06-14
  • Spring中会用到的设计模式有哪些
    Spring中会用到的设计模式有哪些,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。JDK 中用到了那些设计模式Spring 中用到了那些设计模式这两个问题,在面试中比较常见...
    99+
    2023-06-16
  • 分布式系统中的 Python 算法实现方式有哪些?
    分布式系统是指由多个独立的计算机节点组成的系统,它们之间通过网络进行通信,共同完成一个任务。Python 是一种高级编程语言,它在分布式系统中的应用越来越广泛。本文将介绍分布式系统中的 Python 算法实现方式。 一、MapReduce ...
    99+
    2023-09-16
    编程算法 分布式 linux
  • Python爬虫遇到验证码的处理方式有哪些
    这篇文章主要介绍“Python爬虫遇到验证码的处理方式有哪些”,在日常操作中,相信很多人在Python爬虫遇到验证码的处理方式有哪些问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Python爬虫遇到验证码的处...
    99+
    2023-06-16
  • 使用DBLink过程中遇到的问题有哪些
    这篇文章给大家分享的是有关使用DBLink过程中遇到的问题有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1.     &n...
    99+
    2024-04-02
  • 小程序开发中遇到的问题有哪些
    这篇文章主要介绍小程序开发中遇到的问题有哪些,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!小程序面试题bindtap和catchtap的区别是什么?bind事件绑定不会阻止冒泡事件向上冒泡,catch事件绑定可以阻止...
    99+
    2023-06-14
  • 从Excel到Python中最常用到的Pandas函数都有哪些
    本篇文章为大家展示了从Excel到Python中最常用到的Pandas函数都有哪些,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。数据预处理本章主要讲的是数据的预处理,对清洗完的数据进行整理以便后期的...
    99+
    2023-06-02
  • redis中的分布式锁有哪些特点
    本篇内容主要讲解“redis中的分布式锁有哪些特点”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redis中的分布式锁有哪些特点”吧! ...
    99+
    2023-04-14
    redis
  • Python shell 在分布式编程算法中的应用:有哪些挑战?
    Python shell 是一个非常强大的工具,它可以在命令行中运行 Python 代码,以及与操作系统进行交互。在分布式编程算法中,Python shell 也发挥了重要的作用。本文将介绍 Python shell 在分布式编程算法中的...
    99+
    2023-10-08
    shell 分布式 编程算法
  • Linux实践中使用重定向和管道符遇到的坑有哪些
    这篇文章给大家介绍Linux实践中使用重定向和管道符遇到的坑有哪些,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。我很喜欢 Linux 系统,尤其是 Linux  的一些设计很漂亮,比如可以将一些复杂的问题分解...
    99+
    2023-06-15
  • 小程序开发过程中遇到的问题有哪些
    这篇文章主要介绍了小程序开发过程中遇到的问题有哪些的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇小程序开发过程中遇到的问题有哪些文章都会有所收获,下面我们一起来看看吧。1、确定需求问题虽然说小程序是可以作为服务...
    99+
    2023-06-27
  • linux中的进程分类有哪些
    本篇内容介绍了“linux中的进程分类有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!linux中的进程一般分为“交互进程”、“批处理进...
    99+
    2023-07-02
  • Python中所涉及到的软件有哪些呢
    Python中所涉及到的软件有哪些呢,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。面对现在的python环境版本越来越多的情况下,你对python环境版本的开发...
    99+
    2023-06-17
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作