为了提高程序的运行效率,Python与其他语言一样,提供了多进程和多线程的开发方式,这篇文章我们来讲Python的多进程和多线程开发。
进程 Python提供了mutilprocessing
模块,为多进程编程提供了友好的API,并且提供了多进程之间信息同步和通信的相关组件,如Queue
、Event
、Pool
、Lock
、Pipe
、Semaphore
、Condition
等模块。
函数当做进程 Python中创建多进程的方式有2种:
逻辑简单的任务一般使用函数当做进程,逻辑较多或代码结构复杂的建议使用类当做进程。
首先来看函数当做进程的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import osimport timeimport randomfrom multiprocessing import Processdef task (name) : s = random.randint(1 , 10 ) print 'pid: %s, name: %s, sleep %s ...' % (os.getpid(), name, s) time.sleep(s) if __name__ == '__main__' : ps = [] for i in range(5 ): p = Process(target=task, args=('p%s' % i, )) ps.append(p) p.start() for p in ps: p.join()
使用p = Process(target=func, args=(arg1, arg2...))
即可创建一个进程,调用p.start()
启动一个进程,p.join()
使得主进程等待子进程执行结束后才退出。
当这个程序执行时,你可以ps
查看一下进程,会发现一共有6个进程在执行,其中包括1个主进程,5个子进程。
类当做进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import osimport randomimport timefrom multiprocessing import Processclass P (Process) : def run (self) : s = random.randint(1 , 10 ) print 'pid: %s, name: %s, sleep %s...' % (os.getpid(), self.name, s) time.sleep(s) if __name__ == '__main__' : ps = [] for i in range(5 ): p = P() ps.append(p) p.start() for p in ps: p.join()
类P
继承了Process
,并重写了run
方法,在调用start
方法时会自动执行run
方法,执行效果与上面类似。
Queue 如果多个进程之间需要通信,可以使用队列,Python提供了Queue
模块,例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import timeimport randomfrom multiprocessing import Process, Queueclass P1 (Process) : def __init__ (self, queue) : self.queue = queue super(P1, self).__init__() def run (self) : print 'P1 put ...' for i in range(5 ): time.sleep(random.randint(1 , 3 )) self.queue.put(i) print 'put: P1 -> %s' % i class P2 (Process) : def __init__ (self, queue) : self.queue = queue super(P2, self).__init__() def run (self) : print 'P2 read ...' while 1 : i = self.queue.get() print 'get: P2 -> %s' % i if __name__ == '__main__' : queue = Queue() p1 = P1(queue) p2 = P2(queue) p1.start() p2.start() p1.join() p2.terminate()
一共2个进程,一个进程使用queue.put()
负责向队列写入数据,另一个进程使用queue.get()
队列中读取数据,实现了2个进程之间的信息通信。
Pipe 上面提到队列的使用场景常用于一端写入数据,另一端读取数据进行操作。
如果进程两端在读取数据时同时也想写入数据要怎么做?
Python多进程模块中提供了Pipe
,意为管道的意思,两端都可以进行读写操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import timeimport randomfrom multiprocessing import Process, Pipeclass P1 (Process) : def __init__ (self, pipe) : self.pipe = pipe super(P1, self).__init__() def run (self) : print 'P1 send ...' for i in range(3 ): time.sleep(random.randint(1 , 2 )) self.pipe.send(i) print 'send: P1 -> %s' % i print 'P1 recv ...' for i in range(3 ): i = self.pipe.recv() print 'recv: P1 -> %s' % i class P2 (Process) : def __init__ (self, pipe) : self.pipe = pipe super(P2, self).__init__() def run (self) : print 'P2 recv ...' for i in range(3 ): i = self.pipe.recv() print 'recv: P2 -> %s' % i print 'P2 send ...' for i in range(3 ): time.sleep(random.randint(1 , 2 )) self.pipe.send(i) print 'send: P2 -> %s' % i if __name__ == '__main__' : pipe1, pipe2 = Pipe() p1 = P1(pipe1) p2 = P2(pipe2) p1.start() p2.start() p1.join() p2.join()
创建一个Pipe
会返回2个管道,这2个管道分别交给2个进程,即可实现这2个进程之间的互相通信。
Event 如果需要在多进程之间控制某些事件的开始与停止,也就是在多进程进程保持同步信号信息,可使用Event
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import timeimport randomfrom multiprocessing import Process, Queue, Eventclass P1 (Process) : def __init__ (self, queue, event) : self.queue = queue self.event = event super(P1, self).__init__() def run (self) : self.event.wait() print 'P1 put ...' for i in range(5 ): time.sleep(random.randint(1 , 3 )) self.queue.put(i) print 'put: P1 -> %s' % i class P2 (Process) : def __init__ (self, queue, event) : self.queue = queue self.event = event super(P2, self).__init__() def run (self) : self.event.wait() print 'P2 read ...' while 1 : i = self.queue.get() print 'get: P2 -> %s' % i if __name__ == '__main__' : queue = Queue() event = Event() p1 = P1(queue, event) p2 = P2(queue, event) p1.start() p2.start() print 'sleep 3s ...' time.sleep(3 ) event.set() p1.join() p2.terminate()
执行程序后,我们发现2个子进程在执行到event.wait()
时,阻塞在此,直到主进程休眠3秒后执行event.set()
时,子进程才得以向下执行。
使用Event
可以控制进程之间的同步问题。
Pool 多进程虽然可以提高运行效率,但同时也不建议无限制的创建进程,过多的进程会给操作系统的调度和上下文切换带来更大的负担,进程越来越多也有可能导致效率下降。
在multiiprocessing
模块中,提供了进程池模块Pool
,理论来说同时执行的进程数与CPU核心相等,才会保证最高效的运行效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import osimport randomimport timefrom multiprocessing import Pooldef task (name) : s = random.randint(1 , 10 ) print 'pid: %s, name: %s, sleep %s ...' % (os.getpid(), name, s) time.sleep(s) if __name__ == '__main__' : pool = Pool(5 ) for i in range(10 ): pool.apply_async(task, ('p-%s' % i, )) pool.close() pool.join()
上面代码定义了大小为5的进程池,也就是说不管向进程池里放入多少个任务,同一时刻只有5个进程在执行。
我们在编写多进程程序时,一般使用进程池的方式执行多个任务,保证高效的同时也避免资源的浪费。
Lock 在执行多进程任务执行过程中,如果需要对同一资源(例如文件)进行访问时,为了防止一个进程操作的资源不被另一个进程篡改,可以使用Lock
对其进行加锁互斥。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 from multiprocessing import Process, Lockclass P1 (Process) : def __init__ (self, lock, fp) : self.lock = lock self.fp = fp super(P1, self).__init__() def run (self) : with self.lock: for i in range(5 ): f = open(self.fp, 'a+' ) f.write('p1 - %s\n' % i) f.close() class P2 (Process) : def __init__ (self, lock, fp) : self.lock = lock self.fp = fp super(P2, self).__init__() def run (self) : with self.lock: for i in range(5 ): f = open(self.fp, 'a+' ) f.write('p2 - %s\n' % i) f.close() if __name__ == '__main__' : lock = Lock() fp = 'test.txt' p1 = P1(lock, fp) p2 = P2(lock, fp) p1.start() p2.start() p1.join() p2.join()
上面代码对同一个文件进行操作时,如果不加锁,2个进程会同时向文件写入内容。如果想保证写入顺序,在写文件之前使用Lock
加锁,就能保证只有一个进程能进入操作文件。
Semaphore 如果有一种场景,需要多个进程同时执行一些任务或访问某个资源,但要限制最大参与的进程数量,那么就可以使用Semaphore
信号量来控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timeimport osfrom multiprocessing import Process, Semaphoresemaphore = Semaphore(4 ) def task (name) : if semaphore.acquire(): print 'pid: %s, name: %s, sleep 1 ...' % (os.getpid(), name) time.sleep(1 ) semaphore.release() if __name__ == '__main__' : ps = [] for i in range(10 ): p = Process(target=task, args=('p%s' % i, )) ps.append(p) p.start() for p in ps: p.join()
执行上面的代码,你会发现虽然创建了10个进程,但同一时刻只有4个进程能能够执行真正的逻辑。
Condition 如果你有使用Lock
+ Event
结合的场景,可以使用Condition
,它基本上包含了这2种特性,在加锁的同时,还可以根据逻辑条件让其他进程等待或重新唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import timeimport randomfrom multiprocessing import Process, Queue, Conditiondef produer (queue, condition) : while 1 : if condition.acquire(): if not queue.empty(): condition.wait() i = random.randint(1 , 10 ) queue.put(i) print 'produer --> %s' % i condition.notify() condition.release() time.sleep(1 ) def consumer (queue, condition) : while 1 : if condition.acquire(): if queue.empty(): condition.wait() i = queue.get() print 'consumer --> %s' % i condition.notify() condition.release() time.sleep(1 ) if __name__ == '__main__' : queue = Queue() condition = Condition() p1 = Process(target=produer, args=(queue, condition)) p2 = Process(target=consumer, args=(queue, condition)) p1.start() p2.start() p1.join() p2.join()
Condition
是一种更高级的控制进程同步和资源控制的方式。
线程 线程是进程执行的最小单位,比进程更轻量,一个进程至少包含一个线程,一个进程中的所有线程共享这个进程的地址空间和资源句柄。
在Python代码执行中,默认是单进程单线程执行的。
如果想要编写多线程程序,Python也提供了threading
模块,同时也提供了线程之间信息同步和信号控制的组件。
函数当做线程 创建线程与创建进程类似,也有2种方式:
函数当做线程的例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import threadingdef run (name) : for i in range(3 ): print '%s --> %s' % (name, i) if __name__ == '__main__' : t1 = threading.Thread(target=run, args=('t1' , )) t2 = threading.Thread(target=run, args=('t2' , )) t1.start() t2.start() t1.join() t2.join()
与进程很类似,t = threading.Thread(target=func, args=(arg1, arg2...))
创建一个线程,t.start()
开始执行线程,t.join()
使主线程等待其他线程执行结束。
类当做线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import threadingclass A (threading.Thread) : def run (self) : for i in range(5 ): print self.name, i if __name__ == '__main__' : a1 = A() a2 = A() a1.start() a2.start() a1.join() a2.join()
只要继承threading.Thread
类,重写run
方法,这个类就会以多线程的方式执行run
方法里的逻辑。
Queue 多线程之间也可以使用队列进行数据传输:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import timeimport randomfrom Queue import Queuefrom threading import Threadclass T1 (Thread) : def __init__ (self, queue) : self.queue = queue super(T1, self).__init__() def run (self) : print 'T1 put ...' for i in range(5 ): time.sleep(random.randint(1 , 3 )) self.queue.put(i) print 'put: T1 -> %s' % i class T2 (Thread) : def __init__ (self, queue) : self.queue = queue self._running = True super(T2, self).__init__() def stop (self) : self._running = False def run (self) : print 'T2 read ...' while self._running: i = self.queue.get() print 'get: T2 -> %s' % i if __name__ == '__main__' : queue = Queue() t1 = T1(queue) t2 = T2(queue) t1.start() t2.start() time.sleep(10 ) t2.stop() t1.join() t2.join()
Event 多线程的同步也有Event
可以控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import timeimport randomfrom Queue import Queuefrom threading import Thread, Eventclass T1 (Thread) : def __init__ (self, queue, event) : self.queue = queue self.event = event super(T1, self).__init__() def run (self) : self.event.wait() print 'T1 put ...' for i in range(5 ): time.sleep(random.randint(1 , 3 )) self.queue.put(i) print 'put: T1 -> %s' % i class T2 (Thread) : def __init__ (self, queue, event) : self.queue = queue self.event = event self._running = True super(T2, self).__init__() def stop (self) : self._running = False def run (self) : self.event.wait() print 'T2 read ...' while self._running: i = self.queue.get() print 'get: T2 -> %s' % i if __name__ == '__main__' : queue = Queue() event = Event() t1 = T1(queue, event) t2 = T2(queue, event) t1.start() t2.start() print 'sleep 3s...' time.sleep(3 ) event.set() time.sleep(10 ) t2.stop() t1.join() t2.join()
Pool 避免无限制的创建线程,使用线程池执行任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import timeimport randomfrom multiprocessing.pool import ThreadPooldef task (name) : s = random.randint(1 , 10 ) print 'name: %s, sleep %s ...' % (name, s) time.sleep(s) if __name__ == '__main__' : pool = ThreadPool(5 ) for i in range(10 ): pool.apply_async(task, ('t-%s' % i, )) pool.close() pool.join()
Semaphore 允许多个线程同时操作某个资源并限制最大线程数,使用Semaphore
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timeimport osfrom threading import Thread, Semaphoresemaphore = Semaphore(4 ) def task (name) : if semaphore.acquire(): print 'name: %s, sleep 1 ...' % name time.sleep(1 ) semaphore.release() if __name__ == '__main__' : ts = [] for i in range(10 ): t = Thread(target=task, args=('t%s' % i, )) ts.append(t) t.start() for t in ts: t.join()
Condition 与多进程类似,Condition
是Lock
+ Event
的结合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import timeimport randomfrom Queue import Queuefrom threading import Thread, Conditiondef produer (queue, condition) : for i in range(5 ): if condition.acquire(): if not queue.empty(): condition.wait() i = random.randint(1 , 10 ) queue.put(i) print 'produer --> %s' % i condition.notify() condition.release() time.sleep(1 ) def consumer (queue, condition) : for i in range(5 ): if condition.acquire(): if queue.empty(): condition.wait() i = queue.get() print 'consumer --> %s' % i condition.notify() condition.release() time.sleep(1 ) if __name__ == '__main__' : queue = Queue() condition = Condition() t1 = Thread(target=produer, args=(queue, condition)) t2 = Thread(target=consumer, args=(queue, condition)) t1.start() t2.start() t1.join() t2.join()
concurrent模块 上面介绍了很多进程、线程各种常用的开发方式,其实最常用的编程模式还是使用进程池或线程池来执行进程、线程。
这里有必要推荐一下concurrent
模块,这个模块非常友好的封装了进程和线程最常用的操作,使用起来更简单易用。
并且在Python3.2以后,已经是纳入官方标准模块。
Python3.2以下需要手动安装此模块:
多进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from concurrent.futures import ProcessPoolExecutordef task (total) : """模拟CPU密集运算""" num = 0 for i in range(total): num += i return num if __name__ == '__main__' : pool = ProcessPoolExecutor(max_workers=5 ) result = pool.map(task, [100 , 1000 , 10000 , 100000 ]) for item in result: print item
使用ProcessPoolExecutor
创建进程池,调用pool.map
方法批量加入任务并执行,然后输出每个进程的执行结果。
也可以使用submit
提交单个任务在进程池中执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from concurrent.futures import ProcessPoolExecutordef task (total) : """模拟CPU密集任务""" num = 0 for i in range(total): num += i return num if __name__ == '__main__' : pool = ProcessPoolExecutor(max_workers=5 ) results = [] results.append(pool.submit(task, 100 )) results.append(pool.submit(task, 1000 )) results.append(pool.submit(task, 10000 )) results.append(pool.submit(task, 10000 )) for future in results: print future.result()
注意,pool.submit
提交后返回的是Future
对象,它意味着在未来的某个时刻才会得到结果,所以在输出结果时,需要调用future.result()
方法拿到真正的执行结果。
多线程 线程池的方式与进程池类似,只需把ProcessPoolExecutor
换成ThreadPoolExecutor
即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import requestsfrom concurrent.futures import ThreadPoolExecutordef task (url) : """模拟IO密集任务""" return requests.get(url).status_code if __name__ == '__main__' : pool = ThreadPoolExecutor(max_workers=5 ) urls = ['http://www.baidu.com' , 'http://www.163.com' , 'http://www.taobao.com' ] result = pool.map(task, urls) for item in result: print item
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import requestsfrom concurrent.futures import ThreadPoolExecutordef task (url) : """模拟IO密集运算""" return requests.get(url).status_code if __name__ == '__main__' : pool = ThreadPoolExecutor(max_workers=5 ) results = [] results.append(pool.submit(task, 'http://www.baidu.com' )) results.append(pool.submit(task, 'http://www.163.com' )) results.append(pool.submit(task, 'http://www.taobao.com' )) for future in results: print future.result()