#3 转载:Python 异步编程与数据库

2020-11-22

这是大神 zzzeek 2015 年发表的一篇文章,详细介绍了关于 SQLAlchemy 与异步编程的一些事情。解答了我关于如何实现异步编程的一些疑惑。
我曾反复阅读这篇文章好多遍,以求能够更加准确地领会到大佬阐述的意思。我认为每个 Python 的使用者都应该阅读阅读。

#2 SQLAlchemy LIKE

2014-09-20

例子:搜索用户表 user 中字段 phone 包含 520 的行。

SQL

和 pymysql 等库一样的用:

keyword = '520'
conn.execute('select * from user where phone like "%%%s%%";' % keyword)
conn.execute('select * from user where phone like "%%%s%%";', keyword)

.like 方法

q = session.query(model.User.id, model.User.phone).filter(model.User.phone.like(f'%{keyword}%'))
qs = qs.all()
# print(qs.statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone LIKE :phone_1

对应的大小写不敏感方法有 ilike (lower("user".phone) LIKE lower(:phone_1))
还有:not_like, not_ilike

.contains 方法

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.contains(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE '%' || :phone_1 || '%')

.regexp_match 方法(1.4 新增)

对应的是 MySQL 支持的 REGEXP 操作符。

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.regexp_match(keyword)).statement)
session.query(model.User.id, model.User.phone).filter(model.User.phone.regexp_match(keyword)).all()
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone <regexp> :phone_1

.startswith.endswith

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.startswith(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE :phone_1 || '%')
print(session.query(model.User.id, model.User.phone).filter(model.User.phone.endswith(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE '%' || :phone_1)

.match 方法

对应的是数据库的 MATCH (col1,col2,...) AGAINST (expr [search_modifier]) 全文索引方法。
对单字段同样可用,不过需要先建立 FULLTEXT 索引。

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.match(keyword)).statement)
session.query(model.User.id, model.User.phone).filter(model.User.phone.match(keyword)).all()
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone MATCH :phone_1

参考资料与拓展阅读

#1 SQLAlchemy

2014-08-21

数据库连接:Engine

创建 engine 相当于通过适配层对接了原生数据库接口。当 excute 方法和 connect 方法首次调用时,Engine 建立了一个正式的 DBAPI 连接到数据库,之后的所有数据库交互都是通过这个连接发出。
使用 ORM 时,Engine 是个幕后工作者,也就是说一旦创建之后,我们一般不会再直接接触到 Engine。

from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=True)  # echo => logging

连接字符串(DB URL)

会话(Session)

http://sqlalchemy.readthedocs.org/en/latest/orm/session_basics.html

Session 是一个基于指定数据库连接的一个工作区。文档上有这么一个例子:如果将一个应用程序线程当作一场派对中的一个来宾,会话就是这个来宾手上的盘子,数据就是盘子中的食物,数据库就是厨房。

何时创建,何时提交,何时关闭?

Session = sessionmaker(bind=engine)
# 等同于:
# Session = sessionmaker()
# Session.configure(bind=engine)
session = Session()

定义模型(数据库映射)

基类

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

模型的定义

from sqlalchemy import Column, Integer, String


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)

    def __repr__(self):
        return "<User(name='%s', fullna

表结构(Schema)

又出来一个幕后工作者:Mapper,通过 模型类.__table__ 访问。

print type(User.__table__)  # <class 'sqlalchemy.sql.schema.Table'>
print repr(User.__table__)
# Table('users', MetaData(bind=None),
#       Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
#       Column('name', String(), table=<users>),
#       Column('fullname', String(), table=<users>),
#       Column('password', String(), table=<users>), schema=None)

数据库操作

1. 添加

u = User(name='ed', fullname='Ed Jones', password='password')
session.add(u)

# 一次添加多条记录
session.add_all([
    User(name='wendy', fullname='Wendy Williams', password='foobar'),
    User(name='mary', fullname='Mary Contrary', password='p@55vv0rd'),
    User(name='fred', fullname='Fred Green', password='123456')
])
print session.new
print session.query(User).all()

2. 查找

2.1 查询对象 Query

for name, fullname in session.query(User.name, User.fullname):
    print name.ljust(10), fullname

for row in session.query(User, User.name):
    print row.User, row.name

直接遍历 Query 对象就相当于按照默认顺序获取所有对象。

2.2 条件:filte_by

qs = session.query(User).filter_by(name='ed')
users = qs.all()
our_user = qs.first()

