深入浅出Python开发Web中已经实现了一个简单的WSGI服务器,但是在生产应用需要考虑健壮、性能、并发等等问题。

Gunicorn是一个Python的WSGI HTTP服务器实现,它支持多个进程模型,可以自动管理工作进程。在Python web开发中被广泛应用,是最受欢迎的服务器之一,许多著名的Python web框架都支持使用Gunicorn作为服务器。

Start

$ pip install gunicorn
$ cat myapp.py
  def app(environ, start_response):
      data = b"Hello, World!\n"
      start_response("200 OK", [
          ("Content-Type", "text/plain"),
          ("Content-Length", str(len(data)))
      ])
      return iter([data])
$ gunicorn -w 4 myapp:app
[2014-09-10 10:22:28 +0000] [30869] [INFO] Listening at: http://127.0.0.1:8000 (30869)
[2014-09-10 10:22:28 +0000] [30869] [INFO] Using worker: sync
[2014-09-10 10:22:28 +0000] [30874] [INFO] Booting worker with pid: 30874
[2014-09-10 10:22:28 +0000] [30875] [INFO] Booting worker with pid: 30875

gunicorn通过pip安装提供了一个cli入口,可以查看setup.py看到入口函数gunicorn.app.wsgiapp:run

# https://github.com/benoitc/gunicorn/blob/20.1.0/setup.py#L114
entry_points="""
[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run


[paste.server_runner]
main=gunicorn.app.pasterapp:serve
"""

# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/app/wsgiapp.py#L61
def run():
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()

实例化了WSGIApplication,然后执行run方法启动。

WSGIApplication

# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/app/base.py#L70

class BaseApplication(object):
    ...
    def run(self):
        try:
            Arbiter(self).run()
        except RuntimeError as e:
            print("\nError: %s\n" % e, file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)

WSGIApplication继承BaseApplicationrun实现,最终实例化Arbiter执行其run

Arbiter

Arbiter是gunicorn的核心类,负责着整个服务器的运行管理,run方法是服务器启动的核心入口。

# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/arbiter.py#L196

def run(self):
    "Main master loop."
    # 1.初始化实例,并信号监听以及sock
    self.start()
    util._setproctitle("master [%s]" % self.proc_name)

    try:
        # 2.创建worker直到worker数量满足用户的指定条件,如果当前worker数量超过用户指定的条件,则会杀掉创建时间比较长的worker
        self.manage_workers()

        while True:
            # 3.负责判断该进程是否是正真的master,如果是则提升为正真的master(这一块放在最后一部分进行分析)
            self.maybe_promote_master()

            # 4.获取信号
            sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
            if sig is None:
                # 5.利用select休眠1小时
                self.sleep()
                # 6.判断worker是否超时,如果是则杀掉worker(将在Worker章节进行分析) 
                self.murder_workers()
                self.manage_workers()
                continue

            if sig not in self.SIG_NAMES:
                self.log.info("Ignoring unknown signal: %s", sig)
                continue

            signame = self.SIG_NAMES.get(sig)
            # 调用对应的信号处理
            handler = getattr(self, "handle_%s" % signame, None)
            if not handler:
                self.log.error("Unhandled signal: %s", signame)
                continue
            self.log.info("Handling signal: %s", signame)
            handler()
            # 7.这样下次循环就不会等待一秒了
            self.wakeup()
    # 8.服务异常,进行退出处理
    except (StopIteration, KeyboardInterrupt):
        # 收到用户的退出信号(按下CTRL+C) 
        self.halt()
    except HaltServer as inst:
        # Worker运行异常的时候
        self.halt(reason=inst.reason, exit_status=inst.exit_status)
    except SystemExit:
        raise
    except Exception:
        # 其它的运行异常
        self.log.info("Unhandled exception in main loop",
                      exc_info=True)
        self.stop(False)
        if self.pidfile is not None:
            self.pidfile.unlink()
        sys.exit(-1)

run方法的loop中会执行核心的manage_workers管理子进程逻辑。

ManageWorkers

# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/arbiter.py#L545

