真正的主宰者

循环主体

通过上面的抽丝拨茧,进入到Arbiter类,这个才是真正的主宰者。在前面涉及gunicorn设计一节中,已经讲到了部分Arbiter的信号处理,这里将详细介绍run()方法。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class Arbiter(object):
    """
    Arbiter maintain the workers processes alive. It launches or
    kills them if needed. It also manages application reloading
    via SIGHUP/USR2.
    """

    # A flag indicating if a worker failed to
    # to boot. If a worker process exist with
    # this error code, the arbiter will terminate.
    WORKER_BOOT_ERROR = 3

    START_CTX = {}

    LISTENER = None
    WORKERS = {}
    PIPE = []

    # I love dynamic languages
    SIG_QUEUE = []
    SIGNALS = map(
        lambda x: getattr(signal, "SIG%s" % x),
        "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
    )
    SIG_NAMES = dict(
        (getattr(signal, name), name[3:].lower()) for name in dir(signal)
        if name[:3] == "SIG" and name[3] != "_"
    )

    def __init__(self, app):
        os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE

        self.setup(app)

        self.pidfile = None
        self.worker_age = 0
        self.reexec_pid = 0
        self.master_name = "Master"

        # get current path, try to use PWD env first
        try:
            a = os.stat(os.environ['PWD'])
            b = os.stat(os.getcwd())
            if a.ino == b.ino and a.dev == b.dev:
                cwd = os.environ['PWD']
            else:
                cwd = os.getcwd()
        except:
            cwd = os.getcwd()

        args = sys.argv[:]
        args.insert(0, sys.executable)

        # init start context
        self.START_CTX = {
            "args": args,
            "cwd": cwd,
            0: sys.executable
        }

    def setup(self, app):
        self.app = app
        self.cfg = app.cfg
        self.log = self.cfg.logger_class(app.cfg)

        if 'GUNICORN_FD' in os.environ:
            self.log.reopen_files()

        self.address = self.cfg.address
        self.num_workers = self.cfg.workers
        self.debug = self.cfg.debug
        self.timeout = self.cfg.timeout
        self.proc_name = self.cfg.proc_name
        self.worker_class = self.cfg.worker_class

        if self.cfg.debug:
            self.log.debug("Current configuration:")
            for config, value in sorted(self.cfg.settings.iteritems()):
                self.log.debug("  %s: %s", config, value.value)

        if self.cfg.preload_app:
            if not self.cfg.debug:
                self.app.wsgi()
            else:
                self.log.warning("debug mode: app isn't preloaded.")

    def start(self):
        """\
        Initialize the arbiter. Start listening and set pidfile if needed.
        """
        self.log.info("Starting gunicorn %s", __version__)
        self.cfg.on_starting(self)
        self.pid = os.getpid()
        self.init_signals()
        if not self.LISTENER:
            self.LISTENER = create_socket(self.cfg, self.log)

        if self.cfg.pidfile is not None:
            self.pidfile = Pidfile(self.cfg.pidfile)
            self.pidfile.create(self.pid)
        self.log.debug("Arbiter booted")
        self.log.info("Listening at: %s (%s)", self.LISTENER,
            self.pid)
        self.log.info("Using worker: %s",
                self.cfg.settings['worker_class'].get())

        self.cfg.when_ready(self)

    def init_signals(self):
        """\
        Initialize master signal handling. Most of the signals
        are queued. Child signals only wake up the master.
        """
        if self.PIPE:
            map(os.close, self.PIPE)
        self.PIPE = pair = os.pipe()
        map(util.set_non_blocking, pair)
        map(util.close_on_exec, pair)
        self.log.close_on_exec()
        map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
        signal.signal(signal.SIGCHLD, self.handle_chld)

    def signal(self, sig, frame):
        if len(self.SIG_QUEUE) < 5:
            self.SIG_QUEUE.append(sig)
            self.wakeup()

    def run(self):
        "Main master loop."
        self.start()
        util._setproctitle("master [%s]" % self.proc_name)

        self.manage_workers()
        while True:
            try:
                self.reap_workers()
                sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
                if sig is None:
                    self.sleep()
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue

                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                if not handler:
                    self.log.error("Unhandled signal: %s", signame)
                    continue
                self.log.info("Handling signal: %s", signame)
                handler()
                self.wakeup()
            except StopIteration:
                self.halt()
            except KeyboardInterrupt:
                self.halt()
            except HaltServer, inst:
                self.halt(reason=inst.reason, exit_status=inst.exit_status)
            except SystemExit:
                raise
            except Exception:
                self.log.info("Unhandled exception in main loop:\n%s",
                            traceback.format_exc())
                self.stop(False)
                if self.pidfile is not None:
                    self.pidfile.unlink()
                sys.exit(-1)

