阅读gunicorn代码文档

说明

因为在玩heroku app时,heroku给出的教程中,采用了gunicorn来部署应用;在南昌echo提出用freebsd的kqueue改造flup时,我建议可以对比一下gunicorn。但自己仅仅是在heroku上小用了一把而已,并没有深入研究,为了此次南昌 pythoner 2012年第一次聚会,特意浏览了一下gunicorn的代码,感觉上gunicorn代码写得比较小巧精炼,整理出来,算是抛砖引玉。

文档内容

介绍

什么是gunicorn

gunicorn,是“Green Unicorn”,脱胎于ruby社区的Unicorn,移植到python上,成为一个WSGI HTTP Server,WSGI是”Web Server Gateway Interface”,是python的web接口规范。

gunicorn的特性

  • 支持 Django,paster,wsgi程序
  • 非常容易配置(相比较而言)
  • 自动管理多个worker进程
  • 可以采用不同的后台扩展接口(sync, gevent, tornado等)

设计

这部分是采用gunicorn的英文文档,同时加入自己的理解,源代码,以及参考资料等等

服务器模型

gunicorn是基于”pre-fork worker”模型,这就意味着有一个中心主控master进程,由它来管理一组worker进程。这个主控进程并不知晓任何客户端,所有的请求和响应都完全是由多个worker进程来处理

解释pre-fork

pre-fork服务器和fork服务器相似,通过一个单独的进程来处理每条请求。

不同的是,pre-fork服务器会通过预先开启大量的进程,等待并处理接到的请求。

由于采用了这种方式来开启进程,服务器并不需要等待新的进程启动而消耗时间,因而能够以更快的速度应付多用户请求。

另外,pre-fork服务器在遇到极大的高峰负载时仍能保持良好的性能状态。这是因为不管什么时候,只要预先设定的所有进程都已被用来处理请求时,服务器仍可追加额外的进程。

缺点是,当遇到高峰负载时,由于要启动新的服务器进程,不可避免地会带来响应的延迟。

主控master进程

主控master进程,就是一个简单的循环,用来不断侦听不同进程信号并作出不同的动作,仅此而已。它通过一些信号,诸如TTIN, TTOU, 和CHLD等等, 管理着那些正在运行的worker进程。

TTIN 和 TTOU信号是告诉主控master进程增加或减少正在运行的worker数量。

CHLD信号是在一个子进程已经中止之后,由主控master进程重启这个失效的worker进程。

我们看几段代码,在gunicorn/arbiter.py中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Arbiter(object):
    """
    Arbiter maintain the workers processes alive. It launches or
    kills them if needed. It also manages application reloading
    via SIGHUP/USR2.
    """

    # A flag indicating if a worker failed to
    # to boot. If a worker process exist with
    # this error code, the arbiter will terminate.
    WORKER_BOOT_ERROR = 3

    START_CTX = {}

    LISTENER = None
    WORKERS = {}
    PIPE = []

    # I love dynamic languages
    SIG_QUEUE = []
    SIGNALS = map(
        lambda x: getattr(signal, "SIG%s" % x),
        "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
    )
    SIG_NAMES = dict(
        (getattr(signal, name), name[3:].lower()) for name in dir(signal)
        if name[:3] == "SIG" and name[3] != "_"
    )

    def __init__(self, app):
        os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE

        self.setup(app)

        self.pidfile = None
        self.worker_age = 0
        self.reexec_pid = 0
        self.master_name = "Master"

在上面代码中的第23行中,列举了相应的多个信号:

  • HUP,重启所有的配置和所有的worker进程
  • QUIT,正常关闭,它会等待所有worker进程处理完各自的东西后关闭
  • INT/TERM,立即关闭,强行中止所有的处理
  • TTIN,增加一个worker进程
  • TTOU,减少一个worker进程
  • USR1,重新打开由master和worker所有的日志处理
  • USR2,重新运行master和worker
  • WINCH,正常关闭所有worker进程,保持主控master进程的运行

下面是针对不同信号的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def handle_chld(self, sig, frame):
    "SIGCHLD handling"
    self.wakeup()

def handle_hup(self):
    """\
    HUP handling.
    - Reload configuration
    - Start the new worker processes with a new configuration
    - Gracefully shutdown the old worker processes
    """
    self.log.info("Hang up: %s", self.master_name)
    self.reload()

def handle_quit(self):
    "SIGQUIT handling"
    raise StopIteration

def handle_int(self):
    "SIGINT handling"
    self.stop(False)
    raise StopIteration

def handle_term(self):
    "SIGTERM handling"
    self.stop(False)
    raise StopIteration

def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1
    self.manage_workers()

def handle_ttou(self):
    """\
    SIGTTOU handling.
    Decreases the number of workers by one.
    """
    if self.num_workers <= 1:
        return
    self.num_workers -= 1
    self.manage_workers()

def handle_usr1(self):
    """\
    SIGUSR1 handling.
    Kill all workers by sending them a SIGUSR1
    """
    self.kill_workers(signal.SIGUSR1)
    self.log.reopen_files()

def handle_usr2(self):
    """\
    SIGUSR2 handling.
    Creates a new master/worker set as a slave of the current
    master without affecting old workers. Use this to do live
    deployment with the ability to backout a change.
    """
    self.reexec()

def handle_winch(self):
    "SIGWINCH handling"
    if os.getppid() == 1 or os.getpgrp() != os.getpid():
        self.log.info("graceful stop of workers")
        self.num_workers = 0
        self.kill_workers(signal.SIGQUIT)
    else:
        self.log.info("SIGWINCH ignored. Not daemonized")

同步workers

大多数情况下,采用的worker类型是同步方式,也就是说一次仅处理一个请求。这种模型方式是最简单的,因为期间发生的任何错误最多只影响到一个请求。

尽管下面我们描述的内容,也是一次处理一个请求,但实际上在编写应用的时候,是加一些条件的。(??)

异步workers

可用的异步workers,主要是基于greenlets软件包(通过eventlet和gevent)。greenlet是用python来实现的协程方式(cooperative multi-threading)。通常情况下,我们编写的应用代码不需要作出什么改变,就能使用上这些异步workers的特性的。

tornado workers

还有一个可以用上的worker,是tornado worker,它是用在那些采用tornado框架的程序上。尽管tornado worker也可以用于wsgi程序上,但是这不是一个推荐的做法。

选择worker进程类型

默认的同步worker在cpu和带宽方面会消耗资源的。这就意味着你的应用不可能无节制地做任何事。例如,互联网上的一个请求,必须要遵守这个准则。

这个资源绑定的条件,也就是为什么我们会在gunicorn默认配置前做一个缓冲的代理。如果你把同步worker,直接暴露在internet上,一个dos(Denial of Service)攻击就会给服务器不停地制造大流量无用数据,而是服务器无法正常提供服务。Slowloris,就是这样一个有趣的例子,专门用来做这事的。

