Evenlet的基础greenlet

Eventlet的基础


用官方的开场白介绍eventlet。eventlet是一个python并发(concurrent)网络库,能够在不改变编程方式的情况下,改变程序的运行方式。

  • 使用epoll\kequeue\libevent实现高伸缩性的非堵塞I/O。
  • 依靠协程(coroutines),可以类似threading库那样采用堵塞式的编程模型,反而获得非堵塞I/O的收益。
  • 事件的分发(dispatch)是隐含式的,因此可以容易从底层使用Eventlet库,或者是作为大型程序的一个部分。

在我的理解中,Eventlet的神奇之处在于两点:

  • 不改变编程模型,虽然底层是异步I/O,但是可以像堵塞式那样正常的编程,没有大量的嵌套回调。
  • 对于I/O事件是隐式分发的,就像使用threading库那样、甚至说比之还要方便;也无需大量的显式的协程调用,这点可以和tornado对比。

eventlet的底层greenlet

说起eventlet总也离不开其底层依赖的greenlet这个Python的协程库。这里不讨论greenlet的实现,只强调greenlet的几个特性。

协程的意思就是相互协助的程序。它与线程的区别是,协程之间的运行顺序是由程序本身确定的,而不是由内核决定;协程拥有自己独立的栈,可以随时终止和继续运行。

下面先提供一个实际的例子,然后分析greenlet的几个接口,再回过头说明这个例子的输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time
import greenlet
all_g = {}
def format_print(*args): # 打印当前协程、父协程、输出
global all_g
current = greenlet.getcurrent()
print 'greenlet(%s), parent(%s): %s' % (
all_g[id(current)],
all_g[id(current.parent)],
' '.join(map(str, args)))
def run1(start):
format_print('g1 begin, start', start)
for i in range(2): # 注意这里是循环2次
time.sleep(1)
start += 1
start = g2.switch(start)
format_print('start', start)
format_print('g1 end, start', start)
def run2(start):
format_print('g2 begin, start', start)
for i in range(3): # 注意这里循环3次
time.sleep(1)
start += 1
start = g1.switch(start)
format_print('start', start)
format_print('g2 end, start', start)
def run3(num):
format_print('g3 begin, num', num)
for i in range(num):
format_print('i', i)
time.sleep(1)
end = g2.switch(num+100)
format_print('g3 end', end)
g1 = greenlet.greenlet(run=run1, parent=greenlet.getcurrent())
g2 = greenlet.greenlet(run=run2, parent=greenlet.getcurrent())
g3 = greenlet.greenlet(run=run3, parent=g2)
all_g[id(None)] = 'main'
all_g[id(greenlet.getcurrent())] = 'main'
all_g[id(g1)] = 'g1'
all_g[id(g2)] = 'g2'
all_g[id(g3)] = 'g3'
g1.switch(0)
format_print('back main after g1.switch')
g3.switch(3)
format_print('back main after g3.switch')
format_print(g1.switch('back g1 from main'))
format_print(g3.switch('back g3 from main'))

greenlet.greenlet

1
2
3
4
greenlet.greenlet(run=None, parent=None) => greenlet object
run: 传入需要运行的函数,与`Thread`中的`target`相同
parent: 转入协程的父协程,默认为当前协程

父协程有什么用?

父协程可以通过调用greenlet.getcurrent().parent得到,那么这里传入父协程有什么用处?

简单的来说,父协程最大的用处在于,当前协程运行退出后,自动切换到父协程运行。换句话说,如果parent=None那么创建的协程退出后将自动切回到创建该协程的调用者中运行。

1
2
3
4
5
6
7
8
9
10
11
12
def run(*args, **kwargs):
print args, kwargs
print 'run over'
parent = greenlet.greenlet(run=run)
child = greenlet.greenlet(run=run, parent=parent)
child.switch()
>>
(), {}
run over
(None,), {}
run over

我们可以看到child的参数和运行结束。然后自动切换到了父协程中运行。需要注意的是,当子线程运行结束自动切换到父协程时,会自动专递一个None参数。

为什么子协程退出切换到父协程时专递一个None参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def run_child(*args):
print args
print 'child over'
def run_parent(*args):
print args
rv = child.switch('hi, child')
print 'Parent continue with:', rv
parent = greenlet.greenlet(run=run_parent)
child = greenlet.greenlet(run=run_child, parent=parent)
parent.switch()
>>>
()
('hi, child',)
child over
Parent continue with: None

我们可以看到,当子协程自然退出后,父协程通过switch获得了None参数。想一想,如果子协程不传递None参数,那么父协程就需要在switch处报错了。

如果一个协程永远不被调用switch会怎么样

