gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:

  • gunicorn 项目结构简介
  • gunicorn 使用
  • gunicorn-application 实现
  • arbiter实现
  • sync-worker实现
  • 小结
  • 小技巧

gunicorn 项目结构简介

gunicorn 源码选择的版本是 20.0.0,主要的文件及包如下:

文件 描述
app包 guincorn 的 Application (不是wsgi定义的applicaton)
http包 gunicorn 对 http协议的一些处理
workers包 gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等
arbiter.py guicorn 的master实现

根据gunicorn的设计特点:

Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.

gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。

我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。

gunicorn 使用

编写测试app,可以看到这是一个符合wsgi规范的application:

1
2
3
4
5
6
7
8
9
# myapp.py

def app(environ, start_response):  # env 和 http 状态及头设定回调
    data = b"Hello, World!\n"
    start_response("200 OK", [
            ("Content-Type", "text/plain"),
            ("Content-Length", str(len(data)))
    ])
    return iter([data])  # 返回数据

使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# gunicorn -w 4 --log-level debug  myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration:  # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0  # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted  # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462)  # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers

使用 curl 测试服务

1
2
# curl http://127.0.0.1:8000
Hello, World!

同时gunicorn中可以看到 worker=50465 处理了这个http请求

1
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /

运行时候,还可以通过发送信号,手动扩充work节点数

1
# kill -TTIN 50462

观察服务日志,会发现 master=50462 进程处理了 ttin 信号,并且扩展worker节点数到5

1
2
3
4
...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers

使用 Ctrl+C 关闭服务,可以看到也是 master=50462 进程处理了 int 信号,并且在关闭worker节点后关闭自己

1
2
3
4
5
6
7
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master

如果对gunicon的参数不了解,可以使用下面命令查看帮助

1
2
3
4
5
6
7
8
# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]

optional arguments:
  -h, --help            show this help message and exit
  ...
  -w INT, --workers INT
                        The number of worker processes for handling requests. [1]

帮助使用我们熟悉的 argparse 实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Setting(object):
			
	def add_option(self, parser):
        args = tuple(self.cli)

        help_txt = "%s [%s]" % (self.short, self.default)
        help_txt = help_txt.replace("%", "%%")

        kwargs = {
            "dest": self.name,
            "action": self.action or "store",
            "type": self.type or str,
            "default": None,
            "help": help_txt
        }
        ...
        parser.add_argument(*args, **kwargs)  # 添加选项

class Workers(Setting):  # --workers 的选项类
    name = "workers"
    section = "Worker Processes"
    cli = ["-w", "--workers"]
    meta = "INT"
    validator = validate_pos_int
    type = int
    default = int(os.environ.get("WEB_CONCURRENCY", 1))
    desc = """\
        The number of worker processes for handling requests.

        A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
        You'll want to vary this a bit to find the best for your particular
        application's work load.

        By default, the value of the ``WEB_CONCURRENCY`` environment variable.
        If it is not defined, the default is ``1``.
        """

def parser(self):
    kwargs = {
        "usage": self.usage,
        "prog": self.prog
    }
    parser = argparse.ArgumentParser(**kwargs)
    parser.add_argument("-v", "--version",
            action="version", default=argparse.SUPPRESS,
            version="%(prog)s (version " + __version__ + ")\n",
            help="show program's version number and exit")
    parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)

    keys = sorted(self.settings, key=self.settings.__getitem__)  # 动态添加参数选项
    for k in keys:
        self.settings[k].add_option(parser)

    return parser

gunicorn-application 实现

gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。

  • BaseApplication
    • Application
      • WSGIApplication

3个Application梳理后,大概的代码模版如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class WSGIApplication(Application)
	
	def __init__(self, usage=None, prog=None):
		self.do_load_config()  # 加载配置
			
	def do_load_config():
		...
		cfg = self.init(parser, args, args.args)  # 初始化配置
		...
	
	def init(...):
	    ...
	    self.app_uri = args[0]  # 获取wsgi-application参数
	
  def load(...):
  		util.import_app(self.app_uri)  # 动态加载wsgi-application
  		...
	
	def run(...):
		self.load()
		Arbiter(self).run()  # 启动master,也就是Arbiter

def run():  # 运行服务
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()

if __name__ == '__main__':
    run()

application部分的实现,相对比较简单,就不再赘述。

arbiter实现

Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Arbiter(object):
	def __init__(self, app):
		self.worker_class = self.cfg.worker_class  # worker类
		self.num_workers = self.cfg.worker  # worker数量
        ...
    
    def start():
		self.init_signals()  # 初始化信号监听
		...
		sock.create_socket(...) # 创建socket服务
    
    def run(self):  
		self.start()
		try:
            self.manage_workers() # 启动节点

            while True: #  无限循环
                ...
                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()  # 持续休眠
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue
    							# 处理信号
                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                ...
                handler()
                self.wakeup()  # 唤醒
        except (StopIteration, KeyboardInterrupt):
           ...
    		

在了解Arbiter工作前先了解一下信号, linux 系统可以使用下面命令查看信号清单

