python logging 源码阅读
python logging 模块实现了灵活的日志系统。整个模块仅仅3个类,不到5000行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:
- logging 简介
- logging API 设计
- 记录器对象 Logger
- 日志记录对象 LogRecord
- 处理器对象 Hander
- 格式器对象 Formatter
- 滚动日志文件处理器
- 小结
- 小技巧
logging 简介
本次代码使用的是 python 3.8.5
的版本,官方中文文档 3.8.8
。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。
类 |
功能 |
logging-module |
logging的API |
Logger |
日志记录器对象类,可以创建一个对象用来记录日志 |
LogRecord |
日志记录对象,每条日志记录都封装成一个日志记录对象 |
Hander |
处理器对象,负责日志输出到流/文件的控制 |
Formatter |
格式器,负责日志记录的格式化 |
RotatingFileHandler |
按大小滚动的日志文件记录器 |
TimedRotatingFileHandler |
按时间滚动的日志文件处理器 |
我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。
logging API 设计
先看看日志使用:
1
2
3
4
5
6
7
8
9
10
|
import logging
logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s')
lang = {"name": "python", "age":20}
logging.info('This is a info message %s', lang)
logging.debug('This is a debug message')
logging.warning('This is a warning message')
logger = logging.getLogger(__name__)
logger.warning('This is a warning')
|
输出内容如下:
1
2
3
|
INFO root 2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20}
WARNING root 2021-03-04 00:03:53,473 This is a warning message
WARNING __main__ 2021-03-04 00:03:53,473 This is a warning
|
可以看到 logging 的使用非常方便,模块直接提供了一组API。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
root = RootLogger(WARNING) # 默认提供的logger
Logger.root = root
Logger.manager = Manager(Logger.root)
def debug(msg, *args, **kwargs): # info,warning等api类似
if len(root.handlers) == 0:
basicConfig() # 默认配置
root.debug(msg, *args, **kwargs)
def getLogger(name=None):
if name:
return Logger.manager.getLogger(name) # 创建特定的logger
else:
return root # 返回默认的logger
|
这种API的提供方式,我们在requests中也有看到。api中很重要的设置config的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def basicConfig(**kwargs):
...
if handlers is None:
filename = kwargs.pop("filename", None)
mode = kwargs.pop("filemode", 'a')
if filename:
h = FileHandler(filename, mode)
else:
stream = kwargs.pop("stream", None)
h = StreamHandler(stream) # 默认的handler
handlers = [h]
dfs = kwargs.pop("datefmt", None)
style = kwargs.pop("style", '%')
fs = kwargs.pop("format", _STYLES[style][1])
fmt = Formatter(fs, dfs, style) # 生成formatter
for h in handlers:
if h.formatter is None:
h.setFormatter(fmt)
root.addHandler(h) # 设置root的handler
level = kwargs.pop("level", None)
if level is not None:
root.setLevel(level) # 设置日志级别
|
可以看到,日志的配置主要包括下面几项:
- level 日志级别
- format 信息格式化模版
- filename 输出到文件
- datefmt
%Y-%m-%d %H:%M:%S,uuu
时间的格式模版
- style [
%
,{
,$] 格式样板
演示代码输出中,可以看到debug日志没有显示,是因为 debug < info
:
1
2
3
4
5
6
7
8
|
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
|
记录器对象 Logger
查看Logger之前,先看logger对象的管理类Manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
_loggerClass = Logger
class Manager(object):
def __init__(self, rootnode):
self.root = rootnode
self.disable = 0
self.loggerDict = {} # 所有日志记录对象的字典
...
def getLogger(self, name):
rv = None
if name in self.loggerDict:
rv = self.loggerDict[name] # 获取已经创建过的同名logger
...
else:
rv = (self.loggerClass or _loggerClass)(name) # 创建新的logger
rv.manager = self
self.loggerDict[name] = rv
...
return rv
|
日志过滤器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class Filterer(object):
def __init__(self):
self.filters = []
def addFilter(self, filter):
self.filters.append(filter)
def removeFilter(self, filter):
self.filters.remove(filter)
def filter(self, record):
rv = True
for f in self.filters: # 过滤日志
if hasattr(f, 'filter'):
result = f.filter(record)
else:
result = f(record) # assume callable - will raise if not
if not result:
rv = False
break
return r
|
核心的 Logger
实际上只是一个控制中心:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class Logger(Filterer): # logger可以过滤日志
def __init__(self, name, level=NOTSET):
Filterer.__init__(self)
self.name = name
self.level = _checkLevel(level)
self.parent = None # 日志可以有层级
self.propagate = True
self.handlers = [] # 可以输出到多个handler
self.disabled = False # 可以关闭
self._cache = {}
def debug(self, msg, *args, **kwargs): # 输出debug日志
if self.isEnabledFor(DEBUG):
self._log(DEBUG, msg, args, **kwargs)
|
logger可以判断日志级别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
def isEnabledFor(self, level):
if self.disabled:
return False
try:
return self._cache[level]
except KeyError:
try:
if self.manager.disable >= level:
is_enabled = self._cache[level] = False
else:
is_enabled = self._cache[level] = (
level >= self.getEffectiveLevel()
)
return is_enabled
def getEffectiveLevel(self):
logger = self
while logger:
if logger.level:
return logger.level
logger = logger.parent
return NOTSET
|
日志输出:
1
2
3
4
5
6
7
8
9
10
|
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,
stacklevel=1):
...
fn, lno, func = "(unknown file)", 0, "(unknown function)"
...
# 生成日志记录
record = self.makeRecord(self.name, level, fn, lno, msg, args,
exc_info, func, extra, sinfo)
# 使用handler处理日志
self.handle(record)
|
日志记录的生产,就是创建一个LogRecord对象:
1
2
3
4
5
6
7
8
9
|
_logRecordFactory = LogRecord
def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
func=None, extra=None, sinfo=None):
...
rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
sinfo)
...
return rv
|
使用logger对象的所有handler处理日志:
1
2
3
4
5
6
7
8
|
def handle(self, record):
c = self
found = 0
while c:
for hdlr in c.handlers: # 使用所有的handler处理日志
found = found + 1
if record.levelno >= hdlr.level:
hdlr.handle(record)
|
root-logger 的handler是在config中配置的:
1
2
3
|
def basicConfig(**kwargs):
...
root.addHandler(h) # 设置root的handler
|
日志记录对象 LogRecord
日志记录对象非常简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class LogRecord(object):
def __init__(self, name, level, pathname, lineno,
msg, args, exc_info, func=None, sinfo=None, **kwargs):
ct = time.time()
self.name = name # logger名称
self.msg = msg # 日志标识信息
...
self.args = args # 变量
self.levelname = getLevelName(level)
...
def getMessage(self):
msg = str(self.msg)
if self.args:
msg = msg % self.args # 格式化消息
return msg
|
处理器对象 Hander
顶级Handler定义了Handler的模版方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
class Handler(Filterer): # 处理器也可以过滤日志
def __init__(self, level=NOTSET):
Filterer.__init__(self)
self._name = None
self.level = _checkLevel(level) # handler也有日志级别
self.formatter = None
_addHandlerRef(self)
self.createLock()
def handle(self, record): # 处理日志
rv = self.filter(record) # 过滤日志
if rv:
self.acquire() # 申请锁
try:
self.emit(record) # 提交记录,由不同子类实现
finally:
self.release() # 释放锁
return rv
|
默认的console流 StreamHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class StreamHandler(Handler):
terminator = '\n' # 自动换行
def __init__(self, stream=None):
Handler.__init__(self)
if stream is None:
stream = sys.stderr # 默认使用stderr输出
self.stream = stream
def emit(self, record):
try:
msg = self.format(record) # 格式化日志记录
stream = self.stream
stream.write(msg + self.terminator) # 写日志
self.flush() # 刷新写缓存
except Exception:
...
def format(self, record):
if self.formatter:
fmt = self.formatter
else:
fmt = _defaultFormatter
return fmt.format(record) # 使用格式化器格式化日志记录
|
为什么使用stderr,可以看下面的测试中的输出都是到console:
1
2
3
|
print("haha")
print("fatal error", file=sys.stderr)
sys.stderr.write("fatal error\n")
|
格式化器主要使用Formatter和Style实现
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class Formatter(object):
def __init__(self, fmt=None, datefmt=None, style='%', validate=True):
self._style = _STYLES[style][0](fmt)
self._fmt = self._style._fmt
self.datefmt = datefmt
def format(self, record):
record.message = record.getMessage()
s = self.formatMessage(record)
return s
def formatMessage(self, record):
return self._style.format(record) # 格式化
|
Style类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
class PercentStyle(object):
default_format = '%(message)s'
asctime_format = '%(asctime)s'
asctime_search = '%(asctime)'
validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I)
def __init__(self, fmt):
self._fmt = fmt or self.default_format
def usesTime(self):
return self._fmt.find(self.asctime_search) >= 0
def validate(self):
"""Validate the input format, ensure it matches the correct style"""
if not self.validation_pattern.search(self._fmt):
raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0]))
def _format(self, record):
return self._fmt % record.__dict__ # 格式化日志记录对象
def format(self, record):
try:
return self._format(record)
except KeyError as e:
raise ValueError('Formatting field not found in record: %s' % e)
|
滚动日志文件处理器
线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
class BaseRotatingHandler(logging.FileHandler):
def emit(self, record):
try:
if self.shouldRollover(record): # 判断是否需要滚动
self.doRollover() # 滚动日志
logging.FileHandler.emit(self, record) # 输出日志
except Exception:
self.handleError(record)
def rotate(self, source, dest):
if not callable(self.rotator):
if os.path.exists(source):
os.rename(source, dest) # 重命名日志文件
else:
self.rotator(source, dest)
|
按大小滚动 RotatingFileHandler
按照文件大小滚动的处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
class RotatingFileHandler(BaseRotatingHandler):
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
if maxBytes > 0:
mode = 'a'
BaseRotatingHandler.__init__(self, filename, mode, encoding, delay)
self.maxBytes = maxBytes # 单个文件大小上限
self.backupCount = backupCount # 日志备份数量
def doRollover(self): # 执行滚动
if self.stream:
self.stream.close() # 关闭当前的流
self.stream = None
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1):
sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
dfn = self.rotation_filename("%s.%d" % (self.baseFilename,
i + 1))
if os.path.exists(sfn):
if os.path.exists(dfn):
os.remove(dfn)
os.rename(sfn, dfn)
dfn = self.rotation_filename(self.baseFilename + ".1")
if os.path.exists(dfn):
os.remove(dfn)
self.rotate(self.baseFilename, dfn) # 重命名文件
if not self.delay:
self.stream = self._open() # 如果shouldRollover延迟,可以打开新的流
def shouldRollover(self, record): # 判断是否需要滚动
if self.stream is None: # 立即打开流
self.stream = self._open()
if self.maxBytes > 0:
msg = "%s\n" % self.format(record)
self.stream.seek(0, 2) #due to non-posix-compliant Windows feature
if self.stream.tell() + len(msg) >= self.maxBytes: # 判断大小
return 1
return 0
|
文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。
按照日期滚动 TimedRotatingFileHandler
按照日期滚动的处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
class TimedRotatingFileHandler(BaseRotatingHandler):
def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):
BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
self.when = when.upper()
self.backupCount = backupCount
self.utc = utc
self.atTime = atTime
# 日期设置,支持多种方式
if self.when == 'S':
self.interval = 1 # one second
self.suffix = "%Y-%m-%d_%H-%M-%S"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
...
self.extMatch = re.compile(self.extMatch, re.ASCII)
self.interval = self.interval * interval # multiply by units requested
filename = self.baseFilename
if os.path.exists(filename):
t = os.stat(filename)[ST_MTIME] # 最后修改时间
else:
t = int(time.time())
self.rolloverAt = self.computeRollover(t) # 提前计算终止时间
def computeRollover(self, currentTime):
# 判断的方法还是很长很复杂的,先pass
def shouldRollover(self, record):
t = int(time.time())
if t >= self.rolloverAt: # 判断是否到期
return 1
return 0
def doRollover(self):
...
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
# 滚动日志文件
if os.path.exists(dfn):
os.remove(dfn)
self.rotate(self.baseFilename, dfn)
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
...
# 计算下一个时间点
newRolloverAt = self.computeRollover(currentTime)
...
self.rolloverAt = newRolloverAt
|
日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。
小结
logging的处理逻辑大概是这样的:
- 创建Logger对象,提供API,用来接收应用程序日志
- Logger对象包括多个Handler
- 每个Handler有一个Formatter对象
- 每条日志都会生成一个LogRecord对象
- 使用不同的Handler对象将LogRecored对象提交到不同的流
- 每个日志对象通过Formatter格式化输出
- 可以使用按日期/文件大小的方式进行日志文件的滚动记录
小技巧
覆盖对象的 __reduce__
方法,让对象支持reduce函数:
1
2
3
4
5
6
|
class RootLogger(Logger):
def __init__(self, level):
Logger.__init__(self, "root", level)
def __reduce__(self):
return getLogger, ()
|
线程锁的创建和释放:
1
2
3
4
5
6
7
8
9
|
_lock = threading.RLock()
def _acquireLock():
if _lock:
_lock.acquire()
def _releaseLock():
if _lock:
_lock.release()
|
线程锁的使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def addHandler(self, hdlr):
_acquireLock()
try:
self.handlers.append(hdlr)
finally:
_releaseLock()
def removeHandler(self, hdlr):
_acquireLock()
try:
self.handlers.remove(hdlr)
finally:
_releaseLock()
|
参考链接