进程的并行与并发
并行:指两者同时执行
并发 : 并发是指资源有限的情况下,两者交替轮流使用资源
区别:
并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。
同步异步与阻塞非阻塞
进程的三状态:
就绪 运行 阻塞
同步:
就是一个任务的完成需要依赖另一个任务,只能等待被依赖的任务执行完,依赖的任务才算执行完。是一种可靠的任务序列,两者的状态保持一致。
异步:
不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确认,所以不是可靠的任务序列。
阻塞与非阻塞:
两者只要是程序(线程)等待消息通知时的状态角度来说的
multiprocess模块
Process模块介绍

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称

1 p.start():启动进程,并调用该子进程中的p.run()
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。
使用Process模块创建进程且使用

import time from multiprocessing import Process def func(num): print('子进程:%s' %num) if __name__ == '__main__': p = Process(target=func, args=(1,)) p.start() time.sleep(1) print('主进程')

import time from multiprocessing import Process def func(num): time.sleep(1) print('子进程:%s' %num) if __name__ == '__main__': p = Process(target=func, args=(1,)) p.start() p.join() # 阻塞 等待子线程执行完毕才结束阻塞 print('主进程')

import os import time from multiprocessing import Process def func(num): time.sleep(1) print('子进程id%s:' %num, os.getpid(), '主进程id:', os.getppid()) if __name__ == '__main__': print('主进程id:', os.getpid()) for i in range(5): Process(target=func, args=(i,)).start()

import time from multiprocessing import Process def func(num): print('子进程%s' %num) time.sleep(1) if __name__ == '__main__': for i in range(5): # 子进程的执行顺序不是根据启动顺序决定的 Process(target=func, args=(i,)).start()

from multiprocessing import Process def func(num): print('子进程%s' %num) if __name__ == '__main__': for i in range(10): p = Process(target=func, args=(i,)) p.start() p.join() # 通过join改变执行方式,按进程开启的顺序执行且等待所有子进程执行完毕主进程才执行结束 print('主进程')

from multiprocessing import Process def func(num): print('子进程%s' %num) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) for pp in p_lst: pp.join() # 通过join阻塞,等待所有子进程执行完毕主进程才结束,此时子进程执行顺序不是按开启顺序执行 print('主进程')

import os from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name # 参数 def run(self): print(os.getpid(), self.name) if __name__ == '__main__': p = MyProcess('zhui') p.start() # 自动调用run方法

from multiprocessing import Process def func(): global n n = 0 print('子进程:', n) if __name__ == '__main__': n = 100 Process(target=func).start() print('主进程:', n)

import time from multiprocessing import Process def func1(): count = 1 while 1: time.sleep(0.5) print(count * '*') count += 1 def func2(): print('func2 start') time.sleep(5) print('func2 end') if __name__ == '__main__': p = Process(target=func1) p.daemon = True # 设置p为一个守护进程,必须在start之前完成 p.start() Process(target=func2).start() time.sleep(5) print('主进程') # 如果主进程代码已经执行完毕,但是子进程没还执行完,守护进程都不会继续执行 # 守护进程会随着主进程的代码执行完毕而结束 # 主进程会等待子进程结束,守护进程只等待主进程代码结束就结束
锁的使用--Lock

import time import json from multiprocessing import Process, Lock def search(person): # 查询余票 with open('ticket') as f: dic = json.load(f) time.sleep(0.2) print('%s查询余票:' % person, dic['count']) def get_ticket(person): # 抢票 with open('ticket') as f: dic = json.load(f) time.sleep(0.2) if dic['count'] > 0: print('%s买到票了' % person) dic['count'] -= 1 time.sleep(0.2) with open('ticket', 'w') as f: json.dump(dic, f) else: print('%s没买到票' % person) def ticket(person, lock): search(person) lock.acquire() # 上锁 get_ticket(person) lock.release() # 解锁 if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=ticket, args=('person%s' % i, lock)) p.start() # 异步的情况下,多个进程有可能同时修改同一份资源 # 加锁可以保证多个进程同时修改同一份资源时,同一时间只有一个进程可以对此操作 # 降低效率 # 保证数据安全
信号量--Semaphore

import time import random from multiprocessing import Process, Semaphore def ktv(person, sem): sem.acquire() print('%s走进ktv' % person) time.sleep(random.randint(1, 3)) print('%s走出ktv' % person) sem.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(10): p = Process(target=ktv, args=('person%s' % i, sem)) p.start() # 信号量的实现机制 # 计数器 + 锁
事件--Event
# 阻塞事件 : wait()方法 # wait是否阻塞是看event对象内部的一个属性 # 控制这个属性的值 # set() 将这个属性的值改成True # clear() 将这个属性的值改成False # is_set() 判断当前的属性是否为True

