flask源码分析

用flask做框架开发了那么久的web, 一直都没有好好读过源码, 最近刚好认真读了源码, 做一个记录.

flask简介

The Python micro framework for building web applications.
flask的定位是一个微python web开发框架, 简单可扩展, 灵活易用, 它只实现了web框架的最核心的功能, 核心代码app.py只有2k+行.
flask主要有两个核心的依赖库: werkzeugjinja, werkzeug 更是两者中的核心.
werkzeug负责核心的逻辑模块, 路由, 请求, 响应, wsgi等; jinja负责模板的渲染.说到werkzeug, 有一个必须要理解的概念那就是 wsgi.

WSGI

在说到wsgi时我们先看一下面向http的python程序需要关心的内容:

  1. 请求:
    • 请求方法(method)
    • 请求地址(url)
    • 请求内容(body)
    • 请求头(header)
    • 请求环境(environ)
  2. 响应:
    • 响应码(status_code)
    • 响应数据(data)
    • 响应头(header)

wsgi要做的就是关于程序端和服务端的标准规范, 将服务程序接收到的请i去传递给python程序, 并将网络的数据流和python的结构体进行转换.
它规定了python程序必须是一个可调用对象(实现了__call__函数的方法或类), 接受两个参数environ(WSGI的环境信息)和start_response(开始响应请求的函数), 并返回可迭代的结果. 直接上代码来实现一个最简单的web程序返回hello world:

from werkzeug.serving import run_simple

class WebClass:
    def __init__(self):
        pass

    def __call__(self, environ, start_response):
        status = '200 OK'
        response_headers = [('Content-type', 'text/plain')]
        start_response(status, response_headers)
        yield str.encode("Hello World!\n")

if __name__ == "__main__":
    app = WebClass()
    run_simple("127.0.0.1", 5000, app)

WebClass正是实现了__call__方法的可调用对象, 接受environ和start_respone, 并在返回之前调用start_response, start_response接受两个必须的参数, status_code(http状态码)和response_header(响应头), yield hello world正是要求的可迭代结果, 现在这个类只是实现了最简单的功能, 路由注册, 模板渲染等都没有实现.这里用了werkzeug提供的run_simple, 其实我们创建flask应用, 跑起来的时候调用的也是这个函数,后面将会讲到.

启动flask应用

上面讲了WSGI的基本概念,基本流程可以参考下面:

c               s   application_callable(environ,start_response)    
l   request     e   -------------->                                 a
i   ------->    r    start_response(status,headers,exc_info)        p(python application) 
e               v      <----------------                            p
n               e   return iterator
t               r       <--------------
  • client: 浏览器等客户端
  • server: apache, nginx, gunicorn等http server
  • app: python程序
    现在我们写一个很简单的flask 应用,后面将其作为例子:
from flask import Flask, jsonify, request

app = Flask(__name__)

docs = modules = [
		{"name":"get_nodes","desc":"获取所有节点资源","url":"/nodes","method":"GET"},
		{"name":"node_create","desc":"新增节点资源","url":"/nodes","method":"POST"},
]

rs_data = {
		"name": "node_info",
		"desc": "节点信息管理",
		"url": "/api",
		"contents": [
			{
				"version": "v1",
				"contents": docs
			}
		]
	}

@app.route("/hello", methods=["GET", "POST"])
def handle_hello():
    req = request.args
    name_param = req.get("name", "")
    return "hello, %s!" % (name_param)

@app.route("/api/v1/nodes", methods=["GET"])
def get_nodes():
    node = [{
            "_id": "5d7f5c6dfd2cf90018ba05e6",
            "created_at": "Mon, 16 Sep 2019 17:57:01 GMT",
            "node": {
                "ipaddress": "192.168.1.100",
                "nodes_limit": 5,
                "partner": "xxxxx技术有限公司",
                "period": 6,
                "provider": "xxxxxx",
            }
    }]
    return jsonify({
		"value": node,
		"msg": "获取节点成功",
		"errors": []
	})

@app.route("/api/v1/nodes", methods=["POST"])
def post_node():
    dat = request.json
    return jsonify({
		"value": dat,
		"msg": "添加节点成功",
		"errors": []
	})

@app.route("/docs", methods=["GET"])
def get_docs():
    return jsonify({
		"value": rs_data,
		"msg": "获取所有接口信息成功",
		"errors": []
	})


if __name__ == "__main__":
    app.run()

启动过程

启动函数app.run(), 其源码为:

def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
        _host = "127.0.0.1"
        _port = 5000
        server_name = self.config.get("SERVER_NAME")
        sn_host, sn_port = None, None

        if server_name:
            sn_host, _, sn_port = server_name.partition(":")

        host = host or sn_host or _host
        port = int(next((p for p in (port, sn_port) if p is not None), _port))

        options.setdefault("use_reloader", self.debug)
        options.setdefault("use_debugger", self.debug)
        options.setdefault("threaded", True)

        cli.show_server_banner(self.env, self.debug, self.name, False)

        from werkzeug.serving import run_simple

        try:
            run_simple(host, port, self, **options)
        finally:
            self._got_first_request = False

为了方便阅读我删了注释和很多不是很重要的地方,下面的源码也将按照同样的方法处理
这个函数很简单,只是处理了一下参数, 主要还是调用werkzeug的run_simple, 正是和我们一开始举的例子一样
为了了解程序的运行到数据的传递将会讲到一些werkzeug的内容,下面简单讲一下run_simple函数:

