#multiprocessing# Python多进程与多线程
共 13583字,需浏览 28分钟
·
2023-09-18 23:50
“ 文章所涉及内容更多来自网络,在此声明,并感谢知识的贡献者!”
Python的多线程效率问题
—
Python的多线程效率问题
尽管Python完全支持多线程编程, 但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。实际上,解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
在讨论普通的GIL之前,有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。
Python的多线程与多进程
—
Python的多线程与多进程分析
-多进程是多核运算
-效率:运行耗时最少是:多进程 < 普通 < 多线程
Python的多线程与多进程示例
—
Python的多线程与多进程示例
import multiprocessing as mp
import threading as td
def joba(a, d):
print('job-a')
for i in range(20):
pass
print('job-a-finish')
def jobb(a, d):
print('job-b')
while 1:
pass
print('job-b-finish')
if __name__ == "__main__":
t1 = td.Thread(target=jobb, args=(1, 2))
p1 = mp.Process(target=joba, args=(1, 2))
t1.start()
p1.start()
t1.join()
p1.join()
Python多线程或多进程的输出存储
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果
Python的多进程池示例
—
Python的多进程池示例
import multiprocessing as mp
def job(x):
return x * x
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
if __name__ == '__main__':
multicore()
import multiprocessing as mp
def job(x):
return x * x
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())
# 迭代器,i=0时apply一次,i=1时apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 从迭代器中取出
print([res.get() for res in multi_res])
if __name__ == '__main__':
multicore()
总结
Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
map() 放入迭代参数,返回多个结果
apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
Python的共享内存
—
共享内存
只有用共享内存才能让CPU之间有交流
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
array = mp.Array('i', [1, 2, 3, 4])
其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
Python共享内存变量的锁
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行
Python的多进程的multiprocessing模块
—
Python多进程multiprocessing模块
1、multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入
将daemon设置为True时,则主线程不必等待子进程,主线程结束则所有结束
2、相关方法
star() 方法启动进程,
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
如果要启动大量的子进程,可以用进程池的方式批量创建子进程
Pool提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
(1)p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(args,kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
(2)p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(args,**kwargs),然后返回结果。此方法的结果是 AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。多进程并发!
(3)p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
(4)p.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
import multiprocessing
def worker(num):
"""该函数将在子进程中执行"""
print('Worker %d' % num)
if __name__ == '__main__':
# 创建进程池
pool = multiprocessing.Pool(4)
# 启动进程池中的进程
pool.map(worker, range(10))
# 关闭进程池
pool.close()
# 等待进程池中的进程结束
pool.join()
Python的多进程间通讯
—
Python进程间的通信
多进程编程中,不同的进程之间需要进行通信。multiprocessing模块提供了多种进程间通信的方式,例如使用队列、管道、共享内存等。
(1)队列
队列是一种常用的进程间通信方式。multiprocessing模块中提供了Queue类,可以用来创建队列。下面是一个简单的示例:
import multiprocessing
def producer(q):
"""该函数将在生产者进程中执行"""
for i in range(10):
q.put(i)
def consumer(q):
"""该函数将在消费者进程中执行"""
while True:
item = q.get()
if item is None:
break
print(item)
if __name__ == '__main__':
# 创建队列
q = multiprocessing.Queue()
# 创建生产者进程
p1 = multiprocessing.Process(target=producer, args=(q,))
# 创建消费者进程
p2 = multiprocessing.Process(target=consumer, args=(q,))
# 启动进程
p1.start()
p2.start()
# 等待进程结束
p1.join()
# 发送结束信号
q.put(None)
p2.join()
在上面的代码中,首先创建了一个Queue对象,然后创建了一个生产者进程和一个消费者进程。生产者进程通过调用put方法将0~9的数字放入队列中,消费者进程通过调用get方法从队列中获取数据,并将其打印出来。最后,调用put方法发送结束信号,然后等待两个进程结束。
(2)管道
管道是另一种常用的进程间通信方式。multiprocessing模块中提供了Pipe类,可以用来创建管道。下面是一个简单的示例:
import multiprocessing
def producer(conn):
"""该函数将在生产者进程中执行"""
for i in range(10):
conn.send(i)
conn.close()
def consumer(conn):
"""该函数将在消费者进程中执行"""
while True:
item = conn.recv()
if item is None:
break
print(item)
if __name__ == '__main__':
# 创建管道
conn1, conn2 = multiprocessing.Pipe()
# 创建生产者进程
p1 = multiprocessing.Process(target=producer, args=(conn1,))
# 创建消费者进程
p2 = multiprocessing.Process(target=consumer, args=(conn2,))
# 启动进程
p1.start()
p2.start()
# 等待进程结束
p1.join()
# 发送结束信号
conn1.send(None)
p2.join()
在上面的代码中,首先创建了一个管道,然后创建了一个生产者进程和一个消费者进程。生产者进程通过调用send方法将0~9的数字发送到管道中,消费者进程通过调用recv方法从管道中获取数据,并将其打印出来。最后,调用send方法发送结束信号,然后等待两个进程结束。
(3)共享内存
共享内存是一种高效的进程间通信方式,它允许多个进程共享同一块内存区域。multiprocessing模块中提供了Value和Array类,可以用来创建共享内存。下面是一个简单的示例:
import multiprocessing
def worker1(n):
"""该函数将在进程1中执行"""
n.value += 1
print('worker1:', n.value)
def worker2(n):
"""该函数将在进程2中执行"""
n.value += 1
print('worker2:', n.value)
if __name__ == '__main__':
# 创建共享内存
n = multiprocessing.Value('i', 0)
# 创建进程1
p1 = multiprocessing.Process(target=worker1, args=(n,))
# 创建进程2
p2 = multiprocessing.Process(target=worker2, args=(n,))
# 启动进程
p1.start()
p2.start()
# 等待进程结束
p1.join()
p2.join()
在上面的代码中,首先创建了一个Value对象,用于存储一个整数值。然后创建了两个进程,每个进程都会将共享内存中的值加1,并将其打印出来。最后,等待两个进程结束。
除了Value类之外,multiprocessing模块还提供了Array类,用于创建共享内存数组。
参考资料
—
GIL 不一定有效率
https://yulizi123.github.io/tutorials/python-basic/threading/5-GIL/
The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. raise RuntimeError
https://blog.csdn.net/qq_34905587/article/details/112020651
Python Multiprocessing(多进程)
https://blog.csdn.net/sikh_0529/article/details/126728914
Python多进程multiprocessing模块介绍
https://www.jianshu.com/p/3ff7d04a39bf
Python编程之多进程(multiprocessing)详解
http://stack.itcast.cn/news/20230404/10534657760.shtml
Python multiprocessing进程池使用详解
https://blog.csdn.net/weixin_39253570/article/details/130817783
multiprocessing --- 基于进程的并行
https://docs.python.org/zh-cn/3/library/multiprocessing.html?highlight=multi
Python 异步编程 多进程
https://www.cjavapy.com/article/2427/
python 中的进程池 -- multiprocessing.pool.Pool
http://www.manongjc.com/detail/63-pndlbcnipsucczm.html
Python 异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)性能对比
https://www.jianshu.com/p/cac56b3d9a18