2.3 更加灵活的 filter

允许使用 Python 表达式和类属性!

query = session.query(User.name)
queryset = query.filter(User.fullname == 'Ed Jones')
for name, in queryset:
    print name

通用 Filter 操作符

  • Equal
    query.filter(User.name == 'ed')
    query.filter(User.name != 'ed')
  • LIKE
    query.filter(User.name.like('%ed%'))
  • IN / NOT IN
    query.filter(User.name.in_(['ed', 'wendy', 'jack']))
    query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))))
    query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
  • NULL
    query.filter(User.name == None)
    query.filter(User.name.is_(None))
    query.filter(User.name != None)
    query.filter(User.name.isnot(None))
  • AND
    query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
    query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')) # from sqlalchemy import and_
  • OR
    query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
    query.filter(or_(User.name == 'ed', User.name == 'wendy')) # from sqlalchemy import or_
  • MATCH:
    query.filter(User.name.match('wendy'))
    注意:match 使用了数据库相关的内容,可能在不同数据库后端上有不同的效果,而且部分数据库不支持。

2.4 排序:order_by

for instance in session.query(User).order_by(User.id):
    print instance.name.ljust(10), instance.fullname

2.5 别名

label = User.name.label('name_label')
for row in session.query(label).all():
    print(row.name_label)

from sqlalchemy.orm import aliased
user_alias = aliased(User, name='user_alias')
for row in session.query(user_alias, user_alias.name).all():
    print row.user_alias

2.6 LIMIT 和 OFFSET

和 Django 一样,使用切片实现。

2.7 其他方法

  • .all() 返回查找到的所有纪录组成的列表。
  • .first() 返回查找到的第一条记录,没有找到返回 None。
  • .one() 只查找到一条记录时,返回该条记录。否则抛出异常:
    sqlalchemy.orm.exc.NoResultFound: No row was found for one()
    sqlalchemy.orm.exc.MultipleResultsFound: Multiple rows were found for one()
  • .scalar().one() 方法模拟出 .first() 方法的效果。不同的是,.first() 方法只获取那一条(LIMIT 1),而调用 .one() 方法却获取了所有记录,不过只返回了第一条。这是不一样的!

2.8 对原生 SQL 的支持

  • text
  • from_statement
  • params 为 Query 增加参数
from sqlalchemy import text

# 例 1,简单用法
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all():
    print user.name

# 例 2,定义参数
session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one()

# 例 3,此时 Query 几乎只检测字段是否合法~
sql = text("SELECT * FROM users where name=:name")
session.query(User).from_statement(sql).params(name='ed').all()
session.query("id", "name").from_statement(sql).params(name='ed').all()

3. 更新

u = session.query(User).filter_by(name='fred')[0]
u.password = '12345678'
session.commit()
print u in session
u = session.query(User).filter_by(name='fred')[0]
print u.password

4. 删除

jack = session.query(User).filter_by(name='jack').one()
session.delete(jack)
session.query(User).filter_by(name='jack').count()

5. Count

session.query(User).filter(User.name.like('%ed')).count()
from sqlalchemy import func
session.query(func.count(User.name), User.name).group_by(User.name).all()
session.query(func.count('*')).select_from(User).scalar()
session.query(func.count(User.id)).scalar()

6. 事务:commit 和 rollback

commit

u = session.query(User).all()[-1]
u.password = '123456'
print session.dirty  # IdentitySet
# 此时 add_all 的新增操作实际上还没有提交
# session.commit()  # 提交到数据库

rollback

u = session.query(User).all()[-1]
u.password = '123456'
session.rollback()  # 撤销更新
print session.dirty
print 'After Rollback, PWD:'
print u.password  # 会先查找一遍~

关系

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, backref

class Address(Base):
    __tablename__ = 'addresses'

    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))

    user = relationship("User", backref=backref('addresses', order_by=id))

    def __repr__(self):
        return "<Address(email_address='%s')>" % self.email_address

# 或者到 User 中声明,如下:
# class User(Base):
#     addresses = relationship("Address", order_by="Address.id", backref="user")

关联操作

jack = User(name='jack', fullname='Jack Bean', password='qwerty')
print jack.addresses
jack.addresses = [
    Address(email_address='jack@google.com'),
    Address(email_address='j25@yahoo.com'),
]
print jack.addresses[1]
print jack.addresses[1].user
session.add(jack)
print '-' * 70
session.commit()  # 也会插入 Address 信息
print '-' * 70
jack = session.query(User).filter_by(name='jack').one()
print jack
print '-' * 70
print jack.addresses   # lazy loading