# lib\python3.6\socketserver.py
class TCPServer(BaseServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        # 初始化函数有个RequestHandlerClass参数, 接受请求处理类, wsgi传的是WSGIRequestHandler
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise
    
    def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()
        try:
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def finish_request(self, request, client_address):
    # RequestHandlerClass正是werkzeug的WSGIRequestHandler
    self.RequestHandlerClass(request, client_address, self)

# HTTPServer类源码很短,主要还是继承于socket server
# lib\python3.6\socketserver.py
class HTTPServer(socketserver.TCPServer):
    server_name: str
    server_port: int
    def __init__(self, server_address: Tuple[str, int],
                 RequestHandlerClass: type) -> None: ...

# site-packages\werkzeug\serving.py
class BaseWSGIServer(HTTPServer, object):
    def __init__(self,host,port,app,handler=None,passthrough_errors=False,ssl_context=None,fd=None):
        # 这个类主要继承了http包的HTTPServer
        pass

# site-packages\werkzeug\serving.py
def make_server(host=None,port=None,app=None,threaded=False,processes=1,request_handler=None,passthrough_errors=False,ssl_context=None,fd=None,):
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )

# site-packages\werkzeug\serving.py
def inner():
    try:
        fd = int(os.environ["WERKZEUG_SERVER_FD"])
    except (LookupError, ValueError):
        fd = None
    srv = make_server(
        hostname,
        port,
        application,
        threaded,
        processes,
        request_handler,
        passthrough_errors,
        ssl_context,
        fd=fd,
    )
    if fd is None:
        log_startup(srv.socket)
    # 创建wsgi的http server, 然后运行
    srv.serve_forever()

上面的整个流程可以概括为:

  1. TcpServer调用server_forever监听指定地址端口
  2. finsh_request将获取到的请求(包括内容, 源IP地址, 端口号)等源信息传递给WSGIRequestHandler
  3. WSGIRequestHandler将request信息转换为environ
  4. WSGIRequestHandler再将environ, start_response传递给flask处理

对应的逻辑再werkzeug.serving:WSGIRequestHandler中的run_wsgi中有这么一段代码:

def execute(app):
    # 调用flask获得迭代结果的地方
    application_iter = app(environ, start_response)
    try:
        for data in application_iter:
            write(data)
        if not headers_sent:
            write(b"")
    finally:
        if hasattr(application_iter, "close"):
            application_iter.close()
        application_iter = None

前面说了wsgi规定应用程序必须实现__call__方法, 找到Flask对应的内容:

    def wsgi_app(self, environ, start_response):
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

这段代码的目的就是找到处理函数并且调用它. 还有异常的处理, 还有个比较重要的就是ctx.push(), 后面会讲到, 现继续看full_dispatch_request的代码:

def full_dispatch_request(self):
    self.try_trigger_before_first_request_functions()
    try:
        request_started.send(self)
        rv = self.preprocess_request()
        if rv is None:
            rv = self.dispatch_request()
    except Exception as e:
        rv = self.handle_user_exception(e)
    return self.finalize_request(rv)

这段最核心的就是dispatch_request, dispatch_request就是我们注册的路由函数的执行结果, 在dispatch_request之前我们看到preprocess_request, 它的作用是将钩子函数处理一下:

  1. 第一次请求处理之前的钩子函数, 通过before_first_request定义的
  2. 每个请求处理之前的钩子函数, 通过before_request定义的

而在dispat_request之后还有finalize_request函数, 它的作用同样是将请求结果通过钩子函数处理一下:

  1. 每个请求正常处理之后的 hook 函数,通过 after_request 定义
  2. 不管请求是否异常都要执行的 teardown_request hook 函数 所以上面最重要的就是dispatch_request函数, 找到我们注册的路由函数, 并返回调用结果, 也就是路由的过程. 现在启动我们准备的例子, 并请求 GET 127.0.0.1:5000/api/v1/nodes?value=30, 将run_wsgi里准备的environ打印出来, 可以看到werkzeug已经将请求转换为python的结构:
    environ:
    {
     "wsgi.version": (1,0),
     "wsgi.url_scheme": "http",
     "wsgi.input": <_io.BufferedReader name=4>,
     "wsgi.errors": <_io.TextIOWrapper name="<stderr>" mode="w" encoding="UTF-8">,
     "wsgi.multithread": True,
     "wsgi.multiprocess": False,
     "wsgi.run_once": False,
     "werkzeug.server.shutdown": <function WSGIRequestHandler.make_environ.<locals>.shutdown_server at 0x7fb721605e18>,
     "SERVER_SOFTWARE": "Werkzeug/0.15.5",
     "REQUEST_METHOD": "GET",
     "SCRIPT_NAME": "",
     "PATH_INFO": "/api/v1/nodes",
     "QUERY_STRING": "value=30",
     "REQUEST_URI": "/api/v1/nodes?value=30",
     "RAW_URI": "/api/v1/nodes?value=30",
     "REMOTE_ADDR": "127.0.0.1",
     "REMOTE_PORT": 51932,
     "SERVER_NAME": "127.0.0.1",
     "SERVER_PORT": "5000",
     "SERVER_PROTOCOL": "HTTP/1.1",
     "HTTP_TEST": "test123",
     "HTTP_CACHE_CONTROL": "no-cache",
     "HTTP_POSTMAN_TOKEN": "941f2037-eaae-4d6d-8b08-31a7d98d4fba",
     "HTTP_USER_AGENT": "PostmanRuntime/7.6.0",
     "HTTP_ACCEPT": "*/*",
     "HTTP_HOST": "127.0.0.1:5000",
     "HTTP_ACCEPT_ENCODING": "gzip, deflate",
     "HTTP_CONNECTION": "keep-alive",
     "werkzeug.request": <Request "http://127.0.0.1:5000/api/v1/nodes?value=30" [GET]>
    }
    