import time import random from multiprocessing import Process, Event def traffic_light(e): print('\033[31m红灯亮\033[0m') while 1: if e.is_set(): # 判断event对象内部属性是否为True time.sleep(2) print('\033[31m红灯亮\033[0m') e.clear() # 将event对象内部属性改成False else: time.sleep(2) print('\033[32m绿灯亮\033[0m') e.set() # 将event对象内部属性改成True def car(e, i): if not e.is_set(): print('car %s 在等待' % i) e.wait() # 是否阻塞看event对象内部属性 print('car %s 通过了' % i) if __name__ == '__main__': e = Event() p = Process(target=traffic_light, args=(e,)) p.daemon = True p.start() p_lst = [] for i in range(5): time.sleep(random.randrange(0, 3, 2)) p = Process(target=car, args=(e, i)) p.start() p_lst.append(p) for p in p_lst: p.join()
进程间通信(IPC)
队列--Queue

Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)
from multiprocessing import Queue q = Queue() q.put(1) # 队列满了,会发生阻塞 print(q.get()) # 队列为空,会发生阻塞 q.put_nowait(1) # 队列满了,会报错 queue.Full 模块为 import queue print(q.get_nowait()) # 队列为空,会报错 queue.Empty 模块为 import queue print(q.empty()) # 判断当前队列是否为空,多进程情况下判断不准确 print(q.full()) # 判断当前队列是否已满,多进程情况下判断不准确

from multiprocessing import Queue def consume(q): print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=consume, args=(q,)) p.start() q.put({'name': 'zhui'})
一般情况下,进程使用队列是一个进程存,一个进程取

import time import random from multiprocessing import Queue, Process # 消费者 def consumer(q, name): # 处理数据 while 1: food = q.get() if food is None: # 根据标识判断是否结束 break print('%s吃了一个%s' %(name, food)) time.sleep(random.uniform(0.5, 1)) # 生产者 def producer(q, name, food): # 获取数据 for i in range(10): time.sleep(random.uniform(0.3, 0.8)) print('%s生产了%s%s' % (name, food, i)) q.put(food + str(i)) if __name__ == '__main__': q = Queue() c1 = Process(target=consumer, args=(q, 'zhui')) c2 = Process(target=consumer, args=(q, 'xiao')) c1.start() c2.start() p1 = Process(target=producer, args=(q, '筱筱', '冰糖葫芦')) p2 = Process(target=producer, args=(q, '小筱', '姜糖')) p1.start() p2.start() p1.join() p2.join() q.put(None) # 有几个consumer就需要放几个None标识 q.put(None)
JoinableQueue
task_done()
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

import time import random from multiprocessing import Process, JoinableQueue def consumer(q, name): while 1: food = q.get() time.sleep(random.uniform(0.5, 1)) print('%s吃了一个%s' % (name, food)) q.task_done() # 通知队列已经有一个数据被处理了 def producer(q, name, food): for i in range(10): time.sleep(random.uniform(0.3, 0.8)) print('%s生产了%s%s' %(name, food, i)) q.put(food + str(i)) if __name__ == '__main__': jq = JoinableQueue() c1 = Process(target=consumer, args=(jq, 'zhui')) c2 = Process(target=consumer, args=(jq, 'xiao')) c1.daemon = True c2.daemon = True c1.start() c2.start() p1 = Process(target=producer, args=(jq, '筱筱', '冰糖葫芦')) p2 = Process(target=producer, args=(jq, '小筱', '姜糖')) p1.start() p2.start() p1.join() p2.join() jq.join() # 阻塞直到计数为 0
管道--Pipe
# 队列是基于管道实现的 # 管道是基于socket实现的 # 队列在进程之间数据安全 队列 + 锁 简便的IPC机制 # 管道在进程之间数据不安全的 且存取数据复杂 socket + pickle

from multiprocessing import Pipe, Process def consumer(left, right): left.close() while 1: try: print(right.recv()) except EOFError: break if __name__ == '__main__': left, right = Pipe() Process(target=consumer, args=(left, right)).start() right.close() for i in range(10): left.send('冰糖葫芦%s' % i) left.close() # pipe的端口管理不会随着某一个进程的关闭而关闭 # 操作系统管理进程对这些端口的使用
进程池--Pool

import os from multiprocessing import Pool def task(n): time.sleep(1) print('%s run' % os.getpid()) return n**2 if __name__ == '__main__': p = Pool(4) # 实例化,参数代表创建几个进程,默认系统cpu个数 for i in range(10): res = p.apply(task, args=(i,)) # 同步调用 print('-->', res) # 子进程对应函数的返回值

import os from multiprocessing import Pool def task(n): time.sleep(1) print('%s : %s run' % (n, os.getpid())) return n**2 if __name__ == '__main__': p = Pool(4) # 实例化,参数代表创建几个进程,默认系统cpu个数 for i in range(10): p.apply_async(task, args=(i,)) # 异步调用 # 没返回值,需要调用close和join p.close() p.join()

import os from multiprocessing import Pool def task(n): time.sleep(1) print('%s : %s run' % (n, os.getpid())) return n**2 if __name__ == '__main__': p = Pool(4) # 实例化,参数代表创建几个进程,默认系统cpu个数 res_lst = [] for i in range(10): res = p.apply_async(task, args=(i,)) # 异步调用 # print(res.get()) # 直接获取会导致变成同步 res_lst.append(res) for res in res_lst: print(res.get())

import os import time from multiprocessing import Pool def task(n): time.sleep(1) print('%s : %s run' % (n, os.getpid())) return n**2 if __name__ == '__main__': p = Pool(4) # 异步调用的简化版本 # 自带close和join res_lst = p.map(task, range(20)) for res in res_lst: print(res)