OpenStack之Eventlet

Eventlet的Hub


所有基于epoll(多路复用)的框架,总有一个事件循环处理中心,一般称为HubEventlet中的Hub不仅是所有事件监听和处理的中心,也是greenlet调度的中心。下面看看Hub的事件处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# eventlet/hubs/hub.py
# BaseHub
def run(self, *a, **kw):
"""Run the runloop until abort is called.
"""
# accept and discard variable arguments because they will be
# supplied if other greenlets have run and exited before the
# hub's greenlet gets a chance to run
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
while not self.stopping:
# 由于垃圾回收,fd可能重新使用,在重新使用的时候通过`mark_as_reopend` -> `_obsolete`
# 将原来可能存在的监听事件删除。并且将对应的监听事件回调的tb设置为IOClosed异常。
while self.closed:
# We ditch all of these first.
self.close_one()
# 将timer按照到期时间排序
self.prepare_timers()
if self.debug_blocking:
self.block_detect_pre()
# 触发定时的事件
self.fire_timers(self.clock())
if self.debug_blocking:
self.block_detect_post()
# 将剩下的没有触发的定时事件排序,主要是为了后面找到sleep的时长
self.prepare_timers()
# wait的时间,取最快到期的定时事件的时间戳
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
# 由具体的平台决定实现(epoll)
self.wait(sleep_time)
else:
self.wait(0)
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False

定时事件

总体来说,Hub处理两件事件。第一类是定时事件,每次循环中处理到期的事件。上面我们可以看到,Hub每次循环将next_timer中的定时事件进行排序,然后处理到期的定时事件。那么定时事件是如何添加到Hub中的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# eventlet/hubs/hub.py
class BaseHub(object):
def add_timer(self, timer):
scheduled_time = self.clock() + timer.seconds
self.next_timers.append((scheduled_time, timer))
return scheduled_time
# eventlet/hubs/timer.py
class Timer(object):
def __init__(self, seconds, cb, *args, **kw):
"""Create a timer.
seconds: The minimum number of seconds to wait before calling
cb: The callback to call when the timer has expired
*args: The arguments to pass to cb
**kw: The keyword arguments to pass to cb
This timer will not be run unless it is scheduled in a runloop by
calling timer.schedule() or runloop.add_timer(timer).
"""
self.seconds = seconds
self.tpl = cb, args, kw
self.called = False
if _g_debug:
self.traceback = six.StringIO()
traceback.print_stack(file=self.traceback)
# 按照id的大小进行排序
def __lt__(self, other):
return id(self) < id(other)

通过add_timer函数,将Timer的实例加入next_timers列表中。注意Timer中的时间是到期的相对时间长度,而Hub中记录的是绝对时间戳,因此add_timer做了一个转换。定时事件处理的逻辑:

  • 每次循环处理到期的定时事件
  • 处理的顺序按照到期的事件由小到大处理
  • 相同时间戳的事件按照添加的顺序(内存id)从小到大处理

