#70 鲁迅:谈所谓大内档案

2014-11-21

所谓「大内档案」 这东西,在清朝的内阁里积存了三百多年,在孔庙里塞了十多年,谁也一声不响。自从历史博物馆将这残余卖给纸铺子,纸铺子转卖给罗振玉,罗振玉转卖给日本人,于是乎大有号啕之声,仿佛国宝已失,国脉随之似的。前几年,我也曾经过几个人的议论,所记得是的一个是金梁,登在《东方杂志》上;还有罗振玉和王国维,随时发感慨。最近的是《北新半月刊》上的《论档案的售出》蒋彝潜先生做的。

#69 韩寒:韩寒:杯中窥人

2014-10-22

第一次读到这篇文章的时候,应该还是 04 年左右,可能在读高一。
在我拿着一张试卷字数都难凑齐的时候,人家那时候也才高二,应试作文竟可以写到这么优秀。
我是很羞愧的。

#67 SQLAlchemy LIKE

2014-09-20

例子:搜索用户表 user 中字段 phone 包含 520 的行。

SQL

和 pymysql 等库一样的用:

keyword = '520'
conn.execute('select * from user where phone like "%%%s%%";' % keyword)
conn.execute('select * from user where phone like "%%%s%%";', keyword)

.like 方法

q = session.query(model.User.id, model.User.phone).filter(model.User.phone.like(f'%{keyword}%'))
qs = qs.all()
# print(qs.statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone LIKE :phone_1

对应的大小写不敏感方法有 ilike (lower("user".phone) LIKE lower(:phone_1))
还有:not_like, not_ilike

.contains 方法

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.contains(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE '%' || :phone_1 || '%')

.regexp_match 方法(1.4 新增)

对应的是 MySQL 支持的 REGEXP 操作符。

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.regexp_match(keyword)).statement)
session.query(model.User.id, model.User.phone).filter(model.User.phone.regexp_match(keyword)).all()
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone <regexp> :phone_1

.startswith.endswith

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.startswith(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE :phone_1 || '%')
print(session.query(model.User.id, model.User.phone).filter(model.User.phone.endswith(keyword)).statement)
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE ("user".phone LIKE '%' || :phone_1)

.match 方法

对应的是数据库的 MATCH (col1,col2,...) AGAINST (expr [search_modifier]) 全文索引方法。
对单字段同样可用,不过需要先建立 FULLTEXT 索引。

print(session.query(model.User.id, model.User.phone).filter(model.User.phone.match(keyword)).statement)
session.query(model.User.id, model.User.phone).filter(model.User.phone.match(keyword)).all()
# SELECT "user".id, "user".phone
# FROM "user"
# WHERE "user".phone MATCH :phone_1

参考资料与拓展阅读

#65 HTTP 方法

2014-09-08

https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol#Request_methods

方法清单

  • GET R 查
  • POST C 增
  • PUT U 改
  • DELETE D 删
  • PATCH U 改
  • HEAD 和 GET 相同,不过只返回请求头,不返回请求体
  • OPTIONS 返回这个请求支持的 HTTP 方法
  • TRACE 返回服务器收到的请求,调试用
  • CONNECT 为代理服务器准备的 HTTP 隧道方法

PATCH 和 PUT 的区别:PUT 是使用新版本来替换旧版本,PATCH 则是在旧版本的基础上修改。

  1. HTTP/1.0 只定义了 HEAD, GET, POST 三个方法,HTTP/1.1 增加了 PUT, DELETE, OPTIONS, TRACE, CONNECT 五个方法。
  2. PATCH 方法定义在 RFC 5789: PATCH Method for HTTP 中,目前还是一个草案,不在 HTTP/1.1 中。
  3. 除了 PATCH 方法之外,其实还出现过 LINK, UNLINK 两个方法,不过后来直接被无视了。
    甚至和 PUT, DELETE 一同列在 HTTP/1.0 标准的 Additional Request Methods 中
    而且还出现在了 HTTP/1.1 草案(RFC 2616)中
  4. TRACE, CONNECT 这两个方法很多 HTTP 服务都不支持,甚至有些服务都不支持 OPTIONS。
  5. Django 不支持 PUT, PATCH 方法(拿不到 body)。

