gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:
- gunicorn 项目结构简介
- gunicorn 使用
- gunicorn-application 实现
- arbiter实现
- sync-worker实现
- 小结
- 小技巧
gunicorn 项目结构简介
gunicorn 源码选择的版本是 20.0.0
,主要的文件及包如下:
文件 |
描述 |
app包 |
guincorn 的 Application (不是wsgi定义的applicaton) |
http包 |
gunicorn 对 http协议的一些处理 |
workers包 |
gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等 |
arbiter.py |
guicorn 的master实现 |
根据gunicorn的设计特点:
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.
gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。
我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。
gunicorn 使用
编写测试app,可以看到这是一个符合wsgi规范的application:
1
2
3
4
5
6
7
8
9
|
# myapp.py
def app(environ, start_response): # env 和 http 状态及头设定回调
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data]) # 返回数据
|
使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app
1
2
3
4
5
6
7
8
9
10
11
12
|
# gunicorn -w 4 --log-level debug myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0 # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
|
使用 curl
测试服务
1
2
|
# curl http://127.0.0.1:8000
Hello, World!
|
同时gunicorn中可以看到 worker=50465 处理了这个http请求
1
|
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
|
运行时候,还可以通过发送信号,手动扩充work节点数
观察服务日志,会发现 master=50462 进程处理了 ttin
信号,并且扩展worker节点数到5
1
2
3
4
|
...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
|
使用 Ctrl+C
关闭服务,可以看到也是 master=50462 进程处理了 int
信号,并且在关闭worker节点后关闭自己
1
2
3
4
5
6
7
|
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
|
如果对gunicon的参数不了解,可以使用下面命令查看帮助
1
2
3
4
5
6
7
8
|
# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]
optional arguments:
-h, --help show this help message and exit
...
-w INT, --workers INT
The number of worker processes for handling requests. [1]
|
帮助使用我们熟悉的 argparse 实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
class Setting(object):
def add_option(self, parser):
args = tuple(self.cli)
help_txt = "%s [%s]" % (self.short, self.default)
help_txt = help_txt.replace("%", "%%")
kwargs = {
"dest": self.name,
"action": self.action or "store",
"type": self.type or str,
"default": None,
"help": help_txt
}
...
parser.add_argument(*args, **kwargs) # 添加选项
class Workers(Setting): # --workers 的选项类
name = "workers"
section = "Worker Processes"
cli = ["-w", "--workers"]
meta = "INT"
validator = validate_pos_int
type = int
default = int(os.environ.get("WEB_CONCURRENCY", 1))
desc = """\
The number of worker processes for handling requests.
A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
You'll want to vary this a bit to find the best for your particular
application's work load.
By default, the value of the ``WEB_CONCURRENCY`` environment variable.
If it is not defined, the default is ``1``.
"""
def parser(self):
kwargs = {
"usage": self.usage,
"prog": self.prog
}
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument("-v", "--version",
action="version", default=argparse.SUPPRESS,
version="%(prog)s (version " + __version__ + ")\n",
help="show program's version number and exit")
parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)
keys = sorted(self.settings, key=self.settings.__getitem__) # 动态添加参数选项
for k in keys:
self.settings[k].add_option(parser)
return parser
|
gunicorn-application 实现
gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。
3个Application梳理后,大概的代码模版如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
class WSGIApplication(Application)
def __init__(self, usage=None, prog=None):
self.do_load_config() # 加载配置
def do_load_config():
...
cfg = self.init(parser, args, args.args) # 初始化配置
...
def init(...):
...
self.app_uri = args[0] # 获取wsgi-application参数
def load(...):
util.import_app(self.app_uri) # 动态加载wsgi-application
...
def run(...):
self.load()
Arbiter(self).run() # 启动master,也就是Arbiter
def run(): # 运行服务
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
run()
|
application部分的实现,相对比较简单,就不再赘述。
arbiter实现
Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class Arbiter(object):
def __init__(self, app):
self.worker_class = self.cfg.worker_class # worker类
self.num_workers = self.cfg.worker # worker数量
...
def start():
self.init_signals() # 初始化信号监听
...
sock.create_socket(...) # 创建socket服务
def run(self):
self.start()
try:
self.manage_workers() # 启动节点
while True: # 无限循环
...
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep() # 持续休眠
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
# 处理信号
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
...
handler()
self.wakeup() # 唤醒
except (StopIteration, KeyboardInterrupt):
...
|
在了解Arbiter工作前先了解一下信号, linux 系统可以使用下面命令查看信号清单
1
2
3
4
5
|
# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...
|
- 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
- 2 (SIGINT): interrupt the session from the dialogue station
- 3 (SIGQUIT): terminate the session from the dialogue station
- 4 (SIGILL): illegal instruction was executed
- 5 (SIGTRAP): do a single instruction (trap)
- 6 (SIGABRT): abnormal termination
- 7 (SIGBUS): error on the system bus
- 8 (SIGFPE): floating point error
- 9 (SIGKILL): immmediately terminate the process
- 10 (SIGUSR1): user-defined signal
- 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
- 12 (SIGUSR2): user-defined signal
- 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
- 14 (SIGALRM): the timer terminated (alarm)
- 15 (SIGTERM): terminate the process in a soft way
信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
def init_signals(self):
...
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld) # 添加信号监听器
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
|
之前演示的扩容信号 TTIN
是这样处理的 :
1
2
3
4
5
6
7
|
def handle_ttin(self):
"""\
SIGTTIN handling.
Increases the number of workers by one.
"""
self.num_workers += 1 # 扩容
self.manage_workers() # 管理worker
|
Arbiter的sleep和warkeup是这样实现的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
self.PIPE = pair = os.pipe() # 创建管道
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0) # 使用select监听管道的数据变化
if not ready[0]:
return
while os.read(self.PIPE[0], 1): # 读取管道数据
pass
except (select.error, OSError) as e:
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.') # 管道写入
except IOError as e:
...
|
需要说明的是Arbiter通过 sock.create_sockets
创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。
1
2
3
4
5
6
7
8
9
|
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid # 记录worker的pid
self.WORKERS[pid] = worker # 添加到worker集合
return pid
|
销毁worker是使用信号:
1
2
3
4
5
6
7
8
|
def kill_workers(self, sig):
"""\
Kill all workers with the signal `sig`
:attr sig: `signal.SIG*` value
"""
worker_pids = list(self.WORKERS.keys())
for pid in worker_pids:
os.kill(pid, sig)
|
sync-worker实现
接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:
- Worker 处理信号
- SyncWorker 同步处理http请求
- ThreadWorker 使用线程处理http请求
接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process
1
2
3
4
5
6
7
8
|
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
|
work的init_process模版如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# For waking ourselves up
self.PIPE = os.pipe() # 创建管道
...
self.wait_fds = self.sockets + [self.PIPE[0]] # 监听管道和socket
...
self.init_signals() # 初始化信号监听
...
self.load_wsgi() # 加载wsgi的应用
...
# Enter main run loop
self.booted = True
self.run() # 工作循环
|
work一样的进行信号监听:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
...
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1]) # 等待select唤醒
|
work最重要的run循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def run(self, timeout):
listener = self.sockets[0]
while self.alive:
...
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener) # 接受客户端链接
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
...
try:
self.wait(timeout) # 休眠等待
except StopWaiting:
return
|
处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。
1
2
3
4
5
|
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
|
work处理完成请求后进入等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def wait(self, timeout):
try:
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
|
小结
可以用下面一张图展示gunicorn的工作流程,作为我们的小结论
小技巧
可以使用thread,实现一个定时器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# reloader.py
class Reloader(threading.Thread):
def __init__(self, extra_files=None, interval=1, callback=None):
super().__init__()
self.setDaemon(True)
self._interval = interval
self._callback = callback
def run(self):
mtimes = {}
while True:
for filename in self.get_files():
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
if self._callback:
self._callback(filename)
time.sleep(self._interval)
|
在使用 gunicorn myapp:app
命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:
1
2
3
4
5
6
7
|
# util.py
klass = components.pop(-1)
mod = importlib.import_module('.'.join(components))
return getattr(mod, klass)
|
参考链接