监听事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# eventlet/hubs/poll.py
def wait(self, seconds=None):
readers = self.listeners[READ]
writers = self.listeners[WRITE]
# 如果没有监听的事件,就休眠到第一个定时事件到期
if not readers and not writers:
if seconds:
sleep(seconds)
return
try:
# 进行poll, 对应python的epoll.poll和C语言的epoll_wait
presult = self.do_poll(seconds)
except (IOError, select.error) as e:
if get_errno(e) == errno.EINTR:
return
raise
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
if self.debug_blocking:
self.block_detect_pre()
# Accumulate the listeners to call back to prior to
# triggering any of them. This is to keep the set
# of callbacks in sync with the events we've just
# polled for. It prevents one handler from invalidating
# another.
# 处理满足监听条件的事件,执行对应的回调函数。
# 如果不满足条件(例如,监听socket读事件但是当前socket不可读),则对应的fileno,event不会在presult中
callbacks = set()
for fileno, event in presult:
if event & READ_MASK:
callbacks.add((readers.get(fileno, noop), fileno))
if event & WRITE_MASK:
callbacks.add((writers.get(fileno, noop), fileno))
if event & select.POLLNVAL:
self.remove_descriptor(fileno)
continue
if event & EXC_MASK:
callbacks.add((readers.get(fileno, noop), fileno))
callbacks.add((writers.get(fileno, noop), fileno))
# 依次执行回调。因为epoll会按照fileno的大小排序返回,因此执行回调也是按照从小到大的顺序
for listener, fileno in callbacks:
try:
listener.cb(fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
clear_sys_exc_info()
if self.debug_blocking:
self.block_detect_post()

可以看到,对第二类监听事件的处理是通过epoll,每次循环返回一批满足条件的事件,然后依次执行回调函数。那么,监听事件是如何添加到Hub中的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# eventlet/hubs/epoll.py
class Hub(poll.Hub):
def __init__(self, clock=time.time):
BaseHub.__init__(self, clock)
self.poll = epoll()
try:
# modify is required by select.epoll
self.modify = self.poll.modify
except AttributeError:
self.modify = self.poll.register
def add(self, evtype, fileno, cb, tb, mac):
oldlisteners = bool(self.listeners[READ].get(fileno) or
self.listeners[WRITE].get(fileno))
# 添加监听事件到listeners或者writers中
# 注意这里调用的是BaseHub.add, 直接绕过了poll.py中的Hub.add
listener = BaseHub.add(self, evtype, fileno, cb, tb, mac)
try:
if not oldlisteners:
# Means we've added a new listener
self.register(fileno, new=True)
else:
# 注册监听的事件到 epoll上,相当于epoll_register
self.register(fileno, new=False)
except IOError as ex: # ignore EEXIST, #80
if get_errno(ex) != errno.EEXIST:
raise
return listener
# eventlet/hubs/hub.py
class BaseHub(object):
def add(self, evtype, fileno, cb, tb, mark_as_closed):
""" Signals an intent to or write a particular file descriptor.
The *evtype* argument is either the constant READ or WRITE.
The *fileno* argument is the file number of the file of interest.
The *cb* argument is the callback which will be called when the file
is ready for reading/writing.
The *tb* argument is the throwback used to signal (into the greenlet)
that the file was closed.
The *mark_as_closed* is used in the context of the event hub to
prepare a Python object as being closed, pre-empting further
close operations from accidentally shutting down the wrong OS thread.
"""
# 初始化一个listener实例,这个实例保存监听的事件类型evtype、监听fd、回调函数cb,异常tb等
# 最后将这 listener 按照类型放到 self.listeners中
# self.listeners = {'READ': {1:listener1, 2:listener2}, 'WRITE': {3: listener3, ..}}
listener = self.lclass(evtype, fileno, cb, tb, mark_as_closed)
bucket = self.listeners[evtype]
if fileno in bucket:
if g_prevent_multiple_readers:
raise RuntimeError(
"Second simultaneous %s on fileno %s "
"detected. Unless you really know what you're doing, "
"make sure that only one greenthread can %s any "
"particular socket. Consider using a pools.Pool. "
"If you do know what you're doing and want to disable "
"this error, call "
"eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=%s; "
"THAT THREAD=%s" % (
evtype, fileno, evtype, cb, bucket[fileno]))
# store off the second listener in another structure
self.secondaries[evtype].setdefault(fileno, []).append(listener)
else:
bucket[fileno] = listener
return listener

通过继承链Hub->BaseHub,将监听事件添加到epoll中并且在每次循环中处理满足监听条件的事件,执行回调函数。我们知道,greenlet本身是不负责调度的,所有的调度需要由应用程序负责,那么eventlet如何进行程序的调度?我们先从eventlet定义的协程说起。

Eventlet的GreenThread

Eventlet通过继承greenlet.geenlet自定义了一个绿色线程,实际上是一个协程。Eventlet扩展了greenlet.greenlet的语义:

  • wait, 当被调用时会自动切换到其他协程上去,条件满足时再切回来
  • link, 注册协程退出时执行的清理函数
  • kill, 杀掉协程,但是在杀掉之前执行一次调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# eventlet/greenthread.py
class GreenThread(greenlet.greenlet):
"""The GreenThread class is a type of Greenlet which has the additional
property of being able to retrieve the return value of the main function.
Do not construct GreenThread objects directly; call :func:`spawn` to get one.
"""
def __init__(self, parent):
greenlet.greenlet.__init__(self, self.main, parent)
self._exit_event = event.Event()
self._resolving_links = False
def wait(self):
""" Returns the result of the main function of this GreenThread. If the
result is a normal return value, :meth:`wait` returns it. If it raised
an exception, :meth:`wait` will raise the same exception (though the
stack trace will unavoidably contain some frames from within the
greenthread module)."""
return self._exit_event.wait()
def link(self, func, *curried_args, **curried_kwargs):
""" Set up a function to be called with the results of the GreenThread.
The function must have the following signature::
def func(gt, [curried args/kwargs]):
When the GreenThread finishes its run, it calls *func* with itself
and with the `curried arguments <http://en.wikipedia.org/wiki/Currying>`_ supplied
at link-time. If the function wants to retrieve the result of the GreenThread,
it should call wait() on its first argument.
Note that *func* is called within execution context of
the GreenThread, so it is possible to interfere with other linked
functions by doing things like switching explicitly to another
greenthread.
"""
self._exit_funcs = getattr(self, '_exit_funcs', deque())
self._exit_funcs.append((func, curried_args, curried_kwargs))
if self._exit_event.ready():
self._resolve_links()
def unlink(self, func, *curried_args, **curried_kwargs):
""" remove linked function set by :meth:`link`
Remove successfully return True, otherwise False
"""
if not getattr(self, '_exit_funcs', None):
return False
try:
self._exit_funcs.remove((func, curried_args, curried_kwargs))
return True
except ValueError:
return False
def main(self, function, args, kwargs):
try:
result = function(*args, **kwargs)
except:
self._exit_event.send_exception(*sys.exc_info())
self._resolve_links()
raise
else:
self._exit_event.send(result)
self._resolve_links()
def _resolve_links(self):
# ca and ckw are the curried function arguments
if self._resolving_links:
return
self._resolving_links = True
try:
exit_funcs = getattr(self, '_exit_funcs', deque())
while exit_funcs:
f, ca, ckw = exit_funcs.popleft()
f(self, *ca, **ckw)
finally:
self._resolving_links = False
def kill(self, *throw_args):
"""Kills the greenthread using :func:`kill`. After being killed
all calls to :meth:`wait` will raise *throw_args* (which default
to :class:`greenlet.GreenletExit`)."""
return kill(self, *throw_args)
def cancel(self, *throw_args):
"""Kills the greenthread using :func:`kill`, but only if it hasn't
already started running. After being canceled,
all calls to :meth:`wait` will raise *throw_args* (which default
to :class:`greenlet.GreenletExit`)."""
return cancel(self, *throw_args)

使用协程的入口spawnspawn_n

一般使用eventlet会有两个入口函数:

  • spawn, eventlet.spawn ->eventlet.greenthread.spawn, 创建一个绿色线程并且返回这个绿色协程,并安排调度执行。
  • spawn_n, eventlet.spawn_n ->eventlet.greenthread.spawn_n, 创建一个greenlet,并返回,同时安排调度执行。

spawn使用了自己的GreenThread,而spawn_n使用的是greenlet,因而后者更加快速一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# eventlet/greenthread.py
def spawn(func, *args, **kwargs):
"""Create a greenthread to run ``func(*args, **kwargs)``. Returns a
:class:`GreenThread` object which you can use to get the results of the
call.
Execution control returns immediately to the caller; the created greenthread
is merely scheduled to be run at the next available opportunity.
Use :func:`spawn_after` to arrange for greenthreads to be spawned
after a finite delay.
"""
hub = hubs.get_hub()
g = GreenThread(hub.greenlet)
hub.schedule_call_global(0, g.switch, func, args, kwargs)
return g
def spawn_n(func, *args, **kwargs):
"""Same as :func:`spawn`, but returns a ``greenlet`` object from
which it is not possible to retrieve either a return value or
whether it raised any exceptions. This is faster than
:func:`spawn`; it is fastest if there are no keyword arguments.
If an exception is raised in the function, spawn_n prints a stack
trace; the print can be disabled by calling
:func:`eventlet.debug.hub_exceptions` with False.
"""
return _spawn_n(0, func, args, kwargs)[1]
def _spawn_n(seconds, func, args, kwargs):
hub = hubs.get_hub()
g = greenlet.greenlet(func, parent=hub.greenlet)
t = hub.schedule_call_global(seconds, g.switch, *args, **kwargs)
return t, g

可以看到两个都调用了schedule_call_global然后返回。这个函数会安排绿色协程的调度。

绿色线程如何调度

在创建GreenThread时,会通过调用不同的Hub方法进行调度,在linux环境下两个函数一样,就以上面说到的schedule_call_global为例说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# eventlet/hubs/poll.py
# BaseHub
def schedule_call_global(self, seconds, cb, *args, **kw):
"""Schedule a callable to be called after 'seconds' seconds have
elapsed. The timer will NOT be canceled if the current greenlet has
exited before the timer fires.
seconds: The number of seconds to wait.
cb: The callable to call after the given time.
*args: Arguments to pass to the callable when called.
**kw: Keyword arguments to pass to the callable when called.
"""
t = timer.Timer(seconds, cb, *args, **kw)
self.add_timer(t)
return t

可以看到,当创建绿色线程时通过调用scheduler_call_global方法,然后再设置定时任务的方式添加入Hub进行调度。每个seconds=0,即在每次循环处理定时任务的时候处理。一直到这里,并没有谈到监听事件是何时注册到epoll进行监听的。我们通过socket创建和监听为例,看一下整个过程是如何的。

socket如何创建、设置、注册和监听

原生的实现不兼容eventlet,所以eventlet对一些标准库进行了绿化。用自己的实现替代了原生的实现。这里的不兼容主要体现在两点:

  • 需要将堵塞模式设置为非堵塞,不然多路复用没有意义
  • 需要将fd添加到epoll中进行监听,并且在满足条件下调度执行

通过GreenSocket实现探查下如何解决上面两个问题:

1
eventlet.green.socket -> eventlet._socket_nodns -> eventlet.greenio.base ->GreenSocket。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# eventlet/greenio/base.py
class GreenSocket(object):
"""
Green version of socket.socket class, that is intended to be 100%
API-compatible.
It also recognizes the keyword parameter, 'set_nonblocking=True'.
Pass False to indicate that socket is already in non-blocking mode
to save syscalls.
"""
# This placeholder is to prevent __getattr__ from creating an infinite call loop
fd = None
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
should_set_nonblocking = kwargs.pop('set_nonblocking', True)
if isinstance(family_or_realsock, six.integer_types):
# 创建原生的socket 或者包装已有的socket
fd = _original_socket(family_or_realsock, *args, **kwargs)
# Notify the hub that this is a newly-opened socket.
# 这是个新打开的fd,如果之前有监听的事件删除之
notify_opened(fd.fileno())
else:
fd = family_or_realsock
# import timeout from other socket, if it was there
try:
self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
except AttributeError:
self._timeout = socket.getdefaulttimeout()
# 设置为非堵塞模式,需要注意set_nonblocking这个参数的意思是 传入的socket已经是非堵塞的了,不需要额外的设置。
if should_set_nonblocking:
set_nonblocking(fd)
self.fd = fd
# when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking
self.act_non_blocking = False
# Copy some attributes from underlying real socket.
# This is the easiest way that i found to fix
# https://bitbucket.org/eventlet/eventlet/issue/136
# Only `getsockopt` is required to fix that issue, others
# are just premature optimization to save __getattr__ call.
self.bind = fd.bind
self.close = fd.close
self.fileno = fd.fileno
self.getsockname = fd.getsockname
self.getsockopt = fd.getsockopt
self.listen = fd.listen
self.setsockopt = fd.setsockopt
self.shutdown = fd.shutdown
self._closed = False
def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
""" We need to trampoline via the event hub.
We catch any signal back from the hub indicating that the operation we
were waiting on was associated with a filehandle that's since been
invalidated.
"""
# 处理已经closed的情况
if self._closed:
# If we did any logging, alerting to a second trampoline attempt on a closed
# socket here would be useful.
raise IOClosed()
try:
return trampoline(fd, read=read, write=write, timeout=timeout,
timeout_exc=timeout_exc,
mark_as_closed=self._mark_as_closed)
except IOClosed:
# This socket's been obsoleted. De-fang it.
self._mark_as_closed()
raise
def accept(self):
if self.act_non_blocking:
return self.fd.accept()
fd = self.fd
while True:
# 非堵塞的读取,如果返回errno.EWOULDBLOCK异常,返回None
res = socket_accept(fd)
# 返回正常可读,设置client为非堵塞,然后绿化之
if res is not None:
client, addr = res
set_nonblocking(client)
return type(self)(client), addr
# 对方可能堵塞了,先垫一下(trampoline弹簧垫的意思)
self._trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
def socket_accept(descriptor):
"""
Attempts to accept() on the descriptor, returns a client,address tuple
if it succeeds; returns None if it needs to trampoline, and raises
any exceptions.
"""
try:
return descriptor.accept()
except socket.error as e:
if get_errno(e) == errno.EWOULDBLOCK:
return None
raise
def trampoline(fd, read=None, write=None, timeout=None,
timeout_exc=timeout.Timeout,
mark_as_closed=None):
"""Suspend the current coroutine until the given socket object or file
descriptor is ready to *read*, ready to *write*, or the specified
*timeout* elapses, depending on arguments specified.
To wait for *fd* to be ready to read, pass *read* ``=True``; ready to
write, pass *write* ``=True``. To specify a timeout, pass the *timeout*
argument in seconds.
If the specified *timeout* elapses before the socket is ready to read or
write, *timeout_exc* will be raised instead of ``trampoline()``
returning normally.
.. note :: |internal|
"""
t = None
hub = get_hub()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
assert not (
read and write), 'not allowed to trampoline for reading and writing'
try:
fileno = fd.fileno()
except AttributeError:
fileno = fd
# 如果设置了超时时间,定义一个超时的定时事件,从而greenlet.throw产生超时异常
if timeout is not None:
def _timeout(exc):
# This is only useful to insert debugging
current.throw(exc)
t = hub.schedule_call_global(timeout, _timeout, timeout_exc)
# 根据读写任务类型,添加到Hub中进行监听。回调函数是current.switch。即如果某个绿色线程监听的事件满足条件
# 就在每个Hub循环中通过current.switch切回本绿色线程继续处理,直到处理完毕或者绿色线程主动让出处理
try:
if read:
listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed)
elif write:
listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
try:
# 已经注册了监听事件,切回Hub中的绿色线程处理
return hub.switch()
finally:
# finally能够运行,说明监听条件满足了,从监听中移除该事件。
hub.remove(listener)
finally:
if t is not None:
t.cancel()

从上面的代码可以看到,使用了绿色线程的执行步骤是:

1
2
开始 ->初始化原生socket ->设置为非堵塞模式 -> accept ->返回堵塞异常 ->trampline将socket添加到Hub进行监听 ->切回Hub线程运行
->每次的Hub循环 ->epoll条件满足执行current.switch切回本线程 -> 进行accept操作正常 -> 绿化返回的client端socket ->结束

因此,我们可以知道eventlet解决上面两个问题的方法:

  • 封装原生socket,设置为非堵塞模式
  • 在accept返回失败时,通过trampoline将其添加到Hub进行事件监听。

eventlet的绿化逻辑

可以归纳出eventlet的大致处理逻辑:

  1. 调用spawn类函数创建一个绿色线程,通过Timer提交给Hub,并将switch函数设置为回调。在每次Hub循环处理Timer时,执行switch切回绿色线程运行;
  2. 通过绿化原生标准库,设置fd为非堵塞模式;在运行非堵塞accept、read、write等失败时trampoline,从而添加到hub中进行事件监听。等待下次Hub循环时通过epoll检查条件是否满足,然后再切换回绿色线程进行对应的操作。