示例

> OPTIONS / HTTP/1.1
> Host: www.markjour.com
> User-Agent: curl/7.74.0
> Accept: */*

< HTTP/1.1 200 OK
< Date: Sat, 11 Jan 2014 06:59:50 GMT
< Server: Apache
< Allow: OPTIONS,GET,HEAD,POST
< Vary: Accept-Encoding,User-Agent
< Content-Length: 0
< Content-Type: text/html
> GET / HTTP/1.1
> Host: www.baidu.com
> User-Agent: curl/7.74.0
> Accept: */*

< HTTP/1.1 200 OK
< Accept-Ranges: bytes
< Cache-Control: private, no-cache, no-store, proxy-revalidate, no-transform
< Connection: keep-alive
< Content-Length: 2443
< Content-Type: text/html
< Date: Sat, 11 Jan 2014 06:52:33 GMT
< Etag: "588603fd-98b"
< Last-Modified: Mon, 23 Jan 2017 13:24:13 GMT
< Pragma: no-cache
< Server: bfe/1.0.8.18
< Set-Cookie: BDORZ=27315; max-age=86400; domain=.baidu.com; path=/

分类

RFC 7231 中的 Safe Methods,Idempotent Methods,Cacheable Methods。

  1. 安全:不会对资源产生影响(副作用除外,比如:请求计数,计费,日志等):
  2. GET
  3. HEAD
  4. OPTIONS
  5. TRACE
  6. 幂等:重复请求也不会对资源产生影响:

  7. 上面四个安全方法自不用说

  8. PUT
  9. DELETE

为什么 XXX 不幂等

  • POST 可能多次创建资源
  • PATCH 根据语义,对计数 +1 这种场景使用 PATCH 方法,这样的话,自然不是幂等
    如果直接赋值修改原数据的部分属性,则是幂等的(可以看作是对属性的批量 PUT 替换)
  • CONNECT 隧道而已,里面的请求具体是什么都不知道

  • 可缓存(该小节在 RFC 2616 没有):

  • GET

  • HEAD
  • POST
  • PATCH

PUT,DELETE 可以导致之前的相关缓存失效。

为什么 POST/PATCH 方法 cacheable

RFC 2616:

Some HTTP methods MUST cause a cache to invalidate an entity. This is either the entity referred to by the Request-URI, or by the Location
or Content-Location headers (if present). These methods are:

  • PUT
  • DELETE
  • POST

RFC 7231:

this specification defines GET, HEAD, and POST as cacheable, although the overwhelming majority of cache implementations only support GET and HEAD.

Mozilla Developer Network:

Only if freshness information is included

根据相关 RFC,如果响应头中有 Expires, Cache-Control 头,可以缓存 POST/PATCH。
我想,这个可能是为了避免资源重复创建而设计?
不过现实是,没有浏览器或服务器支持缓存 POST/PATCH 请求。

备注:最终可缓存状态还取决于 HTTP 状态码, 必须是 200、203、204、206、300、301 才可以缓存。

拓展

当然,只需要客户端和服务器端都能支持,请求方法可以自定义,如:

  • LIST 列出资源,和 GET 方法对应
  • UPLOAD 上传
  • DOWNLOAD 下载
  • EXIST 是否存在
  • COLLECT 收藏
  • STAR 星标
  • VOTEUP 赞
  • VOTEDOWN 踩
  • 等等

Update @ 2020-04-01:

WebDAV 协议就可以看作是一个 HTTP 拓展, 增加了以下方法的支持:

  • COPY 复制
  • LOCK 锁定
  • MKCOL 创建集合(目录)
  • MOVE 移动
  • PROPFIND 查询属性
  • PROPPATCH 修改属性
  • UNLOCK 解锁

附:相关 RFC

参考资料与拓展阅读

#64 SQLAlchemy

2014-08-21

数据库连接:Engine

