Python Windows下分布式进程的坑(分布式进程的一个简单例子)
时间:2022-04-26
本文章向大家介绍Python Windows下分布式进程的坑(分布式进程的一个简单例子),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
下面这个例子基于”廖雪峰的Python教程:分布式进程”原例在Linux上运行,直接在Windows上运行会出现错误,下面是针对原例进行的改进,使之能成功运行。 https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000#0 博主也对代码注释作了更精确的改进。 原例在Windows下你会遇到的问题:
_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001940C172EA0>: attribute lookup <lambda> on __main__ failed
#原因 Win下callable不能以 lambda表达式赋值
This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module:
#需要使用Python常用的 if __name__ == '__main__':来进行是不是主module的判断
[WinError 10061] No connection could be made because the target machine actively refused it
#使用的主机地址和端口号有错误 需要修正 windows下address不能为空
OSError: [WinError 10049] The requested address is not valid in its context
#使用的主机地址和端口号有错误 需要修正 发送QueueManager 和 接收QueueManager没有使用相同的地址或端口号
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.
#windows 运行分布式进程需要先启动 freeze_support()
#freeze_support()"冻结"为时生成 Windows 可执行文件
#原因是Windows没有直接的fork()
#Window是通过创建一个新的过程代码,在子进程运行来模拟fork()
#由于代码是在技术无关的进程中运行的,所以它必须在运行之前交付
#它传递的方式首先是被pickle,然后通过管道从原始进程发送到新进程
#另外,这个新进程被告知它必须运行通过管道传递的代码通过传递
#freeze_support() 函数的任务是检查它正在运行的进程是否应该通过管道或不运行代码。
Traceback (most recent call last):
File ...
raise convert_to_error(kind, result)
queue.Empty
#task_worker开始工作时 task_master还没有在队列里添加任务
正确而完整的改进例子
#task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#send queue 发送队列
task_queue = queue.Queue()
#receiver queue 接收队列
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
#注册2个queue到网络上 使用callable和匿名函数关联了Queue对象
'''仅适用Linux Windows下callable不能使用lambda表达式赋值
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
'''
def return_task_queue():
global task_queue
return task_queue
def return_result_queue():
global result_queue
return result_queue
def runf():
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
#绑定端口5000,设置验证密码'abc'
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
#Linux下address留空等于本机 Windows下不能留空 127.0.0.0即本机的地址
#启动Queue
manager.start()
#通过网络获取Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
#开启示例任务
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d to run...' %n)
task.put(n)
#读取任务结果
print('Try to get results...')
for i in range(10):
r = result.get(timeout=10)
print('Results: %s' %r)
manager.shutdown()
print('master has been shoutdown')
if __name__ == '__main__':
freeze_support()
runf()
#task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
#创建QueueManager
class QueueManager(BaseManager):
pass
#该Manager从网络上获取Queue 因此只使用名字注册
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#连接到运行task_master的机器 Server (127.0.0.0是本机地址 使用时应改掉)
server_addr = '127.0.0.1'
print('Try to connect to server %s...' %server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
#获取Queue对象
task = m.get_task_queue()
result = m.get_result_queue()
#从task_queue获取任务 记过写入result_queue
for i in range(10):
try:
n = task.get(timeout=1)
print('we get %s' %n)
print('now we calculate %s * %s' % (n, n))
r = n * n
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task_queue is empty, all work have done')
print('worker quit')
在2个CMD窗口下同时运行2个脚本(间隔不能超过10秒),结果如下:
C:Usershongze>python task_master.py
Put task 5495 to run...
Put task 1701 to run...
Put task 7110 to run...
Put task 5385 to run...
Put task 6355 to run...
Put task 7144 to run...
Put task 5190 to run...
Put task 6259 to run...
Put task 5349 to run...
Put task 2931 to run...
Try to get results...
Results: 30195025
Results: 2893401
Results: 50552100
Results: 28998225
Results: 40386025
Results: 51036736
Results: 26936100
Results: 39175081
Results: 28611801
Results: 8590761
master has been shoutdown
C:Usershongze>
C:Usershongze>python task_worker.py
Try to connect to server 127.0.0.1...
we get 5495
now we calculate 5495 * 5495
we get 1701
now we calculate 1701 * 1701
we get 7110
now we calculate 7110 * 7110
we get 5385
now we calculate 5385 * 5385
we get 6355
now we calculate 6355 * 6355
we get 7144
now we calculate 7144 * 7144
we get 5190
now we calculate 5190 * 5190
we get 6259
now we calculate 6259 * 6259
we get 5349
now we calculate 5349 * 5349
we get 2931
now we calculate 2931 * 2931
worker quit
C:Usershongze>
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP+MySQL实现在线测试答题实例
- Python异常处理机制结构实例解析
- PHP字符串与数组处理函数用法小结
- 详解Flask前后端分离项目案例
- Laravel5.1 框架表单验证操作实例详解
- 通过实例了解Python异常处理机制底层实现
- header函数设置响应头解决php跨域问题实例详解
- Linux采用双网卡bond、起子接口的方式
- PHP实现字母数字混合验证码功能
- php+pdo实现的购物车类完整示例
- CentOS7怎么执行PHP定时任务详解
- Linux下PHP+Apache的26个必知的安全设置
- linux中ssh免密通信的实现
- 怎么修改CentOS服务器时间为北京时间
- Laravel5.1 框架控制器基础用法实例分析