class Arbiter(object):

    def manage_workers(self):
        if len(self.WORKERS) < self.num_workers:
            self.spawn_workers()
        ...

    def spawn_worker(self):
        self.worker_age += 1
        worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log) # 通过配置获取子进程worker类

        self.cfg.pre_fork(self, worker)
        pid = os.fork()      # 执行fork
        if pid != 0:
            worker.pid = pid
            self.WORKERS[pid] = worker
            return pid

        ...

        # Process Child
        worker.pid = os.getpid()
        try:
            util._setproctitle("worker [%s]" % self.proc_name)
            self.log.info("Booting worker with pid: %s", worker.pid)
            self.cfg.post_fork(self, worker)
            worker.init_process()        # 执行worker的初始化
            sys.exit(0)
        except SystemExit:
            raise


    def spawn_workers(self):

        for _ in range(self.num_workers - len(self.WORKERS)):
            self.spawn_worker()
            time.sleep(0.1 * random.random())

manage_workers中会实例化配置的worker类,然后fork子进程执行workerinit_process

Worker

Worker在gunicorn的pre-fork子进程中负责运行应用处理请求, 大部分worker都是对gunicorn的http、wsgi接口封装,但可以通过自定义协议封装来支持TCP应用。

  • 如果worker使用gunicorn实现的http、wsgi接口,则只需要考虑怎么处理来自客户端的请求即可。Gunicron内置实现了基于thread、gevent、tornado和sync的不同worker,默认gunicorn使用sync的worker,处理请求是单线程、阻塞同步模式。
  • 如果worker在init_process之后,worker不再使用gunicorn实现的http、wsgi接口,仅仅使用共享master进程获得的socket,则可以自定义协议实现TCP应用,这种模式下gunicorn充当一个TCP应用进程管理器。
# src/gunicorn/gunicorn/workers git:(master) $ ls -lh

-rw-rw-r-- 1 arvin arvin 5.6K Jul 11  2022 base_async.py
-rw-rw-r-- 1 arvin arvin 9.0K Mar 24 10:50 base.py
-rw-rw-r-- 1 arvin arvin 6.0K Mar 24 10:50 geventlet.py
-rw-rw-r-- 1 arvin arvin 5.7K Mar 24 10:50 ggevent.py
-rw-rw-r-- 1 arvin arvin  12K Jul 11  2022 gthread.py
-rw-rw-r-- 1 arvin arvin 5.9K Mar 24 10:50 gtornado.py
-rw-rw-r-- 1 arvin arvin  594 Jul 11  2022 __init__.py
-rw-rw-r-- 1 arvin arvin 7.2K Jul 11  2022 sync.py
-rw-rw-r-- 1 arvin arvin 1.7K Jul 11  2022 workertmp.py

SyncWorker

# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/workers/base.py#L29
class Worker(object):

    def init_process(self):
        ....
        self.load_wsgi() # 加载wsgi应用
        ...
        self.run()


# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/workers/sync.py#L25
class SyncWorker(base.Worker):

    def accept(self, listener):
        client, addr = listener.accept()
        client.setblocking(1)
        util.close_on_exec(client)
        self.handle(listener, client, addr) # 处理连接请求

    def run_for_one(self, timeout):
        while self.alive:
            ...
           self.accept(listener)
            ...


    def run_for_multiple(self, timeout):
        while self.alive:
            ...
           self.accept(listener)
            ...

    def run(self): # main loop
        ...
        if len(self.sockets) > 1:
            self.run_for_multiple(timeout) # 进入请求处理loop
        else:
            self.run_for_one(timeout)  # 进入请求处理loop

    def handle(self, listener, client, addr):
         ...
         parser = http.RequestParser(self.cfg, client, addr) # 解析http请求
         req = next(parser)
         self.handle_request(listener, req, client, addr) # 处理wsgi请求
        ...


    def handle_request(self, listener, req, client, addr):
          ...
          resp, environ = wsgi.create(req, client, addr,
                                        listener.getsockname(), self.cfg) # 构建wsgi协议对象
          ...
          respiter = self.wsgi(environ, resp.start_response) # 调用wsgi应用
          ...
  • SyncWorker启动流程为:init_process(初始化)->self.load_wsgi(加载wsgi应用)->run(进入处理请求loop)。
  • SyncWorker处理一个http请求的流程为:accept(接受请求)->handle(解析http)->handle_request(封装wsgi)->self.wsgi(调用wsgi应用)。