创建 engine 相当于通过适配层对接了原生数据库接口。当 excute 方法和 connect 方法首次调用时,Engine 建立了一个正式的 DBAPI 连接到数据库,之后的所有数据库交互都是通过这个连接发出。
使用 ORM 时,Engine 是个幕后工作者,也就是说一旦创建之后,我们一般不会再直接接触到 Engine。

from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=True)  # echo => logging

连接字符串(DB URL)

会话(Session)

http://sqlalchemy.readthedocs.org/en/latest/orm/session_basics.html

Session 是一个基于指定数据库连接的一个工作区。文档上有这么一个例子:如果将一个应用程序线程当作一场派对中的一个来宾,会话就是这个来宾手上的盘子,数据就是盘子中的食物,数据库就是厨房。

何时创建,何时提交,何时关闭?

Session = sessionmaker(bind=engine)
# 等同于:
# Session = sessionmaker()
# Session.configure(bind=engine)
session = Session()

定义模型(数据库映射)

基类

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

模型的定义

from sqlalchemy import Column, Integer, String


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)

    def __repr__(self):
        return "<User(name='%s', fullna

表结构(Schema)

又出来一个幕后工作者:Mapper,通过 模型类.__table__ 访问。

print type(User.__table__)  # <class 'sqlalchemy.sql.schema.Table'>
print repr(User.__table__)
# Table('users', MetaData(bind=None),
#       Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
#       Column('name', String(), table=<users>),
#       Column('fullname', String(), table=<users>),
#       Column('password', String(), table=<users>), schema=None)

数据库操作

1. 添加

u = User(name='ed', fullname='Ed Jones', password='password')
session.add(u)

# 一次添加多条记录
session.add_all([
    User(name='wendy', fullname='Wendy Williams', password='foobar'),
    User(name='mary', fullname='Mary Contrary', password='p@55vv0rd'),
    User(name='fred', fullname='Fred Green', password='123456')
])
print session.new
print session.query(User).all()

2. 查找

2.1 查询对象 Query

for name, fullname in session.query(User.name, User.fullname):
    print name.ljust(10), fullname

for row in session.query(User, User.name):
    print row.User, row.name

直接遍历 Query 对象就相当于按照默认顺序获取所有对象。

2.2 条件:filte_by

qs = session.query(User).filter_by(name='ed')
users = qs.all()
our_user = qs.first()

2.3 更加灵活的 filter

允许使用 Python 表达式和类属性!

query = session.query(User.name)
queryset = query.filter(User.fullname == 'Ed Jones')
for name, in queryset:
    print name

通用 Filter 操作符

  • Equal
    query.filter(User.name == 'ed')
    query.filter(User.name != 'ed')
  • LIKE
    query.filter(User.name.like('%ed%'))
  • IN / NOT IN
    query.filter(User.name.in_(['ed', 'wendy', 'jack']))
    query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))))
    query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
  • NULL
    query.filter(User.name == None)
    query.filter(User.name.is_(None))
    query.filter(User.name != None)
    query.filter(User.name.isnot(None))
  • AND
    query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
    query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')) # from sqlalchemy import and_
  • OR
    query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
    query.filter(or_(User.name == 'ed', User.name == 'wendy')) # from sqlalchemy import or_
  • MATCH:
    query.filter(User.name.match('wendy'))
    注意:match 使用了数据库相关的内容,可能在不同数据库后端上有不同的效果,而且部分数据库不支持。

2.4 排序:order_by

for instance in session.query(User).order_by(User.id):
    print instance.name.ljust(10), instance.fullname

2.5 别名

label = User.name.label('name_label')
for row in session.query(label).all():
    print(row.name_label)

from sqlalchemy.orm import aliased
user_alias = aliased(User, name='user_alias')
for row in session.query(user_alias, user_alias.name).all():
    print row.user_alias

2.6 LIMIT 和 OFFSET

和 Django 一样,使用切片实现。

2.7 其他方法

  • .all() 返回查找到的所有纪录组成的列表。
  • .first() 返回查找到的第一条记录,没有找到返回 None。
  • .one() 只查找到一条记录时,返回该条记录。否则抛出异常:
    sqlalchemy.orm.exc.NoResultFound: No row was found for one()
    sqlalchemy.orm.exc.MultipleResultsFound: Multiple rows were found for one()
  • .scalar().one() 方法模拟出 .first() 方法的效果。不同的是,.first() 方法只获取那一条(LIMIT 1),而调用 .one() 方法却获取了所有记录,不过只返回了第一条。这是不一样的!

