10X优化glance镜像下载速率

(将一年之前解决的问题再梳理下)

背景知识

Glance是OpenStack中管理镜像的服务,主要用于上传和下载镜像文件,后端接各种存储服务。例如Ceph、sheepdog、本地磁盘等等,大致的逻辑如图。

img

在用sheepdog作为存储后端的时候,发现下载速度非常慢只有10~20Mb/s,一般应该能够打满内网的千兆网卡达到100+Mb/s才对。在一番探究下终于找到问题所在。原来是oslo_concurrencyevenlet库不太匹配,导致数据在上图的管道中传输很慢造成的。

简单的例子

为了去除glance服务其他代码的描述,可以将问题简化为使用oslo_concurrency下载本地文件,实验代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
import time
import eventlet
# eventlet.monkey_patch() # [1] 不使用eventlet [2] 去掉注释使用eventlet
from oslo_concurrency import processutils
start = time.time()
data = processutils.execute('cat /home/datafile', shell=True)[0]
end = time.time()
print 'time: %f, data: %dM, speed: %d' % (end-start, len(data)/1024/1024, len(data)/1024/1024/(end-start))

分别执行[1][2]以及优化后的代码[3]

1
2
3
4
5
6
7
8
[root@test]# python load.py # [1]
time: 2.553961, data: 200M, speed: 78 M/s
[root@test]# python load.py # [2]
time: 15.442971, data: 200M, speed: 12 M/s
[root@test]# python load.py # [3] 使用eventlet并优化代码后的测试
time: 2.685989, data: 200M, speed: 74 M/s

可以看到使用eventlet后速率降低到原来的1/6~1/7。优化代码后依旧使用eventlet,效率与不使用eventlet时基本持平,性能没有损耗。

在分析代码之前需要牢记一点。因为使用eventlet会导致相关模块都会被patcheventlet对应的模块。而evenlet中对应模块的类又会继承原始Python模块中的类。所以,如果查看相关类的方法时,需先从eventlet对应的模块中的类中查找;如果找不到则到Python原始模块中查找。例如subprocess模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# eventlet/green/subprocess.py # 被替换后的eventlet中的subprocess模块
# eventlet中的类继承原始subprocess中的Popen类
# 重载和封装对应的方法和属性
class Popen(subprocess_orig.Popen):
"""eventlet-friendly version of subprocess.Popen"""
# We do not believe that Windows pipes support non-blocking I/O. At least,
# the Python file objects stored on our base-class object have no
# setblocking() method, and the Python fcntl module doesn't exist on
# Windows. (see eventlet.greenio.set_nonblocking()) As the sole purpose of
# this __init__() override is to wrap the pipes for eventlet-friendly
# non-blocking I/O, don't even bother overriding it on Windows.
if not subprocess_orig.mswindows:
def __init__(self, args, bufsize=0, *argss, **kwds):
self.args = args
# Forward the call to base-class constructor
# 调用原始的类初始化
# self.stdout 设置为上图中管道的父进程的读端
# self.stdin 设置为上图中管道的父进程的写端
subprocess_orig.Popen.__init__(self, args, 0, *argss, **kwds)
# Now wrap the pipes, if any. This logic is loosely borrowed from
# eventlet.processes.Process.run() method.
for attr in "stdin", "stdout", "stderr":
pipe = getattr(self, attr)
# 再次使用eventlet中的GreenPipe封装原始的pipe
if pipe is not None and not type(pipe) == greenio.GreenPipe:
wrapped_pipe = greenio.GreenPipe(pipe, pipe.mode, bufsize)
setattr(self, attr, wrapped_pipe)
__init__.__doc__ = subprocess_orig.Popen.__init__.__doc__
def wait(self, timeout=None, check_interval=0.01):
# Instead of a blocking OS call, this version of wait() uses logic
# borrowed from the eventlet 0.2 processes.Process.wait() method.
if timeout is not None:
endtime = time.time() + timeout
try:
while True:
status = self.poll()
if status is not None:
return status
if timeout is not None and time.time() > endtime:
raise TimeoutExpired(self.args, timeout)
eventlet.sleep(check_interval)
except OSError as e:
if e.errno == errno.ECHILD:
# no child process, this happens if the child process
# already died and has been cleaned up
return -1
else:
raise
wait.__doc__ = subprocess_orig.Popen.wait.__doc__
if not subprocess_orig.mswindows:
# don't want to rewrite the original _communicate() method, we
# just want a version that uses eventlet.green.select.select()
# instead of select.select().
_communicate = FunctionType(
six.get_function_code(six.get_unbound_function(
subprocess_orig.Popen._communicate)),
globals())
try:
_communicate_with_select = FunctionType(
six.get_function_code(six.get_unbound_function(
subprocess_orig.Popen._communicate_with_select)),
globals())
_communicate_with_poll = FunctionType(
six.get_function_code(six.get_unbound_function(
subprocess_orig.Popen._communicate_with_poll)),
globals())
except AttributeError:
pass
# Borrow subprocess.call() and check_call(), but patch them so they reference
# OUR Popen class rather than subprocess.Popen.
call = FunctionType(six.get_function_code(subprocess_orig.call), globals())
check_call = FunctionType(six.get_function_code(subprocess_orig.check_call), globals())

