#19 Tornado & HTTP 599
Python Tornado 2015-10-04Tornado POST 请求直接 599 了,经过 WireShark 抓包发现根本没有发出请求。
最后竟然发现和版本有关,4.1 下才有问题,4.2 以后就好了。
coding in a complicated world
Tornado POST 请求直接 599 了,经过 WireShark 抓包发现根本没有发出请求。
最后竟然发现和版本有关,4.1 下才有问题,4.2 以后就好了。
没有什么实际意义,只是玩玩而已。
突然来了兴致,想看看 Unicode 中有多少个中文,查了一下,很多人都是说 4e00 至 9fff 段1。
# -*- coding: utf-8 -*-
all_chinese_in_unicode = range(0x4e00, 0x9fff)
def transfer(u_char_num):
if isinstance(u_char_num, int):
u_char_hex = '%x' % u_char_num
u_char_str = '\u' + u_char_hex
else:
if isinstance(u_char_num, str) and len(u_char_num) == 4:
u_char_str = '\u' + u_char_num
else:
raise Exception
u_char = u_char_str.decode('raw_unicode_escape')
# print u_char_hex, u_char
return u_char
def test_transfer():
# repr(u"国") -> u'\u56fd' -> 22269
print transfer(0x56fd)
# 打印所有中文字符
# for i in all_chinese_in_unicode:
# print transfer(i),
# print
# 打印最后一个中文字符及前、后各一个字符
last_chinese_char = 0x9fbb
last_chinese_char_index = all_chinese_in_unicode.index(last_chinese_char)
start, end = last_chinese_char_index - 1, last_chinese_char_index + 2
for i in all_chinese_in_unicode[start:end]:
print transfer(i),
print
Ubuntu 下的 zsh 中运行,只能显示到这个字符:龻,后面的都是乱码,这个字符对应的十六进制数是 9fbb
。
结果又意外发现,最后一个字符似乎不是 9fbb,而是 9fcc(改 URL 一个一个试出来的)。
来源:https://www.fileformat.info/info/unicode/char/9fcc/index.htm
对应文字图片:https://www.fileformat.info/info/unicode/char/9fcc/sample.png
当然,这个来源并不保证权威,也可能有错误,涉及中文字符的范围还是使用 4e00 - 9fff 比较保险。
比如正则表达式:[\u4e00-\u9fff]
。
看来是需要抽半天空闲时间,仔细研究研究编码问题了。
51CTO 博客,lover007,正则匹配中文及常用正则表达式 ↩
我个人比较喜欢的方式,简单高效。
class Application:
pass
APP = Application()
import threading
class singleton:
_instance_lock = threading.Lock()
def __init__(self, decorated):
self._decorated = decorated
def instance(self):
if not hasattr(self, '_instance'):
with singleton._instance_lock:
if not hasattr(self, '_instance'):
self._instance = self._decorated()
return self._instance
def __call__(self):
raise TypeError('Singletons must be accessed through `instance()`.')
def __instancecheck__(self, inst):
return isinstance(inst, self._decorated)
def __getattr__(self, name):
if name == '_instance':
raise AttributeError("'singleton' object has no attribute '_instance'")
return getattr(self.instance(), name)
@singleton
class Application:
pass
Application.instance()
def singleton(cls):
_instances = {}
def _inner(*args, **kargs):
if cls not in _instances:
_instances[cls] = cls(*args, **kargs)
return _instances[cls]
return _inner
class Application:
_instance_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
with cls._instance_lock:
if not hasattr(cls, "_instance"):
cls._instance = object.__new__(cls)
return cls._instance
改成通用方式:
import threading
class SingletonMeta(type):
_instances = {}
_instance_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._instance_lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class Application(metaclass=SingletonMeta):
pass
例子:搜索用户表 user
中字段 phone
包含 520
的行。
和 pymysql 等库一样的用:
keyword = '520'
conn.execute('select * from user where phone like "%%%s%%";' % keyword)
conn.execute('select * from user where phone like "%%%s%%";', keyword)
.like
方法q = session.query(model.User.id, model.User.phone).filter(model.User.phone.like(f'%{keyword}%'))
qs = qs.all()
# print(qs.statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone LIKE :phone_1
对应的大小写不敏感方法有 ilike
(lower("user".phone) LIKE lower(:phone_1)
)
还有:not_like
, not_ilike
.contains
方法print(session.query(model.User.id, model.User.phone).filter(model.User.phone.contains(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE '%' || :phone_1 || '%')
.regexp_match
方法(1.4 新增)对应的是 MySQL 支持的 REGEXP
操作符。
print(session.query(model.User.id, model.User.phone).filter(model.User.phone.regexp_match(keyword)).statement)
session.query(model.User.id, model.User.phone).filter(model.User.phone.regexp_match(keyword)).all()
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone <regexp> :phone_1
.startswith
和 .endswith
print(session.query(model.User.id, model.User.phone).filter(model.User.phone.startswith(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE :phone_1 || '%')
print(session.query(model.User.id, model.User.phone).filter(model.User.phone.endswith(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE '%' || :phone_1)
.match
方法对应的是数据库的 MATCH (col1,col2,...) AGAINST (expr [search_modifier])
全文索引方法。
对单字段同样可用,不过需要先建立 FULLTEXT 索引。
print(session.query(model.User.id, model.User.phone).filter(model.User.phone.match(keyword)).statement)
session.query(model.User.id, model.User.phone).filter(model.User.phone.match(keyword)).all()
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone MATCH :phone_1
创建 engine 相当于通过适配层对接了原生数据库接口。当 excute
方法和 connect
方法首次调用时,Engine 建立了一个正式的 DBAPI 连接到数据库,之后的所有数据库交互都是通过这个连接发出。
使用 ORM 时,Engine 是个幕后工作者,也就是说一旦创建之后,我们一般不会再直接接触到 Engine。
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=True) # echo => logging
http://sqlalchemy.readthedocs.org/en/latest/orm/session_basics.html
Session 是一个基于指定数据库连接的一个工作区。文档上有这么一个例子:如果将一个应用程序线程当作一场派对中的一个来宾,会话就是这个来宾手上的盘子,数据就是盘子中的食物,数据库就是厨房。
何时创建,何时提交,何时关闭?
Session = sessionmaker(bind=engine)
# 等同于:
# Session = sessionmaker()
# Session.configure(bind=engine)
session = Session()
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __repr__(self):
return "<User(name='%s', fullna
又出来一个幕后工作者:Mapper
,通过 模型类.__table__
访问。
print type(User.__table__) # <class 'sqlalchemy.sql.schema.Table'>
print repr(User.__table__)
# Table('users', MetaData(bind=None),
# Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
# Column('name', String(), table=<users>),
# Column('fullname', String(), table=<users>),
# Column('password', String(), table=<users>), schema=None)
u = User(name='ed', fullname='Ed Jones', password='password')
session.add(u)
# 一次添加多条记录
session.add_all([
User(name='wendy', fullname='Wendy Williams', password='foobar'),
User(name='mary', fullname='Mary Contrary', password='p@55vv0rd'),
User(name='fred', fullname='Fred Green', password='123456')
])
print session.new
print session.query(User).all()
for name, fullname in session.query(User.name, User.fullname):
print name.ljust(10), fullname
for row in session.query(User, User.name):
print row.User, row.name
直接遍历 Query 对象就相当于按照默认顺序获取所有对象。
filte_by
qs = session.query(User).filter_by(name='ed')
users = qs.all()
our_user = qs.first()
允许使用 Python 表达式和类属性!
query = session.query(User.name)
queryset = query.filter(User.fullname == 'Ed Jones')
for name, in queryset:
print name
通用 Filter 操作符
query.filter(User.name == 'ed')
query.filter(User.name != 'ed')
query.filter(User.name.like('%ed%'))
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))))
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.name == None)
query.filter(User.name.is_(None))
query.filter(User.name != None)
query.filter(User.name.isnot(None))
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')) # from sqlalchemy import and_
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
query.filter(or_(User.name == 'ed', User.name == 'wendy')) # from sqlalchemy import or_
query.filter(User.name.match('wendy'))
for instance in session.query(User).order_by(User.id):
print instance.name.ljust(10), instance.fullname
label = User.name.label('name_label')
for row in session.query(label).all():
print(row.name_label)
from sqlalchemy.orm import aliased
user_alias = aliased(User, name='user_alias')
for row in session.query(user_alias, user_alias.name).all():
print row.user_alias
和 Django 一样,使用切片实现。
.all()
返回查找到的所有纪录组成的列表。.first()
返回查找到的第一条记录,没有找到返回 None。.one()
只查找到一条记录时,返回该条记录。否则抛出异常:sqlalchemy.orm.exc.NoResultFound: No row was found for one()
sqlalchemy.orm.exc.MultipleResultsFound: Multiple rows were found for one()
.scalar()
用 .one()
方法模拟出 .first()
方法的效果。不同的是,.first()
方法只获取那一条(LIMIT 1
),而调用 .one()
方法却获取了所有记录,不过只返回了第一条。这是不一样的!from sqlalchemy import text
# 例 1,简单用法
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all():
print user.name
# 例 2,定义参数
session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one()
# 例 3,此时 Query 几乎只检测字段是否合法~
sql = text("SELECT * FROM users where name=:name")
session.query(User).from_statement(sql).params(name='ed').all()
session.query("id", "name").from_statement(sql).params(name='ed').all()
u = session.query(User).filter_by(name='fred')[0]
u.password = '12345678'
session.commit()
print u in session
u = session.query(User).filter_by(name='fred')[0]
print u.password
jack = session.query(User).filter_by(name='jack').one()
session.delete(jack)
session.query(User).filter_by(name='jack').count()
session.query(User).filter(User.name.like('%ed')).count()
from sqlalchemy import func
session.query(func.count(User.name), User.name).group_by(User.name).all()
session.query(func.count('*')).select_from(User).scalar()
session.query(func.count(User.id)).scalar()
commit
u = session.query(User).all()[-1]
u.password = '123456'
print session.dirty # IdentitySet
# 此时 add_all 的新增操作实际上还没有提交
# session.commit() # 提交到数据库
rollback
u = session.query(User).all()[-1]
u.password = '123456'
session.rollback() # 撤销更新
print session.dirty
print 'After Rollback, PWD:'
print u.password # 会先查找一遍~
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, backref
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", backref=backref('addresses', order_by=id))
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
# 或者到 User 中声明,如下:
# class User(Base):
# addresses = relationship("Address", order_by="Address.id", backref="user")
jack = User(name='jack', fullname='Jack Bean', password='qwerty')
print jack.addresses
jack.addresses = [
Address(email_address='jack@google.com'),
Address(email_address='j25@yahoo.com'),
]
print jack.addresses[1]
print jack.addresses[1].user
session.add(jack)
print '-' * 70
session.commit() # 也会插入 Address 信息
print '-' * 70
jack = session.query(User).filter_by(name='jack').one()
print jack
print '-' * 70
print jack.addresses # lazy loading
for u, a in session.query(User, Address).\
filter(User.id==Address.user_id).\
filter(Address.email_address=='jack@google.com').\
all():
print u
print a
# <User(name='jack', fullname='Jack Bean', password='gjffdd')>
# <Address(email_address='jack@google.com')>
Join 也可以如下表示:session.query(User).join(Address)
如果两张表没有声明的关联,或者有多个关联,最好使用下面的形式:
query.join(Address, User.id==Address.user_id) # explicit condition
query.join(User.addresses) # specify relationship from left to right
query.join(Address, User.addresses) # same, with explicit target
query.join('addresses') # same, using a string
默认是 INNER JOIN:
session.query(User).join(Address). \
filter(Address.email_address == 'jack@google.com'). \
all()
# SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
# FROM users
# JOIN addresses ON users.id = addresses.user_id
# WHERE addresses.email_address = 'jack@google.com'
使用其他连接方式:
query.outerjoin(User.addresses) # LEFT OUTER JOIN
或者使用 join 方法的其他参数,具体信息,在后面章节会涉及。
from sqlalchemy.orm import aliased
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in \
session.query(User.name, adalias1.email_address, adalias2.email_address).\
join(adalias1, User.addresses).\
join(adalias2, User.addresses).\
filter(adalias1.email_address=='jack@google.com').\
filter(adalias2.email_address=='j25@yahoo.com'):
print username, email1, email2
# jack jack@google.com j25@yahoo.com
from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').label('address_count')).\
group_by(Address.user_id).subquery()
for u, count in session.query(User, stmt.c.address_count).\
outerjoin(stmt, User.id==stmt.c.user_id).\
order_by(User.id):
print u, count
stmt = session.query(Address).\
filter(Address.email_address != 'j25@yahoo.com').subquery()
adalias = aliased(Address, stmt)
for user, address in session.query(User, adalias).\
join(adalias, User.addresses):
print user
print address
from sqlalchemy.sql import exists
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
print name
for name, in session.query(User.name).filter(User.addresses.any()):
print name
for name, in session.query(User.name).\
filter(User.addresses.any(Address.email_address.like('%google%'))):
print name
session.query(Address).filter(~Address.user.has(User.name=='jack')).all()
query.filter(User.addresses.contains(someaddress))
query.filter(User.addresses.any(Address.email_address == 'bar'))
query.filter(User.addresses.any(email_address='bar'))
query.filter(Address.user.has(name='ed'))
session.query(Address).with_parent(someuser, 'addresses')
Eager Loading 直译过来,表示立即载入。
http://sqlalchemy.readthedocs.org/en/latest/orm/loading_relationships.html
默认是 Lazy Loading。
from sqlalchemy.orm import subqueryload
jack = session.query(User).\
options(subqueryload(User.addresses)).\
filter_by(name='jack').one()
subqueryload()
when used in conjunction with limiting such as Query.first()
, Query.limit()
or Query.offset()
should also include Query.order_by()
on a unique column in order to ensure correct results.
默认 LEFT OUTER JOIN。
from sqlalchemy.orm import joinedload
jack = session.query(User).\
options(joinedload(User.addresses)).\
filter_by(name='jack').one()
joinedload() is not a replacement for join()
The join created by joinedload() is anonymously aliased such that it does not affect the query results. An Query.order_by() or Query.filter() call cannot reference these aliased tables - so-called “user space” joins are constructed using Query.join(). The rationale for this is that joinedload() is only applied in order to affect how related objects or collections are loaded as an optimizing detail - it can be added or removed with no impact on actual results. See the section The Zen of Eager Loading for a detailed description of how this is used.
from sqlalchemy.orm import contains_eager
jacks_addresses = session.query(Address).\
join(Address.user).\
filter(User.name=='jack').\
options(contains_eager(Address.user)).\
all()
回到曾经的示例——删除 Jack 上来,如果 Jack 有几个地址,实际上 Jack 删除之后那几个地址的用户主键会被设置成 NULL,但是并没有删除。
一个小问题。
The Web Server Gateway Interface (WSGI, pronounced whiskey or WIZ-ghee) is a simple calling convention for web servers to forward requests to web applications or frameworks written in the Python programming language.
The current version of WSGI, version 1.0.1, is specified in Python Enhancement Proposal (PEP) 3333.
Web 服务器网关接口(WSGI,发音为威士忌或 WIZ-ghee)是一种简单的调用约定,用于将请求转发到用 Python 编写的 Web 应用程序或框架的 Web 服务器。
当前版本的 WSGI,即 1.0.1 版本,由 Python 增强提案(PEP)3333 指定。
WSGI was originally specified as PEP-333 in 2003.
PEP-3333, published in 2010, updates the specification for Python 3.
WSGI 最初是在 2003 年的 PEP-333 中指定的。
2010 年发布的 PEP-3333 更新了 Python 3 的规范。
In 2003, Python web frameworks were typically written against only CGI, FastCGI, mod_python, or some other custom API of a specific web server. To quote PEP 333:
2003 年,Python Web 框架通常只针对 CGI、FastCGI、mod_python 或某些特定 Web 服务器的其他自定义 API 编写。引用 PEP 333 的话:
Python currently boasts a wide variety of web application frameworks, such as Zope, Quixote, Webware, SkunkWeb, PSO, and Twisted Web -- to name just a few. This wide variety of choices can be a problem for new Python users, because generally speaking, their choice of web framework will limit their choice of usable web servers, and vice versa... By contrast, although Java has just as many web application frameworks available, Java's "servlet" API makes it possible for applications written with any Java web application framework to run in any web server that supports the servlet API.
Python 目前拥有众多 Web 应用程序框架,例如 Zope、Quixote、Webware、SkunkWeb、PSO 和 Twisted Web 等等。这种广泛的选择可能会成为新 Python 用户的问题,因为一般来说,他们选择的 Web 框架将限制他们可用的 Web 服务器的选择,反之亦然……相比之下,尽管 Java 也拥有同样多的 Web 应用程序框架,但 Java 的“servlet”API 使得使用任何 Java Web 应用程序框架编写的应用程序都能在支持 servlet API 的任何 Web 服务器中运行。
WSGI was thus created as an implementation-neutral interface between web servers and web applications or frameworks to promote common ground for portable web application development.
因此,WSGI 被创建为实现中立的 Web 服务器和 Web 应用程序或框架之间的接口,以促进可移植 Web 应用程序的开发和共同基础。
The WSGI has two sides:
WSGI 有两个方面:
Between the server and the application, there may be one or more WSGI middleware components, which implement both sides of the API, typically in Python code.
在服务器和应用程序之间,可能会有一个或多个WSGI 中间件组件,它们在 Python 代码中实现了 API 的两个方面。
WSGI does not specify how the Python interpreter should be started, nor how the application object should be loaded or configured, and different frameworks and webservers achieve this in different ways.
WSGI 不指定 Python 解释器应如何启动,也不指定应用程序对象如何加载或配置,不同的框架和 Web 服务器以不同的方式实现这些功能。
A WSGI middleware component is a Python callable that is itself a WSGI application, but may handle requests by delegating to other WSGI applications. These applications can themselves be WSGI middleware components.
一个 WSGI 中间件组件是一个 Python 可调用对象,它本身是一个 WSGI 应用程序,但可以通过委托给其他 WSGI 应用程序来处理请求。这些应用程序本身可以是 WSGI 中间件组件。
A middleware component can perform such functions as:
中间件组件可以执行以下功能:
A WSGI-compatible "Hello, World!" application written in Python:
def application(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
yield b'Hello, World!\n'
Where:
application
, which takes two parameters, environ
and start_response
. environ
is a dictionary containing CGI environment variables as well as other request parameters and metadata under well-defined keys. start_response
is a callable itself, taking two positional parameters, status
and response_headers
.start_response
, specifying "200 OK" as the HTTP status and a "Content-Type" response header.A full example of a WSGI network server is outside the scope of this article. Below is a sketch of how one would call a WSGI application and retrieve its HTTP status line, response headers, and response body, as Python objects. Details of how to construct the environ
dict have been omitted.
完整的 WSGI 网络服务器示例不再本文范围之内。以下只是一个 Demo,说明如何调用 WSGI 应用程序并检索其 HTTP 状态行、响应头和响应正文,作为 Python 对象。如何构造 environ 字典的详细信息已被省略。
from io import BytesIO
def call_application(app, environ):
status = None
headers = None
body = BytesIO()
def start_response(rstatus, rheaders):
nonlocal status, headers
status, headers = rstatus, rheaders
app_iter = app(environ, start_response)
try:
for data in app_iter:
assert status is not None and headers is not None, \
"start_response() was not called"
body.write(data)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
return status, headers, body.getvalue()
environ = {...} # "environ" dict
status, headers, body = call_application(app, environ)
PEP 3333 引入了一些新特性和更严格的规范,以提高 Python Web 应用程序的可移植性和互操作性。其中一些变化包括:
此外,PEP 3333 还引入了一些新的建议,包括使用可选的服务器和客户端请求信息,提供更好的错误处理和日志记录,以及更好地处理请求和响应的二进制数据。
time
库
本文主要是说 time
库。
参考文档:https://docs.python.org/2/library/time.html
a = 7
"{:04b}".format(a)
# 0111
f"{a:04b}"
# 0111
bin(a)
# 0b111