2.8 对原生 SQL 的支持

  • text
  • from_statement
  • params 为 Query 增加参数
from sqlalchemy import text

# 例 1,简单用法
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all():
    print user.name

# 例 2,定义参数
session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one()

# 例 3,此时 Query 几乎只检测字段是否合法~
sql = text("SELECT * FROM users where name=:name")
session.query(User).from_statement(sql).params(name='ed').all()
session.query("id", "name").from_statement(sql).params(name='ed').all()

3. 更新

u = session.query(User).filter_by(name='fred')[0]
u.password = '12345678'
session.commit()
print u in session
u = session.query(User).filter_by(name='fred')[0]
print u.password

4. 删除

jack = session.query(User).filter_by(name='jack').one()
session.delete(jack)
session.query(User).filter_by(name='jack').count()

5. Count

session.query(User).filter(User.name.like('%ed')).count()
from sqlalchemy import func
session.query(func.count(User.name), User.name).group_by(User.name).all()
session.query(func.count('*')).select_from(User).scalar()
session.query(func.count(User.id)).scalar()

6. 事务:commit 和 rollback

commit

u = session.query(User).all()[-1]
u.password = '123456'
print session.dirty  # IdentitySet
# 此时 add_all 的新增操作实际上还没有提交
# session.commit()  # 提交到数据库

rollback

u = session.query(User).all()[-1]
u.password = '123456'
session.rollback()  # 撤销更新
print session.dirty
print 'After Rollback, PWD:'
print u.password  # 会先查找一遍~

关系

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, backref

class Address(Base):
    __tablename__ = 'addresses'

    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))

    user = relationship("User", backref=backref('addresses', order_by=id))

    def __repr__(self):
        return "<Address(email_address='%s')>" % self.email_address

# 或者到 User 中声明,如下:
# class User(Base):
#     addresses = relationship("Address", order_by="Address.id", backref="user")

关联操作

jack = User(name='jack', fullname='Jack Bean', password='qwerty')
print jack.addresses
jack.addresses = [
    Address(email_address='jack@google.com'),
    Address(email_address='j25@yahoo.com'),
]
print jack.addresses[1]
print jack.addresses[1].user
session.add(jack)
print '-' * 70
session.commit()  # 也会插入 Address 信息
print '-' * 70
jack = session.query(User).filter_by(name='jack').one()
print jack
print '-' * 70
print jack.addresses   # lazy loading

JOIN

for u, a in session.query(User, Address).\
        filter(User.id==Address.user_id).\
        filter(Address.email_address=='jack@google.com').\
        all():
    print u
    print a
# <User(name='jack', fullname='Jack Bean', password='gjffdd')>
# <Address(email_address='jack@google.com')>

Join 也可以如下表示:session.query(User).join(Address)

如果两张表没有声明的关联,或者有多个关联,最好使用下面的形式:

query.join(Address, User.id==Address.user_id)  # explicit condition
query.join(User.addresses)  # specify relationship from left to right
query.join(Address, User.addresses)  # same, with explicit target
query.join('addresses')  # same, using a string

默认是 INNER JOIN:

session.query(User).join(Address). \
    filter(Address.email_address == 'jack@google.com'). \
    all()
# SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password
# FROM users
# JOIN addresses ON users.id = addresses.user_id
# WHERE addresses.email_address = 'jack@google.com'

使用其他连接方式:

query.outerjoin(User.addresses)   # LEFT OUTER JOIN

或者使用 join 方法的其他参数,具体信息,在后面章节会涉及。

aliased 又出现了

from sqlalchemy.orm import aliased
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in \
    session.query(User.name, adalias1.email_address, adalias2.email_address).\
    join(adalias1, User.addresses).\
    join(adalias2, User.addresses).\
    filter(adalias1.email_address=='jack@google.com').\
    filter(adalias2.email_address=='j25@yahoo.com'):
    print username, email1, email2