路由

web 应用程序对不同的路径会有不同的处理函数, 路由就是根据请求的URL找到对应的处理函数的过程.
当请求到达应用程序的时候, 应用程序需要一个规则列表, 它存储了url和处理函数的对应关系, 最容易想到的就是定义一个字典, key是url, value是对应的处理函数.
这是在理想的情况下, 也就是url都是静态的, 如果是动态路由, 就无法用这种方法处理了, 那么flask的路由过程是怎么样的呢?
在flask中, 构建路由规则有两种方法

  1. 通过@app.route()的装饰器, 上面例子用的就是这种方法
  2. 通过app.add_url_rule, 这个方法的签名为add_url_rule(self, rule, endpoint=None, view_func=None, **options), 参数的含义为:
    • rule: url规则字符串
    • endpoint: 要注册规则的endpoint, 也可以不写, 默认将会是view_func的名称 这两种方法是等价的, 先让我们看一下route方法:
      def route(self, rule, **options):
       def decorator(f):
        endpoint = options.pop("endpoint", None)
        self.add_url_rule(rule, endpoint, f, **options)
        return f
       return decorator
      

      可以看到它只是一个装饰器, 最终还是会调用add_url_rule:

@setupmethod
def add_url_rule(self,rule,endpoint=None,view_func=None,provide_automatic_options=None,**options):
    if endpoint is None:
        endpoint = _endpoint_from_view_func(view_func)
    options["endpoint"] = endpoint
    methods = options.pop("methods", None)
    if methods is None:
        methods = getattr(view_func, "methods", None) or ("GET",)
    if isinstance(methods, string_types):
        raise TypeError(
            "Allowed methods have to be iterables of strings, "
            'for example: @app.route(..., methods=["POST"])'
        )
    methods = set(item.upper() for item in methods)

    required_methods = set(getattr(view_func, "required_methods", ()))
    if provide_automatic_options is None:
        provide_automatic_options = getattr(view_func, "provide_automatic_options", None)
    if provide_automatic_options is None:
        if "OPTIONS" not in methods:
            provide_automatic_options = True
            required_methods.add("OPTIONS")
        else:
            provide_automatic_options = False
    methods |= required_methods
    rule = self.url_rule_class(rule, methods=methods, **options)
    rule.provide_automatic_options = provide_automatic_options
    self.url_map.add(rule)
    if view_func is not None:
        old_func = self.view_functions.get(endpoint)
        if old_func is not None and old_func != view_func:
            raise AssertionError(
                "View function mapping is overwriting an "
                "existing endpoint function: %s" % endpoint
            )
        self.view_functions[endpoint] = view_func

可以发现这个函数主要做的就是更新app的url_map和view_functions这两个变量.查找定义, 发现url_map是werkzeug.routing的Map类对象, rule是werkzeug.routing的Rule类对象, view_functions就是一个字典, 从上我们也可以知道每个视图函数的endpoint必须是不同的.也可以发现, flask的核心路由逻辑其实实在werkzeug中实现的

werkzeug路由逻辑

我们将上面例子的url_map和view_functions打印出来看一下, print(app.url_map) print(app.view_functions):

url_map:
Map([<Rule '/api/v1/nodes' (GET, OPTIONS, HEAD) -> get_nodes>,
 <Rule '/api/v1/nodes' (POST, OPTIONS) -> post_node>,
 <Rule '/hello' (GET, OPTIONS, POST, HEAD) -> handle_hello>,
 <Rule '/docs' (GET, OPTIONS, HEAD) -> get_docs>,
 <Rule '/static/<filename>' (GET, OPTIONS, HEAD) -> static>])

view_functions:
{'static': <bound method _PackageBoundObject.send_static_file of <Flask 'flask_prac'>>, 'handle_hello': <function handle_hello at 0x7f5639807bf8>, 'get_nodes': <function get_nodes at 0x7f56365a98c8>, 'post_node': <function post_node at 0x7f56365b5bf8>, 'get_docs': <function get_docs at 0x7f56365b5d90>}

从上面打印出来信息我们可以知道, werkzeug实现的功能其实是从url到返回endpoint, 至于endpoint和view_function的关系它是不管的, 而从view_functions也可以看到flask是把它放到字典里面的, 通过key(endpoint), 就可以找到view function了, 也发现了endpoint在整个路由过程中是非常重要的.
那么有个很重要的功能就是通过url找到准确的endpoint了, 我们先看一下werkzeug官方注释里实现的功能:

>>> m = Map([
    ...     # Static URLs
    ...     Rule('/', endpoint='static/index'),
    ...     Rule('/about', endpoint='static/about'),
    ...     Rule('/help', endpoint='static/help'),
    ...     # Knowledge Base
    ...     Subdomain('kb', [
    ...         Rule('/', endpoint='kb/index'),
    ...         Rule('/browse/', endpoint='kb/browse'),
    ...         Rule('/browse/<int:id>/', endpoint='kb/browse'),
    ...         Rule('/browse/<int:id>/<int:page>', endpoint='kb/browse')
    ...     ])
    ... ], default_subdomain='www')
    >>> c = m.bind('example.com')
    >>> c.build("kb/browse", dict(id=42))
    'http://kb.example.com/browse/42/'
    >>> c.build("kb/browse", dict())
    'http://kb.example.com/browse/'
    >>> c.build("kb/browse", dict(id=42, page=3))
    'http://kb.example.com/browse/42/3'
    >>> c.build("static/about")
    '/about'
    >>> c.build("static/index", force_external=True)
    'http://www.example.com/'

    >>> c = m.bind('example.com', subdomain='kb')
    >>> c.build("static/about")
    'http://www.example.com/about'
    >>> c = m.bind('example.com', '/applications/example')
    >>> c = m.bind('example.com')
    >>> c.match("/")
    ('static/index', {})
    >>> c.match("/about")
    ('static/about', {})
    >>> c = m.bind('example.com', '/', 'kb')
    >>> c.match("/")
    ('kb/index', {})
    >>> c.match("/browse/42/23")
    ('kb/browse', {'id': 42, 'page': 23})

上面演示了几个核心的路由功能: 把路由表绑定到特定的环境(m.bind), 匹配url(c.match), 正常情况下返回对应的endpoint和参数字典, 也可能报重定向或404.
现在回过头来看dispatch_request:

def dispatch_request(self):
    req = _request_ctx_stack.top.request
    if req.routing_exception is not None:
        self.raise_routing_exception(req)
    rule = req.url_rule
    if (
        getattr(rule, "provide_automatic_options", False)
        and req.method == "OPTIONS"
    ):
        return self.make_default_options_response()
    return self.view_functions[rule.endpoint](**req.view_args)

这里先获取请求对象request, 得到endpoint后又从view_functions找到对应的view_func, 再把请求参数传过去, 得到响应并返回.
至于req.url_rule是什么保存进去, 格式又是什么呢? 先要知道的是_request_ctx_stack.top.request保存着当前请求的信息, 在每次请求来的时候, flask会把当前的请求信息压入栈顶, 一边在整个请求过程中使用它.

flask的ctx.py文件中,_request_ctx_stack保存这RequestContext, 和路由相关的整个处理逻辑是:

class Flask(_PackageBoundObject):
    # request_class为flask.wrappers.py定义的Request类, 而flask.wrappers.py里的Request类又继承于werkzeug.wrappers.py的Request类
    request_class = Request
    def wsgi_app(self, environ, start_response):
        ctx = self.request_context(environ)
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()

    def request_context(self, environ):
        return RequestContext(self, environ)
    
    def create_url_adapter(self, request):
        if request is not None:
            subdomain = (
                (self.url_map.default_subdomain or None)
                if not self.subdomain_matching
                else None
            )
            return self.url_map.bind_to_environ(
                request.environ,
                server_name=self.config["SERVER_NAME"],
                subdomain=subdomain,
            )
        if self.config["SERVER_NAME"] is not None:
            return self.url_map.bind(
                self.config["SERVER_NAME"],
                script_name=self.config["APPLICATION_ROOT"],
                url_scheme=self.config["PREFERRED_URL_SCHEME"],
            )

class RequestContext(object):
    def __init__(self, app, environ, request=None, session=None):
        self.app = app
        if request is None:
            request = app.request_class(environ)
        self.request = request
        self.url_adapter = None
        try:
            self.url_adapter = app.create_url_adapter(self.request)
        except HTTPException as e:
            self.request.routing_exception = e
    
    def push(self):
        if self.url_adapter is not None:
                self.match_request()

    def match_request(self):
        try:
            result = self.url_adapter.match(return_rule=True)
            self.request.url_rule, self.request.view_args = result
        except HTTPException as e:
            self.request.routing_exception = e

总结一下整个流程,就是先根据environ环境信息创建RequestContext, ctx在初始化的时候调用app的create_url_adapter返回的是werkzeug.routing.MapAdapter路由适配器, 在ctx.push压入栈顶的时候, 调用match_request, 而match_request又调用适配器的match方法, 返回匹配的准确路由和参数, 我们启动上面样例, 并访问/api/v1/nodes, 将self.url_adapter.match(return_rule=True)的 结果打印出来, 可以看到:

(<Rule '/api/v1/nodes' (OPTIONS, GET, HEAD) -> get_nodes>, {})

因为加了return_rule=True的参数, 所以返回的是werkzeug.routing.Rule类, 不然返回的将是(‘get_nodes’, {}), 和官方例子的match给的结果一样, 所以在dispatch_request中, req.url_rule就是这么来的, 且它是Rule类, Rule类里面包含了endpoint的名称, 再通过views_functions字典查找, 找到处理函数并加上路由参数就能得到返回结果了
现在flask的整个路由流程都讲完了, 但是还有一点不清楚, 那就是werkzeug怎么实现match方法的, 首先调用是MapAdapter类的match方法, Map里保存了Rule列表, MapAdapter在match的时候其实是一次调用了Rule的match方法, 如果两个匹配就说明找到了, Rule.match方法:

