Python多线程和线程同步
Mar 4, 2014
本文内容适用于 python 2.x 版本
Python中提供了两个实现多线程的模块,一个是底层(low-level)的thread模块,一个是高层(high-level)的threading. 两者都可实现多线程,thread模块使用的是低级的原语,threading模块基于thread模块,提供了高层的API,更简单易用,所以推荐使用threading模块。
注意:在Python中,如果试图通过多线程来提高程序的速度是不合理的。因为Python的主流版本CPython中,存在一个
Global Interpreter Lock(GIL)
的全局解释锁,这个锁的作用是保证在一个时刻只有一个线程能执行python字节码.一个线程必须先持有这个锁,才能执行字节码。这就表示,同一个时刻不会有多个线程并行执行。
thread模块 #
thread模块使用低级的原语实现线程同步,通过提供 锁(lock object)机制,同一个时刻,只能有一个线程占用锁,利用锁的获取(lock.acquire)和锁的释放(lock.release)实现对象的互斥访问。
thread模块的主要函数有:
thread.start_new_thread(function, args[, kwargs])
: 启动一个新线程,并返回线程的标识。function是线程的回调函数;args是传递给回调函数的一个tuple;kwargs是可选参数,代表命令行参数的一个字典(a dictionary of keyword arguments)。thread.exit()
: 抛出一个SystemExit异常,如果该异常没被捕获,线程就自动静默退出。thread.exit_thread()
: 退出线程thread.get_ident()
: 返回线程的标识(非0整数)thread.allocate_lock()
: 返回一个新建的锁。初始化状态为unlock。lock.acquire([waitflag])
: 请求获取锁,成功返回True,否则返回False. waitflag省略或非0值,则无条件等待,直到其它线程释放了锁,本线程能够获取锁,waitflag=0表示无论是否获取成功,函数马上返回,不等待。lock.release()
: 释放锁。lock.locked()
: 查看锁当前是否处于锁定状态,是则返回True,否则返回False.
示例 - 售票 #
下面是经典的售票问题的Python多线程实现。总共有 total_ticket = 10 张票,三个窗口同步出售这10张票。使用thread模块实现。
#coding=utf-8
#author: JarvisChu
#date: 2014-3-3
#blog:zhujiangtao.com
import thread,time
#总票数
total_ticket = 10
#售票程序
def sell(name,ticket_lock):
'''启动一个售票窗口,开始售票,每次售出1张。name 售票窗口的名字;ticket_lock 同步锁'''
#print name,'启动,id=',thread.get_ident()
while True:
if ticket_lock.acquire():
#print id,ticket_lock.locked()
try:
global total_ticket #声明这是全局变量的
if total_ticket <= 0:
#print '票已售完'
break
else:
print name,'售出车票:',total_ticket
total_ticket = total_ticket – 1
finally:
ticket_lock.release()
thread.exit_thread()
#新建一个锁对象,用于同步售票
lock = thread.allocate_lock()
#print lock.locked() #查看锁的状态
#启动三个窗口,同时售票
id1 = thread.start_new_thread(sell,('售票窗口1', lock))
id2 = thread.start_new_thread(sell,('售票窗口2', lock))
id3 = thread.start_new_thread(sell,('售票窗口3', lock))
#print id1,id2,id3
#主线程等待2秒
time.sleep(2)
threading模块 #
基于thread模块,提供了更多的功能。包含的内容有:一个线程类Thread,几个用于同步的对象Lock、RLock、Condition、Event、Semaphore,几个辅助的函数和对象。
threading模块提供了一个 threading.Thread
类,表示一个线程。用户可以通过两种方式获得自己的线程:
1. 设置Thread类的构造函数的参数 #
给Thread类的构造函数的参数target传递一个可调用的对象,这个可调用对象会被线程的run()函数调用
#coding=utf-8
import threading,time
def foo(no,workname):
print '%d 号售票员%s 为您服务..' % (no,workname)
thread1 = threading.Thread(target=foo,args=(1,'Seller1'))
thread2 = threading.Thread(target=foo,args=(2,'Seller2'))
thread1.start()
thread2.start()
time.sleep(3)
2. 继承Thread类,覆写run方法 #
继承Thread类,覆写(overwrite)run()方法
#coding=utf-8
import threading,time
class Sellman(threading.Thread):#1.继承Thread
def __init__(self,no,workname):
threading.Thread.__init__(self)#2.调用__init__
self.no = no
self.workname = workname
def run(self):#3.覆写run
print '%d 号售票员%s 为您服务..' % (self.no,self.workname)
seller1 = Sellman(1,'Seller1')
seller2 = Sellman(1,'Seller2')
seller1.start()
seller2.start()
time.sleep(3)
threading.Thread 类 #
- 构造函数:group为保留参数,暂时无用;target为可调用的对象;name为线程的名字,注意多个线程可以有相同的名字;args是一个参数的tuple;kwargs为命令行参数的字典。
threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
- name,getName(),setName(): 获取和设置线程的名字
- ident: 线程的唯一标识。线程启动后就会有,线程退出后被回收
- start(): 启动线程
- join(): 等待另一个线程结束后再运行。 join把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。比如在线程B中调用了线程A的join()方法,直到线程A执行完毕后,才会继续执行线程B。
- is_alive(): 返回线程是否还存活着。
- daemon, isDaemon,setDaemon: 判断线程是否是守护线程,设置为守护线程(没有用户线程时自动退出的线程,如垃圾回收线程)。
threading.Lock() #
生成一个Lock锁
对象,同thread模块中的thread.allocate_lock,锁的用法也相同,使用acquire()获取,release()释放。
threading.RLock() #
生成一个可重入(reentrant)的RLock锁对象。RLock可以被同一个线程连续多次acquire,线程acquire的次数要和release次数相同。(Lock锁,一个线程如果acquire后没有释放,再acquire则会使该线程挂起)
RLock内部使用 recursion level,每次acquire,recursion level会++,released则会–。一旦recursion level 等于0,则表示该线程释放了该RLock锁。
threading.Condition() #
生成一个条件变量(Condition Variable) 对象。Condition Variable对象总是和锁一起工作,这个锁可以是你传递给它的或者它默认创建的。Condition Variable对象 通过acqure()和release()两个方法操作它内部的锁。当Condition Variable对象acquire()得到一个锁的占用权后,可以使用wait()、notify()和notifyall()方法。(如果没有占用锁就调用这个三个方法,会抛出RuntimeError异常)。
- wait(): 释放锁,然后挂起(block),直到另一个线程对这个相同的Condition Variable对象调用了notify()或notifyall()时,才被唤醒。唤醒后,它重新获得锁的占有权并返回。
- notify() 和 notifyall(): 唤醒一个或所有的因等待这个Condition Variable对象而挂起的线程。这两个方法不会release锁,这意味着,唤醒的线程不会立即从它(们)的wait()方法返回,直到调用notify()或notifyall()的线程放弃了锁的占有权后,才能获取占有权并返回。
Condition Variable 的典型应用场景是使用锁对共享状态(shared state)的访问进行同步。一个线程重复调用wait() 直到 共享状态变化为它需要的状态。另一个线程修改完状态后,调用notify()通知其它等待该状态的线程。
典型的应用如生产者-消费者模型。
# 消费一个物品 – 消费者线程
cv.acquire() #获取锁
while not an_item_is_available():
cv.wait() #如果没有物品可用,则调用wait()释放锁,然后挂起等待
get_an_available_item() #消费一个物品
cv.release() #释放锁
# 生成一个物品 – 生产者线程
cv.acquire() #获取锁
make_an_item_available() #生产一个物品
cv.notify() #唤醒一个等待线程,这个等待线程醒了,但此时还不能获取锁,因为生产者线程还没有释放锁
cv.release() #释放锁,此时消费者线程wait()函数可以获取锁,并返回
threading.Event() #
生成一个 Event 对象。Event 是最简单的线程通信机制之一,通过维护一个内部的flag的状态实现线程同步。通过set()方法将Event对象的flag设置为True,通过clear()方法将Event对象reset为Fasle. wait()方法将线程挂起,直到Event为True.
threading.Semaphore([value]) #
生成一个Semaphore对象(信号量对象)。Semaphore对象维护一个内部的计数器。计数器的初始值为value,如果没给定value值,则默认为1. 调用release()方法,计时器值+1,调用acquire()方法,计数器值-1. (其实就是PV操作)。 如果计时器值为0,acquire()方法将线程挂起,直到其它线程release使得计时器值+1了。
Semaphore可以实现数据的共享访问(value>1),不仅仅是互斥访问(value=1)。Event只能实现互斥访问。
threading模块其它重要类和函数 #
threading.active_count()
: 目前存活的线程数量。threading.current_thread()
: 返回当前的threading.Thread对象threading.enumerate()
: 返回当前存活的所有线程的一张列表 list.threading.Timer
: 定时器类。类的构造函数为threading.Timer(interval, function, args=[], kwargs={}) interval是定时间隔,单位为秒,function是回调函数,args是传给function的参数列表。即,interval秒后,调用一次function,function的参数为args. 注意function只会调用一次,不是每隔interval秒。