语法结构: Process(group=None, target, name, args, kwargs)
参数说明:
pythonfrom multiprocessing import Process
import os, time
def test():
print(f'我是子进程,我的PID是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
print('主进程开始执行')
lst = []
# 创建五个子进程
for i in range(5):
# 创建子进程
p = Process(target=test)
# 启动子进程
p.start()
# 启动中的进程添加到列表
lst.append(p)
# 遍历列表
for item in lst: # item是数据类型 Process类型
item.join() # 阻塞主进程 所有的子进程执行结束后,主进程才会执行结束
print('主进程执行结束')
Process类中常用的属性和方法
方法/属性名称 | 功能描述 |
---|---|
name | 当前进程实例别名,默认为Process-N |
pid | 当前进程对象的PID值 |
is_alive() | 进程是否执行完,没执行完结果为True,否则为False |
join(timeout) | 等待结束或等待timeout秒 |
start() | 启动进程 |
run() | 如果没有指定target参数,则启动进程后,会调用父类中的run方法 |
terminate() | 强制终止进程 |
Process类中属性和方法示例代码1
pythonfrom multiprocessing import Process
import os, time
# 函数式方式创建子进程
def sub_precoess(name):
print(f'子进程PID:{os.getpid()},父进程PID:{os.getppid()},-------------{name}')
time.sleep(1)
def sub_precoess2(name):
print(f'子进程PID:{os.getpid()},父进程PID:{os.getppid()},-------------{name}')
time.sleep(1)
if __name__ == '__main__':
# 主进程
print('主进程开始执行')
for i in range(5):
# 创建第一个子进程
p1 = Process(target=sub_precoess, args=('ysj',))
# 创建第二个子进程
p2 = Process(target=sub_precoess2, args=(18,))
# 调用start()启动子进程
p1.start()
p2.start()
print(p1.name, '是否执行完毕:', p1.is_alive())
print(p2.name, '是否执行完毕:', p2.is_alive())
print(p1.name, 'PID是:', p1.pid)
print(p2.name, 'PID是:', p2.pid)
p1.join() # 主进程要等待p1执行结束,阻塞主进程
p2.join()
print(p1.name, '是否执行完毕:', p1.is_alive())
print(p2.name, '是否执行完毕:', p2.is_alive())
print('你的父进程执行完毕')
Process类中属性和方法示例代码2
pythonfrom multiprocessing import Process
import os, time
# 函数式方式创建子进程
def sub_precoess(name):
print(f'子进程PID:{os.getpid()},父进程PID:{os.getppid()},-------------{name}')
time.sleep(1)
def sub_precoess2(name):
print(f'子进程PID:{os.getpid()},父进程PID:{os.getppid()},-------------{name}')
time.sleep(1)
if __name__ == '__main__':
# 主进程
print('主进程开始执行')
for i in range(5):
# 创建第一个子进程
p1 = Process(target=sub_precoess, args=('ysj',)) # 没有给定target参数,不会执行自己编写的函数中的代码,会调用执行Process类中的run方法
# 创建第二个子进程
p2 = Process(target=sub_precoess, args=(18,))
# 启动子进程
p1.start() # 如果Process类创建对象时没有指定target参数,会调用执行Process类中的run方法执行
p2.start() # 如果Process类创建对象时指定了target参数,start()调用target指定的函数去执行
# 强制终止进程
p1.terminate()
p2.terminate()
print('你的父进程执行完毕')
语法结构: class 子进程(Process): pass
pythonfrom multiprocessing import Process
import os, time
# 自定义一个类
class SubProcess(Process):
# 编写一个初始化方法
def __init__(self, name):
super().__init__()
self.name = name
# 重写父类中的run方法
def run(self):
print(f'子进程的名称{self.name},PID:{os.getpid()},父进程的PID:{os.getppid()}')
if __name__ == '__main__':
print('父进程开始执行')
lst = []
for i in range(1,6):
p1 = SubProcess(f'进程{i}')
# 启动进程
p1.start()
lst.append(p1)
# 阻塞主进程
for item in lst:
item.join()
print('父进程执行结束')
进程池原理:创建一个进程池,并设置进程池中最大的进程数量。假设进程池中最大的进程数为3,现在有10个任务需要执行,那么进程池次可以执行3个任务,4次即可完成全部任务的执行。语法结构: 进程池对象 = Pool(N)
方法名 | 功能描述 |
---|---|
apply_async(func, args, kwargs) | 使用非阻塞方式调用函数func |
apply(func, args, kwargs) | 使用阻塞方式调用函数func |
close() | 关闭进程池,不再接收新任务 |
terminate() | 不管任务是否完成,立即终止 |
join() | 阻塞主进程,必须在 terminate() 或 close() 之后使用,否则不起作用 |
pythonfrom multiprocessing import Pool
import os, time
# 编写任务
def task(name):
print(f'子进程的PID:{os.getpid()},执行的任务:{name}')
time.sleep(1)
if __name__ == '__main__':
# 主进程
start = time.time()
print('父进程开始执行')
# 创建进程池
p = Pool(3)
# 创建任务
for i in range(10):
#p.apply_async(func=task, args=(i,)) # 以非阻塞的方式
p.apply(func=task, args=(i,)) # 以阻塞的方式
p.close() # 关闭进程池,不再接收新任务
p.join() # 阻塞父进程
print('所有的子进程执行完毕,父进程执行结束')
print(time.time()-start) # 执行完10个任务,所需要的时间
并发: 是指两个或多个事件同一时间间隔发生,多个任务被交替轮换着执行,比如A事件是吃苹果,在吃苹果的过程中有快递员敲门让你收下快递,收快递就是B事件,那么收完快递继续吃没吃完的苹果。这就是并发。
并行: 指两个或多个事件在同一时刻发生,多个任务在同一时刻在多个处理器上同时执行。比如A事件是泡脚,B事件是打电话,C事件是记录电话内容,这三件时则可以在同一时刻发生,这就是并行。
多个进程之间数据可以共享吗?通过一下代码可以查看变量a的值,多个进程之间数据不是共享的。
pythonfrom multiprocessing import Process
import os, time
a = 100
def add():
print('子进程1开始执行')
global a
a += 30
print('a=', a)
print('子进程1执行完毕')
def sub():
print('子进程2开始执行')
global a
a -= 50
print('a=', a)
print('子进程2执行完毕')
if __name__ == '__main__':
# 父进程
print('父进程开始执行')
print('a=', a)
# 创建加的子进程
p1 = Process(target=add)
# 创建减的子进程
p2 = Process(target=sub)
# 启动子进程
p1.start()
p2.start()
# 阻塞主进程
p1.join()
p2.join()
print('父进程执行结束')
print('a=', a)
进程之间可以通过 队列(Queue) 进行通信,队列是一种先进先出(First In First Out)的数据结构。语法结构: 队列对象 = Queue(N)
方法名称 | 功能描述 |
---|---|
qsize() | 获取当前队列包含的消息数量 |
empty() | 判断队列是否为空,为空结果为True,否则为False |
full() | 判断队列是否满了,满结果为True,否则为False |
get(block=True) | 获取队列中的一条消息,然后从队列中移除,block默认值为True |
get_nowait() | 相当于get(block=False),消息队列为空时,抛出异常 |
put(item,block= True) | 将item消息放入队列block默认为True |
put_nowait(item) | 相当于put(item, block=False) |
pythonfrom multiprocessing import Queue
if __name__ == '__main__':
# 创建一个队列
q = Queue(3) # 最多可以接收3条消息
print('队列是否为空:', q.empty())
print('队列是否为满:', q.full())
# 向队列中添加消息
q.put('hell')
q.put('word')
print('队列是否为空:', q.empty()) # False
print('队列是否为满:', q.full()) # False
q.put('python')
print('队列是否为空:', q.empty()) # False
print('队列是否为满:', q.full()) # True
print('队列中的信息的个数:', q.qsize())
# 出队
print(q.get())
print('队列中的信息的个数:', q.qsize())
# 入队
q.put_nowait('html')
# q.put_nowait('sql') # 队列满了会报错 queue.Full
# q.put('sql') # 队列满了不报错,会一直等待,等到队列中有空位置
# 遍历
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait()) # nowait() 不等待
print('队列是否为空:', q.empty())
print('队列是否为满:', q.full())
print('队列中的信息的个数:', q.qsize())
向已满队列添加元素
pythonfrom multiprocessing import Queue
if __name__ == '__main__':
# 创建一个队列
q = Queue(3) # 最多可以接收3条消息
# 向队列中添加元素
q.put('hell') # block=True
q.put('word')
q.put('python')
q.put('html', block=True, timeout=2) # 等待2秒后,队列还没有空位置,抛出异常
使用队列实现进程之间的通讯,队列是共享的
pythonfrom multiprocessing import Queue, Process
import time
a = 100
def write_msg(q): # 队列
global a
if not q.full():
for i in range(6):
a -= 10
q.put(a) # 入队
print('a入队时的值是:', a)
# 出队
def read_msg(q):
time.sleep(1)
while not q.empty():
print('出队时a的值是:', q.get())
if __name__ == '__main__':
print('父进程开始执行')
q = Queue() # 由父进程创建队列,没有指定参数,说明可接收的消息个数没有上线
# 创建两个子进程
p1 = Process(target=write_msg, args=(q,))
p2 = Process(target=read_msg, args=(q,))
# 启动两个子进程
p1.start()
p2.start()
# 等待写的进程执行结束,再去执行主进程
p1.join()
p2.join()
print('--------父进程执行完毕--------')
函数式创建线程,语法结构: t = Thread(group, target, name, args, kwargs)
参数说明:
pythonimport threading
from threading import Thread
import time
# 编写函数
def test():
for i in range(3):
time.sleep(1)
print(f'线程:{threading.current_thread().name}正在执行{i}')
if __name__ == '__main__':
start = time.time()
print('主线程开始执行')
# 线程
lst = [Thread(target=test) for i in range(2)]
for item in lst: # item类型是 Thread类型
# 启动进程
item.start()
for item in lst:
item.join()
print(f'一共耗时:{time.time()-start}')
# 三个线程并行执行的任务是什么? 主线程负责执行 main中的代码,Thread-1线程 执行三次循环 Thread-2线程执行三次循环
# 三个线程又并发执行 谁先执行不一定 Thread-1 和 Thread-2
使用Thread子类创建线程的操作步骤:
pythonimport threading
from threading import Thread
import time
# 自定义类
class SubThread(Thread):
def run(self):
for i in range(3):
print(f'线程:{threading.current_thread().name}正在执行{i}')
if __name__ == '__main__':
print('主线程开始执行')
lst = [SubThread() for i in range(2)]
for item in lst: # item类型是 Thread类型
# 启动进程
item.start()
for item in lst:
item.join() # 阻塞主进程
print('主线程执行完毕')
线程之间的数据可以共享吗?线程之间数据是可以共享的
pythonfrom threading import Thread
a = 100
def add():
print('加线程开始执行')
global a
a += 30
print(f'a的值为:{a}')
print('加的线程执行结束')
def sub():
print('减线程开始执行')
global a
a -= 50
print(f'a的值为:{a}')
print('减的线程执行结束')
if __name__ == '__main__':
print('主线程开始执行')
print(f'-----------全局变量a的值为:{a}')
# 线程
add = Thread(target=add)
sub = Thread(target=sub)
add.start()
sub.start()
add.join()
sub.join()
print('---主线程执行结束')
线程操作共享数据安全性问题,多进程操作共享数据,多个线程可以同时出售同一张票,这个时候就需要使用Lock了
pythonimport threading
from threading import Thread, Lock
import time
ticket = 50 # 50张票
lock_obj = Lock() # 创建锁对象
def sale_ticket():
global ticket
# 每个排队窗口假设有100人
lock_obj.acquire() # 上锁
for i in range(100):
if ticket > 0:
print(f'{threading.current_thread().name}正在出售第{ticket}张票')
ticket -= 1
time.sleep(1)
lock_obj.release() # 释放锁
if __name__ == '__main__':
for i in range(3): # 创建3个线程,分别代表3个售票窗口
t = Thread(target=sale_ticket)
t.start()
是线程模型中的经典问题,与编程语言无关。当程序中出现了明确的两类任务,一个任务负责生产数据,一个任务负责处理生产的数据时就可以使用该模式。
python内置模块queue中的Queue类
方法名称 | 功能描述 |
---|---|
put(item) | 向队列中放置数据,如果队列为满,则阻塞 |
get() | 从队列中取走数据,如果队列为空,则阻塞 |
join() | 如果队列不为空,则等待队列变为空 |
task_done() | 消费者从队列中取走一项数据,当队列变为空时,唤醒调用join()的线程 |
pythonfrom queue import Queue
from threading import Thread
import time
# 创建一个生产者类
class Producer(Thread):
def __init__(self, name, queue):
Thread.__init__(self, name=name)
self.queue = queue
def run(self):
for i in range(1, 6):
print(f'{self.name}将产品{i}放入队列')
self.queue.put(i)
time.sleep(1)
print('生产者已经完成了所有数据的存放')
# 创建一个消费者类
class Consumer(Thread):
def __init__(self, name, queue):
Thread.__init__(self, name=name)
self.queue = queue
def run(self):
for _ in range(5):
value = self.queue.get()
print(f'消费者线程:{self.name}取出了{value}')
time.sleep(1)
print('--------消费者线程已经完成了所有数据的取出--------')
if __name__ == '__main__':
# 创建队列
queue = Queue()
# 创建生产者线程
p = Producer('Process', queue)
# 创建消费者线程
c = Consumer('Consumer', queue)
# 启动线程
p.start()
c.start()
# 阻塞主线程
p.join()
c.join()
print('主线程执行结束')
Process(group=None, target, name, args, kwargs)
进程池对象 = Pool(N)
本文作者:柯南
本文链接:
版权声明:©2024 柯南 All rights reserved.