1
2
3
4
5
# kill -l
 1) SIGHUP	 2) SIGINT	 3) SIGQUIT	 4) SIGILL	 5) SIGTRAP
 6) SIGABRT	 7) SIGBUS	 8) SIGFPE	 9) SIGKILL	10) SIGUSR1
 11) SIGSEGV	12) SIGUSR2	13) SIGPIPE	14) SIGALRM	15) SIGTERM
...
  • 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
  • 2 (SIGINT): interrupt the session from the dialogue station
  • 3 (SIGQUIT): terminate the session from the dialogue station
  • 4 (SIGILL): illegal instruction was executed
  • 5 (SIGTRAP): do a single instruction (trap)
  • 6 (SIGABRT): abnormal termination
  • 7 (SIGBUS): error on the system bus
  • 8 (SIGFPE): floating point error
  • 9 (SIGKILL): immmediately terminate the process
  • 10 (SIGUSR1): user-defined signal
  • 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
  • 12 (SIGUSR2): user-defined signal
  • 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
  • 14 (SIGALRM): the timer terminated (alarm)
  • 15 (SIGTERM): terminate the process in a soft way

信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
SIGNALS = [getattr(signal, "SIG%s" % x)
               for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
               
def init_signals(self):
    ...

    # initialize all signals
    for s in self.SIGNALS:
        signal.signal(s, self.signal)
    signal.signal(signal.SIGCHLD, self.handle_chld)  # 添加信号监听器

def signal(self, sig, frame):
	if len(self.SIG_QUEUE) < 5:
			self.SIG_QUEUE.append(sig)
			self.wakeup()

之前演示的扩容信号 TTIN 是这样处理的 :

1
2
3
4
5
6
7
def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1  # 扩容 
    self.manage_workers()  # 管理worker 

Arbiter的sleep和warkeup是这样实现的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
self.PIPE = pair = os.pipe()  # 创建管道
	
def sleep(self):
	"""\
    Sleep until PIPE is readable or we timeout.
    A readable PIPE means a signal occurred.
    """
    try:
        ready = select.select([self.PIPE[0]], [], [], 1.0)  # 使用select监听管道的数据变化
        if not ready[0]:
            return
        while os.read(self.PIPE[0], 1):  # 读取管道数据
            pass
    except (select.error, OSError) as e:
        ...
        
def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')  # 管道写入
    except IOError as e:
        ...
    

需要说明的是Arbiter通过 sock.create_sockets 创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。

1
2
3
4
5
6
7
8
9
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
		worker.pid = pid  # 记录worker的pid
    self.WORKERS[pid] = worker # 添加到worker集合
    return pid

销毁worker是使用信号:

1
2
3
4
5
6
7
8
def kill_workers(self, sig):
    """\
    Kill all workers with the signal `sig`
    :attr sig: `signal.SIG*` value
    """
    worker_pids = list(self.WORKERS.keys())
    for pid in worker_pids:
        os.kill(pid, sig)

sync-worker实现

接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:

  • Worker 处理信号
    • SyncWorker 同步处理http请求
    • ThreadWorker 使用线程处理http请求

接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process

1
2
3
4
5
6
7
8
# Process Child
worker.pid = os.getpid()
try:
    util._setproctitle("worker [%s]" % self.proc_name)
    self.log.info("Booting worker with pid: %s", worker.pid)
    self.cfg.post_fork(self, worker)
    worker.init_process()
    sys.exit(0)

work的init_process模版如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def init_process(self):
    """\
    If you override this method in a subclass, the last statement
    in the function should be to call this method with
    super().init_process() so that the ``run()`` loop is initiated.
    """
    # For waking ourselves up
    self.PIPE = os.pipe()  # 创建管道
    ...
    self.wait_fds = self.sockets + [self.PIPE[0]]  # 监听管道和socket
			...
    self.init_signals()  # 初始化信号监听
			...
    self.load_wsgi()  # 加载wsgi的应用
    ...
    # Enter main run loop
    self.booted = True
    self.run()  # 工作循环

work一样的进行信号监听:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
    # reset signaling
    for s in self.SIGNALS:
    		signal.signal(s, signal.SIG_DFL)
    # init new signaling
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    ...

    if hasattr(signal, 'set_wakeup_fd'):
    		signal.set_wakeup_fd(self.PIPE[1])  # 等待select唤醒

work最重要的run循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
 def run(self, timeout):
    listener = self.sockets[0]
    while self.alive:
					...
        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)  # 接受客户端链接
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue

        except EnvironmentError as e:
              ...

        try:
            self.wait(timeout)  # 休眠等待 
        except StopWaiting:
            return

处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。

1
2
3
4
5
def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)

work处理完成请求后进入等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def wait(self, timeout):
    try:
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]

    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise

小结

可以用下面一张图展示gunicorn的工作流程,作为我们的小结论

Request flow of Django with Gunicorn and Nginx as a reverse proxy.

小技巧

可以使用thread,实现一个定时器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# reloader.py

class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super().__init__()
        self.setDaemon(True)
        self._interval = interval
        self._callback = callback

    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)

在使用 gunicorn myapp:app 命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:

1
2
3
4
5
6
7
# util.py

klass = components.pop(-1)

mod = importlib.import_module('.'.join(components))

return getattr(mod, klass)

参考链接