regex = r'^%s%s$' % (
            u''.join(regex_parts),
            (not self.is_leaf or not self.strict_slashes) and
            '(?<!/)(?P<__suffix__>/?)' or ''
        )
self._regex = re.compile(regex, re.UNICODE)

def match(self, path, method=None):
    if not self.build_only:
        m = self._regex.search(path)
        if m is not None:
            groups = m.groupdict()
            if self.strict_slashes and not self.is_leaf and \
                    not groups.pop('__suffix__') and \
                    (method is None or self.methods is None or
                        method in self.methods):
                raise RequestSlash()
            elif not self.strict_slashes:
                del groups['__suffix__']

            result = {}
            for name, value in iteritems(groups):
                try:
                    value = self._converters[name].to_python(value)
                except ValidationError:
                    return
                result[str(name)] = value
            if self.defaults:
                result.update(self.defaults)
            if self.alias and self.map.redirect_defaults:
                raise RequestAliasRedirect(result)
            return result

逻辑其实是用compile的正则表达式去匹配真实的路径信息, 并把匹配的参数保存在字典中返回

上下文(application contex和request conetxt)

什么是上下文呢, 知乎上的高赞回答很通俗易懂:

每一段程序都有很多外部变量。只有像Add这种简单的函数才是没有外部变量的。一旦你的一段程序有了外部变量,这段程序就不完整,不能独立运行。你为了使他们运行,就要给所有的外部变量一个一个写一些值进去。这些值的集合就叫上下文。

之前在上面我们已经讲到dispatch_request函数在找到view_function后, 只是将最基本的参数传给了view_function, 可是有时这对视图函数来说是远远不够的, 它有时还需要头部(header), body里的数据, 才能正确运行, 可能 最简单的方法就是将所有的这些信息封装成一个对象, 作为参数传给视图函数, 可是这样一来所有的视图函数都需要添加对应的参数, 即使并没有用到它.

flask 的做法是把这些信息作为上下文, 类似全局变量的东西, 在需要的时候, 用 from flask import request 获取, 比如经常用的request.json, request.args, 这里有一个很重要的点就是它们必须是动态的, 在多线程或多协程的情况下, 每个线程或协程获取的必须是自己独特的对象, 不能导入后结果获取的是其他请求的内容, 那就乱套了.

那么flask是如何实现不同的线程协程准确获得自己的上下文的呢, 我们先来看一下这两个上下文的定义:

def _lookup_req_object(name):
    top = _request_ctx_stack.top
    if top is None:
        raise RuntimeError(_request_ctx_err_msg)
    return getattr(top, name)

def _lookup_app_object(name):
    top = _app_ctx_stack.top
    if top is None:
        raise RuntimeError(_app_ctx_err_msg)
    return getattr(top, name)

def _find_app():
    top = _app_ctx_stack.top
    if top is None:
        raise RuntimeError(_app_ctx_err_msg)
    return top.app

# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))
g = LocalProxy(partial(_lookup_app_object, "g"))

application context演化成出两个变量current_app和g, request context演化出request和session, 要实现用到了两个类: Local Stack和Local Proxy, 正是这两个东西才让我们在并发程序中每个视图函数都会看到属于自己的上下文而不会混乱, 而这两个类能在多线程或多协程情况下实现隔离效果是考了另一个基础类Local, 实现了类似threading.local的效果:

try:
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident

class Local(object):
    #__slots__限制实例的属性, 只允许对Local实现添加__storage__和__ident_func__属性
    __slots__ = ('__storage__', '__ident_func__')

    def __init__(self):
        # 数据保存在__storage__中, 后面的访问都是对它的操作
        object.__setattr__(self, '__storage__', {})
        # 获得当前线程的id
        object.__setattr__(self, '__ident_func__', get_ident)

    def __iter__(self):
        return iter(self.__storage__.items())

    def __call__(self, proxy):
        """Create a proxy for a name."""
        return LocalProxy(self, proxy)

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    # 下面三个方法实现了属性的访问,设置和删除
    # 内部都调用了get_ident方法, 获取当前的线程或协程id, 然后一次为键访问值
    # 这样外部只是看到访问实例的属性, 其实内部已经实现了线程或协程的切换
    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

Local类实例的数据全是保存在__storage__中的, 这个属性其实是个嵌套字典: map[ident]map[key]value , ident是内部自动关联的, 用的就是self._ident_func(其实就是get_idnet或getcurrent), 除了访问,设置,删除__storage__的属性值外, 还实现了__release_local__方法, 用来清空当前线程或协程的数据, 其实就是删除字典里的一对key value. 现在清楚了Local是干什么, 再回过头来看那两个类, Local是用来提供多线程或多协程的隔离属性访问的, 那么Local Stack就提供了隔离的栈访问, 它只要提供了push, pop, top方法, 主要是栈的一些方法:

class LocalStack(object):
    def __init__(self):
        self._local = Local()

    def __release_local__(self):
        self._local.__release_local__()

    def _get__ident_func__(self):
        return self._local.__ident_func__

    def _set__ident_func__(self, value):
        object.__setattr__(self._local, '__ident_func__', value)
    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
    del _get__ident_func__, _set__ident_func__

    def __call__(self):
        def _lookup():
            rv = self.top
            if rv is None:
                raise RuntimeError('object unbound')
            return rv
        return LocalProxy(_lookup)
    # push, pop, top实现了栈的操作
    def push(self, obj):
        """Pushes a new item to the stack"""
        rv = getattr(self._local, 'stack', None)
        if rv is None:
            self._local.stack = rv = []
        rv.append(obj)
        return rv

    def pop(self):
        """Removes the topmost item from the stack, will return the
        old value or `None` if the stack was already empty.
        """
        stack = getattr(self._local, 'stack', None)
        if stack is None:
            return None
        elif len(stack) == 1:
            release_local(self._local)
            return stack[-1]
        else:
            return stack.pop()

    @property
    def top(self):
        """The topmost item on the stack.  If the stack is empty,
        `None` is returned.
        """
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None

之前我们在run_wsgi中看到了ctx.push()这么一个操作, ctx是RequextContext类, 而RequestConetxt里的push方法有这么一段代码:

def push(self):
    _request_ctx_stack.push(self)

在LocalStack的push方法中, 其实是对属性_local也就是Local的操作, 也就是先创建一个列表, self._local.storage[ident(当前线程或协程id)][‘stack’] = [], 然后其实还是用append将request请求信息添加进去, 现在Local和LocalStack已经讲的差不多了, 但是request,g等用的是LocalProxy, 它是做什么的呢, 其实从名字就可以看出来, 它是一个代理, 将所有对它的操作转换成对内部Local对象的操作, LocalProxy的构造函数会有一个callable的参数, 调用之后返回一个Local实例, 后续所有操作都是对这个对象的操作:

@implements_bool
class LocalProxy(object):
    __slots__ = ('__local', '__dict__', '__name__', '__wrapped__')

    def __init__(self, local, name=None):
        object.__setattr__(self, '_LocalProxy__local', local)
        object.__setattr__(self, '__name__', name)
        if callable(local) and not hasattr(local, '__release_local__'):
            # "local" is a callable that is not an instance of Local or
            # LocalManager: mark it as a wrapped function.
            object.__setattr__(self, '__wrapped__', local)

    def _get_current_object(self):
        if not hasattr(self.__local, '__release_local__'):
            return self.__local()
        try:
            return getattr(self.__local, self.__name__)
        except AttributeError:
            raise RuntimeError('no object bound to %s' % self.__name__)

上面只列出了关键代码, 关键是将通过参数传递进来的Local实例保存在_local属性中, 并定义了_get_current_object方法获取当前线程或协程对应的对象,然后LocalProxy重写了所有的魔术方法,具体实现都是代理对象,这里我们用request context演化出来的request来举例:

def _lookup_req_object(name):
    top = _request_ctx_stack.top
    if top is None:
        raise RuntimeError(_request_ctx_err_msg)
    return getattr(top, name)

_request_ctx_stack = LocalStack())
request = LocalProxy(partial(_lookup_req_object, "request"))

我们已经知道_request_ctx_stack其实是[,],也就是里面都是RequestContext的列表, top方法就是栈顶,第一个请求, 而getattr是获取为name的属性, 所以_lookup_req_object("request")其实就是获取当前线程或协程的RequestContext的request属性, 而RequestContext的request属性就似乎flask.wrappers的Request类, 它继承于多个类, 想我们平时用from flask import request, request.json, request.args等获取信息方法都是这个类来实现的.现在我们知道ctx其实是RequestContext, 请求来的时候会用push方法,我们再来看看它的一些方法:

    def push(self):
        app_ctx = _app_ctx_stack.top
        if app_ctx is None or app_ctx.app != self.app:
            app_ctx = self.app.app_context()
            app_ctx.push()
            self._implicit_app_ctx_stack.append(app_ctx)
        else:
            self._implicit_app_ctx_stack.append(None)

        if hasattr(sys, "exc_clear"):
            sys.exc_clear()

        _request_ctx_stack.push(self)
        if self.session is None:
            session_interface = self.app.session_interface
            self.session = session_interface.open_session(self.app, self.request)

            if self.session is None:
                self.session = session_interface.make_null_session(self.app)

        if self.url_adapter is not None:
            self.match_request()

    def pop(self, exc=_sentinel):
        app_ctx = self._implicit_app_ctx_stack.pop()

        try:
            clear_request = False
            if not self._implicit_app_ctx_stack:
                self.app.do_teardown_request(exc)

                request_close = getattr(self.request, 'close', None)
                if request_close is not None:
                    request_close()
                clear_request = True
        finally:
            rv = _request_ctx_stack.pop()

            # get rid of circular dependencies at the end of the request
            # so that we don't require the GC to be active.
            if clear_request:
                rv.request.environ['werkzeug.request'] = None

            # Get rid of the app as well if necessary.
            if app_ctx is not None:
                app_ctx.pop(exc)

    def auto_pop(self, exc):
        if self.request.environ.get('flask._preserve_context') or \
           (exc is not None and self.app.preserve_context_on_exception):
            self.preserved = True
            self._preserved_exc = exc
        else:
            self.pop(exc)