JOIN

for u, a in session.query(User, Address).\
        filter(User.id==Address.user_id).\
        filter(Address.email_address=='jack@google.com').\
        all():
    print u
    print a
# <User(name='jack', fullname='Jack Bean', password='gjffdd')>
# <Address(email_address='jack@google.com')>

Join 也可以如下表示:session.query(User).join(Address)

如果两张表没有声明的关联,或者有多个关联,最好使用下面的形式:

query.join(Address, User.id==Address.user_id)  # explicit condition
query.join(User.addresses)  # specify relationship from left to right
query.join(Address, User.addresses)  # same, with explicit target
query.join('addresses')  # same, using a string

默认是 INNER JOIN:

session.query(User).join(Address). \
    filter(Address.email_address == 'jack@google.com'). \
    all()
# SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
# FROM users
# JOIN addresses ON users.id = addresses.user_id
# WHERE addresses.email_address = 'jack@google.com'

使用其他连接方式:

query.outerjoin(User.addresses)   # LEFT OUTER JOIN

或者使用 join 方法的其他参数,具体信息,在后面章节会涉及。

aliased 又出现了

from sqlalchemy.orm import aliased
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in \
    session.query(User.name, adalias1.email_address, adalias2.email_address).\
    join(adalias1, User.addresses).\
    join(adalias2, User.addresses).\
    filter(adalias1.email_address=='jack@google.com').\
    filter(adalias2.email_address=='j25@yahoo.com'):
    print username, email1, email2
# jack jack@google.com j25@yahoo.com

子查询

from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').label('address_count')).\
    group_by(Address.user_id).subquery()
for u, count in session.query(User, stmt.c.address_count).\
        outerjoin(stmt, User.id==stmt.c.user_id).\
        order_by(User.id):
    print u, count

使用子查询

stmt = session.query(Address).\
    filter(Address.email_address != 'j25@yahoo.com').subquery()
adalias = aliased(Address, stmt)
for user, address in session.query(User, adalias).\
        join(adalias, User.addresses):
    print user
    print address

EXIST

from sqlalchemy.sql import exists
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
    print name

for name, in session.query(User.name).filter(User.addresses.any()):
    print name

for name, in session.query(User.name).\
    filter(User.addresses.any(Address.email_address.like('%google%'))):
    print name

session.query(Address).filter(~Address.user.has(User.name=='jack')).all()

其他方法

query.filter(User.addresses.contains(someaddress))
query.filter(User.addresses.any(Address.email_address == 'bar'))
query.filter(User.addresses.any(email_address='bar'))
query.filter(Address.user.has(name='ed'))
session.query(Address).with_parent(someuser, 'addresses')

预加载

Eager Loading 直译过来,表示立即载入。

http://sqlalchemy.readthedocs.org/en/latest/orm/loading_relationships.html

默认是 Lazy Loading。

1. subqueryload

from sqlalchemy.orm import subqueryload
jack = session.query(User).\
    options(subqueryload(User.addresses)).\
    filter_by(name='jack').one()

subqueryload() when used in conjunction with limiting such as Query.first(), Query.limit() or Query.offset() should also include Query.order_by() on a unique column in order to ensure correct results.

2. joinedload

默认 LEFT OUTER JOIN。

from sqlalchemy.orm import joinedload
jack = session.query(User).\
    options(joinedload(User.addresses)).\
    filter_by(name='jack').one()

joinedload() is not a replacement for join()
The join created by joinedload() is anonymously aliased such that it does not affect the query results. An Query.order_by() or Query.filter() call cannot reference these aliased tables - so-called “user space” joins are constructed using Query.join(). The rationale for this is that joinedload() is only applied in order to affect how related objects or collections are loaded as an optimizing detail - it can be added or removed with no impact on actual results. See the section The Zen of Eager Loading for a detailed description of how this is used.

3. Join + Eagerload

from sqlalchemy.orm import contains_eager
jacks_addresses = session.query(Address).\
    join(Address.user).\
    filter(User.name=='jack').\
    options(contains_eager(Address.user)).\
    all()

关联对象的删除

回到曾经的示例——删除 Jack 上来,如果 Jack 有几个地址,实际上 Jack 删除之后那几个地址的用户主键会被设置成 NULL,但是并没有删除。