SQLAlchemy是Python SQL工具箱和ORM框架,它为应用程序开发人员提供了全面而灵活的SQL功能。它提供了一整套企业级持久化方案,旨在高效,高性能地访问数据库,并符合Pythonic之禅。项目代码量比较大,接近200个文件,7万行代码, 我们一起来挑战一下。由于篇幅原因,分成上下两篇,上篇我们学习了core部分的engine,dialect,connection和pool等部分,下篇主要学习core部分剩余的sql表达式和orm部分,包括如下内容:

  • SQL-schema使用示例
  • DDL(Data Definition Language)创建table
  • DML(Data Manipulation Language)使用insert插入数据
  • DQL(Data Query Language)使用select查询数据
  • ORM示例
  • model核心功能
  • 小结
  • 小技巧
  • 一点感悟

SQL-schema使用示例

上篇中,我们使用的sql都是手工编写的语句,下面这样:

1
2
create table x (a integer, b integer)
insert into x (a, b) values (1, 1)

在sqlalchemy中可以通过定义schema的方式进行数据操作,完整的示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.sql import select

engine = create_engine('sqlite:///:memory:', echo=True)

metadata = MetaData()
users = Table('users', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String),
              Column('fullname', String),
              )
metadata.create_all(engine)

ins = users.insert().values(name='jack', fullname='Jack Jones')
print(ins)
result = engine.execute(ins)
print(result, result.inserted_primary_key)
s = select([users])
result = conn.execute(s)
for row in result:
    print(row)

result = engine.execute("select * from users")
for row in result:
    print(row)

示例程序的执行过程:

  • 创建engine,用于数据库连接
  • 创建metadata,用于管理schema
  • 创建users表的Table,绑定到metadata;同时包括id,name和fullname三个column
  • 将metadata提交到engine(创建表)
  • 使用users插入数据
  • 查询users的数据
  • 使用普通sql的方式验证数据

下面是示例的执行日志,清晰展示了上面过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
2021-04-19 10:02:09,166 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE users (
	id INTEGER NOT NULL, 
	name VARCHAR, 
	fullname VARCHAR, 
	PRIMARY KEY (id)
)
2021-04-19 10:02:09,166 INFO sqlalchemy.engine.base.Engine ()
2021-04-19 10:02:09,167 INFO sqlalchemy.engine.base.Engine COMMIT
INSERT INTO users (name, fullname) VALUES (:name, :fullname)
2021-04-19 10:02:09,167 INFO sqlalchemy.engine.base.Engine INSERT INTO users (name, fullname) VALUES (?, ?)
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine ('jack', 'Jack Jones')
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine COMMIT
<sqlalchemy.engine.result.ResultProxy object at 0x7ffca0607070> [1]
2021-04-27 11:38:19,134 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname 
FROM users
2021-04-27 11:38:19,134 INFO sqlalchemy.engine.base.Engine ()
(1, 'jack', 'Jack Jones')
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine select * from users
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine ()
(1, 'jack', 'Jack Jones')

在开始之前,我们需要简单了解一下SQL语句的分类: SQL

在我们的schema使用示例中,就包括了DDL,DML和DQL三种类型的语句,下面我们按照这3种类型,详细了解一下sqlalchemy的sql表达式部分。sql表达式主要在sql包中,部分文件的功能如下:

模块 描述
base.py 基础类
compiler.py sql编译
crud.py crud的参数处理
ddl.py DDL语句
default_comparator.py 比较
dml.py DML语句
elements.py 基本类型
operators.py sql操作符
schema.py schema定义
selectable.py DQL
sqltypes.py&&type_api.py sql数据类型
vistitors.py 递归算法

DDL(Data Definition Language)创建table

首先了解一下schema的基础实现visitable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class VisitableType(type):

    def __init__(cls, clsname, bases, clsdict):
        if clsname != "Visitable" and hasattr(cls, "__visit_name__"):
            _generate_dispatch(cls)
        super(VisitableType, cls).__init__(clsname, bases, clsdict)

def _generate_dispatch(cls):
    if "__visit_name__" in cls.__dict__:
        visit_name = cls.__visit_name__
        if isinstance(visit_name, str):
            getter = operator.attrgetter("visit_%s" % visit_name)

            def _compiler_dispatch(self, visitor, **kw):
                try:
                    meth = getter(visitor)
                except AttributeError:
                    raise exc.UnsupportedCompilationError(visitor, cls)
                else:
                    return meth(self, **kw)
        
        cls._compiler_dispatch = _compiler_dispatch

