litefs.handlers.request 源代码

#!/usr/bin/env python
# coding: utf-8

import importlib.util
import itertools
import json
import re
import sys
from collections import UserDict
from email.message import Message
from errno import EAGAIN, EWOULDBLOCK
from functools import lru_cache, partial
from hashlib import sha256
from http.cookies import SimpleCookie
from io import DEFAULT_BUFFER_SIZE, BufferedRWPair, BytesIO, RawIOBase, StringIO
from os import urandom
from posixpath import abspath as path_abspath
from posixpath import exists as path_exists
from posixpath import isdir as path_isdir
from posixpath import isfile as path_isfile
from posixpath import join as path_join
from posixpath import realpath as path_realpath
from posixpath import split as path_split
from posixpath import splitext as path_splitext
from subprocess import PIPE, Popen
from tempfile import NamedTemporaryFile, TemporaryFile
from time import time
from uuid import uuid4
from weakref import proxy


[文档] @lru_cache(maxsize=512) def parse_header(line): msg = Message() msg["content-type"] = line return msg.get_params()[0][0], dict(msg.get_params()[1:])
import socket from http.client import responses as http_status_codes from io import BytesIO as StringIO from urllib.parse import unquote_plus from ..exceptions import HttpError from ..session import Session from ..utils import gmt_date, log_debug, log_error, render_error from mako.lookup import TemplateLookup from ..cache import FormCache default_page = "index" default_404 = "not_found" # 会话配置默认值 default_content_type = "application/json; charset=utf-8" # 创建表单数据缓存实例 _form_cache = FormCache(max_size=1000, default_ttl=300) EOFS = ("", "\n", "\r\n") POSTS_HEADER_NAME = "litefs.posts" FILES_HEADER_NAME = "litefs.files" should_retry_error = (EWOULDBLOCK, EAGAIN) double_slash_sub = re.compile(r"\/{2,}").sub startswith_dot_sub = re.compile(r"\/\.+").sub suffixes = (".py", ".pyc", ".pyo", ".so") form_dict_match = re.compile(r"(.+)\[([^\[\]]+)\]").match server_info = "litefs/%s python/%s" % ("0.4.0", sys.version.split()[0]) DEFAULT_STATUS_MESSAGE = """\ <html> <head> <meta charset="utf-8"> <title>HTTP response</title> </head> <body> <h1>HTTP response</h1> <p>HTTP status %(code)d. <p>Message: %(message)s. <p>HTTP code explanation: %(code)s = %(explain)s. </body> </html>"""
[文档] class Response: """ 响应对象,提供更丰富的响应方法 """ def __init__(self, content=None, status_code=200, headers=None): self.content = content self.status_code = status_code self.headers = headers or []
[文档] @classmethod def json(cls, data, status_code=200, headers=None): """ 返回 JSON 响应 """ import json content = json.dumps(data, ensure_ascii=False) headers = headers or [] # 确保设置正确的 Content-Type has_content_type = any(h[0].lower() == 'content-type' for h in headers) if not has_content_type: headers.insert(0, ("Content-Type", "application/json; charset=utf-8")) return cls(content, status_code, headers)
[文档] @classmethod def html(cls, content, status_code=200, headers=None): """ 返回 HTML 响应 """ headers = headers or [] # 确保设置正确的 Content-Type has_content_type = any(h[0].lower() == 'content-type' for h in headers) if not has_content_type: headers.insert(0, ("Content-Type", "text/html; charset=utf-8")) return cls(content, status_code, headers)
[文档] @classmethod def text(cls, content, status_code=200, headers=None): """ 返回纯文本响应 """ headers = headers or [] # 确保设置正确的 Content-Type has_content_type = any(h[0].lower() == 'content-type' for h in headers) if not has_content_type: headers.insert(0, ("Content-Type", "text/plain; charset=utf-8")) return cls(content, status_code, headers)
[文档] @classmethod def file(cls, file_path, status_code=200, headers=None): """ 返回文件响应 """ import os from mimetypes import guess_type if not os.path.exists(file_path): return cls("File not found", 404) # 猜测文件的 MIME 类型 mime_type, encoding = guess_type(file_path) mime_type = mime_type or "application/octet-stream" # 读取文件内容 with open(file_path, 'rb') as f: content = f.read() headers = headers or [] # 确保设置正确的 Content-Type has_content_type = any(h[0].lower() == 'content-type' for h in headers) if not has_content_type: headers.insert(0, ("Content-Type", mime_type)) # 添加 Content-Disposition 头,使浏览器下载文件 headers.append(("Content-Disposition", f"attachment; filename={os.path.basename(file_path)}")) headers.append(("Content-Length", str(len(content)))) return cls(content, status_code, headers)
[文档] @classmethod def redirect(cls, url, status_code=302, headers=None): """ 返回重定向响应 """ headers = headers or [] headers.insert(0, ("Location", url)) headers.insert(0, ("Content-Type", "text/html; charset=utf-8")) return cls(f"Redirecting to {url}", status_code, headers)
[文档] @classmethod def error(cls, status_code, message=None, headers=None): """ 返回错误响应 """ from http.client import responses status_text = responses.get(status_code, "Unknown Error") message = message or status_text content = DEFAULT_STATUS_MESSAGE % { "code": status_code, "message": message, "explain": status_text, } headers = headers or [] headers.insert(0, ("Content-Type", "text/html; charset=utf-8")) return cls(content, status_code, headers)
[文档] def is_bytes(s): return isinstance(s, bytes)
[文档] def imap(func, iterable): return map(func, iterable)
[文档] def parse_form(query_string): """ 解析表单数据,支持缓存 Args: query_string: 查询字符串 Returns: 解析后的表单数据字典 """ # 生成缓存键 cache_key = f"form:{query_string}" # 尝试从缓存获取 cached = _form_cache.get(cache_key) if cached is not None: return cached # 解析表单数据 form = {} # 处理 bytes 类型输入 if isinstance(query_string, bytes): query_string = query_string.decode('utf-8') query_string = unquote_plus(query_string) for s in query_string.split("&"): if not s: continue kv = s.split("=", 1) if 2 == len(kv): k, v = kv else: k, v = kv[0], "" k, v = unquote_plus(k), unquote_plus(v) if k.endswith("[]"): k = k[:-2] if k in form: result = form[k] if isinstance(result, dict): raise ValueError("invalid form data %s" % query_string) if isinstance(result, list): form[k].append(v) else: form[k] = [result, v] else: form[k] = [v] continue matched = form_dict_match(k) if matched is None: if k in form: result = form[k] if isinstance(result, dict): raise ValueError("invalid form data %s" % query_string) if isinstance(result, list): form[k].append(v) else: form[k] = [result, v] else: form[k] = v else: key, prefix = matched.groups() if key in form: result = form[key] if not isinstance(result, dict): raise ValueError("invalid form data %s" % query_string) if prefix in result: result[prefix] = [result[prefix], v] else: result[prefix] = v else: form[key] = {prefix: v} # 存入缓存 _form_cache.set(cache_key, form) return form
[文档] class BaseRequestHandler(object): """ 请求处理器基类,提供通用的请求处理功能 """ def __init__(self, app, environ): self._app = app self._environ = environ self._headers_responsed = False self._status_code = 200 self._get = {} self._post = {} self._body = "" self._files = {} self._session_id = None self._session = None self._template_lookup = None # 初始化 _headers 属性,用于存储响应头 self._headers = []
[文档] def render_template(self, template_name, **kwargs): """ 渲染模板 Args: template_name: 模板文件名 **kwargs: 模板变量 Returns: 渲染后的 HTML 字符串 """ import os # 延迟初始化模板查找器 if self._template_lookup is None: # 获取模板目录路径 template_dir = getattr(self._app.config, 'template_dir', 'templates') if not os.path.isabs(template_dir): # 如果是相对路径,相对于当前工作目录 template_dir = os.path.join(os.getcwd(), template_dir) # 创建模板查找器 self._template_lookup = TemplateLookup( directories=[template_dir], input_encoding='utf-8', output_encoding='utf-8', default_filters=['decode.utf8'] ) try: # 获取模板 template = self._template_lookup.get_template(template_name) # 渲染模板 content = template.render(**kwargs) # 直接返回渲染后的内容,不设置 Content-Type # Content-Type 会在 _response 方法中设置 return content except Exception as e: # 模板渲染失败,返回错误信息 error_content = f"<h1>Template Error</h1><p>Failed to render template '{template_name}': {str(e)}</p>" # 直接返回错误内容,不设置 Content-Type # Content-Type 会在 _response 方法中设置 return error_content
[文档] def start_response(self, status_code=200, headers=None): if self._headers_responsed: raise ValueError("Http headers already responsed.") self._status_code = int(status_code) if headers is not None: for header in headers: if not isinstance(header, (list, tuple)): header = header.encode("utf-8") k, v = header.split(":") k, v = k.strip(), v.strip() else: k, v = header self._add_header(k, v) self._headers_responsed = True
def _add_header(self, key, value): raise NotImplementedError("Subclasses must implement _add_header") def _response(self, status_code, headers=None, content=None): status_code = int(status_code) status_text = http_status_codes.get(status_code, "Unknown") status = "%d %s" % (status_code, status_text) # 创建响应头列表 response_headers = [] response_headers.append(("Server", server_info)) response_headers.append(("X-Content-Type-Options", "nosniff")) response_headers.append(("X-Frame-Options", "SAMEORIGIN")) response_headers.append(("X-XSS-Protection", "1; mode=block")) # 添加 self._headers 中的内容 if hasattr(self, '_headers'): response_headers.extend(self._headers) # 添加传入的 headers 参数 if headers: response_headers.extend(headers) # 保存 Session 数据到 Session 存储 app = self._app session_key = self._session_id or self.session.id app.sessions.put(session_key, self.session) # 设置 session cookie(每次都设置,确保 cookie 不会丢失) session_name = self._app.config.session_name session_secure = self._app.config.session_secure session_http_only = self._app.config.session_http_only session_same_site = self._app.config.session_same_site cookie_header = SimpleCookie() cookie_header[session_name] = session_key cookie_header[session_name]['path'] = "/" if session_secure: cookie_header[session_name]['secure'] = True if session_http_only: cookie_header[session_name]['httponly'] = True if session_same_site: cookie_header[session_name]['samesite'] = session_same_site response_headers.append(("Set-Cookie", str(cookie_header[session_name]))) # 检查是否已经有 Content-Type 响应头 has_content_type = any(h[0].lower() == 'content-type' for h in response_headers) # 如果没有 Content-Type 响应头,根据状态码和内容类型设置默认的 Content-Type if not has_content_type: if status_code >= 400: response_headers.append(("Content-Type", "text/html; charset=utf-8")) else: from collections.abc import Iterable if not isinstance( content, (str, bytes, dict, list, tuple, type(None)) ) and isinstance(content, Iterable): response_headers.append(("Content-Type", "text/plain; charset=utf-8")) else: response_headers.append(("Content-Type", default_content_type)) # 标记响应头已发送 self._headers_responsed = True # 如果 content 为 None,设置默认的错误信息 if content is None: content = DEFAULT_STATUS_MESSAGE % { "code": status_code, "message": status_text, "explain": status_text, } # 检查是否已经有 Content-Length 或 Transfer-Encoding 头 has_content_length = any(h[0].lower() == 'content-length' for h in response_headers) has_transfer_encoding = any(h[0].lower() == 'transfer-encoding' for h in response_headers) # 处理不同类型的 content from collections.abc import Iterable if not has_content_length and not has_transfer_encoding: if isinstance(content, (str, bytes)): # 对于字符串或字节,计算长度并添加 Content-Length 头 if isinstance(content, str): content_bytes = content.encode('utf-8') content_length = len(content_bytes) else: content_length = len(content) response_headers.append(("Content-Length", str(content_length))) elif not isinstance(content, (dict, list, tuple, type(None))) and isinstance(content, Iterable): # 对于生成器等可迭代对象,使用 Transfer-Encoding: chunked response_headers.append(("Transfer-Encoding", "chunked")) # 返回响应 return status, response_headers, content def _add_response_headers(self, headers): raise NotImplementedError("Subclasses must implement _add_response_headers") @property def session_id(self): return self._session_id @property def session(self): return self._session @property def get(self): return self._get @property def post(self): return self._post @property def body(self): return self._body @property def files(self): return self._files @property def form(self): """ 表单数据,与 post 属性相同 Returns: 表单数据字典 """ return self._post @property def app(self): """ 获取应用实例 Returns: 应用实例 """ return self._app
[文档] class WSGIRequestHandler(BaseRequestHandler): """ WSGI 请求处理器,用于在 gunicorn、uWSGI 等 WSGI 服务器中运行 符合 PEP 3333 规范,处理 WSGI environ 并返回标准响应 """ def __init__(self, app, environ): super(WSGIRequestHandler, self).__init__(app, environ) self._environ = self._normalize_environ(environ) self._headers = [] self._get = parse_form(self._environ.get("QUERY_STRING", "")) content_type = self._environ.get("CONTENT_TYPE", "") if content_type: content_type_raw = content_type content_type, params = parse_header(content_type) content_length_str = self._environ.get("CONTENT_LENGTH") or "0" content_length = int(content_length_str) if content_length_str.strip() else 0 if content_length > 0: max_request_size = getattr(app.config, "max_request_size", 10485760) if content_length > max_request_size: raise HttpError( 413, f"Request body too large. Maximum size is {max_request_size} bytes" ) if content_type_raw == "application/x-www-form-urlencoded": if content_length > 0: wsgi_input = self._environ.get("wsgi.input") if wsgi_input: post_content = wsgi_input.read(content_length) post_content = post_content.decode("utf-8") self._post = parse_form(post_content) elif content_type_raw.startswith("multipart/form-data"): boundary = params.get("boundary") if boundary: wsgi_input = self._environ.get("wsgi.input") if wsgi_input: content_length_str = self._environ.get("CONTENT_LENGTH") or "0" content_length = ( int(content_length_str) if content_length_str.strip() else 0 ) if content_length > 0: self._parse_multipart(wsgi_input, boundary, content_length) else: if content_length > 0: wsgi_input = self._environ.get("wsgi.input") if wsgi_input: self._body = wsgi_input.read(content_length) self._body = self._body.decode("utf-8") self._session_id, self._session = self._get_session() self._middlewares = app._get_middleware_instances() def _normalize_environ(self, environ): normalized = dict(environ) if "PATH_INFO" not in normalized: normalized["PATH_INFO"] = "/" if "REQUEST_METHOD" not in normalized: normalized["REQUEST_METHOD"] = "GET" if "QUERY_STRING" not in normalized: normalized["QUERY_STRING"] = "" if "CONTENT_LENGTH" not in normalized: normalized["CONTENT_LENGTH"] = "0" else: content_length = normalized.get("CONTENT_LENGTH", "0") if content_length == "" or content_length is None: normalized["CONTENT_LENGTH"] = "0" if "CONTENT_TYPE" not in normalized: normalized["CONTENT_TYPE"] = "" if "HTTP_HOST" not in normalized: host = normalized.get("SERVER_NAME", "localhost") port = normalized.get("SERVER_PORT", "80") if port != "80": normalized["HTTP_HOST"] = "%s:%s" % (host, port) else: normalized["HTTP_HOST"] = host return normalized def _parse_multipart(self, wsgi_input, boundary, content_length): app = self._app max_upload_size = getattr(app.config, "max_upload_size", 52428800) if content_length > max_upload_size: raise HttpError(413, f"Request body too large. Maximum size is {max_upload_size} bytes") boundary = boundary.encode("utf-8") begin_boundary = b"--" + boundary end_boundary = b"--" + boundary + b"--" posts = {} files = {} data = wsgi_input.read(content_length) parts = data.split(begin_boundary) for part in parts[1:]: if part.strip() == end_boundary: break header_end = part.find(b"\r\n\r\n") if header_end == -1: continue headers_part = part[:header_end] content = part[header_end + 4 :] headers = {} for line in headers_part.split(b"\r\n"): if b":" in line: k, v = line.split(b":", 1) k = k.strip().upper() v = v.strip() k = k.decode("utf-8") v = v.decode("utf-8") headers[k] = v disposition = headers.get("CONTENT-DISPOSITION", "") disposition, params = parse_header(disposition) name = params.get("name", "") filename = params.get("filename") if filename: fp = TemporaryFile(mode="w+b") fp.write(content) fp.seek(0) files[name] = fp else: content = content.decode("utf-8") posts[name] = content.strip() self._post = posts self._files = files def _get_session(self): app = self._app sessions = app.sessions session_name = app.config.session_name cookie_str = self._environ.get("HTTP_COOKIE", "") cookie = SimpleCookie(cookie_str) morsel = cookie.get(session_name) if morsel is not None: session_id = morsel.value session = sessions.get(session_id) if session is not None: # 设置存储后端实例,确保数据修改时自动保存 session.store = sessions return session_id, session session_id = self._new_session_id() # 创建 Session 对象时传入存储后端实例 session = Session(session_id, store=sessions) sessions.put(session_id, session) return None, session def _new_session_id(self): app = self._app sessions = app.sessions while True: token = urandom(32) session_id = sha256(token).hexdigest() session = sessions.get(session_id) if session is None: break return session_id @property def config(self): return self._app.config @property def files(self): return self._files or {} @property def body(self): return self._body @property def json(self): body = self._body if not body: return {} content_type = self._environ.get("CONTENT_TYPE", "") content_type, _ = parse_header(content_type) content_type = content_type.lower() if content_type not in ("application/json", "application/json-rpc"): return {} return json.loads(body) @property def environ(self): return self._environ @property def params(self): return self._get @property def data(self): return self._post @property def session_id(self): return self._session_id @property def session(self): return self._session @property def request_method(self): return self._environ.get("REQUEST_METHOD", "GET") method = request_method @property def server_protocol(self): return self._environ.get("SERVER_PROTOCOL", "HTTP/1.1") @property def content_type(self): return self._environ.get("CONTENT_TYPE") @property def charset(self, default="UTF-8"): content_type = self.content_type if content_type: _, params = parse_header(content_type) return params.get("charset", default) return default @property def content_length(self): return int(self._environ.get("CONTENT_LENGTH", 0) or 0) @property def path_info(self): return self._environ.get("PATH_INFO", "/") @property def query_string(self): return self._environ.get("QUERY_STRING", "") @property def request_uri(self): path_info = self.path_info query_string = self.query_string if not query_string: return path_info return "?".join((path_info, query_string)) @property def referer(self): return self._environ.get("HTTP_REFERER") @property def cookie(self): cookie_str = self._environ.get("HTTP_COOKIE", "") cookie = SimpleCookie() cookie.load(cookie_str) return cookie def _add_header(self, key, value): self._headers.append((key, value)) def _add_response_headers(self, headers): headers.extend(self._headers)
[文档] def handler(self): app = self._app middleware_result = app.middleware_manager.process_request(self) if middleware_result is not None: return middleware_result try: environ = self._environ path_info = environ.get("PATH_INFO", "/") request_method = environ.get("REQUEST_METHOD", "GET") # 尝试使用路由系统处理请求 route_match = app.router.match(path_info, request_method) if route_match: handler, params = route_match try: # 将路由参数添加到请求对象 setattr(self, 'route_params', params) result = handler(self, **params) # 处理 Response 对象 if isinstance(result, Response): # 保存 Session 数据到 Session 存储 app = self._app session_key = self._session_id or self.session.id app.sessions.put(session_key, self.session) # 设置 session cookie(每次都设置,确保 cookie 不会丢失) session_name = app.config.session_name session_secure = app.config.session_secure session_http_only = app.config.session_http_only session_same_site = app.config.session_same_site cookie_header = SimpleCookie() cookie_header[session_name] = session_key cookie_header[session_name]['path'] = "/" if session_secure: cookie_header[session_name]['secure'] = True if session_http_only: cookie_header[session_name]['httponly'] = True if session_same_site: cookie_header[session_name]['samesite'] = session_same_site result.headers.append(("Set-Cookie", str(cookie_header[session_name]))) status_code = result.status_code status_text = http_status_codes.get(status_code, "Unknown") status = "%d %s" % (status_code, status_text) return app.middleware_manager.process_response( self, (status, result.headers, result.content) ) if self._headers_responsed: # 保存 Session 数据到 Session 存储 app = self._app session_key = self._session_id or self.session.id app.sessions.put(session_key, self.session) # 设置 session cookie(每次都设置,确保 cookie 不会丢失) session_name = app.config.session_name session_secure = app.config.session_secure session_http_only = app.config.session_http_only session_same_site = app.config.session_same_site self.set_cookie(session_name, session_key, path="/", secure=session_secure, httponly=session_http_only, samesite=session_same_site) status_code = int(self._status_code) status_text = http_status_codes.get(status_code, "Unknown") status = "%d %s" % (status_code, status_text) response_headers = [("Server", server_info)] response_headers.extend(self._headers) return app.middleware_manager.process_response( self, (status, response_headers, result) ) return app.middleware_manager.process_response( self, self._response(200, content=result) ) except Exception: log_error(app.logger) if app.config.debug: content = render_error() return app.middleware_manager.process_response( self, self._response(500, content=content) ) return app.middleware_manager.process_response(self, self._response(500)) # 路由未匹配,返回 404 return app.middleware_manager.process_response(self, self._response(404)) except Exception as e: middleware_result = app.middleware_manager.process_exception(self, e) if middleware_result is not None: return middleware_result raise
def _redirect(self, url): url = "/" if url is None else url headers = [("Content-Type", "text/html; charset=utf-8"), ("Location", url)] status_code = 302 status_text = http_status_codes.get(status_code, "Found") content = "%d %s" % (status_code, status_text) return self._response(status_code, headers=headers, content=content)
[文档] class RequestHandler(BaseRequestHandler): default_headers = { "Content-Type": default_content_type, "Server": "litefs/0.4.0", "X-Content-Type-Options": "nosniff", "X-Frame-Options": "SAMEORIGIN", "X-XSS-Protection": "1; mode=block" } def __init__(self, app, rw, environ, request): super(RequestHandler, self).__init__(app, environ) self._request = request self._rw = rw self._buffers = StringIO() self._response_headers = {} self._headers = [] # 显式初始化 _headers 属性,确保它存在 self._cookies = None self._get = parse_form(environ["QUERY_STRING"]) content_type = environ.get("CONTENT_TYPE", "") self.content_type_raw = content_type_raw = content_type content_type, content_type_params = parse_header(content_type) self._post = {} self._body = "" self._files = {} # 读取请求体并设置到 POST_CONTENT if content_type: content_length_value = environ.get("CONTENT_LENGTH") or 0 content_length = int(content_length_value) if content_length_value else 0 if content_length > 0: max_request_size = getattr(app.config, "max_request_size", 10485760) if content_length > max_request_size: raise HttpError( 413, f"Request body too large. Maximum size is {max_request_size} bytes" ) if content_length: if content_type_raw == "application/x-www-form-urlencoded": post_content = rw.read(int(content_length)) self._post = parse_form(post_content) self._body = post_content elif content_type_raw.startswith("multipart/form-data"): boundary = content_type_params.get("boundary", None) self._post, self._files = self._parse_multipart(rw, boundary) else: post_content = rw.read(int(content_length)) environ["POST_CONTENT"] \ = unquote_plus(post_content.decode("utf-8")) self._body = post_content self._session_id, self._session = self._get_session(environ) self._middlewares = app._get_middleware_instances() def _parse_multipart(self, rw, boundary): boundary = boundary.encode("utf-8") begin_boundary = b"--%s" % boundary end_boundary = b"--%s--" % boundary posts = {} files = {} s = rw.readline(DEFAULT_BUFFER_SIZE).strip() while True: if s.strip() != begin_boundary: assert s.strip() == end_boundary break headers = {} s = rw.readline(DEFAULT_BUFFER_SIZE).strip() while s: s = s.decode("utf-8") k, v = s.split(":", 1) headers[k.strip().upper()] = v.strip() s = rw.readline(DEFAULT_BUFFER_SIZE).strip() disposition = headers["CONTENT-DISPOSITION"] disposition, params = parse_header(disposition) name = params["name"] filename = params.get("filename") if filename: fp = TemporaryFile(mode="w+b") s = rw.readline(DEFAULT_BUFFER_SIZE) while s.strip() != begin_boundary and s.strip() != end_boundary: fp.write(s) s = rw.readline(DEFAULT_BUFFER_SIZE) fp.seek(0) files[name] = fp else: fp = StringIO() s = rw.readline(DEFAULT_BUFFER_SIZE) while s.strip() != begin_boundary and s.strip() != end_boundary: fp.write(s) s = rw.readline(DEFAULT_BUFFER_SIZE) fp.seek(0) posts[name] = fp.getvalue().strip().decode("utf-8") return posts, files def _get_session(self, environ): app = self._app sessions = app.sessions session_name = app.config.session_name cookie = environ.get("HTTP_COOKIE") cookie = SimpleCookie(cookie) morsel = cookie.get(session_name) if morsel is not None: session_id = morsel.value session = sessions.get(session_id) if session is not None: # 设置存储后端实例,确保数据修改时自动保存 session.store = sessions return session_id, session session_id = self._new_session_id() # 创建 Session 对象时传入存储后端实例 session = Session(session_id, store=sessions) sessions.put(session_id, session) return None, session def _new_session_id(self): app = self._app sessions = app.sessions while True: token = urandom(32) session_id = sha256(token).hexdigest() session = sessions.get(session_id) if session is None: break return session_id @property def config(self): return self._app.config @property def files(self): return self._files or {} @property def body(self): return self._body @property def json(self): body = self._body if not self._body: return {} content_type = self.content_type content_type, _ = parse_header(content_type) content_type = content_type.lower() if content_type not in ("application/json", "application/json-rpc"): return {} return json.loads(body) @property def environ(self): return self._environ @property def params(self): return self._get @property def data(self): return self._post @property def session_id(self): return self._session_id @property def session(self): return self._session @property def request_method(self): return self.environ["REQUEST_METHOD"] method = request_method @property def server_protocol(self): return self.environ["SERVER_PROTOCOL"] @property def content_type(self): return self.environ.get("CONTENT_TYPE") @property def charset(self, default="UTF-8"): _, params = parse_header(self.content_type) return params.get("charset", default) @property def content_length(self): return int(self.environ.get("CONTENT_LENGTH") or -1) @property def path_info(self): return self.environ["PATH_INFO"] @property def query_string(self): return self.environ["QUERY_STRING"] @property def request_uri(self): environ = self.environ path_info = environ["PATH_INFO"] query_string = environ["QUERY_STRING"] if not query_string: return path_info return "?".join((path_info, query_string)) @property def referer(self): return self.environ.get("HTTP_REFERER") @property def cookie(self): cookie_str = self.environ.get("HTTP_COOKIE", "") cookie = SimpleCookie() cookie.load(cookie_str) return cookie def _add_header(self, key, value): # 添加到 _response_headers 字典中 response_headers = self._response_headers response_headers[key] = value def _add_response_headers(self, headers): for header, value in headers: self._response_headers[header] = value
[文档] def redirect(self, url=None): if self._headers_responsed: raise ValueError("Http headers already responsed.") url = "/" if url is None else url response_headers = self._response_headers response_headers["Content-Type"] = "text/html;charset=utf-8" host = self._environ.get("HTTP_HOST") if not host: server_name = self._environ.get("SERVER_NAME", "localhost") server_port = self._environ.get("SERVER_PORT", "80") if server_port not in ("80", "443"): host = "%s:%s" % (server_name, server_port) else: host = server_name scheme = ( "https" if self._environ.get("HTTPS") == "on" or self._environ.get("SERVER_PORT") == "443" else "http" ) response_headers["Location"] = "%s://%s%s" % (scheme, host, url) status_code = 302 status_text = http_status_codes[status_code] content = "%d %s" % (status_code, status_text) headers = response_headers.items() self.start_response(status_code, headers=headers) return content
def _cast(self, s=None): response_headers = self._response_headers if not s: if "Content-Length" not in response_headers: response_headers["Content-Length"] = 0 return [] # 对于 3xx 重定向响应,设置 Content-Type 为 text/html; charset=utf-8 if 300 <= self._status_code < 400: response_headers["Content-Type"] = "text/html; charset=utf-8" # 直接返回空字符串,不需要转换为 JSON 格式 return [b""] content_type = response_headers.get("Content-Type", "") is_json_response = "application/json" in content_type if isinstance(s, (tuple, list)): if is_json_response: import json s = [json.dumps(item, ensure_ascii=False) for item in s] s = "[" + ",".join(s) + "]" else: if len(s) > 0 and (isinstance(s[0], str) or is_bytes(s[0])): first_type = type(s[0]) if isinstance(first_type, type) and issubclass(first_type, str): s = [item if isinstance(item, str) else str(item) for item in s] join_chr = s[0][:0] s = join_chr.join(s) else: s = [item if is_bytes(item) else str(item).encode("utf-8") for item in s] join_chr = s[0][:0] s = join_chr.join(s) else: s = [str(item) for item in s] s = "".join(s) elif not (isinstance(s, str) or is_bytes(s) or isinstance(s, (tuple, list))): if is_json_response: import json try: s = json.dumps(s, ensure_ascii=False) except (TypeError, ValueError) as e: raise TypeError("Response data cannot be JSON serialized: %s" % e) else: s = str(s) if isinstance(s, str): s = s.encode("utf-8") if is_bytes(s): if "Content-Length" not in response_headers: response_headers["Content-Length"] = len(s) return [s] try: iter_s = iter(s) first = next(iter_s) while not first: first = next(iter_s) except StopIteration: return self._cast() if is_bytes(first): new_iter_s = itertools.chain([first], iter_s) elif isinstance(first, str): encoder = lambda item: str(item).encode("utf-8") new_iter_s = itertools.chain([first], iter_s) new_iter_s = imap(encoder, new_iter_s) else: raise TypeError("response type is not allowd: %s" % type(first)) return new_iter_s
[文档] def handle_response(self, result): """ 处理响应结果,支持 Response 对象 """ if isinstance(result, Response): # 设置状态码 self._status_code = result.status_code # 添加响应头 for header, value in result.headers: self._add_header(header, value) # 返回响应内容 return result.content return result
[文档] def finish(self, content): try: rw = self._rw status_code = self._status_code status_text = http_status_codes[status_code] line = "HTTP/1.1 %d %s\r\n" % (status_code, status_text) line = line.encode("utf-8") rw.write(line) # 获取当前头部 headers = self._response_headers.copy() # 添加标准头部(如果不存在) standard_headers = { "Server": "litefs/0.4.0", "X-Content-Type-Options": "nosniff", "X-Frame-Options": "SAMEORIGIN", "X-XSS-Protection": "1; mode=block" } for header, value in standard_headers.items(): if header not in headers: headers[header] = value # 如果没有 Content-Type 头部,添加默认值 if "Content-Type" not in headers: # 对于 3xx 重定向响应,设置 Content-Type 为 text/html; charset=utf-8 if 300 <= status_code < 400: headers["Content-Type"] = "text/html; charset=utf-8" else: # 对于其他响应,检查是否是 HTML 内容 if isinstance(content, str) and content.startswith('<'): headers["Content-Type"] = "text/html; charset=utf-8" else: headers["Content-Type"] = default_content_type # 检查是否已经有 Content-Length 或 Transfer-Encoding 头 has_content_length = "Content-Length" in headers has_transfer_encoding = "Transfer-Encoding" in headers # 处理不同类型的 content from collections.abc import Iterable if not has_content_length and not has_transfer_encoding: if isinstance(content, (str, bytes)): # 对于字符串或字节,计算长度并添加 Content-Length 头 if isinstance(content, str): content_bytes = content.encode('utf-8') content_length = len(content_bytes) else: content_length = len(content) headers["Content-Length"] = str(content_length) elif not isinstance(content, (dict, list, tuple, type(None))) and isinstance(content, Iterable): # 对于生成器等可迭代对象,使用 Transfer-Encoding: chunked headers["Transfer-Encoding"] = "chunked" # 写入头部 for header, value in headers.items(): line = "%s: %s\r\n" % (header, value) line = line.encode("utf-8") rw.write(line) if self._cookies: for c in self._cookies.values(): # 直接使用 cookie 的输出,它已经包含了完整的 Set-Cookie 头部 line = str(c) + "\r\n" line = line.encode("utf-8") rw.write(line) rw.write("\r\n".encode("utf-8")) # 对于 3xx 重定向响应,直接返回空字符串,不需要转换为 JSON 格式 if 300 <= status_code < 400: rw.write(b"") else: for _ in self._cast(content): rw.write(_) rw.close() except Exception: if not self._headers_responsed: try: from ..utils import log_error, render_error log_error(self._app.logger) rw = self._rw status_code = 500 status_text = http_status_codes[status_code] line = "HTTP/1.1 %d %s\r\n" % (status_code, status_text) line = line.encode("utf-8") rw.write(line) rw.write("Content-Type: text/html; charset=utf-8\r\n".encode("utf-8")) rw.write("Server: litefs/0.4.0\r\n".encode("utf-8")) rw.write("X-Content-Type-Options: nosniff\r\n".encode("utf-8")) rw.write("X-Frame-Options: SAMEORIGIN\r\n".encode("utf-8")) rw.write("X-XSS-Protection: 1; mode=block\r\n".encode("utf-8")) rw.write("\r\n".encode("utf-8")) if self._app.config.debug: error_content = render_error() if isinstance(error_content, str): error_content = error_content.encode("utf-8") rw.write(error_content) else: rw.write(b"500 Internal Server Error") rw.close() except Exception: pass else: try: from ..utils import log_error log_error(self._app.logger) except Exception: pass raise
def __del__(self): files = self._environ.get(FILES_HEADER_NAME) if not files: return for fp in files.values(): fp.close() def _response(self, status_code, headers=None, content=None): if self._headers_responsed: raise ValueError("Http headers already responsed.") status_code = int(status_code) status_text = http_status_codes[status_code] if headers is None: headers = [] # 直接设置 Content-Type 为 text/html; charset=utf-8 # 这样可以确保无论 _response_headers 字典中是否有 Content-Type 响应头,Content-Type 都会被正确设置 headers.insert(0, ("Content-Type", "text/html; charset=utf-8")) # 添加 self._response_headers 中的响应头 existing_header_names = [h[0].lower() for h in headers] for header, value in self._response_headers.items(): if header.lower() not in existing_header_names: headers.append((header, value)) # 添加标准头部 standard_headers = [ ("Server", "litefs/0.4.0"), ("X-Content-Type-Options", "nosniff"), ("X-Frame-Options", "SAMEORIGIN"), ("X-XSS-Protection", "1; mode=block") ] # 检查并添加标准头部,避免重复 existing_header_names = [h[0].lower() for h in headers] for header in standard_headers: if header[0].lower() not in existing_header_names: headers.append(header) # 检查是否已经有 Content-Type 响应头 has_content_type = any(h[0].lower() == 'content-type' for h in headers) if status_code >= 400: if not has_content_type: html_headers = [("Content-Type", "text/html; charset=utf-8")] headers = html_headers + list(headers) elif not has_content_type: from collections.abc import Iterable if not isinstance(content, (str, bytes, dict, list, tuple, type(None))) and isinstance( content, Iterable ): headers = [("Content-Type", "text/plain; charset=utf-8")] + list(headers) else: headers = [("Content-Type", default_content_type)] + list(headers) # 设置 session cookie if self.session_id is None: session_name = self._app.config.session_name session_secure = self._app.config.session_secure session_http_only = self._app.config.session_http_only session_same_site = self._app.config.session_same_site self.set_cookie(session_name, self.session.id, path="/", secure=session_secure, httponly=session_http_only, samesite=session_same_site) # 保存 Session 数据到 Session 存储 app = self._app app.sessions.put(self.session.id, self.session) # 检查是否已经有 Content-Length 或 Transfer-Encoding 头 has_content_length = any(h[0].lower() == 'content-length' for h in headers) has_transfer_encoding = any(h[0].lower() == 'transfer-encoding' for h in headers) # 处理不同类型的 content from collections.abc import Iterable if not has_content_length and not has_transfer_encoding: if isinstance(content, (str, bytes)): # 对于字符串或字节,计算长度并添加 Content-Length 头 if isinstance(content, str): content_bytes = content.encode('utf-8') content_length = len(content_bytes) else: content_length = len(content) headers.append(("Content-Length", str(content_length))) elif not isinstance(content, (dict, list, tuple, type(None))) and isinstance(content, Iterable): # 对于生成器等可迭代对象,使用 Transfer-Encoding: chunked headers.append(("Transfer-Encoding", "chunked")) self.start_response(status_code, headers=headers) if content is None: content = DEFAULT_STATUS_MESSAGE % { "code": status_code, "message": status_text, "explain": status_text, } return content
[文档] def handler(self): app = self._app middleware_result = app.middleware_manager.process_request(self) if middleware_result is not None: return middleware_result try: environ = self.environ path_info = environ["PATH_INFO"] request_method = environ["REQUEST_METHOD"] # 尝试使用路由系统处理请求 route_match = app.router.match(path_info, request_method) if route_match: handler, params = route_match try: # 将路由参数添加到请求对象 setattr(self, 'route_params', params) result = handler(self, **params) # 处理 Response 对象 content = self.handle_response(result) if self._headers_responsed: return app.middleware_manager.process_response(self, content) return app.middleware_manager.process_response( self, self._response(200, content=content) ) except Exception: log_error(app.logger) if app.config.debug: content = render_error() return app.middleware_manager.process_response( self, self._response(500, content=content) ) return app.middleware_manager.process_response(self, self._response(500)) # 路由未匹配,返回 404 return app.middleware_manager.process_response(self, self._response(404)) except Exception as e: middleware_result = app.middleware_manager.process_exception(self, e) if middleware_result is not None: return middleware_result raise