下面回过头重新梳理glance中拉取数据的代码逻辑。

subprocess惹的祸


oslo_concurrency使用了subprocess中的Popen类并且通过其中的communicate方法来进行管道的通信。大致逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from oslo_concurrency import processutils
processutils.execute('cat datafile', shell=True) #
# 调用该方法,此时该模块和方法已经被eventlet pacth
# 所以这个是eventlet中subprocess的Popen类
# 如何创建管道和fork子进程、以及子进程启动shell执行命令等过程忽略
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=close_fds,
preexec_fn=on_preexec_fn,
shell=shell,
cwd=cwd,
env=env_variables)
# 与这个管道通信获取返回的数据
return obj.communicate()[0]

通过之前eventlet中的subprocessPopen类的代码,我们知道communicate是原始subprocesscommunicate方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# 原始subprocess中的Popen类的communicate方法
class Popen(object):
....
def communicate(self, input=None):
"""Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be a
string to be sent to the child process, or None, if no data
should be sent to the child.
communicate() returns a tuple (stdout, stderr)."""
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() or threads is unnecessary.
# 由于只有stdout参数,所以跳过该逻辑
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
stdout = None
stderr = None
if self.stdin:
if input:
try:
self.stdin.write(input)
except IOError as e:
if e.errno != errno.EPIPE and e.errno != errno.EINVAL:
raise
self.stdin.close()
# 这个self.stout已经被封装成了eventlet中的GreenPipe
# 读取数据就是执行 self.stdout.read()
elif self.stdout:
stdout = _eintr_retry_call(self.stdout.read)
self.stdout.close()
elif self.stderr:
stderr = _eintr_retry_call(self.stderr.read)
self.stderr.close()
self.wait()
return (stdout, stderr)
# 调用_communicate
return self._communicate(input)
def _communicate_with_select(self, input):
read_set = []
write_set = []
stdout = None # Return
stderr = None # Return
if self.stdin and input:
write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
if self.stderr:
read_set.append(self.stderr)
stderr = []
input_offset = 0
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [])
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
if self.stdin in wlist:
chunk = input[input_offset : input_offset + _PIPE_BUF]
try:
bytes_written = os.write(self.stdin.fileno(), chunk)
except OSError as e:
if e.errno == errno.EPIPE:
self.stdin.close()
write_set.remove(self.stdin)
else:
raise
else:
input_offset += bytes_written
if input_offset >= len(input):
self.stdin.close()
write_set.remove(self.stdin)
if self.stdout in rlist:
# 调用os.read进行读写
data = os.read(self.stdout.fileno(), 1024)
# 直到没有数据返回
if data == "":
self.stdout.close()
read_set.remove(self.stdout)
stdout.append(data)
if self.stderr in rlist:
data = os.read(self.stderr.fileno(), 1024)
if data == "":
self.stderr.close()
read_set.remove(self.stderr)
stderr.append(data)
return (stdout, stderr)