class Visitable(util.with_metaclass(VisitableType, object)):
    pass

Visitable约定子类必须提供 visit_name 的类属性,用来绑定编译方法。参与sql的类都继承自Visitable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class SchemaItem(SchemaEventTarget, visitors.Visitable):
    __visit_name__ = "schema_item"
    
class MetaData(SchemaItem):
    __visit_name__ = "metadata"
    
class Table(DialectKWArgs, SchemaItem, TableClause):
    __visit_name__ = "table"

class Column(DialectKWArgs, SchemaItem, ColumnClause):
    __visit_name__ = "column"

class TypeEngine(Visitable):
    ...

class Integer(_LookupExpressionAdapter, TypeEngine):
    __visit_name__ = "integer"

MetaData是schema的集合,记录了所有的Table定义, 通过 _add_table 函数用来添加表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class MetaData(SchemaItem):
    def __init__(
        self,
        bind=None,
        reflect=False,
        schema=None,
        quote_schema=None,
        naming_convention=None,
        info=None,
    ):
        # table集合
        self.tables = util.immutabledict()
        
        self.schema = quoted_name(schema, quote_schema)
        self._schemas = set()
    
    def _add_table(self, name, schema, table):
        key = _get_table_key(name, schema)
        dict.__setitem__(self.tables, key, table)
        if schema:
            self._schemas.add(schema)

Table是column的集合,在创建table对象的时候,把自己添加到metadata中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Table(DialectKWArgs, SchemaItem, TableClause):
    
    def __new__(cls, *args, **kw):
        name, metadata, args = args[0], args[1], args[2:]
        schema = metadata.schema
        table = object.__new__(cls)
        # 添加到metadata
        metadata._add_table(name, schema, table)
        table._init(name, metadata, *args, **kw)
        return table
    
    def _init(self, name, metadata, *args, **kwargs):
        super(Table, self).__init__(
            quoted_name(name, kwargs.pop("quote", None))
        )
        self.metadata = metadata
        self.schema = metadata.schema
        # column集合
        self._columns = ColumnCollection()
        self._init_items(*args)
    
    def _init_items(self, *args):
        # column 
        for item in args:
            if item is not None:
                item._set_parent_with_dispatch(self)

Column是通过下面的方法将column添加到table的colummns中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Column(DialectKWArgs, SchemaItem, ColumnClause):
    
    def __init__(self, *args, **kwargs):
        pass
        
    def _set_parent(self, table):
        table._columns.replace(self)

class ColumnCollection(util.OrderedProperties):
    def replace(self, column):
        ...
        self._data[column.key] = column
        ...

现阶段,我们大概厘清了metadata,table和column的数据结构:metadata持有table集合,table持有column集合。接下来我们看看这个数据结构如何转换成sql语句,API是通过 MetaData.create_all 函数实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class MetaData(SchemaItem):
    def create_all(self, bind=None, tables=None, checkfirst=True):
        bind._run_visitor(
            ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
        )

class Engine(Connectable, log.Identified):
    def _run_visitor(
        self, visitorcallable, element, connection=None, **kwargs
    ):
        with self._optional_conn_ctx_manager(connection) as conn:
            conn._run_visitor(visitorcallable, element, **kwargs)
            
class Connection(Connectable):
    def _run_visitor(self, visitorcallable, element, **kwargs):
        visitorcallable(self.dialect, self, **kwargs).traverse_single(element)

create-table的sql编译主要由ddl中的SchemaGenerator实现, 下面是SchemaGenerator的继承关系和核心的traverse_single函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ClauseVisitor(object):

    def traverse_single(self, obj, **kw):
        # 遍历所有的visit实现 
        for v in self.visitor_iterator:
            meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
            if meth:
                return meth(obj, **kw)
    
    @property
    def visitor_iterator(self):
        v = self
        while v:
            yield v
            v = getattr(v, "_next", None)
            
class SchemaVisitor(ClauseVisitor):
    ...
    
class DDLBase(SchemaVisitor):
    ...

class SchemaGenerator(DDLBase):
    ...