push就是将该请求的application context(如果_app_ctx_stack栈顶不是当前请求所在app,需要重新创建app context)和request context都保存到相关的栈上, pop则相反, 做一些出栈清理操作.现在上下文就比较清楚了,就是每次有请求过来,flask会创建当前线程或协程需要处理的两个上下文,并压入隔离的栈, 最后属兔函数需要的时候直接从栈顶取出这些信息.
其实还要知道的是application context针对的是flask实例的, 因为app实例只有一个, 所以多个请求其实是公用一个application context, 而request context是每次请求过来都要创建的,在请求结束时又出栈, 所以两个的生命周期时不同的, 也就是application context的周期就是实例的生命周期, 而requestcontext的生命周期取决于请求存在的时间. 现在为了验证localStack是线程隔离的,我们来做一下实验验证, 在上面例子的handle_hello视图函数里加上:

print("view_func hello thread id: {}".format(get_ident()))
sleep(10)

我们用sleep 10秒来延迟它的处理速度,在get_nodes函数里加上:

print("view_func get_nodes thread id: {}".format(get_ident()))

在源码flask.ctx的dispatch_request的req下面加上

req = _request_ctx_stack.top.request
print("request thread id: {}".format(_request_ctx_stack.__ident_func__()))
print(_request_ctx_stack._local.__storage__)

我们主要想看栈里的数据__storage__是不是和我们上面说的一样,现在同时发起两个/hello请求,因为里面加了sleep 10秒,所以必定由两个线程或协程处理, 并再加个请求/api/v1/nodes, 下面是打印出来的结果:

request thread id: 140734530717440
{140734530717440: {'stack': [<RequestContext 'http://127.0.0.1:5000/hello' [GET] of flask_prac>]}}
view_func hello thread id: 140734530717440
request thread id: 140734522263296
{140734530717440: {'stack': [<RequestContext 'http://127.0.0.1:5000/hello' [GET] of flask_prac>]}, 140734522263296: {'stack': [<RequestContext 'http://127.0.0.1:5000/hello' [GET] of flask_prac>]}}
view_func hello thread id: 140734522263296
request thread id: 140734513809152
{140734530717440: {'stack': [<RequestContext 'http://127.0.0.1:5000/hello' [GET] of flask_prac>]}, 140734522263296: {'stack': [<RequestContext 'http://127.0.0.1:5000/hello' [GET] of flask_prac>]}, 140734513809152: {'stack': [<RequestContext 'http://127.0.0.1:5000/api/v1/nodes?value=30' [GET] of flask_prac>]}}
view_func get_nodes thread id: 140734513809152

我们可以看到随着线程的增加, local stack里__storage__存的键值对也就越多, 键值ident_id和我们视图函数里打印出来的线程id是一样, 而且每个线程的栈顶都存储着自己的请求信息, 每个线程的请求信息都泾渭分明,现在等他们处理我们再发一次get_nodes请求,看下结果:

request thread id: 140734522263296
{140734522263296: {'stack': [<RequestContext 'http://127.0.0.1:5000/api/v1/nodes?value=30' [GET] of flask_prac>]}}
view_func get_nodes thread id: 140734522263296

我们可以看到现在localstack里只存着一个键值对, 也就说明在每次请求结束都会进行清理出栈,直到下一个请求过来又入栈.
现在我们已经清楚了上下文的实现方式.其实现在有个疑问就是我们在启动flask: app.run() 时, 并没有看到哪里启动了多线程, 理论上在单线程的情况下, 只有一个请求处理完成之后才能处理下一个请求, 那么上面为什么能同时处理多个请求, 哪里创建了多线程呢?
在flask的启动函数(run)中, 有这么一个源码:

options.setdefault("threaded", True)

其实是自flask1.0后, Flask随附的WSGI服务器默认在线程模式下运行, 带threaded=True的情况下, 服务器可以同时处理多少个线程完全取决于操作系统以及它对每个进程的线程数的限制, 该实现使用的其实是SocketServer.ThreadingMixIn, 我们在上面有讲到TcpServer的server_forever监听在指定端口, 在请求过来的时候, 将请求交给peocess_request的转发给请求处理handler, 再关闭请求, 而ThreadingMixIn则是重写了process_request方法,我们来看一下方法是怎么实现的:

class ThreadingMixIn:
    daemon_threads = False
    _block_on_close = False
    _threads = None

    def process_request_thread(self, request, client_address):
        try:
            from _thread import get_ident
            print(get_ident())
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        if not t.daemon and self._block_on_close:
            if self._threads is None:
                self._threads = []
            self._threads.append(t)
        t.start()

    def server_close(self):
        super().server_close()
        if self._block_on_close:
            threads = self._threads
            self._threads = None
            if threads:
                for thread in threads:
                    thread.join()

其实方法很简单, 就是用threading.Thread方法再起一个线程, target目标函数process_request_thread和之前的process_request没啥区别, 创建线程并把相关参数传进去, 后面的处理方式和之前一样, 只是后面会关闭线程, 在上面的源码的process_request_thread里加上print(get_ident()), 打印出来的线程id和我们在视图函数里的线程是一样的, 也就是同一线程, 并且local的__storage__将多一个以线程id为键的键值对. 还有一个问题就是为什么两个上下文为什么要弄成栈结构呢, 好像在上面的例子中没有发现栈的好处, 其实虽然在实际运行中, 每个请求都对应一个application context和request context, 但是在测试或shell中运行时, 用户可以单独创建, 以方便调试和测试, 主要是为了更加的灵活.

