核桃干货 | 快速掌握用python写并行程序,干货满满
一、大数据时代的现状
当前我们正处于大数据时代,每天我们会通过手机、电脑等设备不断的将自己的数据传到互联网上。据统计,YouTube上每分钟就会增加500多小时的视频,面对如此海量的数据,如何高效的存储与处理它们就成了当前最大的挑战。
但在这个对硬件要求越来越高的时代,CPU却似乎并不这么给力了。自2013年以来,处理器频率的增长速度逐渐放缓了,目前CPU的频率主要分布在3~4GHz。
这个也是可以理解的,毕竟摩尔定律都生效了50年了,如果它老人家还如此给力,那我们以后就只要静等处理器频率提升,什么计算问题在未来那都不是话下了。
实际上CPU与频率是于能耗密切相关的,我们之前可以通过加电压来提升频率,但当能耗太大,散热问题就无法解决了,所以频率就逐渐稳定下来了,而Intel与AMD等大制造商也将目标转向了多核芯片,目前普通桌面PC也达到了4~8核。
二、面对挑战的方法
咱们有了多核CPU,以及大量计算设备,那我们怎么来用它们应对大数据时代的挑战了。那就要提到下面的方法了。
2.1 并行计算
并行(parallelism)是指程序运行时的状态,如果在同时刻有多个“工作单位”运行,则所运行的程序处于并行状态。图一是并行程序的示例,开始并行后,程序从主线程分出许多小的线程并同步执行,此时每个线程在各个独立的CPU进行运行,在所有线程都运行完成之后,它们会重新合并为主线程,而运行结果也会进行合并,并交给主线程继续处理。
图一、多线程并行
图二是一个多线程的任务(沿线为线程时间),但它不是并行任务。这是因为task1与task2总是不在同一时刻执行,这个情况下单核CPU完全可以同时执行task1与task2。
方法是在task1不执行的时候立即将CPU资源给task2用,task2空闲的时候CPU给task1用,这样通过时间窗调整任务,即可实现多线程程序,但task1与task2并没有同时执行过,所以不能称为并行。我们可以称它为并发(concurrency)程序,这个程序一定意义上提升了单个CPU的使用率,所以效率也相对较高。
数据并行(Data Parallel)模型:将相同的操作同时作用于不同数据,只需要简单地指明执行什么并行操作以及并行操作对象。该模型反映在图一中即是,并行同时在主线程中拿取数据进行处理,并线程执行相同的操作,然后计算完成后合并结果。各个并行线程在执行时互不干扰。 消息传递(Message Passing)模型:各个并行执行部分之间传递消息,相互通讯。消息传递模型的并行线程在执行时会传递数据,可能一个线程运行到一半的时候,它所占用的数据或处理结果就要交给另一个线程处理,这样,在设计并行程序时会给我们带来一定麻烦。该模型一般是分布式内存并行计算机所采用方法,但是也可以适用于共享式内存的并行计算机。
多核CPU——计算密集型任务。尽量使用并行计算,可以提高任务执行效率。计算密集型任务会持续地将CPU占满,此时有越多CPU来分担任务,计算速度就会越快,这种情况才是并行程序的用武之地。 单核CPU——计算密集型任务。此时的任务已经把CPU资源100%消耗了,就没必要使用并行计算,毕竟硬件障碍摆在那里。 单核CPU——I/O密集型任务。I/O密集型任务在任务执行时需要经常调用磁盘、屏幕、键盘等外设,由于调用外设时CPU会空闲,所以CPU的利用率并不高,此时使用多线程程序,只是便于人机交互。计算效率提升不大。 多核CPU——I/O密集型任务。同单核CPU——I/O密集型任务。
强大的浮点数计算速度。 大量的计算核心,可以进行大型并行计算。一个普通的GPU也有数千个计算核心。 强大的数据吞吐量,GPU的吞吐量是CPU的数十倍,这意味着GPU有适合的处理大数据。
GFS(The Google File System) :解决数据存储的问题。采用N多台廉价的电脑,使用冗余的方式,来取得读写速度与数据安全并存的结果。 MapReduce(Simplified Data Processing on Large Clusters) :函数式编程,把所有的操作都分成两类,map与reduce,map用来将数据分成多份,分开处理,reduce将处理后的结果进行归并,得到最终的结果。 BigTable(Bigtable: A Distributed Storage System for Structured Data) :在分布式系统上存储结构化数据的一个解决方案,解决了巨大的Table的管理、负载均衡的问题.
在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器。 进程拥有自己独立的内存空间,所属线程可以访问进程的空间。 程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例。例如,Visual Studio开发环境就是利用一个进程编辑源文件,并利用另一个进程完成编译工作的应用程序。
线程有自己的一组CPU指令、寄存器与私有数据区,线程的数据可以与同一进程的线程共享。 当前的操作系统是面向线程的,即以线程为基本运行单位,并按线程分配CPU。
multiprocessing.Process(target=None, args=())
target: 可以被run()调用的函数,简单来说就是进程中运行的函数
args: 是target的参数
process的方法:
start(): 开始启动进程,在创建process之后执行
join([timeout]):阻塞目前父进程,直到调用join方法的进程执行完或超时(timeout),才继续执行父进程
terminate():终止进程,不论进程有没有执行完,尽量少用。
示例1
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',)) # p进程执行f函数,参数为'bob',注意后面的“,”
p.start() # 进程开始
p.join() # 阻塞主线程,直至p进程执行结束
class multiprocessing.Pool([processes])
processes是进程池中的进程数,默认是本机的cpu数量
方法:
apply(func[, args[, kwds]])进程池中的进程进行func函数操作,操作时会阻塞进程,直至生成结果。
apply_async(func[, args[, kwds[, callback]]])与apply类似,但是不会阻塞进程
map(func, iterable[, chunksize])进程池中的进程进行映射操作
map_async(func, iterable[, chunksize[, callback]])
imap(func, iterable[, chunksize]):返回有序迭代器
imap_unordered(func, iterable[, chunsize]):返回无序迭代器
close():禁止进程池再接收任务
terminate():强行终止进程池,不论是否有任务在执行
join():在close()或terminate()之后进行,等待进程退出
示例2
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5) # 创建有5个进程的进程池
print(p.map(f, [1, 2, 3])) # 将f函数的操作给进程池
3.3.3 Pipes & Queues
multiprocessing.Pipe([duplex])
返回两个连接对象(conn1, conn2),两个连接对象分别访问pipe的头和尾,进行读写操作
Duplex: True(default),创建的pipe是双向的,也即两端都可以进行读写;若为False,则pipe是单向的,仅可以在一端读,另一端写,此时与Queue类似。
multiprocessing.Queue([maxsize])
qsize():返回queue中member数量
empty():如果queue是空的,则返回true
full():如果queue中member数量达到maxsize,则返回true
put(obj):将一个object放入到queue中
get():从队列中取出一个object并将它从queue中移除,FIFO原则
close():关闭队列,并将缓存的object写入pipe
示例
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
3.3.4 进程锁multiprocessing.Lock
当一个进程获得(acquire)锁之后,其它进程在想获得锁就会被禁止,可以保护数据,进行同步处理。
acquire(block=True, timeout=None):尝试获取一个锁,如果block为true,则会在获得锁之后阻止其它进程再获取锁。
release():释放锁
multiprocessing.Value(typecode_or_type, *args[, lock])
返回一个ctype对象,
创建c = Value(‘d’, 3.14),调用c.value()
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回一个ctype数组,只能是一维的
Array(‘i’, [1, 2, 3, 4])
3.3.6 其它方法
multiprocessing.active_children():返回当前进程的所有子进程
multiprocessing.cpu_count():返回本计算机的cpu数量
multiprocessing.current_process():返回当前进程
尽量避免共享数据 所有对象都尽量是可以pickle的 避免使用terminate强行终止进程,以造成不可预料的后果 有队列的进程在终止前队列中的数据需要清空,join操作应放到queue清空后 明确给子进程传递资源、参数
注意跨模块全局变量的使用,可能被各个进程修改造成结果不统一 主模块需要加上if name == ' main ':来提高它的安全性,如果有交互界面,需要加上freeze_support()
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore():
l = mp.Lock() # 定义一个进程锁
#l = 1
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__=='__main__':
multicore()
直接运行上述代码输出[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],运行时间为1.037s 在1的基础上注释掉锁(上述注释了三行),在没有锁的情况下,输出[1, 4, 5, 8, 9, 12, 13, 15, 14, 16],运行时间为0.53s 在2的基础上将p1.join()调到p2.start()前面,输出为[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],运行时间为1.042s.
import multiprocessing as mp
#import pdb
def job(i):
return i*i
def multicore():
pool = mp.Pool()
#pdb.set_trace()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())
# 迭代器,i=0时apply一次,i=1时apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 从迭代器中取出
print([res.get() for res in multi_res])
multicore()