创建meta,table和columun的过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class SchemaGenerator(DDLBase):
    def visit_metadata(self, metadata):
        tables = list(metadata.tables.values())
        collection = sort_tables_and_constraints(
            [t for t in tables if self._can_create_table(t)]
        )
        for table, fkcs in collection:
            if table is not None:
                # 创建表 
                self.traverse_single(
                    table,
                    create_ok=True,
                    include_foreign_key_constraints=fkcs,
                    _is_metadata_operation=True,
                )
            
    def visit_table(
        self,
        table,
        create_ok=False,
        include_foreign_key_constraints=None,
        _is_metadata_operation=False,
    ):
        for column in table.columns:
            if column.default is not None:
                # 创建column-DDLElement
                self.traverse_single(column.default)
    
        self.connection.execute(
            # fmt: off
            # 创建create-table-DDLElement
            CreateTable(
                table,
                include_foreign_key_constraints=  # noqa
                    include_foreign_key_constraints,
            )
            # fmt: on
        )

CreateTableDDLElement和CreateColumnDDLElement的继承关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class _DDLCompiles(ClauseElement):
    def _compiler(self, dialect, **kw):
        return dialect.ddl_compiler(dialect, self, **kw)
        
class DDLElement(Executable, _DDLCompiles):
    ...

class _CreateDropBase(DDLElement):
    ...
    
class CreateTable(_CreateDropBase):

    __visit_name__ = "create_table"
    
    def __init__(
        self, element, on=None, bind=None, include_foreign_key_constraints=None
    ):
        super(CreateTable, self).__init__(element, on=on, bind=bind)
        self.columns = [CreateColumn(column) for column in element.columns]

class CreateColumn(_DDLCompiles):
    __visit_name__ = "create_column"

    def __init__(self, element):
        self.element = element

最终这些DDLElement在compiler中被DDLCompiler编译成sql语句, CREATE TABLE是这样被编译的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def visit_create_table(self, create):
    table = create.element
    preparer = self.preparer

    text = "\nCREATE "
    if table._prefixes:
        text += " ".join(table._prefixes) + " "
    text += "TABLE " + preparer.format_table(table) + " "

    create_table_suffix = self.create_table_suffix(table)
    if create_table_suffix:
        text += create_table_suffix + " "

    text += "("

    separator = "\n"

    # if only one primary key, specify it along with the column
    first_pk = False
    for create_column in create.columns:
        column = create_column.element
        try:
            processed = self.process(
                create_column, first_pk=column.primary_key and not first_pk
            )
            if processed is not None:
                text += separator
                separator = ", \n"
                text += "\t" + processed
            if column.primary_key:
                first_pk = True
        except exc.CompileError as ce:
            ...

    const = self.create_table_constraints(
        table,
        _include_foreign_key_constraints=create.include_foreign_key_constraints,  # noqa
    )
    if const:
        text += separator + "\t" + const

    text += "\n)%s\n\n" % self.post_create_table(table)
    return text

def visit_create_column(self, create, first_pk=False):
    column = create.element

    text = self.get_column_specification(column, first_pk=first_pk)
    const = " ".join(
        self.process(constraint) for constraint in column.constraints
    )
    if const:
        text += " " + const

    return text

在前面column介绍中,我们略过了数据类型。大家都知道sql的数据类型和python数据类型有差异, 下面是一些常见的SQL数据类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class TypeEngine(Visitable):
    ...
    
class Integer(_LookupExpressionAdapter, TypeEngine):
    __visit_name__ = "integer"
    ...

class String(Concatenable, TypeEngine):
    __visit_name__ = "string"
    ...

class CHAR(String):
    __visit_name__ = "CHAR"
    ...

class VARCHAR(String):
    __visit_name__ = "VARCHAR"
    ...

数据类型由GenericTypeCompiler进行编译:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class TypeCompiler(util.with_metaclass(util.EnsureKWArgType, object)):
    
    def process(self, type_, **kw):
        return type_._compiler_dispatch(self, **kw)
    
class GenericTypeCompiler(TypeCompiler):
    
    def visit_INTEGER(self, type_, **kw):
        return "INTEGER"
        
    def visit_string(self, type_, **kw):
        return self.visit_VARCHAR(type_, **kw)
    
    def visit_VARCHAR(self, type_, **kw):
        return self._render_string_type(type_, "VARCHAR")
    
    def _render_string_type(self, type_, name):
        text = name
        if type_.length:
            text += "(%d)" % type_.length
        if type_.collation:
            text += ' COLLATE "%s"' % type_.collation
        return text

DML(Data Manipulation Language)使用insert插入数据

