Skip to main content

多进程

基础概念

官方文档

什么是进程

进程(process)是正在运行的程序的实例,但一个程序可能会产生多个进程

每个进程都有自己的地址空间,内存,数据栈以及其他记录其运行状态的辅助数据,不同的进程只能使用消息队列、共享内存等进程间通讯(IPC)方法进行通信,而不能直接共享信息。

虽然子进程复制了父进程的代码段和数据段等,但是一旦子进程开始运行,子进程和父进程就是相互独立的,它们之间不再共享任何数据。

基础使用

os.fork()

import os
pid = os.fork()

if pid < 0:
# 创建失败
print 'Fail to create process'
elif pid == 0:
# 主进程的代码
print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid())
else:
# 希望在子进程执行的代码
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

windows 系统没有fokr()

multiprocessing 模块

基础用例

import os
from multiprocessing import Process

# 子进程要执行的代码
def child_proc(name):
print(f"Run child process {name} ({os.getpid()})...")


if __name__ == "__main__":
# 首先主进程获取自己的pid
print(f"Parent process {os.getpid()}.")

# 创建一个子进程实例,
p = Process(target=child_proc, args=(["test"]))
print("Process will start.")

# 开启子进程
p.start()

# 让主进程阻塞,等待子进程执行
p.join()

print("Process end.")

Process是一个可以创建进程的类,参数target指定了进程要执行的函数,args指定了对应的参数

在创建实例后,调用实例的start()方法,

数据共享

使用 Manager 服务进程管理器

from multiprocessing import Process, Manager

config = {
"name":None,
"id":0,
}

def func(config_dict):
# 进程内可以修改共享的对象
if config["update"]:
config = Config(**config_dict)

if __name__ == '__main__':
# 开启一个服务进程
with Manager() as manager:
c = manager.dict()

p = Process(target=func, args=(c,))
p.start()
p.join()

进程池

multiprocessing 模块

import os, time
from multiprocessing import Pool

def foo(x):
print 'Run task %s (pid:%s)...' % (x, os.getpid())
time.sleep(2)
print 'Task %s result is: %s' % (x, x * x)

if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
# 设置进程数
p = Pool(4)

for i in range(5):
# 设置每个进程要执行的函数和参数
p.apply_async(foo, args=(i,))

print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'

在上面的代码中,Pool 用于生成进程池,对 Pool 对象调用 apply_async 方法可以使每个进程异步执行任务,也就说不用等上一个任务执行完才执行下一个任务,close 方法用于关闭进程池,确保没有新的进程加入,join 方法会等待所有子进程执行完毕。

注意点

  • close() 的调用必须在 join() 之前,这样表示不在添加新的进程了,可以运行了
  • Pool 的数量决定了同时能执行的进程个数,超过数量的进程会等待新的空置进程再执行

进程通讯

进程间的通信可以通过管道(Pipe),队列(Queue)等多种方式来实现。

  • 使用Queue,通过将一个Queue实例以参数的形式传入进程中,各个子进程都可以获取到同一个Queue
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue

# 向队列中写入数据
def write_task(q):
try:
n = 1
while n < 5:
print "write, %d" % n
q.put(n)
time.sleep(1)
n += 1
except BaseException:
print "write_task error"
finally:
print "write_task end"

# 从队列读取数据
def read_task(q):
try:
n = 1
while n < 5:
print "read, %d" % q.get()
time.sleep(1)
n += 1
except BaseException:
print "read_task error"
finally:
print "read_task end"

if __name__ == "__main__":
q = Queue() # 父进程创建Queue,并传给各个子进程
pw = Process(target=write_task, args=(q,))
pr = Process(target=read_task, args=(q,))
pw.start() # 启动子进程 pw,写入
pr.start() # 启动子进程 pr,读取
pw.join() # 等待 pw 结束
pr.join() # 等待 pr 结束
print "DONE"

根据cpu数量创建进程

def main(input_pdf: str, output_pdf: str):
"""主处理流程"""

# 初始化环境
with tempfile.TemporaryDirectory() as tmpdir:
# 读取文档信息
src_doc = fitz.open(input_pdf)
total_pages = len(src_doc)
src_doc.close()

# 准备任务队列的入参
# tasks = [(input_pdf, p, TEMP_DIR) for p in range(total_pages)]

# pdf_path, page_num, out_put_name, dpi=300, paper_size="A4"

dpi = 300
page_size = "A3"
output_type = "pdf"
# tasks = [(input_pdf, page_num, output_name, dpi, page_size) for page_num in range(total_pages)]

param_list = []
jpg_list = []
for page_num in range(total_pages):
basename, _ = path.splitext(path.basename(input_pdf))

output_name = path.join(tmpdir, f"{basename}_{page_num}.{output_type}")
param_list.append((input_pdf, page_num, output_name, dpi, page_size, output_type))
jpg_list.append(output_name)

# 创建进程池,CPU核心数,保留1个核心给系统
with multiprocessing.Pool(processes=max(1, multiprocessing.cpu_count() - 1)) as pool:
# 使用imap_unordered并配合排序以获得最佳效率
results = list(tqdm(pool.imap_unordered(ss, param_list), total=total_pages, desc="页面处理进度"))
# results = list(tqdm(pool.imap_unordered(process_page, tasks), total=total_pages, desc="页面处理进度"))

# 校验处理结果
failed_pages = [p for p, success, output_path in results if not success]
if failed_pages:
print(f"警告:以下页面处理失败: {failed_pages}")
return

images_to_pdf(jpg_list, output_pdf)