有两种常用的方法可以使得代码并行执行,多线程和多进程 。因为Cython解释器的实现不是线程安全的,具有GIL锁,同一时刻,只有一个线程可以获得解释器的锁。因此,Python利用多核心的CPU只能通过多进程,而多线程只适用于IO密集型的程序。
多进程基础
创建并开启进程
可以使用
multiprocessing.Process()
来创建进程,它接受两个参数:
- target,一个可调用的函数,当进程开始时会执行
- args,一个元组,提供目标函数的参数
使用
process.start()
来开始执行一个进程调用
process.join()
来告诉程序等待进程结束再执行后续代码,主进程将会被阻塞
from multiprocessing import Process
import os
def square_numbers():
for i in range(1000):
result = i * i
if __name__ == "__main__":
processes = []
num_processes = os.cpu_count()
# number of CPUs on the machine. Usually a good choise for the number of processes
# create processes and asign a function for each process
for i in range(num_processes):
process = Process(target=square_numbers)
processes.append(process)
# start all processes
for process in processes:
process.start()
# wait for all processes to finish
# block the main programm until these processes are finished
for process in processes:
process.join()
进程间分享数据
因为进程的内存空间不同,需要特殊的共享内存对象来分享数据。
数据可以保存在共享内存变量中,使用
Value
或者Array
- Value(type, value)创建一个
ctype
对象- Array(type, value)创建一个
ctype
类型的列表如下程序演示年race condition资源竟态,每次执行结果都不一样,例如当两个进程读取同一个值,并对其执行+1操作,然后写会原有地址,其结果并不是预想的加2。
from multiprocessing import Process, Value, Array
import time
def add_100(number):
for _ in range(100):
time.sleep(0.001)
number.value += 1
def add_100_array(numbers):
for _ in range(100):
time.sleep(0.01)
for i in range(len(numbers)):
numbers[i] += 1
if __name__ == "__main__":
shared_number = Value('i', 0)
print('Value at beginning:', shared_number.value)
shared_array = Array('d', [0.0, 100.0, 200.0])
print('Array at beginning:', shared_array[:])
process1 = Process(target=add_100, args=(shared_number,))
process2 = Process(target=add_100, args=(shared_number,))
process3 = Process(target=add_100_array, args=(shared_array,))
process4 = Process(target=add_100_array, args=(shared_array,))
process1.start()
process2.start()
process3.start()
process4.start()
process1.join()
process2.join()
process3.join()
process4.join()
print('Value at end:', shared_number.value)
print('Array at end:', shared_array[:])
print('end main')
"""
Value at beginning: 0
Array at beginning: [0.0, 100.0, 200.0]
Value at end: 144
Array at end: [134.0, 237.0, 339.0]
end main
"""
可以使用锁避免资源竟态
锁(也称为互斥锁)是一种同步机制,用于在存在许多执行进程/线程的环境中强制限制对资源的访问。锁具有两种状态:锁定和解锁。 如果状态为锁定,则在再次解除锁定状态之前,不允许其他并发进程/线程进入此代码段。
# import Lock
from multiprocessing import Lock
from multiprocessing import Process, Value, Array
import time
def add_100(number, lock):
for _ in range(100):
time.sleep(0.001)
# lock the state
lock.acquire()
number.value += 1
# unlock the state
lock.release()
def add_100_array(numbers, lock):
for _ in range(100):
time.sleep(0.01)
for i in range(len(numbers)):
lock.acquire()
numbers[i] += 1
lock.release()
if __name__ == "__main__":
# create a lock
lock1 = Lock()
lock2 = Lock()
shared_number = Value('i', 0)
print('Value at beginning:', shared_number.value)
shared_array = Array('d', [0.0, 100.0, 200.0])
print('Array at beginning:', shared_array[:])
# pass the lock to the target function
process1 = Process(target=add_100, args=(shared_number, lock1))
process2 = Process(target=add_100, args=(shared_number, lock1))
process3 = Process(target=add_100_array, args=(shared_array, lock2))
process4 = Process(target=add_100_array, args=(shared_array, lock2))
process1.start()
process2.start()
process3.start()
process4.start()
process1.join()
process2.join()
process3.join()
process4.join()
print('Value at end:', shared_number.value)
print('Array at end:', shared_array[:])
print('end main')
"""
Value at beginning: 0
Array at beginning: [0.0, 100.0, 200.0]
Value at end: 200
Array at end: [200.0, 300.0, 400.0]
end main
"""
在上下文管理器中使用锁
使用上下文管理器管理锁的获取和释放更加安全
def add_100(number, lock):
for _ in range(100):
time.sleep(0.01)
with lock:
number.value += 1
多进程使用队列通信
使用队列的操作是进程安全的。多进程队列实现了队列的所有方法。done()和join()除外。
q.get()
:移除队首第一个元素,默认情况,会阻塞直到有元素可用q.put(item)
将元素压到队尾,默认情况,阻塞直到队列有空的槽q.empty()
如果队列为空,返回Trueq.close()
表明当前进程不会有新的数据放到队列中了
# communicate between processes with the multiprocessing Queue
# Queues are thread and process safe
from multiprocessing import Process, Queue
import time
def square(numbers, queue):
for i in numbers:
time.sleep(0.01)
queue.put(i*i)
def make_negative(numbers, queue):
for i in numbers:
time.sleep(0.01)
queue.put(i*-1)
if __name__ == "__main__":
numbers = range(1, 6)
q = Queue()
p1 = Process(target=square, args=(numbers,q))
p2 = Process(target=make_negative, args=(numbers,q))
p1.start()
p2.start()
p1.join()
p2.join()
# order might not be sequential
while not q.empty():
print(q.get())
print('end main')
"""
1
-1
4
-2
9
-3
16
-4
25
-5
end main
"""
进程池
进程池对象控制一些工作进程worker,可以支持超时和回调以实现异步处理,也有一些并行的map实现。它可以自动管理多个处理器,并将数据分成小块,在多个处理器上并行处理。
重要的函数包括:
map(func, iterable[, chunksize])
将可迭代对象切分成小块,作为独立任务提交到进程池,并行处理。函数将会阻塞,直到返回结果。close()
阻止更多任务添加到进程池,一旦任务完成,worker进程将退出join()
等待工作进程退出,在调用join()
之前需要调用close()
或者terminate()
apply(func, args)
调用func
函数,参数是args
。阻塞直到返回结果,func
函数只在进程池中一个worker中执行- 有
map_async()
和apply_async()
这种非阻塞的异步函数
from multiprocessing import Pool
import random
import time
def cube(number):
print("Hi")
time.sleep(random.randint(1,2))
return number * number * number
if __name__ == "__main__":
numbers = range(10)
p = Pool()
# by default this allocates the maximum number of available
# processors for this task --> os.cpu_count()
result = p.map(cube, numbers)
# or
# result = [p.apply(cube, args=(i,)) for i in numbers]
p.close()
p.join()
print(result)
多线程基础
Python多线程相对比较鸡肋,其使用和多进程类似
创建并开始线程
使用threading库实现
from threading import Thread
def square_numbers():
for i in range(1000):
result = i * i
if __name__ == "__main__":
threads = []
num_threads = 10
# create threads and asign a function for each thread
for i in range(num_threads):
thread = Thread(target=square_numbers)
threads.append(thread)
# start all threads
for thread in threads:
thread.start()
# wait for all threads to finish
# block the main thread until these threads are finished
for thread in threads:
thread.join()
线程间共享数据
线程间可以通过全局变量来共享数据,因为线程间是共享内存空间的
from threading import Thread
import time
# all threads can access this global variable
database_value = 0
def increase():
global database_value # needed to modify the global value
# get a local copy (simulate data retrieving)
local_copy = database_value
# simulate some modifying operation
local_copy += 1
time.sleep(0.1)
# write the calculated new value into the global variable
database_value = local_copy
if __name__ == "__main__":
print('Start value: ', database_value)
t1 = Thread(target=increase)
t2 = Thread(target=increase)
t1.start()
t2.start()
t1.join()
t2.join()
print('End value:', database_value)
print('end main')
"""
Start value: 0
End value: 1
end main
"""
使用锁处理资源竟态
# import Lock
from threading import Thread, Lock
import time
database_value = 0
def increase(lock):
global database_value
# lock the state
lock.acquire()
local_copy = database_value
local_copy += 1
time.sleep(0.1)
database_value = local_copy
# unlock the state
lock.release()
if __name__ == "__main__":
# create a lock
lock = Lock()
print('Start value: ', database_value)
# pass the lock to the target function
t1 = Thread(target=increase, args=(lock,)) # notice the comma after lock since args must be a tuple
t2 = Thread(target=increase, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
print('End value:', database_value)
print('end main')
使用上下文管理器
def increase(lock):
global database_value
with lock:
local_copy = database_value
local_copy += 1
time.sleep(0.1)
database_value = local_copy
多线程消息队列通信
对队列的操作是线程安全的
from threading import Thread, Lock, current_thread
from queue import Queue
def worker(q, lock):
while True:
value = q.get() # blocks until the item is available
# do stuff...
with lock:
# prevent printing at the same time with this lock
print(f"in {current_thread().name} got {value}")
# ...
# For each get(), a subsequent call to task_done() tells the queue
# that the processing on this item is complete.
# If all tasks are done, q.join() can unblock
q.task_done()
if __name__ == '__main__':
q = Queue()
num_threads = 10
lock = Lock()
for i in range(num_threads):
t = Thread(name=f"Thread{i+1}", target=worker, args=(q, lock))
t.daemon = True # dies when the main thread dies
t.start()
# fill the queue with items
for x in range(20):
q.put(x)
q.join() # Blocks until all items in the queue have been gotten and processed.
print('main done')
"""
in Thread1 got 0
in Thread2 got 1
in Thread2 got 11
in Thread2 got 12
in Thread2 got 13
in Thread2 got 14
in Thread2 got 15
in Thread2 got 16
in Thread2 got 17
in Thread2 got 18
in Thread2 got 19
in Thread8 got 5
in Thread4 got 9
in Thread1 got 10
in Thread5 got 2
in Thread6 got 3
in Thread9 got 6
in Thread7 got 4
in Thread10 got 7
in Thread3 got 8
main done
"""
Python标准库自带的一些小工具
# 查看包安装路径
python -m site
# 开启简单的http server
python -m http.server
# base64编码和解码
echo "hello" | python -m base64
# top level await console
python -m asyncio
# 查看tokenize和ast结果
python -m tokenize cgi.py
pythono -m ast cgi.py
# json美化输出
echo '{"foo": "bar"}' | python -m json.tool
# 显示日历
python -m calendar
参考
Python多线程和多进程 CLI tools hidden in the Python standard library | Simon Willison’s TILs