# jack jack@google.com j25@yahoo.com

子查询

from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').label('address_count')).\
    group_by(Address.user_id).subquery()
for u, count in session.query(User, stmt.c.address_count).\
        outerjoin(stmt, User.id==stmt.c.user_id).\
        order_by(User.id):
    print u, count

使用子查询

stmt = session.query(Address).\
    filter(Address.email_address != 'j25@yahoo.com').subquery()
adalias = aliased(Address, stmt)
for user, address in session.query(User, adalias).\
        join(adalias, User.addresses):
    print user
    print address

EXIST

from sqlalchemy.sql import exists
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
    print name

for name, in session.query(User.name).filter(User.addresses.any()):
    print name

for name, in session.query(User.name).\
    filter(User.addresses.any(Address.email_address.like('%google%'))):
    print name

session.query(Address).filter(~Address.user.has(User.name=='jack')).all()

其他方法

query.filter(User.addresses.contains(someaddress))
query.filter(User.addresses.any(Address.email_address == 'bar'))
query.filter(User.addresses.any(email_address='bar'))
query.filter(Address.user.has(name='ed'))
session.query(Address).with_parent(someuser, 'addresses')

预加载

Eager Loading 直译过来,表示立即载入。

http://sqlalchemy.readthedocs.org/en/latest/orm/loading_relationships.html

默认是 Lazy Loading。

1. subqueryload

from sqlalchemy.orm import subqueryload
jack = session.query(User).\
    options(subqueryload(User.addresses)).\
    filter_by(name='jack').one()

subqueryload() when used in conjunction with limiting such as Query.first(), Query.limit() or Query.offset() should also include Query.order_by() on a unique column in order to ensure correct results.

2. joinedload

默认 LEFT OUTER JOIN。

from sqlalchemy.orm import joinedload
jack = session.query(User).\
    options(joinedload(User.addresses)).\
    filter_by(name='jack').one()

joinedload() is not a replacement for join()
The join created by joinedload() is anonymously aliased such that it does not affect the query results. An Query.order_by() or Query.filter() call cannot reference these aliased tables - so-called “user space” joins are constructed using Query.join(). The rationale for this is that joinedload() is only applied in order to affect how related objects or collections are loaded as an optimizing detail - it can be added or removed with no impact on actual results. See the section The Zen of Eager Loading for a detailed description of how this is used.

3. Join + Eagerload

from sqlalchemy.orm import contains_eager
jacks_addresses = session.query(Address).\
    join(Address.user).\
    filter(User.name=='jack').\
    options(contains_eager(Address.user)).\
    all()

关联对象的删除

回到曾经的示例——删除 Jack 上来,如果 Jack 有几个地址,实际上 Jack 删除之后那几个地址的用户主键会被设置成 NULL,但是并没有删除。

#63 MySQL set 类型

2014-08-20

顾名思义,就是集合类型。

set('a', 'b', ...)

每个字段可以是指定选项中的若干个(包含 0 个)。

#61 RabbitMQ & AMQP

2014-07-11

AMQP

Advanced Message Queuing Protocol, 高级消息队列协议

久负盛名的投资公司摩根大通(JPMorgan Chase)在 2005 年前后设计了 AMQP,并和红帽一同采用 Java 实现了这个协议(没过多久就改用 C++ 重构了一遍),这就是后来的 Apache Qpid。后来的一些消息队列也都支持 AMQP 协议,比如 RabbitMQ(采用 Erlang 开发)、Apache ActiveMQ(Java)。

其他常见的 MQ 协议还有:STOMP 1, MQTT 2,有时也会和 XMPP 3 做对比。

  • 2006/06 版本 0-8
  • 2006/12 版本 0-9
  • 2008/11 版本 0-9-1 RabbitMQ实现
  • 2011/10 版本 1.0
  • AMQP 移交给 OASIS 组织之后发布的第一个版本
  • 2014/04 该版本成为 ISO 国际标准。

基本概念