数据插入的API由TableClause提供的insert函数:

1
2
3
4
5
class TableClause(Immutable, FromClause):
    
    @util.dependencies("sqlalchemy.sql.dml")
    def insert(self, dml, values=None, inline=False, **kwargs):
        return dml.Insert(self, values=values, inline=inline, **kwargs)

dml中提供了Insert类的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class UpdateBase(
    HasCTE, DialectKWArgs, HasPrefixes, Executable, ClauseElement
):
    ...
    
class ValuesBase(UpdateBase):
    ...
    
class Insert(ValuesBase):
    __visit_name__ = "insert"
    ...

按照ddl的经验,我们查找insert语句的编译方法,在SQLCompiler中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class SQLCompiler(Compiled):
    
    def visit_insert(self, insert_stmt, asfrom=False, **kw):

        crud_params = crud._setup_crud_params(
            self, insert_stmt, crud.ISINSERT, **kw
        )

        if insert_stmt._has_multi_parameters:
            crud_params_single = crud_params[0]
        else:
            crud_params_single = crud_params

        preparer = self.preparer
        supports_default_values = self.dialect.supports_default_values

        text = "INSERT "

        text += "INTO "
        table_text = preparer.format_table(insert_stmt.table)

        if crud_params_single or not supports_default_values:
            text += " (%s)" % ", ".join(
                [preparer.format_column(c[0]) for c in crud_params_single]
            )
        ...
        if insert_stmt.select is not None:
            select_text = self.process(self._insert_from_select, **kw)

            if self.ctes and toplevel and self.dialect.cte_follows_insert:
                text += " %s%s" % (self._render_cte_clause(), select_text)
            else:
                text += " %s" % select_text
        elif not crud_params and supports_default_values:
            text += " DEFAULT VALUES"
        elif insert_stmt._has_multi_parameters:
            text += " VALUES %s" % (
                ", ".join(
                    "(%s)" % (", ".join(c[1] for c in crud_param_set))
                    for crud_param_set in crud_params
                )
            )
        else:
            text += " VALUES (%s)" % ", ".join([c[1] for c in crud_params])

        return text

可以看到insert语句就是对Insert对象,通过字符串模版拼接而来。

DQL(Data Query Language)使用select查询数据

数据查询select语句也都有特定的数据结构Select,继承关系如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SelectBase(HasCTE, Executable, FromClause):
    ...
    
class GenerativeSelect(SelectBase):
    ...
    
class Select(HasPrefixes, HasSuffixes, GenerativeSelect):
    __visit_name__ = "select"
    
    def __init__(
        self,
        columns=None,
        whereclause=None,
        from_obj=None,
        distinct=False,
        having=None,
        correlate=True,
        prefixes=None,
        suffixes=None,
        **kwargs
    ):
        GenerativeSelect.__init__(self, **kwargs)
        ...

select的编译语句也在SQLCompiler中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class SQLCompiler(Compiled):
    
    def visit_select(
        self,
        select,
        asfrom=False,
        parens=True,
        fromhints=None,
        compound_index=0,
        nested_join_translation=False,
        select_wraps_for=None,
        lateral=False,
        **kwargs
    ):
        ...
        
        froms = self._setup_select_stack(select, entry, asfrom, lateral)

        column_clause_args = kwargs.copy()
        column_clause_args.update(
            {"within_label_clause": False, "within_columns_clause": False}
        )

        text = "SELECT "  # we're off to a good start !

        text += self.get_select_precolumns(select, **kwargs)
        # the actual list of columns to print in the SELECT column list.
        inner_columns = [
            c
            for c in [
                self._label_select_column(
                    select,
                    column,
                    populate_result_map,
                    asfrom,
                    column_clause_args,
                    name=name,
                )
                for name, column in select._columns_plus_names
            ]
            if c is not None
        ]
        
        ...
        
        text = self._compose_select_body(
            text, select, inner_columns, froms, byfrom, kwargs
        )

        if select._statement_hints:
            per_dialect = [
                ht
                for (dialect_name, ht) in select._statement_hints
                if dialect_name in ("*", self.dialect.name)
            ]
            if per_dialect:
                text += " " + self.get_statement_hint_text(per_dialect)

        if self.ctes and toplevel:
            text = self._render_cte_clause() + text

        if select._suffixes:
            text += " " + self._generate_prefixes(
                select, select._suffixes, **kwargs
            )

        self.stack.pop(-1)

        if (asfrom or lateral) and parens:
            return "(" + text + ")"
        else:
            return text