一个协程如果没有被调用switch,那么他就永远不能运行。这点和线程完全不同,线程由内核调度,如果主线程自然退出,那么其他线程依然运行(非daemon的线程)。但是,在协程里如果主协程退出,或者进程在其他协程中退出,那么整个程序退出,其他协程没有运行的机会了。看一个官方的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def run_g1():
print 1, 2
g2.switch()
print 5, 6
def run_g2():
print 3, 4
g1.switch()
print 7, 8
g1 = greenlet.greenlet(run=run_g1)
g2 = greenlet.greenlet(run=run_g2)
g1.switch()
>>>
1, 2
3, 4
5, 6 # 结束

我们看到7, 8没有输出,因为g1的父协程是主协程,当g1运行结束后,自动切换到主协程,而主协程没有做任何动作就退出了。

greenlet.switch

1
2
3
greenlet.switch(*args, **kargs)
args: 不定参数
kwargs: 位置参数

greenlet.switch会导致程序切换到被调用的协程中运行。例如前几个例子中的child.switch()会切换到child协程中运行。

协程如何继续运行?

g.switch被调用时,g在何处继续运行有三种情况:

  • 第一种情况,g没有运行,那么g在run入口处运行。传递的参数作为run的参数。
  • 第二种情况,g已经运行过,那么g在上次运行停止,也就是调用了otehr.switch而被切换出去的地方继续运行。
  • 第三种情况,g已经运行退出。那么g会直接返回,如果有参数传递,则直接返回参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def run():
print 'over'
g = greenlet.greenlet(run=run)
print g.dead
print g.switch()
print g.dead
print g.switch(1, 2)
print g.switch()
>>>
False
over
True
(1, 2)
()

这里需要注意,当调用已经结束的协程时,会直接返回传递的参数或者是(),而不是之前的None

greenlet.throw

1
2
3
4
5
greenlet.throw([type, [val, [tb]]])
type: 异常类型,例如TypeError 之类
val: 传递给type的参数,例如‘type is not correct’
tb: 传递给type的参数,异常的栈。

throw调用类似switch调用,会立即转到调用的线程运行,并立即抛出异常,类似:

1
2
3
4
def raiser():
raise typ, val, tb
g_raiser = greenlet(raiser, parent=g)
g_raiser.switch()

如果throw没有参数,那么协程会抛出greenlet.GreenletExit。这个异常不会向上传递给父协程,相当于正常的退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def run():
try:
greenlet.getcurrent().parent.switch()
except greenlet.GreenletExit:
print 'exit'
raise # raise IOError 加入抛出其他异常,对比抛出GreenletExit
g = greenlet.greenlet(run=run)
g.switch()
ge = g.throw() #
print type(ge)
>>>
exit
greenlet.GreenletExit

需要注意到,子协程内部产生异常,从而打印exit。父协程获取到了子协程抛出的异常作为返回值,而不是继续抛出异常。所以,调用throw()是安全中断协程的方法。

第一个例子的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
greenlet(g1), parent(main): g1 begin, start 0
greenlet(g2), parent(main): g2 begin, start 1
greenlet(g1), parent(main): start 2
greenlet(g2), parent(main): start 3
greenlet(g1), parent(main): start 4
greenlet(g1), parent(main): g1 end, start 4 # g1运行结束,转到父协程(main)中运行
greenlet(main), parent(main): back main after g1.switch
greenlet(g3), parent(g2): g3 begin, num 3
greenlet(g3), parent(g2): i 0
greenlet(g3), parent(g2): i 1
greenlet(g3), parent(g2): i 2
greenlet(g2), parent(main): start 103 # g3循环3次转到g2运行
greenlet(main), parent(main): back main after g3.switch
greenlet(main), parent(main): back g1 from main # g1运行已经结束,因此g1.switch()直接返回传递的参数
greenlet(g3), parent(g2): g3 end back g3 from main
greenlet(g2), parent(main): start None # g3运行结束,转回到父协程g2,因此获取的值为None
greenlet(g2), parent(main): g2 end, start None
greenlet(main), parent(main): None # g2运行结束,因此获取的参数是None

从结果可知道,调用g.switch后返回的参数,是下次切换到本协程(其他协程调用本协程的switch或者throww)传递的参数,而与协程g没有关系。

协程与线程

假如在一个协程,切换到另一个线程中的协程会如何?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import greenlet
import time
def run():
print 'run in thread'
g = greenlet.greenlet(run=run)
th = threaing.Thread(target=g.switch)
th.start()
>>>
Traceback ...
...
error: cannot switch to a different thread

每个协程是依赖于栈空间的,而线程拥有独立的空间,当切换过去必然引起错误。greenlet也不允许这种切换