响应

前面讲过HTTP 响应分为三个部分: 响应码(status code), 头部(header), 数据(body), flask自然也支持对这些数据的操作, 我们在编写view函数时, 不用直接和响应打交道, flask会自动生成响应返回给客户端, 视图函数支持返回三个值, (body, statu_code, header), 比如:

return jsonify({
        "value": node,
		"msg": "获取节点成功",
		"errors": []
	}), 200, {"Content-Type": "application/json"}

在上面有提到过full_dispatch_request最后会调用finalize_request来进行最后的处理, 这个方法就包含了response对象的生成和逻辑:

def finalize_request(self, rv, from_error_handler=False):
    response = self.make_response(rv)
    try:
        response = self.process_response(response)
        request_finished.send(self, response=response)
    except Exception:
        if not from_error_handler:
            raise
        self.logger.exception(
            "Request finalizing failed with an error while handling an error"
        )
    return response

主要时两个方法的调用: make_response 根据视图函数的返回值生成response对象, process_response对response做后续的处理(注册的after_request和teardown_request钩子函数), 先来看一下make_response函数:

def make_response(self, rv):
    status = headers = None
    if isinstance(rv, tuple):
        len_rv = len(rv)
        if len_rv == 3:
            rv, status, headers = rv
        elif len_rv == 2:
            if isinstance(rv[1], (Headers, dict, tuple, list)):
                rv, headers = rv
            else:
                rv, status = rv
        else:
            raise TypeError(
                "The view function did not return a valid response tuple."
                " The tuple must have the form (body, status, headers),"
                " (body, status), or (body, headers)."
            )
    if rv is None:
        raise TypeError(
            "The view function did not return a valid response. The"
            " function either returned None or ended without a return"
            " statement."
        )
    if not isinstance(rv, self.response_class):
        if isinstance(rv, (text_type, bytes, bytearray)):
            # let the response class set the status and headers instead of
            # waiting to do it manually, so that the class can handle any
            # special logic
            rv = self.response_class(rv, status=status, headers=headers)
            status = headers = None
        elif isinstance(rv, dict):
            rv = jsonify(rv)
        elif isinstance(rv, BaseResponse) or callable(rv):
            # evaluate a WSGI callable, or coerce a different response
            # class to the correct type
            try:
                rv = self.response_class.force_type(rv, request.environ)
            except TypeError as e:
                new_error = TypeError(
                    "{e}\nThe view function did not return a valid"
                    " response. The return type must be a string, dict, tuple,"
                    " Response instance, or WSGI callable, but it was a"
                    " {rv.__class__.__name__}.".format(e=e, rv=rv)
                )
                reraise(TypeError, new_error, sys.exc_info()[2])
        else:
            raise TypeError(
                "The view function did not return a valid"
                " response. The return type must be a string, dict, tuple,"
                " Response instance, or WSGI callable, but it was a"
                " {rv.__class__.__name__}.".format(rv=rv)
            )
    if status is not None:
        if isinstance(status, (text_type, bytes, bytearray)):
            rv.status = status
        else:
            rv.status_code = status
    if headers:
        rv.headers.extend(headers)
    return rv

作用就是根据视图函数返回的多个不同数量和类型的值统一转换成response, 如果是本身就是resonse实例, 直接使用它, 如果是dict转换为json格式, 如果是tuple就尝试用(resonse,status,header)或(response, header)来解析.那么Response类是怎么样的呢, 其实flask的Response类非常简单, 基本就是继承了werkzeug.wrappers:Response和JsonMixin, 并默认返回值类型为html. 所以直接看一下werkzeug.wrappers的Response类:

class Response(BaseResponse, ETagResponseMixin, ResponseStreamMixin,
               CommonResponseDescriptorsMixin,
               WWWAuthenticateMixin):

    """Full featured response object implementing the following mixins:

    - :class:`ETagResponseMixin` for etag and cache control handling
    - :class:`ResponseStreamMixin` to add support for the `stream` property
    - :class:`CommonResponseDescriptorsMixin` for various HTTP descriptors
    - :class:`WWWAuthenticateMixin` for HTTP authentication support
    """

它用了Mixin机制, 主要看一下精简后的BaseResponse类:

class BaseResponse(object):
    """Base response class.  The most important fact about a response object
    is that it's a regular WSGI application.  It's initialized with a couple
    of response parameters (headers, body, status code etc.) and will start a
    valid WSGI response when called with the environ and start response
    callable.
    """

    charset = 'utf-8'
    default_status = 200
    default_mimetype = 'text/plain'
    automatically_set_content_length = True

    def __init__(self, response=None, status=None, headers=None,
                 mimetype=None, content_type=None, direct_passthrough=False):
        pass

它首先为类属性设置了默认值, 默认编码是utf-8, 默认状态码是200, 它和其它MixIn类共同组成了Response类, 实现了get_data读取body, set_cookie设置cookie等方法, 如果觉得方法不够, 可以直接用自己实现的相应类来替换flask自带的:

from flask import Flask, Response

class MyResponse(Response):
    pass

app = Flask(__name__)
app.response_class = MyResponse