select语句一样是采用字符串拼接得到。

ORM 示例

orm的使用和schema使用方式略有不同, 下面是orm的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker

engine = create_engine('sqlite:///:memory:', echo=True)
Model = declarative_base()

class User(Model):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    nickname = Column(String)

    def __repr__(self):
        return "<User(name='%s', fullname='%s', nickname='%s')>" % (
            self.name, self.fullname, self.nickname)

Model.metadata.create_all(engine)
print("="*10)
Session = sessionmaker(bind=engine)
session = Session()

ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
session.add(ed_user)
session.commit()
print(ed_user.id)
result = engine.execute("select * from users")
for row in result:
    print(row)

对比schema和orm的差异,可以得到下表:

schema方式 orm方式
创建engine,用于数据库连接 -
创建metadata,用于管理schema 创建Model
创建users表的Table 创建User模型
将metadata提交到engine(创建表) -
- 创建session
使用users插入数据 使用session插入数据

总结一下主要就2点差异:

  1. orm时候不用显示的创建表的schema
  2. orm的数据处理都使用session来操作,而不是使用connection

model核心功能

Model类使用declarative_base动态创建:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class DeclarativeMeta(type):
    def __init__(cls, classname, bases, dict_):
        if "_decl_class_registry" not in cls.__dict__:
            _as_declarative(cls, classname, cls.__dict__)
        type.__init__(cls, classname, bases, dict_)

    def __setattr__(cls, key, value):
        _add_attribute(cls, key, value)

    def __delattr__(cls, key):
        _del_attribute(cls, key)

def declarative_base(
    bind=None,
    metadata=None,
    mapper=None,
    cls=object,
    name="Base",
    constructor=_declarative_constructor,
    class_registry=None,
    metaclass=DeclarativeMeta,
):
    # 创建metadata
    lcl_metadata = metadata or MetaData()

    if class_registry is None:
        class_registry = weakref.WeakValueDictionary()

    bases = not isinstance(cls, tuple) and (cls,) or cls
    class_dict = dict(
        _decl_class_registry=class_registry, metadata=lcl_metadata
    )

    # 构造函数
    if constructor:
        class_dict["__init__"] = constructor
    if mapper:
        class_dict["__mapper_cls__"] = mapper
    
    # class-meta
    return metaclass(name, bases, class_dict)

关于如何动态创建类,在小技巧中进行介绍。declarative_base主要定义了Model类的几个特性:

  • Model类的构造函数__init__使用_declarative_constructor
  • Model类的子类在构造的时候会调用_as_declarative
  • model对象会使用_add_attribute进行赋值

先从构造函数_declarative_constructor开始:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def _declarative_constructor(self, **kwargs):
    cls_ = type(self)
    for k in kwargs:
        if not hasattr(cls_, k):
            raise TypeError(
                "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
            )
        setattr(self, k, kwargs[k])


_declarative_constructor.__name__ = "__init__"

看起来非常简单,但是这里做了一个类和对象实例之间的校验转换。我们先看一段演示代码:

1
2
3
4
5
6
7
8
class DummyModel(object):
    name = ["dummy_model"]  # 引用类型

a = DummyModel()
b = DummyModel()
assert id(a.name) == id(b.name) == id(DummyModel.name)
a.name.append("a")
assert id(a.name) == id(b.name) == id(DummyModel.name)

DummyModel的类属性name和a对象的name属性都是同一个引用。如果使用Model类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Model = declarative_base()

class UserModel(Model):
    __tablename__ = 'user'  # 必须字段
    id = Column(Integer, primary_key=True)  # 必须字段
    name = Column(String)

c = UserModel()
c.name = "c"
d = UserModel()
d.name = "d"
# 注意并不是Column
assert isinstance(UserModel.name, InstrumentedAttribute)
assert isinstance(c.name, str)
assert d.name == "d"
assert id(c.name) != id(d.name) != id(UserModel.name)

可以发现UserModel的类属性name和d对象的name属性完全不一样,类定义的是Cloumn(InstrumentedAttribute),对象变成了str。这个就是orm模型的特性之一,Model是定义格式模版,对象实例化后转化为普通数据。

