#!/usr/bin/env python
# coding: utf-8
import os
import sqlite3
import time
from collections import OrderedDict, deque
from gzip import GzipFile
from hashlib import sha1
from io import BytesIO
from mimetypes import guess_type
from os import stat
from posixpath import splitext as path_splitext
from weakref import proxy
from zlib import compress
from watchdog.events import FileSystemEventHandler
from ..utils import gmt_date, log_info
suffixes = (".py", ".pyc", ".pyo", ".so")
[文档]
class FileEventHandler(FileSystemEventHandler):
def __init__(self, app):
FileSystemEventHandler.__init__(self)
self._app = proxy(app)
self._monitored_files = set() # 记录已监控的文件
self._last_reload_time = 0 # 上次重载时间
self._reload_cooldown = 1.0 # 重载冷却时间(秒)
def _should_reload(self, path):
"""
判断文件变化是否需要重新加载应用
Args:
path: 文件路径
Returns:
True 表示需要重新加载,False 表示不需要
"""
# 排除不需要触发重载的文件
# 日志文件
if path.endswith('.log'):
return False
# Python 编译文件
if path.endswith(('.pyc', '.pyo', '.so')):
return False
# 临时文件和备份文件(包括 ~, .bak, .tmp, .swp 等)
if path.endswith(('~', '.bak', '.tmp', '.swp', '.orig')):
return False
# 检查文件名是否包含波浪号(某些编辑器创建的备份文件)
if '~' in os.path.basename(path):
return False
# __pycache__ 目录
if '/__pycache__/' in path or path.startswith('__pycache__/'):
return False
# .git 目录
if '/.git/' in path or path.startswith('.git/'):
return False
# .svn 目录
if '/.svn/' in path or path.startswith('.svn/'):
return False
# .hg 目录
if '/.hg/' in path or path.startswith('.hg/'):
return False
# node_modules 目录
if '/node_modules/' in path:
return False
# Python 源文件变化需要重新加载
if path.endswith('.py'):
return True
# 配置文件变化需要重新加载
if path.endswith(('.yaml', '.yml', '.json', '.ini', '.conf')):
return True
return False
def _can_reload_now(self):
"""
检查现在是否可以触发重载(防止频繁重载)
Returns:
True 表示可以触发重载,False 表示需要等待
"""
import time
current_time = time.time()
if current_time - self._last_reload_time >= self._reload_cooldown:
self._last_reload_time = current_time
return True
return False
[文档]
def add_monitored_file(self, path):
"""
添加需要监控的文件
Args:
path: 文件路径
"""
self._monitored_files.add(path)
def _reload_app(self, event_type, path):
"""
重新加载应用
Args:
event_type: 事件类型
path: 文件路径
"""
# 检查是否可以触发重载(防止频繁重载)
if not self._can_reload_now():
return
log_info(self._app.logger, "File %s: %s, reloading application..." % (event_type, path))
# 标记需要重新加载
self._app._need_reload = True
# 清除所有缓存
self._app.caches.data.clear()
# 关闭服务器,触发重新加载
if hasattr(self._app, 'server') and self._app.server:
try:
self._app.server.server_close()
except Exception:
pass
[文档]
def on_moved(self, event):
src_path = event.src_path
dest_path = event.dest_path
# 对于移动事件,需要特殊处理
# 如果是 .py 文件移动到备份文件(如 quickstart.py -> quickstart.py~),不触发重载
# 这是编辑器保存文件时的常见行为
src_is_backup = src_path.endswith('~') or '~' in os.path.basename(src_path)
dest_is_backup = dest_path.endswith('~') or '~' in os.path.basename(dest_path)
# 如果源文件或目标文件是备份文件,不触发重载
if src_is_backup or dest_is_backup:
return
# 对于不需要触发重载的文件,不记录日志,避免循环触发
if not self._should_reload(src_path) and not self._should_reload(dest_path):
return
log_info(self._app.logger, "%s has been moved to %s" % (src_path, dest_path))
# 判断是否需要重新加载应用
if self._should_reload(src_path) or self._should_reload(dest_path):
self._reload_app('moved', dest_path)
else:
# 只清空缓存
self._app.caches.data.clear()
[文档]
def on_created(self, event):
src_path = event.src_path
# 对于不需要触发重载的文件,不记录日志,避免循环触发
if not self._should_reload(src_path):
return
log_info(self._app.logger, "%s has been created" % src_path)
# 判断是否需要重新加载应用
if self._should_reload(src_path):
self._reload_app('created', src_path)
else:
# 只清空缓存
self._app.caches.data.clear()
[文档]
def on_modified(self, event):
src_path = event.src_path
# 对于不需要触发重载的文件,不记录日志,避免循环触发
if not self._should_reload(src_path):
return
log_info(self._app.logger, "%s has been modified" % src_path)
# 判断是否需要重新加载应用
if self._should_reload(src_path):
self._reload_app('modified', src_path)
else:
# 只清空缓存
self._app.caches.data.clear()
[文档]
def on_deleted(self, event):
src_path = event.src_path
# 对于不需要触发重载的文件,不记录日志,避免循环触发
if not self._should_reload(src_path):
return
log_info(self._app.logger, "%s has been deleted" % src_path)
# 判断是否需要重新加载应用
if self._should_reload(src_path):
self._reload_app('deleted', src_path)
else:
# 只清空缓存
self._app.caches.data.clear()
[文档]
class LiteFile(object):
def __init__(self, path, base, name, text, status_code=200):
self.status_code = int(status_code)
self.path = path
self.text = text
self.etag = etag = sha1(text).hexdigest()
self.zlib_text = zlib_text = compress(text, 9)[2:-4]
self.zlib_etag = sha1(zlib_text).hexdigest()
stream = BytesIO()
with GzipFile(fileobj=stream, mode="wb") as f:
f.write(text)
self.gzip_text = gzip_text = stream.getvalue()
self.gzip_etag = sha1(gzip_text).hexdigest()
self.last_modified = gmt_date(stat(path).st_mtime)
mimetype, coding = guess_type(name)
headers = [("Content-Type", "text/html;charset=utf-8")]
if mimetype is not None:
headers = [("Content-Type", "%s;charset=utf-8" % mimetype)]
headers.append(("Last-Modified", self.last_modified))
self.headers = headers
[文档]
def handler(self, request):
environ = request.environ
if_modified_since = environ.get("HTTP_IF_MODIFIED_SINCE")
if if_modified_since == self.last_modified:
return request._response(304)
if_none_match = environ.get("HTTP_IF_NONE_MATCH")
accept_encodings = environ.get("HTTP_ACCEPT_ENCODING", "").split(",")
accept_encodings = [s.strip().lower() for s in accept_encodings]
headers = list(self.headers)
if "gzip" in accept_encodings:
if if_none_match == self.gzip_etag:
return request._response(304)
headers.append(("Etag", self.gzip_etag))
headers.append(("Content-Encoding", "gzip"))
text = self.gzip_text
elif "deflate" in accept_encodings:
if if_none_match == self.zlib_etag:
return request._response(304)
headers.append(("Etag", self.zlib_etag))
headers.append(("Content-Encoding", "deflate"))
text = self.zlib_text
else:
if if_none_match == self.etag:
return request._response(304)
headers.append(("Etag", self.etag))
text = self.text
headers.append(("Content-Length", "%d" % len(text)))
return request._response(self.status_code, headers=headers, content=text)
[文档]
class TreeCache(object):
def __init__(self, clean_period=60, expiration_time=3600):
self.data = {}
self.clean_time = time.time()
self.clean_period = clean_period
self.expiration_time = expiration_time
self.conn = sqlite3.connect(":memory:", check_same_thread=False)
self.conn.text_factory = str
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS cache (
key VARCHAR PRIMARY KEY,
timestamp INTEGER
);
CREATE INDEX idx_cache ON cache (key);
""")
def __len__(self):
return len(self.data)
[文档]
def put(self, key, val):
current_time = time.time()
# 检查是否需要清理
if current_time - self.clean_time >= self.clean_period:
self.auto_clean()
timestamp = int(current_time)
if key not in self.data:
self.conn.execute(
"""
INSERT INTO cache (key, timestamp) VALUES (?, ?);
""",
(key, timestamp),
)
else:
self.conn.execute(
"""
UPDATE cache SET timestamp=? WHERE key=?;
""",
(timestamp, key),
)
self.data[key] = [val, timestamp]
[文档]
def get(self, key):
current_time = time.time()
# 检查是否需要清理
if current_time - self.clean_time >= self.clean_period:
self.auto_clean()
ret = self.data.get(key)
if ret is None:
return None
val, timestamp = ret
if int(current_time - timestamp) > self.expiration_time:
del self.data[key]
return None
return val
[文档]
def delete(self, key):
current_time = time.time()
# 检查是否需要清理
if current_time - self.clean_time >= self.clean_period:
self.auto_clean()
curr = self.conn.execute(
"""\
SELECT key FROM cache WHERE key=? OR key LIKE ?;
""",
(key, key + "/%"),
)
keys = curr.fetchall()
self.conn.executemany(
"""\
DELETE FROM cache WHERE key=?;
""",
keys,
)
for item in keys:
item_key = item[0]
if item_key in self.data:
del self.data[item_key]
[文档]
def auto_clean(self):
"""优化的清理机制
避免每次清理都查询数据库,只在清理周期到达时执行清理
"""
current_time = time.time()
# 检查是否达到清理周期,避免频繁清理
if current_time - self.clean_time < self.clean_period:
return
# 使用批量删除,避免先查询再删除
self.conn.execute(
"DELETE FROM cache WHERE timestamp < ?;",
(int(current_time - self.expiration_time),)
)
# 清理内存缓存,使用列表推导式提高效率
expired_keys = [
k for k, v in self.data.items()
if current_time - v[1] > self.expiration_time
]
for key in expired_keys:
del self.data[key]
# 更新清理时间
self.clean_time = current_time
[文档]
class MemoryCache(object):
def __init__(self, max_size=10000):
self._max_size = int(max_size)
self._cache = OrderedDict()
def __str__(self):
return str(self._cache)
def __len__(self):
return len(self._cache)
[文档]
def put(self, key, val):
if key in self._cache:
del self._cache[key]
elif len(self._cache) >= self._max_size:
self._cache.popitem(last=False)
self._cache[key] = val
[文档]
def get(self, key):
val = self._cache.get(key)
if val is None:
return None
self._cache.move_to_end(key)
return val
[文档]
def delete(self, key):
if key not in self._cache:
return
del self._cache[key]