多进程
基础概念
什么是进程
进程(process)是正在运行的程序的实例,但一个程序可能会产生多个进程。
每个进程都有自己的地址空间,内存,数据栈以及其他记录其运行状态的辅助数据,不同的进程只能使用消息队列、共享内存等进程间通讯(IPC)方法进行通信,而不能直接共享信息。
虽然子进程复制了父进程的代码段和数据段等,但是一旦子进程开始运行,子进程和父进程就是相互独立的,它们之间不再共享任何数据。
基础使用
os.fork()
import os
pid = os.fork()
if pid < 0:
# 创建失败
print 'Fail to create process'
elif pid == 0:
# 主进程的代码
print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid())
else:
# 希望在子进程执行的代码
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
windows 系统没有
fokr()
multiprocessing
模块
基础用例
import os
from multiprocessing import Process
# 子进程要执行的代码
def child_proc(name):
print(f"Run child process {name} ({os.getpid()})...")
if __name__ == "__main__":
# 首先主进程获取自己的pid
print(f"Parent process {os.getpid()}.")
# 创建一个子进程实例,
p = Process(target=child_proc, args=(["test"]))
print("Process will start.")
# 开启子进程
p.start()
# 让主进程阻塞,等待子进程执行
p.join()
print("Process end.")
Process
是一个可以创建进程的类,参数target
指定了进程要执行的函数,args
指定了对应的参数
在创建实例后,调用实例的start()
方法,
数据共享
使用 Manager 服务进程管理器
from multiprocessing import Process, Manager
config = {
"name":None,
"id":0,
}
def func(config_dict):
# 进程内可以修改共享的对象
if config["update"]:
config = Config(**config_dict)
if __name__ == '__main__':
# 开启一个服务进程
with Manager() as manager:
c = manager.dict()
p = Process(target=func, args=(c,))
p.start()
p.join()
进程池
multiprocessing
模块
import os, time
from multiprocessing import Pool
def foo(x):
print 'Run task %s (pid:%s)...' % (x, os.getpid())
time.sleep(2)
print 'Task %s result is: %s' % (x, x * x)
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
# 设置进程数
p = Pool(4)
for i in range(5):
# 设置每个进程要执行的函数和参数
p.apply_async(foo, args=(i,))
print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'
在上面的代码中,Pool 用于生成进程池,对 Pool 对象调用 apply_async 方法可以使每个进程异步执行任务,也就说不用等上一个任务执行完才执行下一个任务,close 方法用于关闭进程池,确保没有新的进程加入,join 方法会等待所有子进程执行完毕。
注意点
- close() 的调用必须在 join() 之前,这样表示不在添加新的进程了,可以运行了
- Pool 的数量决定了同时能执行的进程个数,超过数量的进程会等待新的空置进程再执行
进程通讯
进程间的通信可以通过管道(Pipe),队列(Queue)等多种方式来实现。
- 使用
Queue
,通过将一个Queue
实例以参数的形式传入进程中,各个子进程都可以获取到同一个Queue
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
# 向队列中写入数据
def write_task(q):
try:
n = 1
while n < 5:
print "write, %d" % n
q.put(n)
time.sleep(1)
n += 1
except BaseException:
print "write_task error"
finally:
print "write_task end"
# 从队列读取数据
def read_task(q):
try:
n = 1
while n < 5:
print "read, %d" % q.get()
time.sleep(1)
n += 1
except BaseException:
print "read_task error"
finally:
print "read_task end"
if __name__ == "__main__":
q = Queue() # 父进程创建Queue,并传给各个子进程
pw = Process(target=write_task, args=(q,))
pr = Process(target=read_task, args=(q,))
pw.start() # 启动子进程 pw,写入
pr.start() # 启动子进程 pr,读取
pw.join() # 等待 pw 结束
pr.join() # 等待 pr 结束
print "DONE"
根据cpu数量创建进程
def main(input_pdf: str, output_pdf: str):
"""主处理流程"""
# 初始化环境
with tempfile.TemporaryDirectory() as tmpdir:
# 读取文档信息
src_doc = fitz.open(input_pdf)
total_pages = len(src_doc)
src_doc.close()
# 准备任务队列的入参
# tasks = [(input_pdf, p, TEMP_DIR) for p in range(total_pages)]
# pdf_path, page_num, out_put_name, dpi=300, paper_size="A4"
dpi = 300
page_size = "A3"
output_type = "pdf"
# tasks = [(input_pdf, page_num, output_name, dpi, page_size) for page_num in range(total_pages)]
param_list = []
jpg_list = []
for page_num in range(total_pages):
basename, _ = path.splitext(path.basename(input_pdf))
output_name = path.join(tmpdir, f"{basename}_{page_num}.{output_type}")
param_list.append((input_pdf, page_num, output_name, dpi, page_size, output_type))
jpg_list.append(output_name)
# 创建进程池,CPU核心数,保留1个核心给系统
with multiprocessing.Pool(processes=max(1, multiprocessing.cpu_count() - 1)) as pool:
# 使用imap_unordered并配合排序以获得最佳效率
results = list(tqdm(pool.imap_unordered(ss, param_list), total=total_pages, desc="页面处理进度"))
# results = list(tqdm(pool.imap_unordered(process_page, tasks), total=total_pages, desc="页面处理进度"))
# 校验处理结果
failed_pages = [p for p, success, output_path in results if not success]
if failed_pages:
print(f"警告:以下页面处理失败: {failed_pages}")
return
images_to_pdf(jpg_list, output_pdf)