AMQP Arch

  • Message
  • DeliveryTag 这个标记十分重要
  • Properties
  • Header
  • Body
  • Content Type
  • Content Encoding
  • Message Queue
  • Message Broker
  • Message-Oriented Middleware 消息中间件,有时简写 MOM
  • Connection TCP 连接, 服务器永远不会主动关闭连接
  • Channel 通道,或者叫信道,逻辑连接,不同通道之间是完全隔离的 (ChannelID)
  • 通道的打开关闭
  • 多线程可以使用各自的通道
  • Server 消息队列服务器,就是指 Broker
  • Virtual Host 虚拟主机,消息队列中的逻辑隔离
  • Publisher 消息生产者
  • Exchange 交换机
  • Queue 队列
  • Binding 绑定,指定了队列和交换机之间的关系
  • RoutingKey 路由键,Binding 的附加参数,对消息进行过滤
    注意:有些地方将绑定时指定的 RoutingKey 叫做 BindingKey
  • Comsumer 消息消费者

基本流程

AMQP 协议中,基本数据单位是帧。有 9 种帧结构用来开启、控制、关闭两点之间的信息传输链路:

  1. 打开(连接)open
  2. 开始(会话)begin
  3. 附加(链路)attach
  4. 传输 transfer
  5. 流量控制 flow
  6. 状态通信 disposition
  7. 分离(链路)detach
  8. 结束(会话)end
  9. 关闭(连接)close

连接 Conection,会话 Session,链路 Link。
链路是单向的数据传输通道,消息(Transfer 帧)就在链路上传输。

+ OpenConnection
|   + StartSession
|   |   + AttachLink
|   |   |     Transfer
|   |   |     Flow
|   |   |     Disposition
|   |   + DetachLink
|   + EndSession
+ CloseConnection
  1. 定义 Exchange、Queue、Binding
  2. 生产者将消息投递给 Exchange
  3. Exchange 根据实现定义的路由策略(Binding)将消息转发到 Queue
  4. 消费者从 Queue 中拿到消息

RabbitMQ

实现了 AMQP 0-9-1。

端口

ps -ef | grep rabbitmq | grep -v grep
rabbitmq  966640       1  5 11:09 ?        00:00:12 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa  -noshell -noinput -s rabbit boot -boot start_sasl -lager crash_log false -lager handlers []
rabbitmq  966744  966640  0 11:09 ?        00:00:00 erl_child_setup 65536
rabbitmq  966775       1  0 11:09 ?        00:00:00 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
rabbitmq  966802  966744  0 11:09 ?        00:00:00 inet_gethost 4
rabbitmq  966803  966802  0 11:09 ?        00:00:00 inet_gethost 4

sudo nmap -p 1-65535 localhost
4369/tcp  open  epmd
5672/tcp  open  amqp
15672/tcp open  unknown
25672/tcp open  unknown
  • 4369 Erlang 端口映射器守护程序 (epmd)
  • 5672 服务端口
  • 15672 Web 接口
  • 25672 不知道干嘛的,来自 inet_dist_listen_min - inet_dist_listen_max, 我本机默认配置

Exchange 类型

AMQP 中定义的 4 种 Exchange 类型:

  1. direct 需要符合 RoutingKey 的完全匹配
  2. topic 支持模糊匹配:# 代表任意个单词,* 代表一个单词
  3. RoutingKey 应该是小数点隔开的单词,另外,不可超过 255 字节。
  4. BindingKey 可以包含上面说的模糊匹配字符。
  5. fanout 将消息广播给所有绑定到该 Exchange 的 Queue,忽略 RoutingKey
  6. headers 采用消息的 Header 与 Binding 的属性来进行匹配,忽略 RoutingKey
  7. Binding 中 x- 开头的属性不会参与匹配。
  8. Binding 可以定义 x-match 属性,any (默认值) 表示匹配中一个字段就行,all 表示所有字段都匹配。
  9. 奇怪的是官网教程中没有这种类型的示例。

预置 Exchange:

  • (AMQP default) direct
  • amq.direct direct
  • amq.fanout fanout
  • amq.headers headers
  • amq.match headers
  • amq.rabbitmq.log topic
  • amq.rabbitmq.trace topic
  • amq.topic topic