Model的另外一个功能是隐式创建Table对象,在_as_declarative函数中通过_MapperConfig实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class _MapperConfig(object):
    def setup_mapping(cls, cls_, classname, dict_):
        cfg_cls = _MapperConfig
        cfg_cls(cls_, classname, dict_)
    
    def __init__(self, cls_, classname, dict_):
        ...
        self._setup_table()
        ...
    
    def _setup_table(self):
        ...
        table_cls = Table
        args, table_kw = (), {}
        if table_args:
            if isinstance(table_args, dict):
                table_kw = table_args
            elif isinstance(table_args, tuple):
                if isinstance(table_args[-1], dict):
                    args, table_kw = table_args[0:-1], table_args[-1]
                else:
                    args = table_args

        autoload = dict_.get("__autoload__")
        if autoload:
            table_kw["autoload"] = True

        cls.__table__ = table = table_cls(
            tablename,
            cls.metadata,
            *(tuple(declared_columns) + tuple(args)),
            **table_kw
        )
        ...

而Column是通过下面的函数实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def _add_attribute(cls, key, value):

    if "__mapper__" in cls.__dict__:
        if isinstance(value, Column):
            _undefer_column_name(key, value)
            cls.__table__.append_column(value)
            cls.__mapper__.add_property(key, value)
        ...
    else:
        type.__setattr__(cls, key, value)

Model通过上面的方式,隐式创建了Schema(Table),实际使用过程中只需要使用Model类,不用关注Schema的定义。

session的源码由于篇幅和时间有限,留待以后再行分析

小结

sqlalchemy可以在低层次上提供了sql语句的方式使用;在次层次上提供定义schema方式使用;在高层次上提供orm的实现,让应用可以根据项目的特点自主选择不同层级的API。

使用schema时候,主要使用Metadata,Table和Column等定义Schema数据结构,使用编译器自动将schema转换成合法的sql语句。

使用orm的时候,则是创建特定的数据模型,模型对象会隐式创建schema,通过session方式进行数据访问。

最后再回顾一下sqlalchemy的架构图:

architecture

小技巧

sqlalchemy中提供了一个动态创建类的方式,主要在declarative_base和DeclarativeMeta中实现。我参考这个实现方式做了一个类工厂:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class DeclarativeMeta(type):
    def __init__(cls, klass_name, bases, dict_):
        print("class_init", klass_name, bases, dict_)
        type.__init__(cls, klass_name, bases, dict_)

def get_attr(self, key):
    print("getattr", self, key)
    return self.__dict__[key]

def constructor(self, *args, **kwargs):
    print("constructor", self, args, kwargs)
    for k, v in kwargs.items():
        setattr(self, k, v)

def dynamic_class(name):
    class_dict = {
        "__init__": constructor,
        "__getattr__": get_attr
    }

    return DeclarativeMeta(name, (object,), class_dict)

DummyModel = dynamic_class("Dummy")
dummy = DummyModel(1, name="hello", age=18)
print(dummy, type(dummy), dummy.name, dummy.age)


# class_init Dummy (<class 'object'>,) {'__init__': <function test_dynamic_class.<locals>.constructor at 0x7f898827ef70>, '__getattr__': <function test_dynamic_class.<locals>.get_attr at 0x7f89882105e0>}
# constructor <sample.Dummy object at 0x7f89882a5820> (1,) {'name': 'hello', 'age': 18}
# <sample.Dummy object at 0x7f89882a5820> <class 'sample.Dummy'> hello 18

示例中我动态创建了一个DummyModel类,type(dummy)可以看到,这个类名是 Dummy。这个类可以的构造函数可以接受name和age两个属性。这种创建方式和collections.namedtuple有点类似。

一点感悟

sqlalchemy的源码非常复杂,前前后后一共准备了一个月,形成的2篇文档仅仅涉及核心流程和用法,细节部分缺失较多,以后有机会还需要继续阅读。在这一个月中,克服了工作较忙,没有时间写稿的烦躁;克服了阅读进入困境,一度想放弃的心理障碍;克服了deadline临近,文稿还只是一个雏形,使用存稿顶替的羞愧;克服了笔记软件故障,写完的文稿丢失,完全重写的懊恼。战胜这些困难,最终还是得以完成,心理上有大满足。当然最大的收获还是对ORM中间件有了初步的了解,也希望梳理的ORM流程对大家有一定的帮助,如果获得大家的支持会更加满意♥️。

最后,欢迎加下面的微信和我互动交流,一起进阶: wx