两个概念:
- 并发:假同时,一段时间内同时处理多个任务,单核都可以;
- 并行:真同时,同时处理多个任务,必须多核。
主流操作系统上完成并发的手段有进程和线程,主流的编程语言提供了用户空间的调度:协程。python 也不例外。
(想自学习编程的小伙伴请搜索圈T社区,更多行业相关资讯更有行业相关免费视频教程。完全免费哦!)
由于现在的操作系统上的进程越来越轻量,导致进程和线程之间的区别越来越少。事实上,linux 并没有原生的线程,线程是通过进程实现的。
Python 中每一个进程会启动一个解释器,而线程会共享一个解释器。
Python 中的线程是通过标准库 threading 实现的。而启动一个线程就是让这个线程执行一些逻辑,这些逻辑就对应一个函数。
>>> import threading
>>> def worker(): # 让多个线程来执行它
... print('work')
...
>>> thread = threading.Thread(target=worker) # 创建了一个线程对象,target 参数是一个函数,即线程要执行的逻辑
>>> thread.start() # start 启动一个线程,执行完毕后,自动退出,Python 没有提供主动退出线程的方法
work
由于 python 没有提供退出线程的方法,因此我们一定不能在逻辑中定义死循环,不然线程无法退出。当然直接 kill -9 和刻意为之的另说。而像那种监听某个端口提供服务的进程,为了保证不退出,通常都会有一个 while True 的死循环。
上面只是启动了一个线程,很显然没什么屌用。启动多个线程的方式非常简单,就是在它的外面套一个 for 循环就可以了:
import time
import threading
def worker(num):
time.sleep(1)
print('work-{}'.fORMat(num))
for i in range(5):
t = threading.Thread(taret=worker, args=(i, )) # 启动了五个线程,要启动几个就循环几次
t.start()
通过 args 给函数传递参数,也可以使用 kwargs 通过字典传递。结果是在等待一秒之后,所有线程同时输出了,并且在一个线程的换行符还没有打印出来的时候,下一个线程就输出了,这就涉及到线程安全的问题了。很显然,print 并不是线程安全的。
线程相比于进程更轻量,上下文切换的代价没有进程那么大,但即使如此,线程数量也不宜过多。
标识一个线程
threading.current_thread()
可以返回当前的线程对象。
>>> threading.Thread(target=lambda: print(threading.current_thread())).start()
<Thread(Thread-13, started 140007299499776)>
返回的线程对象我们可以通过一个变量进行接收:
thread = threading.current_thread()
它有很多属性和方法:
-
name
:返回线程的名字; -
ident
:返回该线程的唯一标识符; -
is_alive
:告知该线程是否存活; -
enumerate
:可以通过循环它打印出所有的线程;
我们创建线程对象的时候是可以给它取名字的:
t = threading.Thread(target=worker, name='thread1')
这个 name 可以通过 logging 的 threadName 获得。
logging
前面提到过,print 并不是线程安全的,而 logging 模块线程安全。
>>> import logging
>>> logging.warning('hehe')
WARNING:root:hehe
>>> logging.info('hehe') # 默认只输出 warning 以上级别
我们可以对其进行一些基础的配置,让其记录 DEBUG 以上的级别,以及记录线程名:
>>> logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
>>> logging.info('hehe')
2017-09-23 15:41:36,868 INFO MainThread hehe
知道了它的简单用法之后,我们就可以使用多线程了:
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(lineno)d %(message)s')
def worker():
logging.info(logging.info('work'))
for i in range(5):
t = threading.Thread(target=worker)
t.start()
使用 logging 就没有问题了,因此我们通常使用它来替代 print。
logging 还可以将异常的栈追踪信息记录下来,这在排查错误的时候非常方便:
import logging
try:
config['DE']['xxx']
except Exception as e:
logging.exception(e)
print('xxx')
daemon 与 non-daemon
daemon 在 linux 上是守护进程的意思,它始终在后台运行。而在 Python 中的 daemon 线程会在主线程退出之后退出。也就是说,如果不是 daemon 线程,主线程退出之后,非 daemon 线程还会继续执行,直到结束退出。
线程默认不是 daemon,如果想要设置为 daemon,那就在创建线程对象的时候,给它传递 daemon=True 即可。
>>> t = threading.Thread(target=worker, daemon=True)
>>> t.daemon
Out[20]: True
通过下面的例子证明之前的说法:
import time
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
def worker():
logging.info('start')
time.sleep(2)
logging.info('end')
if __name__ == '__main__':
logging.info('start')
t1 = threading.Thread(target=worker, name='non-daemon')
t1.start()
t2 = threading.Thread(target=worker, name='daemon', daemon=True)
t2.start()
logging.info('end')
# 执行结果
2017-09-24 04:08:49,027 INFO MainThread start
2017-09-24 04:08:49,028 INFO non-daemon start
2017-09-24 04:08:49,028 INFO daemon start
2017-09-24 04:08:49,028 INFO MainThread end
2017-09-24 04:08:51,031 INFO non-daemon end
执行上面的代码你会发现有的时候主线程退出了,但是 daemon 线程还会执行完成。这是因为虽然从日志中看到主线程退出,但是事实上主线程是没有退出的,它会等待非 daemon 线程执行完毕后才会退出,这样就给了 daemon 线程的执行时间了。当我们将 t1 给注释掉之后,就不可能出现主线程退出后,daemon 线程仍然执行的情况了。
如果我们在 t2.start() 之后增加一行 t2.join(),那即使它是 daemon 线程,主线程依然会等待它执行完毕后再退出。因为 join 会阻塞直到线程执行完毕。join 支持一个参数,那就是阻塞的秒数。t2.join(1) 表示只阻塞一秒,这个时候即使 t2 没有执行完成,主线程依然会退出。join 用的比较多,它并不占用 CPU 时间。
创建线程的另一种方法
上面创建线程的方法是通过实例化 Thread,我们还可以通过下面这种方式:
import logging
import threading
class Mythread(threading.Thread):
def run(self):
logging.warning('worker')
t = Mythread()
t.start()
通过继承 + 重写 run 方法来到达启动多线程的效果,run 等同于之前 target 指定的函数。但是 Python 中这种方法使用的很少。
当我们创建一个线程对象的时候,除了可以使用 start 启动它之外,还可以通过 run 来启动。如果不是以继承的方式创建线程,一个线程对象的 run 和 start 只能执行其中一个。
thread local
定义一个 thread local 对象。
ctx = threading.local()
这时的 ctx 没有任何属性,我们可以给它增加属性:
>>> ctx.data = 5
>>> ctx.data
Out[25]: 5
继续:
>>> data = 'abc' # 定义一个变量
>>> def worker():
... logging.warning(data)
... logging.warning(ctx.data)
...
>>> worker() # 执行没什么问题
WARNING:root:abc
WARNING:root:5
>>> threading.Thread(target=worker).start() # 但是通过线程执行就不行了
WARNING:root:abc # data 可以直接打出来
Exception in thread Thread-9:
Traceback (most recent call last):
File "/usr/local/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/local/python3/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-32-2e99199c517b>", line 3, in worker
logging.warning(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data' # 但是 ctx.data 提示没有
这是因为 ctx.data 是一个 thread local 的变量,我们可以给它赋值任意属性,但是只对当前线程可见。线程独享!
使用 run 方法,它会将 target 放在主线程中;start 则会将其放到子线程中,二者只能执行一个。
定时器
也可以称为延时执行。Python 中存在一种特殊的线程,可用于延迟执行。它继承自 Thread 类,因此它也是 Thread 对象。
>>> def worker():
... logging.warning('worker')
...
>>> t = threading.Timer(interval=5, function=worker)
>>> t.start()
- interval:延时多少秒执行,默认为 30;
- function:等同于 target。
可以看到执行 start 方法后,五秒后才有输出。在等待的过程中,它可以通过 cancel()
终止。
它也可以设置线程名,只不过要这样:
>>> t = threading.Timer(interval=5, function=worker)
>>> t.name = 'Timer'
>>> t.deamon = True # 设置是否为 daemon
当 function 指定的函数开始执行的时候,无法通过 cancel() 终止。
Timer 的定时执行功能很弱,如果真的有这方面的需要,可以使用 APSchedule。
event
第一种线程同步的方式。同步意味着阻塞,如果线程之间没有联系,完全没有必要使用同步。有这么一种需求:worker 线程做一些事情,当它完成之后,通知 boss 线程,由 boss 完成处理后续工作。这可能并不难实现,但是 boss 线程要统计 worker 线程的执行时间呢?
这就要用到线程间通信的机制了,最简单的是 event:
>>> event = threading.Event()
>>> event.set()
>>> event.wait()
Out[8]: True
它是一个 threading.Event 的对象,有 set 和 wait 这两个方法。wait 会阻塞线程直到 set 方法被调用。
有了这两种方法之后,我们就可以完成上面的需求了:
import time
import random
import logging
import datetime
import threading
def worker(event: threading.Event):
time.sleep(random.randint(1, 5))
event.set()
def boss(event: threading.Event):
start = datetime.datetime.now()
event.wait()
logging.warning('worker exit {}'.format(datetime.datetime.now() - start))
def start():
event = threading.Event()
b = threading.Thread(target=boss, args=(event,), name='boss')
b.start()
for x in range(5):
threading.Thread(target=worker, args=(event,), name='worker').start()
start()
五个 worker 线程,谁先执行完成就谁执行 event.set(),一旦 event.set 被执行,boss 线程也就会继续执行并输出日志了。但是会有一个问题,由于是随机 sleep 时间,也就是说最快 boss 线程可以一秒就退出,但是还有四个 worker 线程还在执行,这四个线程拉长了整个脚本的执行时间。
再做修改:
import time
import random
import logging
import datetime
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
def worker(event: threading.Event):
s = random.randint(1, 5)
event.wait(s) # 先阻塞
event.set() # 一下全放开了
logging.info('sleep {}'.format(s))
def boss(event: threading.Event):
start = datetime.datetime.now()
event.wait()
logging.info('worker exit {}'.format(datetime.datetime.now() - start))
def start():
event = threading.Event()
threading.Thread(target=boss, args=(event,), name='boss').start()
for x in range(5):
threading.Thread(target=worker, args=(event,), name='worker-{}'.format(x)).start()
start()
# 执行结果
2017-09-25 06:15:42,114 INFO worker-0 sleep 2
2017-09-25 06:15:42,115 INFO boss worker exit 0:00:02.004014
2017-09-25 06:15:42,116 INFO worker-1 sleep 5
2017-09-25 06:15:42,116 INFO worker-2 sleep 4
2017-09-25 06:15:42,116 INFO worker-3 sleep 3
2017-09-25 06:15:42,117 INFO worker-4 sleep 2
可以看到都在同一秒退出了,这是因为 wait 可以指定超时时间,时间一到它就不再阻塞。这样阻塞时间最短的那个线程就会执行 set,这样一来所有阻塞的线程同时放开了,于是同一时间都执行完成了。因此,wait 会阻塞线程直到 set 方法被调用,或者超时时间到。
event 可以被多个线程所持有,多个线程可以同时被阻塞,一旦其中一个线程执行了 set,那么所有的线程都不再阻塞。event 可以在线程之间发送信号,通常用于某个线程需要其他线程处理某些动作之后才能启动。
event 还有一个特性,如果先 set 然后 wait,不管有没有指定超时,它都瞬间返回 True(因为阻塞被放开,所以无法再阻塞);而如果直接 wait,且给它一个超时时间,那么超时完成之后,它会返回 False。我们可以根据这个特点来完成定时的操作。
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
def worker(event: threading.Event):
while not event.wait(3):
logging.info('running')
event = threading.Event()
threading.Thread(target=worker, args=(event,)).start()
每三秒会输出一次日志,会无限输出下去。但是如果执行 event.set() 就会终止死循环。
event 还有一些方法:
-
is_set
:用来判断有没有 set 过; -
clean
:清除 set 标志,通常用来做线程退出的条件。def worker(event):
while not event.is_set(): pass
wait 会主动让出 CPU 时间片,time.sleep 却不会。假如它们分到了 10ms 的 CPU 时间,都使用了 5ms,那么剩余的 5ms wait 会让给别人,而 sleep 会自己用完。因此我们会使用 wait 而不是 sleep。
实现定时器
延时执行。
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
class Timer:
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.event = threading.Event()
self.thread = threading.Thread(target=self.__thread)
def __thread(self):
if not self.event.wait(self.interval):
self.function(*self.args, **self.kwargs)
def start(self):
self.thread.start()
def cancel(self):
self.event.set()
def worker():
logging.info('running')
t = Timer(interval=2, function=worker)
t.start()
Lock
第二种线程同步的方式。lock 用来保护共享资源,其余几种线程同步的方式都是用了它。
import random
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
class Counter:
def __init__(self):
self.__val = 0
@property
def value(self):
return self.__val
def inc(self):
self.__val += 1
def dec(self):
self.__val -= 1
counter = Counter()
def fn():
if random.choice([-1, 1]) > 0:
logging.info('inc')
counter.inc()
else:
logging.info('dec')
counter.dec()
for x in range(10):
threading.Thread(target=fn).start()
print(counter.value)
上面的代码即使你知道它加了多少次减了多少次,但你不能肯定它的结果,这是因为资源的争用。Lock 对象可用于解决这种问题:
>>> lock = threading.Lock()
>>> lock.acquire()
Out[4]: True
对于 lock 实例,只能调用一次 acquire 方法,再次调用会被阻塞,直到 release 方法被调用。根据它的这种特性,可用来改造之前的 Counter。
class Counter:
def __init__(self):
self.__val = 0
self._lock = threading.Lock()
@property
def value(self):
return self.__val
def inc(self):
self._lock.acquire()
self.__val += 1
self._lock.release()
def dec(self):
self._lock.acquire()
self.__val -= 1
self._lock.release()
这样一来,不管有多少个线程,同一时间只会有一个线程能够修改 __val。但是这样会有一个问题,如果执行加减的时候发生了异常(虽然这里不会),那么 release 永远就不会执行,那么就会形成死锁,因此我们要使用 try finally。
def inc(self):
try:
self._lock.acquire()
self.__val += 1
finally:
self._lock.release()
从上面这种结构中我们可以联想到 with,事实上它是支持 with 的,因此我们可以定义的更为简单:
def inc(self):
with self._lock:
self.__val += 1
凡是用锁的地方,一定要在 finally 中使用 release,否则就会有锁死的可能性。
而对于读来说,如果不加锁,就会存在脏读的可能性,就看能不能忍受了。通过加锁之后,Counter 类就变成线程安全了,我们可以放心的使用。
锁是并发的难点,它会将并发变为串行,掌握了锁,并发就没有丝毫问题了。那么何时需要加锁?凡是有共享资源的地方都要加锁。
lock 对象可以接收两个参数:
-
blocking
:当再次加锁时,如果它为 False,那么不会阻塞,而是返回 False;
-
timeout
:如果 blocking 为 True,timeout 大于等于 0 会阻塞到超时,并返回 False。
预先启动 10 个线程处理一些任务,当其中一个线程在处理其中一个任务时,其他线程可以处理其他任务,这时候就可以用到非阻塞锁。第一个线程对该任务加非阻塞锁,由于之前没有加过锁,因此可以加上。第二个线程再加的时候就加不上了,并且返回 False,这时就可以让它跳过这个任务去执行下一个任务了。
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
def worker(tasks):
for task in tasks:
# 第一个执行加锁的线程可以锁,它的值为 True。由于锁住了,剩下的九个线程执行的时候它的值都为 False
# 因此 loggi.info 语句只会执行 10 次
if task.lock.acquire(False):
logging.info(task.name)
class Task:
def __init__(self, name):
self.name = name
self.lock = threading.Lock()
tasks = [Task(x) for x in range(10)]
for i in range(5):
threading.Thread(target=worker, args=(tasks,), name='work-{}'.format(i)).start()
如果任务有先后顺序的话,就只能串行了。
RLock
可重入锁在同一个线程内可多次加锁,但是只能有一个线程成功,并且 acquire 几次,就需要 release 几次。
>>> rlock = threading.RLock()
>>> rlock.acquire()
Out[13]: True
>>> rlock.acquire()
Out[14]: True
>>> rlock.release()
>>> rlock.release()
condition
第三种线程同步的方式。通常用于生产者消费者模式,生产者生产消息之后,使用 notify 和 notify_all 通知消费者进行消费。而消费者使用 wait 方法阻塞等待生产者的通知。
import random
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
self.cond = threading.Condition()
def consumer(self):
while not self.event.wait(1):
with self.cond:
self.cond.wait() # 会阻塞,直到 notifyAll 被执行
logging.info(self.data)
def producer(self):
for _ in range(10):
data = random.randint(0, 100)
logging.info(data)
self.data = data
with self.cond:
self.cond.notify_all()
self.event.wait(1)
self.event.set()
d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
c = threading.Thread(target=d.consumer, name='consumer')
p.start()
c.start()
有生产者修改共享资源,然后通知消费者进行消费。
-
wait
:会阻塞,直到被 notify 唤醒; -
notifyAll
:老版的驼峰写法,现已改为下面的,但为了兼容仍然存在; -
notify_all
:用于通知所有 wait 的线程,可以理解为广播; -
notify
:接收一个数字,表示唤醒多少个 wait 线程,默认为 1。可以理解为单播。
比如下面的示例中,虽然启动了四个消费者进程,但是只允许两个同时消费,至于是哪两个就不得而知了。
import random
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
self.cond = threading.Condition()
def consumer(self):
while not self.event.is_set():
with self.cond:
self.cond.wait()
logging.info(self.data)
def producer(self):
for _ in range(10):
data = random.randint(0, 100)
logging.info(data)
self.data = data
with self.cond:
self.cond.notify(2)
self.event.wait(1)
self.event.set()
d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
for i in range(4):
threading.Thread(target=d.consumer, name='consumer-{}'.format(i)).start()
p.start()
按理来说,因为有锁的存在,所以只有在消费者的 with 代码块执行完毕,锁释放之后,生产者才能进入自己的 with 代码块。这样就能够保证,消费者只有在消费完毕之后生产者才能继续生产。但是我在运行过程中生产者根本不会等待消费者消费,它自己一个劲的跑。
无论 notify、notify_all 还是 wait,都必须先 acquire,完成之后必须确保 release,因此通常使用 with 语法。
barrier
第四种线程同步的方式,栅栏的意思,只有凑齐一拨人之后才往下走。从下面这段代码中就能理解它的作用:
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
def worker(barrier: threading.Barrier):
logging.info('waiting for {} threads'.format(barrier.n_waiting))
try:
# 上面的代码各个线程什么时候执行,怎么执行都无所谓
# 但是所有线程都会在这里同时等待,只有所有线程都执行到这了,才同时执行下面的代码
worker_id = barrier.wait()
except threading.BrokenBarrierError:
logging.warning('aborting')
else:
logging.info('after barrier {}'.format(worker_id))
# 实例化的时候指定拦多少个线程,如果启动了四个线程,只要三个到齐了就可以同时往下走了
barrier = threading.Barrier(3)
for i in range(3):
threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()
logging.info('start')
barrier 对象的一些属性和方法:
-
wait
:阻塞线程,它可以指定超时时间,超时时间一到抛出 BrokenBarrierError 异常。如果执行过 abort 方法,那么再执行 wait 也会抛出 BrokenBarrierError 异常; -
reset
:清除对象执行 abort 的痕迹。执行 abort 后执行 rest,接着执行 wait 就不会抛异常了; -
n_waiting
:当前有多少个线程在等待; -
abort
:通知已经在等待的线程不必再等了,不能因为它一个而让其他线程在那傻等。而一旦执行了这个方法, wait 就会抛出 BrokenBarrierError 异常,因此不处于 wait 状态的线程是不会抛出这个异常的。
适用场景:比如有十种工作,每个线程负责一种,只有这十个线程都初始化完成后才能工作。
semaphore
最后五种线程同步的方式。信号量和锁很像,锁是为 1 的信号量。
# 创建一个为 3 的信号量
>>> s = threading.Semaphore(3)
>>> s.acquire()
Out[84]: True
>>> s.acquire(False)
Out[85]: True
>>> s.acquire(False)
Out[86]: True
>>> s.acquire(False)
Out[87]: False
它可以锁多次,上面锁了三次都没有问题,等到第四次的时候就不行了。由于锁只能锁一次,所以它是为 1 的信号量。RLock 也能锁多次,它是它只能用在同一个线程上,信号量却可以在多个线程中使用。
创建一个连接池的时候可以用到它:
import time
import random
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
class Pool:
def __init__(self, num):
self.num = num # 指定池子的连接数
self.conns = [self._make_connect(x) for x in range(num)]
self.s = threading.Semaphore(num)
# 这个函数是拿到连接之后做的操作
def _make_connect(self, name):
return name
# 从池子中取出一个连接
def get(self):
self.s.acquire()
return self.conns.pop()
def return_resource(self, conn):
# 执行完毕后,将连接放回池子中
self.conns.insert(0, conn)
self.s.release()
def worker(pool):
logging.info('started')
name = pool.get()
logging.info('get connect {}'.format(name))
time.sleep(random.randint(1, 3))
pool.return_resource(name)
logging.info('return resource {}'.format(name))
pool = Pool(3)
for i in range(5):
threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start()
如果不使用信号量的话,我们还需要对池子是否为空进行判断。为什么将连接放回连接池中的 insert 操作不需要加锁呢?这是因为 GIL 的影响。
信号量也是对资源的保护,但是和锁不一样的地方在于,锁限制只有一个线程可以访问共享资源,而信号量限制指定个线程可以访问共享资源。事实上我们只需要使用信号量就可以了,因为锁本身就是信号量的一种。
queue
队列,它是进程间通信的一种方式,队列有三种:
-
FIFO
:Queue.Queue(maxsize=0),先进先出,线程安全; -
LIFO
:Queue.LifoQueue(maxsize=0),后进先出; -
Priority
:Queue.PriorityQueue(maxsize=0),优先队列。
创建一个先进先出队列:
>>> import queue
>>> q = queue.Queue() # 队列长度无限
对象的属性和方法:
-
empty()
:判断队列是否为空(不可靠)。因为等你获取队列的长度时,可能已经有人往里面放入了数据; -
full()
:队列是否满了(不可靠); -
maxsize
:查看队列的最大长度; -
qsize()
:看到队列当前长度(不可靠); -
clear()
:清空队列; -
join()
:等到队列为空的时候,才进行操作; -
put()
:往队列里面添加内容,可以为任意数据结构。put(self, item, block=True, timeout=None),block 表示是否为队列是否为阻塞状态。队列满了,再往里面加内容,队列会阻塞。如果不阻塞会返回一个异常,默认为阻塞状态;timeout 是阻塞的时间,如果队列满了,再往队列里面添加数据时,timeout 时间后会抛出异常。如果为 None(默认),它会一直阻塞,直到有线程从队列中取出数据; -
get()
:从队列中取内容。如果是先进先出队列,它会取出最先存进去的数据。get(self, block=True, timeout=None),如果队列是空的,并且 timeout 为 None,它会一直阻塞,直到有线程往队列里面存入数据; -
put_nowait(item)
:等效于 put(item, block=False); - g
et_nowait()
:等效 get(item, block=False)。
我们可以通过它来重写生产者消费者模型:
#!/usr/local/python3/bin/python3
import queue
import random
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
def producer(queue: queue.Queue, event: threading.Event):
while not event.wait(3):
data = random.randint(0, 100)
logging.info(data)
queue.put(data)
def consumer(queue: queue.Queue, event: threading.Event):
while not event.is_set():
logging.info(queue.get())
q = queue.Queue()
e = threading.Event()
threading.Thread(target=consumer, args=(q, e), name='consumer').start()
threading.Thread(target=producer, args=(q, e), name='producer').start()
通过 e.set() 就能停止它。相对 condition 实现的生产者消费者模型,它的优势在于可以暂存数据,这在生产者和消费者速率不一致的时候很好用;而它的缺陷在于无法广播,无法通知多个线程同时消费一条消息。因为我们通常可以将它们结合起来使用。
取出队列中所有数据:
while not q.enpty():
q.get()
GIL
全局解释器锁,这是 Python 争议很大的一个点。正是由于它的存在,在操作内置容器时,解释器会在解释器级别增加一个锁,因此 Python 所有内置容器(字典、列表等)都是线程安全的,多线程环境下使用没有丝毫问题。而导致的后果就是 Python 的并发性能很差。
Python 中 collection, logging 等标准库都是线程安全的。
concurrent.futures
官网地址,Python3.2 引入的异步模块。
创建一个线程池:
from concurrent import futures
pool = futures.ThreadPoolExecutor(max_workers=5)
pool 对象有三个方法。submit 用于执行一个函数:
>>> fut = pool.submit(lambda: 1+1) # 执行一段逻辑,也就是一个函数
>>> fut.result() # 获取执行结果
Out[116]: 2
>>> fut.done() # 查看函数是否执行完成
Out[117]: True
>>> fut.running() # 是否处于运行状态
Out[118]: False
>>> fut.cancel() # 一个已经开始运行的线程是无法结束的,没开始的(比如 pool 满了在阻塞)可以
Out[119]: False
>>> fut.exception() # 如果函数中产生了异常,可以通过它来获取异常的实例
传递参数:
pool.submit(self.create_vm, vm_attributes, extra_attributes, conns)
通过这种方式使用线程,不需要将数据发送到队列中。
进程池由 ProcessPoolExecutor 实现,它们简化了进程和线程的操作,并且对返回值和异常进行了处理。
建议使用 futures,虽然它无法设置线程名(3.6 之后可以)、daemon 等属性,但是问题不大。
0