在深入浅出Python开发Web中已经实现了一个简单的WSGI服务器,但是在生产应用需要考虑健壮、性能、并发等等问题。
Gunicorn是一个Python的WSGI HTTP服务器实现,它支持多个进程模型,可以自动管理工作进程。在Python web开发中被广泛应用,是最受欢迎的服务器之一,许多著名的Python web框架都支持使用Gunicorn作为服务器。
Start
$ pip install gunicorn
$ cat myapp.py
def app(environ, start_response):
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data])
$ gunicorn -w 4 myapp:app
[2014-09-10 10:22:28 +0000] [30869] [INFO] Listening at: http://127.0.0.1:8000 (30869)
[2014-09-10 10:22:28 +0000] [30869] [INFO] Using worker: sync
[2014-09-10 10:22:28 +0000] [30874] [INFO] Booting worker with pid: 30874
[2014-09-10 10:22:28 +0000] [30875] [INFO] Booting worker with pid: 30875
gunicorn通过pip安装提供了一个cli入口,可以查看setup.py看到入口函数gunicorn.app.wsgiapp:run
。
# https://github.com/benoitc/gunicorn/blob/20.1.0/setup.py#L114
entry_points="""
[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run
[paste.server_runner]
main=gunicorn.app.pasterapp:serve
"""
# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/app/wsgiapp.py#L61
def run():
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
实例化了WSGIApplication
,然后执行run
方法启动。
WSGIApplication
# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/app/base.py#L70
class BaseApplication(object):
...
def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
print("\nError: %s\n" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
WSGIApplication
继承BaseApplication
的run
实现,最终实例化Arbiter
执行其run
。
Arbiter
Arbiter是gunicorn的核心类,负责着整个服务器的运行管理,run
方法是服务器启动的核心入口。
# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/arbiter.py#L196
def run(self):
"Main master loop."
# 1.初始化实例,并信号监听以及sock
self.start()
util._setproctitle("master [%s]" % self.proc_name)
try:
# 2.创建worker直到worker数量满足用户的指定条件,如果当前worker数量超过用户指定的条件,则会杀掉创建时间比较长的worker
self.manage_workers()
while True:
# 3.负责判断该进程是否是正真的master,如果是则提升为正真的master(这一块放在最后一部分进行分析)
self.maybe_promote_master()
# 4.获取信号
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
# 5.利用select休眠1小时
self.sleep()
# 6.判断worker是否超时,如果是则杀掉worker(将在Worker章节进行分析)
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
# 调用对应的信号处理
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
# 7.这样下次循环就不会等待一秒了
self.wakeup()
# 8.服务异常,进行退出处理
except (StopIteration, KeyboardInterrupt):
# 收到用户的退出信号(按下CTRL+C)
self.halt()
except HaltServer as inst:
# Worker运行异常的时候
self.halt(reason=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
# 其它的运行异常
self.log.info("Unhandled exception in main loop",
exc_info=True)
self.stop(False)
if self.pidfile is not None:
self.pidfile.unlink()
sys.exit(-1)
在run
方法的loop中会执行核心的manage_workers
管理子进程逻辑。
ManageWorkers
# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/arbiter.py#L545
class Arbiter(object):
def manage_workers(self):
if len(self.WORKERS) < self.num_workers:
self.spawn_workers()
...
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log) # 通过配置获取子进程worker类
self.cfg.pre_fork(self, worker)
pid = os.fork() # 执行fork
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid
...
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process() # 执行worker的初始化
sys.exit(0)
except SystemExit:
raise
def spawn_workers(self):
for _ in range(self.num_workers - len(self.WORKERS)):
self.spawn_worker()
time.sleep(0.1 * random.random())
在manage_workers
中会实例化配置的worker
类,然后fork子进程执行worker
的init_process
。
Worker
Worker在gunicorn的pre-fork子进程中负责运行应用处理请求, 大部分worker都是对gunicorn的http、wsgi接口封装,但可以通过自定义协议封装来支持TCP应用。
- 如果worker使用gunicorn实现的http、wsgi接口,则只需要考虑怎么处理来自客户端的请求即可。Gunicron内置实现了基于thread、gevent、tornado和sync的不同worker,默认gunicorn使用sync的worker,处理请求是单线程、阻塞同步模式。
- 如果worker在init_process之后,worker不再使用gunicorn实现的http、wsgi接口,仅仅使用共享master进程获得的socket,则可以自定义协议实现TCP应用,这种模式下gunicorn充当一个TCP应用进程管理器。
# src/gunicorn/gunicorn/workers git:(master) $ ls -lh
-rw-rw-r-- 1 arvin arvin 5.6K Jul 11 2022 base_async.py
-rw-rw-r-- 1 arvin arvin 9.0K Mar 24 10:50 base.py
-rw-rw-r-- 1 arvin arvin 6.0K Mar 24 10:50 geventlet.py
-rw-rw-r-- 1 arvin arvin 5.7K Mar 24 10:50 ggevent.py
-rw-rw-r-- 1 arvin arvin 12K Jul 11 2022 gthread.py
-rw-rw-r-- 1 arvin arvin 5.9K Mar 24 10:50 gtornado.py
-rw-rw-r-- 1 arvin arvin 594 Jul 11 2022 __init__.py
-rw-rw-r-- 1 arvin arvin 7.2K Jul 11 2022 sync.py
-rw-rw-r-- 1 arvin arvin 1.7K Jul 11 2022 workertmp.py
SyncWorker
# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/workers/base.py#L29
class Worker(object):
def init_process(self):
....
self.load_wsgi() # 加载wsgi应用
...
self.run()
# https://github.com/benoitc/gunicorn/blob/20.1.0/gunicorn/workers/sync.py#L25
class SyncWorker(base.Worker):
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr) # 处理连接请求
def run_for_one(self, timeout):
while self.alive:
...
self.accept(listener)
...
def run_for_multiple(self, timeout):
while self.alive:
...
self.accept(listener)
...
def run(self): # main loop
...
if len(self.sockets) > 1:
self.run_for_multiple(timeout) # 进入请求处理loop
else:
self.run_for_one(timeout) # 进入请求处理loop
def handle(self, listener, client, addr):
...
parser = http.RequestParser(self.cfg, client, addr) # 解析http请求
req = next(parser)
self.handle_request(listener, req, client, addr) # 处理wsgi请求
...
def handle_request(self, listener, req, client, addr):
...
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg) # 构建wsgi协议对象
...
respiter = self.wsgi(environ, resp.start_response) # 调用wsgi应用
...
- SyncWorker启动流程为:init_process(初始化)->self.load_wsgi(加载wsgi应用)->run(进入处理请求loop)。
- SyncWorker处理一个http请求的流程为:accept(接受请求)->handle(解析http)->handle_request(封装wsgi)->self.wsgi(调用wsgi应用)。