默认 Exchange (AMQP default) 有点特殊,所有的队列都自动绑定在上面,然后又是 direct 类型,这样一来,生产者 publish 时如果将消息投递到名字为空字符串的这个 Exchange,RoutingKey 填写队列名字,就可以直接将消息投递到队列中。

确认机制

  1. 来自 TCP 的启发。
  2. 分成消费者确认和生产者确认两部分。
  3. DeliveryTag 对于确认机制至关重要。

具体下来就是:

  • Basic.Ack basic_ack(delivery_tag=0, multiple=False) 可以一次确认多个消息
  • Basic.Nack basic_nack(delivery_tag=None, multiple=False, requeue=True)
  • Basic.Reject basic.reject(delivery_tag=None, requeue=True)

Nack 是 RabbitMQ 对 AMQP 的拓展,和 Reject 不同的是,Nack 可以一次拒绝该通道所有没有 ACK 的消息。

关于自动确认模式

命令

Connection

  1. Connection.Start
  2. Connection.StartOk
  3. Connection.Secure
  4. Connection.SecureOk
  5. Connection.Tune
  6. Connection.TuneOk
  7. Connection.Open
  8. Connection.OpenOk
  9. Connection.Close
  10. Connection.CloseOk
  11. Connection.Blocked
  12. Connection.Unblocked

Channel

  1. Channel.Open
  2. Channel.OpenOk
  3. Channel.Flow
  4. Channel.FlowOk
  5. Channel.Close
  6. Channel.CloseOk

Access

  1. Access.Request
  2. Access.RequestOk

Exchange 交换器

  1. Exchange.Declare
  2. Exchange.DeclareOk
  3. Exchange.Delete
  4. Exchange.DeleteOk
  5. Exchange.Bind 交换器绑定 (RabbitMQ 拓展)
  6. Exchange.BindOk
  7. Exchange.Unbind 交换器解绑 (RabbitMQ 拓展)
  8. Exchange.UnbindOk

Queue 队列

  1. Queue.Declare 队列声明
  2. Queue.DeclareOk
  3. Queue.Bind 队列绑定 (到交换器)
  4. Queue.BindOk
  5. Queue.Purge 队列清空 (没分配的,也就是 unack 不会被清空)
  6. Queue.PurgeOk
  7. Queue.Delete 队列删除
  8. Queue.DeleteOk
  9. Queue.Unbind 队列解绑
  10. Queue.UnbindOk

Basic

  1. Basic.Qos Quality of Service 设置:
  2. prefetch_size
  3. prefetch_count
  4. Basic.QosOk
  5. Basic.Consume 消费者开始消费
  6. Basic.ConsumeOk
  7. Basic.Cancel 取消消费者订阅
  8. Basic.CancelOk
  9. Basic.Publish 发布消息
  10. Basic.Return
  11. Basic.Deliver
  12. Basic.Get 直接从指定队列获取消息
  13. Basic.GetOk
  14. Basic.GetEmpty
  15. Basic.Ack 确认
  16. Basic.Reject
  17. Basic.RecoverAsync
  18. Basic.Recover
  19. Basic.RecoverOk
  20. Basic.Nack 负确认 (RabbitMQ 拓展)

Tx 事务

  1. Tx.Select
  2. Tx.SelectOk
  3. Tx.Commit
  4. Tx.CommitOk
  5. Tx.Rollback
  6. Tx.RollbackOk

Confirm

  1. Confirm.Select
  2. Confirm.SelectOk

话题 1:持久化

  1. 交换机持久化 durable=true
  2. 队列持久化 durable=true
  3. 绑定持久化
  4. 消息持久化 properties=pika.BasicProperties(delivery_mode = 2)
    basic_publish(exchange, routing_key, body, properties=None,
                  mandatory=False, immediate=False)
    

话题 2:可靠性的保障

  1. 事务
  2. 持久化
  3. 确认模式

参考资料与拓展阅读


  1. Streaming Text Oriented Messaging Protocol 

  2. 曾经是 Message Queuing Telemetry Transport 的简称, 后来不代表任何意义了
    img 

  3. Extensible Messaging and Presence Protocol, 一种通讯协议