正如在设计一节中说到的,主控master进程,就是一个简单的循环,用来不断侦听各种信号,然后作出不同的动作。

  • 130行,start(),用来创建一个侦听用的socket,并创建一个pidfile(就是进程号文件),放在/tmp下
  • 133行,manage_workers()
  • 134行,正式进入循环体

核心语句

那下面我们感兴趣的地方,自然是落在那些信号的初始化上,仔细进入start()里面看看,start()中主要的语句在:

self.pid = os.getpid()
self.init_signals()
if not self.LISTENER:
    self.LISTENER = create_socket(self.cfg, self.log)

if self.cfg.pidfile is not None:
    self.pidfile = Pidfile(self.cfg.pidfile)
    self.pidfile.create(self.pid)

self.init_signals():

#对预设的各种信号进行map处理
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
signal.signal(signal.SIGCHLD, self.handle_chld)

各种信号就是Arbiter的类变量:

SIGNALS = map(
      lambda x: getattr(signal, "SIG%s" % x),
      "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
  )

这里运用了python有趣的动态语言特性,lambda,map,那这些信号map之后,到哪去了呢?注意看self.signal()

def signal(self, sig, frame):
    if len(self.SIG_QUEUE) < 5:
        self.SIG_QUEUE.append(sig)
        self.wakeup()

把所有的信号放进一个信号列表,然后执行wakeup()。

进入run()中的循环体之后,真正处理信号的就是handle(),而这个对应的是各种信号的处理handle_xxx()。真正管理用户请求和响应的,并不是在arbiter中处理,而是交给了worker。所以注意看manage_worker()和spawn_worker()

manage_worker()代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def manage_workers(self):
        """\
        Maintain the number of workers by spawning or killing
        as required.
        """
        if len(self.WORKERS.keys()) < self.num_workers:
            self.spawn_workers()

        workers = self.WORKERS.items()
        workers.sort(key=lambda w: w[1].age)
        while len(workers) > self.num_workers:
            (pid, _) = workers.pop(0)
            self.kill_worker(pid, signal.SIGQUIT)

根据worker数量,如果小于配置中的数量,就生成一个worker进程,否则中断worker进程

spawn_worker()代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def spawn_worker(self):
    self.worker_age += 1
    worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
                                self.app, self.timeout/2.0,
                                self.cfg, self.log)
    self.cfg.pre_fork(self, worker)
    pid = os.fork()
    if pid != 0:
        self.WORKERS[pid] = worker
        return pid

    # Process Child
    worker_pid = os.getpid()
    try:
        util._setproctitle("worker [%s]" % self.proc_name)
        self.log.info("Booting worker with pid: %s", worker_pid)
        self.cfg.post_fork(self, worker)
        worker.init_process()
        sys.exit(0)
    except SystemExit:
        raise
    except:
        self.log.exception("Exception in worker process:")
        if not worker.booted:
            sys.exit(self.WORKER_BOOT_ERROR)
        sys.exit(-1)
    finally:
        self.log.info("Worker exiting (pid: %s)", worker_pid)
        try:
            worker.tmp.close()
            self.cfg.worker_exit(self, worker)
        except:
            pass

其中:

worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
                                self.app, self.timeout/2.0,
                                self.cfg, self.log)
......

worker.init_process()

由此进入worker枢纽环节,将在后续介绍之。

self.cfg.pre_fork(self, worker)是server hook之一。代码在哪呢?在config.py中

class Prefork(Setting):
    name = "pre_fork"
    section = "Server Hooks"
    validator = validate_callable(2)
    type = "callable"
    def pre_fork(server, worker):
        pass
    default = staticmethod(pre_fork)
    desc = """\
        Called just before a worker is forked.

        The callable needs to accept two instance variables for the Arbiter and
        new Worker.
        """

可以在自定义的配置文件中定义。之后的self.cfg.post_fork(self, worker),self.cfg.worker_exit(self, worker)都是在config.py中作为server hook:

pid = os.fork()
worker_pid = os.getpid()

这是我们熟知的进程处理方式。