由于self.stout已经被eventlet封装成了GreenPip,因此其read方法需要查看GreenPipe。通过GreenPipe的初始化可以看到,传进来的原始的管道文件封装成了_SocketDuckForFd。而GreenPipe本身继承_fileobjec,其read方法也继承_fileobject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# eventlet/greenio/py2.py
class GreenPipe(_fileobject):
__doc__ = greenpipe_doc
def __init__(self, f, mode='r', bufsize=-1):
if not isinstance(f, six.string_types + (int, file)):
raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
if isinstance(f, six.string_types):
f = open(f, mode, 0)
if isinstance(f, int):
fileno = f
self._name = "<fd:%d>" % fileno
else:
fileno = os.dup(f.fileno())
self._name = f.name
if f.mode != mode:
raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
self._name = f.name
f.close()
super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
set_nonblocking(self)
self.softspace = 0

最终调用的是os.read。这个逻辑比较复杂,可以归纳为基本的调用过程。

1
2
3
4
5
Popen -> eventlet: subprocess.Popen -> python: subprocess.Popen -> stdout = GreePipe(pipe)
communicate -> eventlet: communicate -> python: communicate ->
eventlet: _communicate -> python: _communicate -> python: _communicate_with_select ->
调用select.select -> 调用os.read(self.stdout.fileno(), 1024)

这里面的关键在于selectos也会被eventlet替换,实际调用的是eventlet中的selectos
os.read中的读大小已经固定成1024(glance_store中传入的参数没有作用了),而select.select会将后续的请求提交给eventletHub中心进行epoll,所以逻辑演变成

1
2
3
select.select请求提交给Hub ----> epoll发现stdout可读 -------> os.read读取1024 ->
^
^--------------------------<---------------------------<-------------------<-

每读取1024就将请求提交给Hub进行epollepoll发现管道中有读事件,则再次读取1024。这个循环读取1024和提交epoll请求消耗了大量的时间。

验证逻辑

通过在data = os.read(self.stdout.fileno(), 1024)下面打印日志验证该逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
# python/subprocess.py
class Popen(object);
....
def _communicate_with_select(self, input):
if self.stdout in rlist:
#data = os.read(self.stdout.fileno(), 104857600)
data = os.read(self.stdout.fileno(), 1024)
if data == "":
self.stdout.close()
read_set.remove(self.stdout)
stdout.append(data)
print 'read stdout: 1024'
print 'path:', select.__file__
1
2
3
4
5
[root@test]# python load.py
read stdout: 1024
path: /usr/local/lib/python2.7/site-packages/eventlet-0.17.4-py2.7.egg/eventlet/green/select.pyc
read stdout: 1024
path: /usr/local/lib/python2.7/site-packages/eventlet-0.17.4-py2.7.egg/eventlet/green/select.pyc

结果证明select确实被替换成eventlet中的select;进行了大量的1024读取,假设读取1G的数据那么这个循环会高达100多万次(1024*1024)。

优化程序


进行大量循环1024读取是耗时的根本原因,因此增加每次读取的数据量,减少循环就会优化性能[3]。

1
2
3
4
5
6
7
8
9
10
11
12
# python/subprocess.py
class Popen(object);
....
def _communicate_with_select(self, input):
if self.stdout in rlist:
# 将一次读取数据量增大到100M
data = os.read(self.stdout.fileno(), 104857600)
#data = os.read(self.stdout.fileno(), 1024)
if data == "":
self.stdout.close()
read_set.remove(self.stdout)
stdout.append(data)

这个就是最开始的[3]性能优化后的程序,确实提高了数据读取速率。

给glance打补丁


因为_communicate_with_select是Python的原始库的代码,因此不能直接修改。但是我们可以效仿eventlet给其打补丁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# glance_store/_dirvers/sheepdog.py
import eventlet
# 定义自己的_communicate_with_select函数。
# 除了一行之外其他代码复制标准库中的该函数
def _communicate_with_select(self, input):
if self.stdout in rlist:
# 每次读取的数据量换成glance_store中的配置sheepdog_store_chunck_size
data = os.read(self.stdout.fileno(), sheepdog_store_chunk_size)
#data = os.read(self.stdout.fileno(), 1024)
if data == "":
self.stdout.close()
read_set.remove(self.stdout)
stdout.append(data)
if eventlet.is_monkey_patched('subprocess'):
subprocess.Popen._communicate_with_select = _communicate_with_select

通过打补丁,glance的下载镜像速度能够打满千兆网卡到达100+M/s,是原来的5~10倍。