有些情况可以考虑采用异步worker:

  • 需要长时间阻塞调用的应用,比如外部的web service
  • 直接给internet提供服务
  • 流请求和响应(是类似flv流么?)
  • 长轮询
  • Web sockets(WebSocket是HTML5规格中的一个非常重要的新特性,它的存在可以允许用户在浏览器中实现双向通信,实现数据的及时推送)
  • Comet(基于 HTTP 长连接的“服务器推”技术,是一种新的 Web 应用架构。基于这种架构开发的应用中,服务器端会主动以异步的方式向客户端程序推送数据,而不需要客户端显式的发出请求。Comet 架构非常适合事件驱动的 Web 应用,以及对交互性和实时性要求很强的应用)

启动多少个workers?

不要试图做这样的事,你预期多少个客户端就启用多少个worker。gunicorn只需要启用4–12个workers,就足以每秒钟处理几百甚至上千个请求了。

在处理请求时,gunicorn依靠操作系统来提供负载均衡。通常我们推荐的worker数量是:(2 x $num_cores) + 1,这个公式很简单,它是基于给定的核心处理器数量,在其他worker处理请求时,每个worker将从socket那进行读写操作。

很显然,你的硬件环境和应用将影响到worker数量。我们推荐先采用上述公式来安排,在应用启动之后,然后再通过TTIN和TTOU这两个信号来调整worker数量。

记住:太多的worker,肯定会在某一个时刻,让你的整个系统急剧降低性能。(只能意译了)

快速入门

安装gunicorn

下载源代码:

git clone https://github.com/benoitc/gunicorn.git

使用开发模式来安装,便于今后调试:

python setup.py develop

这样安装后,会在pythone的site-packages中仅仅增加一个链接,在自己目录下的源代码可以随时进行调整测试

简单的wsgi应用

随便编写一个myapp.py

1
2
3
4
5
6
7
8
9
# -*- coding: utf8 -*-

def app(environ, start_response):
    data = "Hello, World!\n"
    start_response("200 OK", [
        ("Content-Type", "text/plain"),
        ("Content-Length", str(len(data)))
    ])
    return iter([data])

运行之:

$ gunicorn --workers=2 myapp:app

运行的结果:

2012-02-13 10:48:02 [2481] [INFO] Starting gunicorn 0.13.4
2012-02-13 10:48:02 [2481] [INFO] Listening at: http://127.0.0.1:8000 (2481)
2012-02-13 10:48:02 [2481] [INFO] Using worker: sync
2012-02-13 10:48:02 [2484] [INFO] Booting worker with pid: 2484
2012-02-13 10:48:02 [2485] [INFO] Booting worker with pid: 2485

简单的django应用

快速体验:

$ django-admin.py startproject hello
$ cd hello
$ gunicorn_django --workers=2

运行结果:

2012-02-13 11:11:15 [2565] [INFO] Starting gunicorn 0.13.4
2012-02-13 11:11:15 [2565] [INFO] Listening at: http://127.0.0.1:8000 (2565)
2012-02-13 11:11:15 [2565] [INFO] Using worker: sync
2012-02-13 11:11:15 [2568] [INFO] Booting worker with pid: 2568
2012-02-13 11:11:15 [2569] [INFO] Booting worker with pid: 2569

命令行参数

执行gunicorn:

(mypy)hzg@gofast:~/gunicorn$ gunicorn -h
Usage: gunicorn [OPTIONS] APP_MODULE

Options:
  --version             show program's version number and exit
  -h, --help            show this help message and exit
  -c FILE, --config=FILE
                        The path to a Gunicorn config file. [None]
  --debug               Turn on debugging in the server. [False]
  --spew                Install a trace function that spews every line
                        executed by the server. [False]
  --access-logfile=FILE
                        The Access log file to write to. [None]
  --access-logformat=STRING
                        The Access log format . [%(h)s %(l)s %(u)s %(t)s
                        "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"]
  --error-logfile=FILE, --log-file=FILE
                        The Error log file to write to. [-]
  --log-level=LEVEL     The granularity of Error log outputs. [info]
  --logger-class=STRING
                        The logger you want to use to log events in gunicorn.
                        [simple]
  -n STRING, --name=STRING
                        A base to use with setproctitle for process naming.
                        [None]
  --preload             Load application code before the worker processes are
                        forked. [False]
  -D, --daemon          Daemonize the Gunicorn process. [False]
  -p FILE, --pid=FILE   A filename to use for the PID file. [None]
  -u USER, --user=USER  Switch worker processes to run as this user. [1000]
  -g GROUP, --group=GROUP
                        Switch worker process to run as this group. [1000]
  -m INT, --umask=INT   A bit mask for the file mode on files written by
                        Gunicorn. [0]
  -b ADDRESS, --bind=ADDRESS
                        The socket to bind. [127.0.0.1:8000]
  --backlog=INT         The maximum number of pending connections.     [2048]
  -w INT, --workers=INT
                        The number of worker process for handling requests.
                        [1]
  -k STRING, --worker-class=STRING
                        The type of workers to use. [sync]
  --worker-connections=INT
                        The maximum number of simultaneous clients. [1000]
  --max-requests=INT    The maximum number of requests a worker will process
                        before restarting. [0]
  -t INT, --timeout=INT
                        Workers silent for more than this many seconds are
                        killed and restarted. [30]
  --keep-alive=INT      The number of seconds to wait for requests on a Keep-
                        Alive connection. [2]

从哪开始阅读源代码

代码树

执行命令:

tree -I '*.pyc'

先看看gunicorn的目录树:

gunicorn
├── app
│   ├── base.py
│   ├── djangoapp.py
│   ├── __init__.py
│   ├── pasterapp.py
│   └── wsgiapp.py
├── arbiter.py
├── config.py
├── debug.py
├── errors.py
├── glogging.py
├── http
│   ├── body.py
│   ├── errors.py
│   ├── __init__.py
│   ├── message.py
│   ├── parser.py
│   ├── _sendfile.py
│   ├── unreader.py
│   └── wsgi.py
├── __init__.py
├── logging_config.py
├── management
│   ├── commands
│   │   ├── __init__.py
│   │   └── run_gunicorn.py
│   └── __init__.py
├── pidfile.py
├── sock.py
├── util.py
└── workers
    ├── async.py
    ├── base.py
    ├── geventlet.py
    ├── ggevent.py
    ├── ggevent_wsgi.py
    ├── gtornado.py
    ├── __init__.py
    ├── sync.py
    └── workertmp.py

从脚本命令开始

就从脚本命令gunicorn和gunicorn_django开始

gunicorn

#!/home/hzg/mypy/bin/python
# EASY-INSTALL-ENTRY-SCRIPT: 'gunicorn==0.13.4','console_scripts','gunicorn'
__requires__ = 'gunicorn==0.13.4'
import sys
from pkg_resources import load_entry_point

sys.exit(
   load_entry_point('gunicorn==0.13.4', 'console_scripts', 'gunicorn')()
)

gunicorn_django

#!/home/hzg/mypy/bin/python
# EASY-INSTALL-ENTRY-SCRIPT: 'gunicorn==0.13.4','console_scripts','gunicorn_django'
__requires__ = 'gunicorn==0.13.4'
import sys
from pkg_resources import load_entry_point

sys.exit(
   load_entry_point('gunicorn==0.13.4', 'console_scripts', 'gunicorn_django')()
)

这是eggs文件的规范,实际上调用入口,可以从entry_points.txt看出来:

entry_points.txt:

[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run
gunicorn_django=gunicorn.app.djangoapp:run
gunicorn_paster=gunicorn.app.pasterapp:run

注意在[console_scripts]部分,有

  • gunicorn <–> gunicorn.app.wsgiapp:run
  • gunicorn_django <–> gunicorn.app.djangoapp:run

那么,我们就从gunicorn/app目录下开始,从wsgiapp.py, djangoapp.py开始。

wsgiapp.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import sys

from gunicorn import util
from gunicorn.app.base import Application

class WSGIApplication(Application):

    def init(self, parser, opts, args):
        if len(args) != 1:
            parser.error("No application module specified.")

        self.cfg.set("default_proc_name", args[0])
        self.app_uri = args[0]

        sys.path.insert(0, os.getcwd())

    def load(self):
        return util.import_app(self.app_uri)

def run():
    """\
    The ``gunicorn`` command line runner for launcing Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%prog [OPTIONS] APP_MODULE").run()

gunicorn.app.wsgiapp:run最终是调用其中的一个函数run(),焦点集中在上述代码的最后两行,创建WSGIApplication后,立刻执行run()。

djangoapp.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
ENVIRONMENT_VARIABLE = 'DJANGO_SETTINGS_MODULE'

class DjangoApplication(Application):

    def init(self, parser, opts, args):
        self.global_settings_path = None
        self.project_path = None
        if args:
            self.global_settings_path = args[0]
            if not os.path.exists(os.path.abspath(args[0])):
                self.no_settings(args[0])

    def get_settings_modname(self):
        from django.conf import ENVIRONMENT_VARIABLE

        # get settings module
        settings_modname = None
        if not self.global_settings_path:
            project_path = os.getcwd()

        # ......

    def setup_environ(self, settings_modname):
        from django.core.management import setup_environ

        # setup environ
        # ......

    def no_settings(self, path, import_error=False):
        if import_error:
            error = "Error: Can't find '%s' in your PYTHONPATH.\n" % path

        # .......

    def activate_translation(self):
        from django.conf import settings
        from django.utils import translation
        translation.activate(settings.LANGUAGE_CODE)

    def validate(self):
        """ Validate models. This also ensures that all models are
        imported in case of import-time side effects."""
        from django.core.management.base import CommandError
        from django.core.management.validation import get_validation_errors
        # ......

    def load(self):
        from django.core.handlers.wsgi import WSGIHandler

        self.setup_environ(self.get_settings_modname())
        self.validate()
        self.activate_translation()
        return WSGIHandler()

# ......

def run():
  """\
  The ``gunicorn_django`` command line runner for launching Django
  applications.
  """
  from gunicorn.app.djangoapp import DjangoApplication
  DjangoApplication("%prog [OPTIONS] [SETTINGS_PATH]").run()

gunicorn.app.djangoapp:run最终是调用其中的一个函数run(),焦点集中在上述代码的最后两行,创建DjangoApplication后,立刻执行run()。

尽管DjangoApplication定义了不少方法,但注意与WSGIAppliction的骨架一样,实际上就定义了两个方法:

  • init()
  • load()

最关键的方法run(),估计就是从父类Application里继承而来。

深入Application

回想一下上节关注的WSGIApplication和DjangoApplication,都有一个共同的父类Application,并且都调用了run()方法。

关注内部

Application的代码不长,下面来看看整个情况:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import errno
import os
import sys
import traceback


from gunicorn.glogging import Logger
from gunicorn import util
from gunicorn.arbiter import Arbiter
from gunicorn.config import Config
from gunicorn import debug

class Application(object):
    """\
    An application interface for configuring and loading
    the various necessities for any given web framework.
    """

    def __init__(self, usage=None):
        self.usage = usage
        self.cfg = None
        self.callable = None
        self.logger = None
        self.do_load_config()

    def do_load_config(self):
        try:
            self.load_config()
        except Exception, e:
            sys.stderr.write("\nError: %s\n" % str(e))
            sys.stderr.flush()
            sys.exit(1)

    def load_config(self):
        # init configuration
        self.cfg = Config(self.usage)

        # parse console args
        parser = self.cfg.parser()
        opts, args = parser.parse_args()

        # optional settings from apps
        cfg = self.init(parser, opts, args)

        # Load up the any app specific configuration
        if cfg and cfg is not None:
            for k, v in cfg.items():
                self.cfg.set(k.lower(), v)

        # Load up the config file if its found.
        if opts.config and os.path.exists(opts.config):
            cfg = {
                "__builtins__": __builtins__,
                "__name__": "__config__",
                "__file__": opts.config,
                "__doc__": None,
                "__package__": None
            }
            try:
                execfile(opts.config, cfg, cfg)
            except Exception:
                print "Failed to read config file: %s" % opts.config
                traceback.print_exc()
                sys.exit(1)

            for k, v in cfg.items():
                # Ignore unknown names
                if k not in self.cfg.settings:
                    continue
                try:
                    self.cfg.set(k.lower(), v)
                except:
                    sys.stderr.write("Invalid value for %s: %s\n\n" % (k, v))
                    raise

        # Lastly, update the configuration with any command line
        # settings.
        for k, v in opts.__dict__.items():
            if v is None:
                continue
            self.cfg.set(k.lower(), v)

    def init(self, parser, opts, args):
        raise NotImplementedError

    def load(self):
        raise NotImplementedError

    def reload(self):
        self.do_load_config()
        if self.cfg.spew:
            debug.spew()

    def wsgi(self):
        if self.callable is None:
            self.callable = self.load()
        return self.callable

    def run(self):
        if self.cfg.spew:
            debug.spew()
        if self.cfg.daemon:
            util.daemonize()
        else:
            try:
                os.setpgrp()
            except OSError, e:
                if e[0] != errno.EPERM:
                    raise
        try:
            Arbiter(self).run()
        except RuntimeError, e:
            sys.stderr.write("\nError: %s\n\n" % e)
            sys.stderr.flush()
            sys.exit(1)

注意其中几个地方:

  1. init()和load()的定义方式
  2. run()的核心内容

从WSGIApplication与DjangoApplication继承自Application,并且从init()和load()的定义看,这里采用了一个最常见的设计模式: Template Method:

在父类一级定义好一系列的方法,作为算法的骨架结构,由不同的子类来实现不同的具体功能。

落实在Application中,我们可以看到算法的骨架结构就在__init__()和run()中,由它们来调用init()和load()两个空的方法,一旦有具体实例WSGIApplication和DjangoApplication中有实现,就调用具体实例的init()和load()。

代码流程

  1. 初始化流程
__init__()
do_load_config()
load_config()
init(parser, opts, args)
  1. run运行流程
run()
Arbiter(self).run()
wsgi()
load()

配置参数

初始化流程主要解决的是加载配置参数或文件,重点部分在load_config()方法中。

配置参数的处理主要是在config.py文件中,有三个主要的类:

  • Config
  • Setting
  • SettingMeta

先挑有趣的代码开始阅读。

class Setting(object):
  __metaclass__ = SettingMeta

class SettingMeta(type):
  def __new__(cls, name, bases, attrs):
      super_new = super(SettingMeta, cls).__new__

元类编程

SettingMeta和Setting继承的方式,是完全不同的,尽管他们都是用class来定义。

__metaclass__ = SettingMeta

metaclass是元类,它是一个类的类(型)。相对于类可以在运行时动态构造对象而言,元类也可以在运行时动态生成类。

解释type

示例:

>>> type('abc')
<type 'str'>
>>> type('abc')('def')
'def'
>>> type(type(1))
<type 'type'>
>>> type(type(1))(1)
<type 'int'>

>>> Calculator = type('Calculator', (), {'add': lambda self, x, y: x+y, 'sub': lambda sef, x, y: x-y })
>>> calc = Calculator()
>>> type(calc)
<class '__main__.Calculator'>
>>>
>>> print calc.add(1, 2)
3
>>> print calc.sub(1, 2)
-1

type实际上是接收3个参数,第一个参数是类名,第二个是父类(由于允许多重继承,所以是个元组,空元组表示父类为object),第三个参数为类的成员字典。它会返回一个新风格的type对象,这个对象实际上就是一个动态生成的类。

跟踪子类

下面的内容是摘自pro django一书中的内容,是自己在2009-03-31翻译的,放在博客上,从中可以很清楚的知道元类的使用方式。

考虑一个应用,在任何时候,访问一个特定类的所有子类列表。metaclass是一个非常好的处理手段,但是存在一个问题。记住,每一个带有__metaclass__属性的类都要处理,包括新的基类,他们是不需要被注册的(只有它的子类要被注册)。

要处理好这个问题,就要作些额外的处理,但这样作也是很直接了当的,同时也是很有益处的。

示例:

>>> class SubclassTracker(type):
...     def __init__(cls, name, bases, attrs):
...         try:
...             if TrackedClass not in bases:
...                 return
...         except NameError:
...             return
...         TrackedClass._registry.append(cls)
...
>>> class TrackedClass(object):
...     __metaclass__ = SubclassTracker
...     _registry = []
...
>>> class ClassOne(TrackedClass):
...     pass
...
>>> TrackedClass._registry
[<class '__main__.ClassOne'>]
>>> class ClassTwo(TrackedClass):
...     pass
...
>>> TrackedClass._registry
[<class '__main__.ClassOne'>, <class '__main__.ClassTwo'>]

这个metaclass执行了两个功能。首先,try块确保父类,TrackedClass,已经定义好了。如果没有的话,就抛出NameError异常, 这个过程就表明metaclass当前正处理TrackedClass。TrackedClass那还能处理更多的东西,但是这个例子为了简单,忽略掉了,只要通过注册就行了。

...

所有TrackedClass的子类能在任何时候从注册表中提取。 TrackedClass的任何子类都将出现在这个注册表中,不管子类在哪里定义的。执行这个类定义的过程就开始注册它,应用程序能导入任何有这些类和 metaclass的模块。

gunicorn配置怎么处理

Setting、SettingMeta与TrackedClass、SubclassTracker主体上是一致的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class SettingMeta(type):
    def __new__(cls, name, bases, attrs):
        super_new = super(SettingMeta, cls).__new__
        parents = [b for b in bases if isinstance(b, SettingMeta)]
        if not parents:
            return super_new(cls, name, bases, attrs)

        attrs["order"] = len(KNOWN_SETTINGS)
        attrs["validator"] = wrap_method(attrs["validator"])

        new_class = super_new(cls, name, bases, attrs)
        new_class.fmt_desc(attrs.get("desc", ""))
        KNOWN_SETTINGS.append(new_class)
        return new_class

    def fmt_desc(cls, desc):
        desc = textwrap.dedent(desc).strip()
        setattr(cls, "desc", desc)
        setattr(cls, "short", desc.splitlines()[0])
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Setting(object):
    __metaclass__ = SettingMeta

    name = None
    value = None
    section = None
    cli = None
    validator = None
    type = None
    meta = None
    action = None
    default = None
    short = None
    desc = None

    def __init__(self):
        if self.default is not None:
            self.set(self.default)

    def add_option(self, parser):
        if not self.cli:
            return
        args = tuple(self.cli)
        kwargs = {
            "dest": self.name,
            "metavar": self.meta or None,
            "action": self.action or "store",
            "type": self.type or "string",
            "default": None,
            "help": "%s [%s]" % (self.short, self.default)
        }
        if kwargs["action"] != "store":
            kwargs.pop("type")
        parser.add_option(*args, **kwargs)

    def copy(self):
        return copy.copy(self)

    def get(self):
        return self.value

    def set(self, val):
        assert callable(self.validator), "Invalid validator: %s" % self.name
        self.value = self.validator(val)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Bind(Setting):
    name = "bind"
    section = "Server Socket"
    cli = ["-b", "--bind"]
    meta = "ADDRESS"
    validator = validate_string
    default = "127.0.0.1:8000"
    desc = """\
        The socket to bind.

        A string of the form: 'HOST', 'HOST:PORT', 'unix:PATH'. An IP is a valid
        HOST.
        """

class Workers(Setting):
    name = "workers"
    section = "Worker Processes"
    cli = ["-w", "--workers"]
    meta = "INT"
    validator = validate_pos_int
    type = "int"
    default = 1
    desc = """\
        The number of worker process for handling requests.

        A positive integer generally in the 2-4 x $(NUM_CORES) range. You'll
        want to vary this a bit to find the best for your particular
        application's work load.
        """

注意在SettingMeta中的KNOWN_SETTINGS.append(new_class),和TrackedClass._registry,Bind、Workers在声明时,就已经执行SettingMeta的__new__(),留意__new__()与__init__()的区别

>>> from gunicorn import config
>>> b = config.Bind()
>>> b.default
'127.0.0.1:8000'
>>> b.cli
['-b', '--bind']
>>> b.desc
"The socket to bind.\n\nA string of the form: 'HOST', 'HOST:PORT', 'unix:PATH'. An IP is a valid\nHOST."
>>> b.name
'bind'
>>> b.value
'127.0.0.1:8000'
>>> b.validator
<bound method Bind._wrapped of <gunicorn.config.Bind object at 0x8eb4eec>>
>>> b.order
1

SettingMeta中第8-9行,动态生成的有:validator()方法与order属性,在导入config时就已完成。

1
2
3
4
def wrap_method(func):
    def _wrapped(instance, *args, **kwargs):
        return func(*args, **kwargs)
    return _wrapped

这里使用了python decorator的特性,wrap_method(‘validate_string’)通过这样处理后,把validate_string替换成新的名称validator。

>>> from gunicorn import config
>>> c = config.Config()
>>> c.address
('127.0.0.1', 8000)
>>> c.workers
1
>>> c.worker_class
<class 'gunicorn.workers.sync.SyncWorker'>

由Config中的parser方法,来解析命令行的参数

真正的主宰者

循环主体

通过上面的抽丝拨茧,进入到Arbiter类,这个才是真正的主宰者。在前面涉及gunicorn设计一节中,已经讲到了部分Arbiter的信号处理,这里将详细介绍run()方法。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class Arbiter(object):
    """
    Arbiter maintain the workers processes alive. It launches or
    kills them if needed. It also manages application reloading
    via SIGHUP/USR2.
    """

    # A flag indicating if a worker failed to
    # to boot. If a worker process exist with
    # this error code, the arbiter will terminate.
    WORKER_BOOT_ERROR = 3

    START_CTX = {}

    LISTENER = None
    WORKERS = {}
    PIPE = []

    # I love dynamic languages
    SIG_QUEUE = []
    SIGNALS = map(
        lambda x: getattr(signal, "SIG%s" % x),
        "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
    )
    SIG_NAMES = dict(
        (getattr(signal, name), name[3:].lower()) for name in dir(signal)
        if name[:3] == "SIG" and name[3] != "_"
    )

    def __init__(self, app):
        os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE

        self.setup(app)

        self.pidfile = None
        self.worker_age = 0
        self.reexec_pid = 0
        self.master_name = "Master"

        # get current path, try to use PWD env first
        try:
            a = os.stat(os.environ['PWD'])
            b = os.stat(os.getcwd())
            if a.ino == b.ino and a.dev == b.dev:
                cwd = os.environ['PWD']
            else:
                cwd = os.getcwd()
        except:
            cwd = os.getcwd()

        args = sys.argv[:]
        args.insert(0, sys.executable)

        # init start context
        self.START_CTX = {
            "args": args,
            "cwd": cwd,
            0: sys.executable
        }

    def setup(self, app):
        self.app = app
        self.cfg = app.cfg
        self.log = self.cfg.logger_class(app.cfg)

        if 'GUNICORN_FD' in os.environ:
            self.log.reopen_files()

        self.address = self.cfg.address
        self.num_workers = self.cfg.workers
        self.debug = self.cfg.debug
        self.timeout = self.cfg.timeout
        self.proc_name = self.cfg.proc_name
        self.worker_class = self.cfg.worker_class

        if self.cfg.debug:
            self.log.debug("Current configuration:")
            for config, value in sorted(self.cfg.settings.iteritems()):
                self.log.debug("  %s: %s", config, value.value)

        if self.cfg.preload_app:
            if not self.cfg.debug:
                self.app.wsgi()
            else:
                self.log.warning("debug mode: app isn't preloaded.")

    def start(self):
        """\
        Initialize the arbiter. Start listening and set pidfile if needed.
        """
        self.log.info("Starting gunicorn %s", __version__)
        self.cfg.on_starting(self)
        self.pid = os.getpid()
        self.init_signals()
        if not self.LISTENER:
            self.LISTENER = create_socket(self.cfg, self.log)

        if self.cfg.pidfile is not None:
            self.pidfile = Pidfile(self.cfg.pidfile)
            self.pidfile.create(self.pid)
        self.log.debug("Arbiter booted")
        self.log.info("Listening at: %s (%s)", self.LISTENER,
            self.pid)
        self.log.info("Using worker: %s",
                self.cfg.settings['worker_class'].get())

        self.cfg.when_ready(self)

    def init_signals(self):
        """\
        Initialize master signal handling. Most of the signals
        are queued. Child signals only wake up the master.
        """
        if self.PIPE:
            map(os.close, self.PIPE)
        self.PIPE = pair = os.pipe()
        map(util.set_non_blocking, pair)
        map(util.close_on_exec, pair)
        self.log.close_on_exec()
        map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
        signal.signal(signal.SIGCHLD, self.handle_chld)

    def signal(self, sig, frame):
        if len(self.SIG_QUEUE) < 5:
            self.SIG_QUEUE.append(sig)
            self.wakeup()

    def run(self):
        "Main master loop."
        self.start()
        util._setproctitle("master [%s]" % self.proc_name)

        self.manage_workers()
        while True:
            try:
                self.reap_workers()
                sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
                if sig is None:
                    self.sleep()
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue

                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                if not handler:
                    self.log.error("Unhandled signal: %s", signame)
                    continue
                self.log.info("Handling signal: %s", signame)
                handler()
                self.wakeup()
            except StopIteration:
                self.halt()
            except KeyboardInterrupt:
                self.halt()
            except HaltServer, inst:
                self.halt(reason=inst.reason, exit_status=inst.exit_status)
            except SystemExit:
                raise
            except Exception:
                self.log.info("Unhandled exception in main loop:\n%s",
                            traceback.format_exc())
                self.stop(False)
                if self.pidfile is not None:
                    self.pidfile.unlink()
                sys.exit(-1)

正如在设计一节中说到的,主控master进程,就是一个简单的循环,用来不断侦听各种信号,然后作出不同的动作。

  • 130行,start(),用来创建一个侦听用的socket,并创建一个pidfile(就是进程号文件),放在/tmp下
  • 133行,manage_workers()
  • 134行,正式进入循环体

核心语句

那下面我们感兴趣的地方,自然是落在那些信号的初始化上,仔细进入start()里面看看,start()中主要的语句在:

self.pid = os.getpid()
self.init_signals()
if not self.LISTENER:
    self.LISTENER = create_socket(self.cfg, self.log)

if self.cfg.pidfile is not None:
    self.pidfile = Pidfile(self.cfg.pidfile)
    self.pidfile.create(self.pid)

self.init_signals():

#对预设的各种信号进行map处理
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
signal.signal(signal.SIGCHLD, self.handle_chld)

各种信号就是Arbiter的类变量:

SIGNALS = map(
      lambda x: getattr(signal, "SIG%s" % x),
      "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
  )

这里运用了python有趣的动态语言特性,lambda,map,那这些信号map之后,到哪去了呢?注意看self.signal()

def signal(self, sig, frame):
    if len(self.SIG_QUEUE) < 5:
        self.SIG_QUEUE.append(sig)
        self.wakeup()

把所有的信号放进一个信号列表,然后执行wakeup()。

进入run()中的循环体之后,真正处理信号的就是handle(),而这个对应的是各种信号的处理handle_xxx()。真正管理用户请求和响应的,并不是在arbiter中处理,而是交给了worker。所以注意看manage_worker()和spawn_worker()

manage_worker()代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def manage_workers(self):
        """\
        Maintain the number of workers by spawning or killing
        as required.
        """
        if len(self.WORKERS.keys()) < self.num_workers:
            self.spawn_workers()

        workers = self.WORKERS.items()
        workers.sort(key=lambda w: w[1].age)
        while len(workers) > self.num_workers:
            (pid, _) = workers.pop(0)
            self.kill_worker(pid, signal.SIGQUIT)

根据worker数量,如果小于配置中的数量,就生成一个worker进程,否则中断worker进程

spawn_worker()代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def spawn_worker(self):
    self.worker_age += 1
    worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
                                self.app, self.timeout/2.0,
                                self.cfg, self.log)
    self.cfg.pre_fork(self, worker)
    pid = os.fork()
    if pid != 0:
        self.WORKERS[pid] = worker
        return pid

    # Process Child
    worker_pid = os.getpid()
    try:
        util._setproctitle("worker [%s]" % self.proc_name)
        self.log.info("Booting worker with pid: %s", worker_pid)
        self.cfg.post_fork(self, worker)
        worker.init_process()
        sys.exit(0)
    except SystemExit:
        raise
    except:
        self.log.exception("Exception in worker process:")
        if not worker.booted:
            sys.exit(self.WORKER_BOOT_ERROR)
        sys.exit(-1)
    finally:
        self.log.info("Worker exiting (pid: %s)", worker_pid)
        try:
            worker.tmp.close()
            self.cfg.worker_exit(self, worker)
        except:
            pass

其中:

worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
                                self.app, self.timeout/2.0,
                                self.cfg, self.log)
......

worker.init_process()

由此进入worker枢纽环节,将在后续介绍之。

self.cfg.pre_fork(self, worker)是server hook之一。代码在哪呢?在config.py中

class Prefork(Setting):
    name = "pre_fork"
    section = "Server Hooks"
    validator = validate_callable(2)
    type = "callable"
    def pre_fork(server, worker):
        pass
    default = staticmethod(pre_fork)
    desc = """\
        Called just before a worker is forked.

        The callable needs to accept two instance variables for the Arbiter and
        new Worker.
        """

可以在自定义的配置文件中定义。之后的self.cfg.post_fork(self, worker),self.cfg.worker_exit(self, worker)都是在config.py中作为server hook:

pid = os.fork()
worker_pid = os.getpid()

这是我们熟知的进程处理方式。

worker积木

之所以称之为*积木*,意思就是它可以随时进行替换,也是gunicorn让人称道的地方。

同步worker

默认的worker class,是同步的方式,也就是一次只能处理一个请求,且必须等待它完成后返回。对应的class SyncWorker(base.Worker),还是进入到源代码看看吧

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
import gunicorn.http as http
import gunicorn.http.wsgi as wsgi
import gunicorn.util as util
import gunicorn.workers.base as base

class SyncWorker(base.Worker):

    def run(self):
        # self.socket appears to lose its blocking status after
        # we fork in the arbiter. Reset it here.
        self.socket.setblocking(0)

        while self.alive:
            self.notify()

            # Accept a connection. If we get an error telling us
            # that no connection is waiting we fall down to the
            # select which is where we'll wait for a bit for new
            # workers to come give us some love.
            try:
                client, addr = self.socket.accept()
                client.setblocking(1)
                util.close_on_exec(client)
                self.handle(client, addr)

                # Keep processing clients until no one is waiting. This
                # prevents the need to select() for every client that we
                # process.
                continue

            except socket.error, e:
                if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
                    raise

            # If our parent changed then we shut down.
            if self.ppid != os.getppid():
                self.log.info("Parent changed, shutting down: %s", self)
                return

            try:
                self.notify()
                ret = select.select([self.socket], [], self.PIPE, self.timeout)
                if ret[0]:
                    continue
            except select.error, e:
                if e[0] == errno.EINTR:
                    continue
                if e[0] == errno.EBADF:
                    if self.nr < 0:
                        continue
                    else:
                        return
                raise

    def handle(self, client, addr):
        try:
            parser = http.RequestParser(client)
            req = parser.next()
            self.handle_request(req, client, addr)
        except StopIteration, e:
            self.log.debug("Closing connection. %s", e)
        except socket.error, e:
            if e[0] != errno.EPIPE:
                self.log.exception("Error processing request.")
            else:
                self.log.debug("Ignoring EPIPE")
        except Exception, e:
            self.handle_error(client, e)
        finally:
            util.close(client)

    def handle_request(self, req, client, addr):
        environ = {}
        try:
            self.cfg.pre_request(self, req)
            request_start = datetime.now()
            resp, environ = wsgi.create(req, client, addr,
                    self.address, self.cfg)
            # Force the connection closed until someone shows
            # a buffering proxy that supports Keep-Alive to
            # the backend.
            resp.force_close()
            self.nr += 1
            if self.nr >= self.max_requests:
                self.log.info("Autorestarting worker after current request.")
                self.alive = False
            respiter = self.wsgi(environ, resp.start_response)
            try:
                if isinstance(respiter, environ['wsgi.file_wrapper']):
                    resp.write_file(respiter)
                else:
                    for item in respiter:
                        resp.write(item)
                resp.close()
                request_time = datetime.now() - request_start
                self.log.access(resp, environ, request_time)
            finally:
                if hasattr(respiter, "close"):
                    respiter.close()
        except socket.error:
            raise
        except Exception, e:
            # Only send back traceback in HTTP in debug mode.
            self.handle_error(client, e)
            return
        finally:
            try:
                self.cfg.post_request(self, req, environ)
            except:
                pass

SyncWorker总共才定义了三个方法,run(),handle(),handle_request(),真正的处理就在run()内,它是从父类base.Worker继承而来。很自然,我们就会回想起app的处理,wsgiapp/djangoapp的设计模式,从WSGIApplication与DjangoApplication继承自Application。

观察了源代码的结构后,发现的确这种设计模式是一脉相承的。在base.Worker中,run()方法是作为一个空架子放在那里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Worker(object):

    SIGNALS = map(
        lambda x: getattr(signal, "SIG%s" % x),
        "HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()
    )

    PIPE = []

    def __init__(self, age, ppid, socket, app, timeout, cfg, log):
        """\
        This is called pre-fork so it shouldn't do anything to the
        current process. If there's a need to make process wide
        changes you'll want to do that in ``self.init_process()``.
        """
        self.age = age
        self.ppid = ppid
        self.socket = socket
        self.app = app
        self.timeout = timeout
        self.cfg = cfg
        self.booted = False

        self.nr = 0
        self.max_requests = cfg.max_requests or sys.maxint
        self.alive = True
        self.log = log
        self.debug = cfg.debug
        self.address = self.socket.getsockname()
        self.tmp = WorkerTmp(cfg)

    def __str__(self):
        return "<Worker %s>" % self.pid

    @property
    def pid(self):
        return os.getpid()

    def notify(self):
        """\
        Your worker subclass must arrange to have this method called
        once every ``self.timeout`` seconds. If you fail in accomplishing
        this task, the master process will murder your workers.
        """
        self.tmp.notify()

    def run(self):
        """\
        This is the mainloop of a worker process. You should override
        this method in a subclass to provide the intended behaviour
        for your particular evil schemes.
        """
        raise NotImplementedError()

在主控master那里启动worker.init_process(),恰恰就是定义在base.Worker中.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Worker(object):

    # ......

    def init_process(self):
        """\
        If you override this method in a subclass, the last statement
        in the function should be to call this method with
        super(MyWorkerClass, self).init_process() so that the ``run()``
        loop is initiated.
        """
        util.set_owner_process(self.cfg.uid, self.cfg.gid)

        # Reseed the random number generator
        util.seed()

        # For waking ourselves up
        self.PIPE = os.pipe()
        map(util.set_non_blocking, self.PIPE)
        map(util.close_on_exec, self.PIPE)

        # Prevent fd inherientence
        util.close_on_exec(self.socket)
        util.close_on_exec(self.tmp.fileno())

        self.log.close_on_exec()

        self.init_signals()

        self.wsgi = self.app.wsgi()

        # Enter main run loop
        self.booted = True
        self.run()

    def init_signals(self):
        map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_exit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        # Don't let SIGQUIT and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGQUIT, False)
            signal.siginterrupt(signal.SIGUSR1, False)

在init_process()执行了子类包括SyncWorker的run()方法。

异步worker

异步worker分两种,一个是Eventlet,另一个就是大名鼎鼎的gevent。整体的设计模式,与同步worker类似,都是采用的template pattern。

EventletWorker

EventletWorker继承路线上是分成了三级:

class EventletWorker(AsyncWorker)
class AsyncWorker(base.Worker)

先看EventletWorker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from __future__ import with_statement


import os
try:
    import eventlet
except ImportError:
    raise RuntimeError("You need eventlet installed to use this worker.")
from eventlet import hubs
from eventlet.greenio import GreenSocket

from gunicorn.workers.async import AsyncWorker

class EventletWorker(AsyncWorker):

    @classmethod
    def setup(cls):
        import eventlet
        if eventlet.version_info < (0,9,7):
            raise RuntimeError("You need eventlet >= 0.9.7")
        eventlet.monkey_patch(os=False)

    def init_process(self):
        hubs.use_hub()
        super(EventletWorker, self).init_process()

    def timeout_ctx(self):
        return eventlet.Timeout(self.cfg.keepalive, False)

    def run(self):
        self.socket = GreenSocket(family_or_realsock=self.socket.sock)
        self.socket.setblocking(1)
        self.acceptor = eventlet.spawn(eventlet.serve, self.socket,
                self.handle, self.worker_connections)

        while self.alive:
            self.notify()
            if self.ppid != os.getppid():
                self.log.info("Parent changed, shutting down: %s", self)
                break

            eventlet.sleep(1.0)

        self.notify()
        with eventlet.Timeout(self.timeout, False):
            eventlet.kill(self.acceptor, eventlet.StopServe)

run()方法中的语句:

self.acceptor = eventlet.spawn(eventlet.serve, self.socket,
                self.handle, self.worker_connections)

调用的是:

eventlet.serve(sock, handle, concurrency=1000)

这里包含了self.handle,这个方法在AsyncWorker中声明定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class AsyncWorker(base.Worker):

    def __init__(self, *args, **kwargs):
        super(AsyncWorker, self).__init__(*args, **kwargs)
        self.worker_connections = self.cfg.worker_connections

    def timeout_ctx(self):
        raise NotImplementedError()

    def handle(self, client, addr):
        try:
            parser = http.RequestParser(client)
            try:
                while True:
                    req = None
                    with self.timeout_ctx():
                        req = parser.next()
                    if not req:
                        break
                    self.handle_request(req, client, addr)
            except StopIteration, e:
                self.log.debug("Closing connection. %s", e)
        except socket.error, e:
            if e[0] not in (errno.EPIPE, errno.ECONNRESET):
                self.log.exception("Socket error processing request.")
            else:
                if e[0] == errno.ECONNRESET:
                    self.log.debug("Ignoring connection reset")
                else:
                    self.log.debug("Ignoring EPIPE")
        except Exception, e:
            self.handle_error(client, e)
        finally:
            util.close(client)

    def handle_request(self, req, sock, addr):
        try:
            self.cfg.pre_request(self, req)
            request_start = datetime.now()
            resp, environ = wsgi.create(req, sock, addr, self.address, self.cfg)
            self.nr += 1
            if self.alive and self.nr >= self.max_requests:
                self.log.info("Autorestarting worker after current request.")
                resp.force_close()
                self.alive = False
            respiter = self.wsgi(environ, resp.start_response)
            if respiter == ALREADY_HANDLED:
                return False
            try:
                for item in respiter:
                    resp.write(item)
                resp.close()
                request_time = datetime.now() - request_start
                self.log.access(resp, environ, request_time)
            finally:
                if hasattr(respiter, "close"):
                  respiter.close()
            if resp.should_close():
                raise StopIteration()
        finally:
            try:
                self.cfg.post_request(self, req, environ)
            except:
                pass
        return True
GeventWorker

这里的代码关系要复杂一些

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class GeventWorker(AsyncWorker):

    server_class = None
    wsgi_handler = None

    @classmethod
    def setup(cls):
        from gevent import monkey
        monkey.noisy = False
        monkey.patch_all()


    def timeout_ctx(self):
        return gevent.Timeout(self.cfg.keepalive, False)

    def run(self):
        self.socket.setblocking(1)

        pool = Pool(self.worker_connections)
        if self.server_class is not None:
            server = self.server_class(
                self.socket, application=self.wsgi, spawn=pool, log=self.log,
                handler_class=self.wsgi_handler)
        else:
            server = StreamServer(self.socket, handle=self.handle, spawn=pool)

        server.start()
        try:
            while self.alive:
                self.notify()
                if self.ppid != os.getppid():
                    self.log.info("Parent changed, shutting down: %s", self)
                    break

                gevent.sleep(1.0)

        except KeyboardInterrupt:
            pass

        try:
            # Try to stop connections until timeout
            self.notify()
            server.stop(timeout=self.timeout)
        except:
            pass

    def handle_request(self, *args):
        try:
            super(GeventWorker, self).handle_request(*args)
        except gevent.GreenletExit:
            pass

    if hasattr(gevent.core, 'dns_shutdown'):

        def init_process(self):
            #gevent 0.13 and older doesn't reinitialize dns for us after forking
            #here's the workaround
            gevent.core.dns_shutdown(fail_requests=1)
            gevent.core.dns_init()
            super(GeventWorker, self).init_process()

主要的焦点在:

server = self.server_class(
                self.socket, application=self.wsgi, spawn=pool, log=self.log,
                handler_class=self.wsgi_handler)

而self.server_class和self.wsgi_handler都是空值,怎么能使用呢?

再继续往下看,原来它派生出了一个子类

class GeventPyWSGIWorker(GeventWorker):
    "The Gevent StreamServer based workers."
    server_class = PyWSGIServer
    wsgi_handler = PyWSGIHandler

在gunicorn的代码树中,唯一没有介绍到的只有http目录了,下一步进入http

自带的httpserver

gunicorn自带了一个python wsgi http server,是为了默认的syncworker而设计的。

class SyncWorker(base.Worker):

    def run(self):
        # self.socket appears to lose its blocking status after
        # we fork in the arbiter. Reset it here.
        self.socket.setblocking(0)

        while self.alive:
            self.notify()

            # Accept a connection. If we get an error telling us
            # that no connection is waiting we fall down to the
            # select which is where we'll wait for a bit for new
            # workers to come give us some love.
            try:
                client, addr = self.socket.accept()
                client.setblocking(1)
                util.close_on_exec(client)
                self.handle(client, addr)

                # Keep processing clients until no one is waiting. This
                # prevents the need to select() for every client that we
                # process.
                continue

            except socket.error, e:
                if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
                    raise

终于在这里看到了久违的self.socket.accept(),接收一个访问连接,由self.handle(client, addr)来处理客户端来的请求命令。

class SyncWorker(base.Worker):

    #......

    def handle(self, client, addr):
        try:
            parser = http.RequestParser(client)
            req = parser.next()
            self.handle_request(req, client, addr)
        except StopIteration, e:
            self.log.debug("Closing connection. %s", e)
        except socket.error, e:
            if e[0] != errno.EPIPE:
                self.log.exception("Error processing request.")
            else:
                self.log.debug("Ignoring EPIPE")
        except Exception, e:
            self.handle_error(client, e)
        finally:
            util.close(client)

从http.RequestParser(client)进入http模块中,在self.handle_request(req, client, addr)中调用wsgi模块

class SyncWorker(base.Worker):

    #......

    def handle_request(self, req, client, addr):
        environ = {}
        try:
            self.cfg.pre_request(self, req)
            request_start = datetime.now()
            resp, environ = wsgi.create(req, client, addr,
                    self.address, self.cfg)
            # Force the connection closed until someone shows
            # a buffering proxy that supports Keep-Alive to
            # the backend.
            resp.force_close()
            self.nr += 1
            if self.nr >= self.max_requests:
                self.log.info("Autorestarting worker after current request.")
                self.alive = False
            respiter = self.wsgi(environ, resp.start_response)
            try:
                if isinstance(respiter, environ['wsgi.file_wrapper']):
                    resp.write_file(respiter)
                else:
                    for item in respiter:
                        resp.write(item)
                resp.close()
                request_time = datetime.now() - request_start
                self.log.access(resp, environ, request_time)
            finally:
                if hasattr(respiter, "close"):
                    respiter.close()
        except socket.error:
            raise
        except Exception, e:
            # Only send back traceback in HTTP in debug mode.
            self.handle_error(client, e)
            return
        finally:
            try:
                self.cfg.post_request(self, req, environ)
            except:
                pass

其中涉及到wsgi模块的有:

resp, environ = wsgi.create(req, client, addr,
                    self.address, self.cfg)

获得最终的响应结果:

respiter = self.wsgi(environ, resp.start_response)

这里的self.wsgi的踪迹在哪里呢?

  1. 由base.Worker中的方法init_process()中调用
self.init_signals()

self.wsgi = self.app.wsgi()

# Enter main run loop
self.booted = True
self.run()
  1. 在app,也就是wsgiapp或djangoapp中WSGIApplication -> Application中,有
def wsgi(self):
    if self.callable is None:
        self.callable = self.load()
    return self.callable
  1. 直接由self.load()执行,回转到WSGIApplication中的load()
def load(self):
    return util.import_app(self.app_uri)
  1. 调用util模块中的import_app(self.app_uri)
def import_app(module):
    parts = module.split(":", 1)
    if len(parts) == 1:
        module, obj = module, "application"
    else:
        module, obj = parts[0], parts[1]

    try:
        __import__(module)
    except ImportError:
        if module.endswith(".py") and os.path.exists(module):
            raise ImportError("Failed to find application, did "
                "you mean '%s:%s'?" % (module.rsplit(".",1)[0], obj))
        else:
            raise

    mod = sys.modules[module]
    app = eval(obj, mod.__dict__)
    if app is None:
        raise ImportError("Failed to find application object: %r" % obj)
    if not callable(app):
        raise TypeError("Application object must be callable.")
    return app

这才是最终wsgi应用程序的发起。还记得我们在快速入门中编写的简单wsgi程序么?

那么我们就在这里加上一句print,看看是不是:

print u'waiting for you ...(%s, %s)' % (module, obj)

再执行命令:

$ gunicorn --workers=2 myapp:app

结果如下:

(mypy)hzg@xubuntu:~/sample$ gunicorn --workers=2 myapp:app
2012-02-16 08:45:09 [2007] [INFO] Starting gunicorn 0.13.4
2012-02-16 08:45:09 [2007] [INFO] Listening at: http://127.0.0.1:8000 (2007)
2012-02-16 08:45:09 [2007] [INFO] Using worker: sync
2012-02-16 08:45:09 [2010] [INFO] Booting worker with pid: 2010
waiting for you ...(myapp, app)
2012-02-16 08:45:09 [2011] [INFO] Booting worker with pid: 2011
waiting for you ...(myapp, app)

自己动手

我们再来看看用自定义配置文件

#import multiprocessing

bind = "127.0.0.1:8000"
#workers = multiprocessing.cpu_count() * 2 + 1

worker_class = "sync"
#worker_class = "gevent_pywsgi"
#worker_class = "egg:gunicorn#tornado"
#worker_class = "gunicorn.workers.ggevent.GeventWorker"

worker_connections = 100

timeout = 50

loglevel = 'debug'

proc_name = 'justforfun'

def post_fork(server, worker):
    #server.log.info("Worker spawned (pid: %s)", worker.pid)
    pass

def pre_fork(server, worker):
    #server.log.info("Pre fork ...")
    pass

执行很简单:

gunicorn -c myconf.py myapp:app

输出结果:

(mypy)hzg@gofast:~/work/rdgunicorn/sample$ gunicorn -c myconf.py myapp:app
2012-02-19 20:05:43 [2139] [INFO] Starting gunicorn 0.13.4
2012-02-19 20:05:43 [2139] [DEBUG] Arbiter booted
2012-02-19 20:05:43 [2139] [INFO] Listening at: http://127.0.0.1:8000 (2139)
2012-02-19 20:05:43 [2139] [INFO] Using worker: sync
2012-02-19 20:05:43 [2142] [INFO] Booting worker with pid: 2142
2012-02-19 20:05:50 [2142] [DEBUG] GET /
2012-02-19 20:06:16 [2139] [INFO] Handling signal: winch
2012-02-19 20:06:16 [2139] [INFO] SIGWINCH ignored. Not daemonized
2012-02-19 20:07:37 [2139] [INFO] Handling signal: ttin
2012-02-19 20:07:37 [2207] [INFO] Booting worker with pid: 2207
2012-02-19 20:07:55 [2139] [INFO] Handling signal: ttou
2012-02-19 20:07:55 [2142] [INFO] Worker exiting (pid: 2142)

看到其中的ttin, ttou,就是用来增加和中止worker进程,怎么做呢?:

$ kill -TTIN 2139
$ kill -TTOU 2139

部署应用

gunicorn官方主站强烈推荐nginx作为web server proxy,同时给出建议:如果选择其他的web server,在使用gunicorn的sync worker时,就需要确保它能缓冲慢速客户端的请求。没有这个缓冲的功能,gunicorn很容易遭受dos的攻击。可以用slowloris测试一下,看看你的web server proxy的响应如何。

slowloris是什么? 瞧瞧去:

http://ha.ckers.org/slowloris/

slowloris,采用perl编写的一个脚本,有趣有趣。

转回正题,看看下面是nginx的配置示例,在nginx.conf中http部分加入:

upstream app_server {
    server http://localhost:8000 fail_timeout=0;
}

在sites-available目录下的default中:

server {
    listen 80 default;
    client_max_body_size 4G;
    server_name _;

    keepalive_timeout 5;

    location / {
        # checks for static file, if not found proxy to app
        try_files $uri @proxy_to_app;
    }

    location @proxy_to_app {
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;

        proxy_pass   http://app_server;
    }
}

在浏览器中输入:

http://localhost

即可得到输出结果