SMS 网络编程 CMPP
2022-03-29
术语
- ISMG: Internet Short Message Gateway
- SMPP: Short Message Peer to Peer
一个国际上比较通用的短信网关协议
CMPP 可能兼容 SMPP,因为 Wireshark 会将 CMPP 会话识别成 SMPP
- CMPP: China Mobile Peer to Peer
中国移动开发的的短信网关协议
- SMC: Short Message Center
- GNS: Gateway Name Server
相当于 CMPP 网络的路由器
- SP: Service Provider
- SMC: Short Message Control
- ISMG_Id: 网关代码
- SP_Id: SP 企业代码
- SP_Code: SP 服务代码
- Service_Id: SP 业务类型
- MO: 手机发送 Originate
- MT: 手机接收 Terminated
网络连接
CMPP 基于 TCP 协议。可以长连接,也可以短连接。
长连接支持一次连接发送多次 CMPP 消息,没有消息的时候需要维持心跳。
短连接则是一次 CMPP 通信之后就断开连接。
关于心跳:
建议每 3 分钟一次心跳,如果 60 秒内没有心跳响应,应该再次心跳,总共 3 次没有响应就断开连接。
关于重试(网关与 SP 之间,网关之间):
60 秒之后没有响应,立即重试,总共 3 次没有响应就停发。
消息采用并发方式发送,加以滑动窗口流量控制,窗口大小参数 W 可配置,现阶段建议为 16,即接收方在应答前一次收到的消息最多不超过 16 条。
关于短连接:
60 秒超时,重试 2 次。
端口
- 7890 长连接(SP 与网关之间)
- 7900 短连接(SP 与网关之间,网关之间)
- 7930 长连接(网关之间)
- 9168 短连接(网关与 GNS 之间)
数据类型
var Total_Length uint32 // 消息总长度
var Command_Id uint32 // 命令或响应类型
var Sequence_Id uint32 // 消息流水号, 递增, 步长为 1, 循环使用
命令 |
Command_Id |
CMPP_CONNECT |
0x00000001 |
CMPP_TERMINATE |
0x00000002 |
CMPP_SUBMIT |
0x00000004 |
CMPP_DELIVER |
0x00000005 |
CMPP_QUERY |
0x00000006 |
CMPP_CANCEL |
0x00000007 |
CMPP_ACTIVE_TEST |
0x00000008 |
CMPP_FWD |
0x00000009 |
CMPP_MT_ROUTE |
0x00000010 |
CMPP_MO_ROUTE |
0x00000011 |
CMPP_GET_ROUTE |
0x00000012 |
CMPP_MT_ROUTE_UPDATE |
0x00000013 |
CMPP_MO_ROUTE_UPDATE |
0x00000014 |
CMPP_PUSH_MT_ROUTE_UPDATE |
0x00000015 |
CMPP_PUSH_MO_ROUTE_UPDATE |
0x00000016 |
RESP Command_Id 则是对应的 Command_Id 最高 4 位为 8,即 0x8X
。
SP 与 ISMG 之间的通信
对于一般开发来说就这 7 个接口。
- 建立连接 CMPP_CONNECT
- 断开连接 CMPP_TERMINATE
- 提交信息 CMPP_SUBMIT
- 获取状态 CMPP_DELIVER
- 撤回信息 CMPP_CANCEL
- 查询信息 CMPP_QUERY
只是一些统计信息,我想不到这个接口的应用场景
- 链路检测 CMPP_ACTIVE_TEST
连接 CMPP_CONNECT
var Source_Addr [6]byte // SP_Id
var AuthenticatorSource [16]byte // SP_Code
// md5(Source_Addr + 9 字节 0 + shared secret + timestamp)
// timestamp: MMDDHHMMSS, 月日时分秒, 补 0
var Version uint8 // 版本号, 高 4 位表示主版本号, 低 4 位表示次版本号, 最大 15.15
var Timestamp uint32 // 时间戳, 默认为 0
CMPP_CONNECT_RESP
var Status uint8 // 状态, 0 正确, 1 结构错误, 2 非法源地址, 3 认证错误, 4 版本错误, 5 其他错误
var AuthenticatorISMG [16]byte // ISMG 认证码
// md5(Status + AuthenticatorSource + shared secret)
// 如果认证错误, 此项为空
var Version uint8 // 服务器支持的最大版本号
断开连接 CMPP_TERMINATE
无请求消息体,无响应消息体
提交信息 CMPP_SUBMIT
var Msg_Id uint64 // 消息标识, 网关负责自主生成,SP 留空
var Pk_total uint8 // 消息总条数, 从 1 开始
var Pk_number uint8 // 消息序号, 从 1 开始
var Registered_Delivery uint8 // 是否要求返回状态确认报告, 0 不需要, 1 需要, 2 产生 SMC 话单 (该类型短信仅供网关计费使用,不发送给目的终端)
var Msg_level uint8 // 信息级别, 0 低, 1 中, 2 高
var Service_Id [10]byte // 业务类型
var Fee_UserType uint8 // 计费用户类型, 0 普通用户, 1 行业用户, 2 用户组
var Fee_terminal_Id [32]byte // 被计费用户的号码
var TP_pid uint8 // GSM协议类型, 0 GSM 03.40, 1 CDMA, 2 WCDMA, 3 CDMA2000, 4 联通移动TD专用协议
var TP_udhi uint8 // GSM协议类型
var Msg_Fmt uint8 // 信息格式, 0 ASCII 串, 3 短信写卡操作, 4 二进制, 8 UCS2 编码, 15 含 GB 汉字
var Msg_src [6]byte // 消息发送者, SP_Id
var FeeType [2]byte // 资费类别
var FeeCode [6]byte // 资费代码, 以分为单位
var ValId_Time [17]byte // 存活有效期,格式遵循 SMPP3.3 协议
var At_Time [17]byte // 定时发送时间, YYMMDDhhmmsstnnp, t 是 1/10 秒,nn 是与 UTS 时间的差值 00-48,p `+/-`
var Src_Id [21]byte // 源号码
// SP 的服务代码或前缀为服务代码的长号码,
// 网关将该号码完整的填到 SMPP 协议 Submit_SM 消息相应的 source_addr 字段
// 该号码最终在用户手机上显示为短消息的主叫号码
var DestUsr_tl uint8 // 接收信息的用户数量, 小于 100
var Dest_terminal_Id [][21]byte // 接收短信的用户号码 (MSISDN), 21 * DestUsr_tl
var Msg_Length uint8 // 短消息长度
var Msg_Content []byte // 短消息内容
var Reserve [8]byte // 保留
注意:关于短信群发的问题,若 SP 对于群发消息不要求状态报告的回送时,才可以考虑群发,否则必须逐条发送。
关于 TP_udhi 的解释:
如果 = 1,则需要在 Msg_Content 中加入一个 udhi 头,定义如下:
- 六字节
05 00 03 开头,分别表示剩余 5 字节,xxx,剩余标识长度为 3
第四字节,唯一标识
第五字节,短信条数
第六字节,序号
- 七字节
06 08 04 开头,分别表示剩余 6 字节,xxx,剩余标识长度为 4
第四五字节,唯一标识
第六字节,短信条数
第七字节,序号
CMPP_SUBMIT_RESP
var Msg_Id uint64 // 消息表示, SP 自主生成
var Result uint8 // 结果, 0 正确, 1 消息结构错, 2 命令字错, 3 消息序号重复, 4 消息长度错,
// 5 资费代码错, 6 超过最大信息长, 7 业务代码错, 8 流量控制错, 9~ 其他错误
- MMDDHHMMSS,26 位
月份 4 位, 1-12
日期 5 位, 1-31
小时 5 位, 0-23
分钟 6 位, 0-59
秒钟 6 位, 0-59
- 网关代码,22 位
- 序列号,16 位,递增, 步长为 1, 循环使用
查询 CMPP_QUERY
var Time [8]byte // YYYYMMDD
var Query_Type uint8 // 查询类型, 0 总数查询, 1 按业务类型查询
var Query_Code [10]byte // 查询码, 查询类型为 0 时无效, 查询类型为 1 时此处为业务类型
var Reserve [8]byte // 保留
CMPP_QUERY_RESP
var Time [8]byte
var Query_Type uint8
var Query_Code [10]byte
var MT_TLMsg uint32 // 接收消息总数
var MT_TLusr uint32 // 接收用户总数
var MT_Scs uint32 // 转发成功总数
var MT_WT uint32 // 待转发总数
var MT_FL uint32 // 转发失败总数
var MO_Scs uint32 // 发送成功总数
var MO_WT uint32 // 待发送总数
var MO_FL uint32 // 发送失败总数
送交短信 CMPP_DELIVER
从网关发送出来的消息
var Msg_Id uint64
var Dest_Id [21]byte // 目的号码
var Service_Id [10]byte
var TP_pid uint8
var TP_udhi uint8
var Msg_Fmt uint8
var Src_terminal_Id [21]byte // 源号码
var Registered_Delivery uint8
var Msg_Length uint8
var Msg_Content []byte // Msg_Length
var Reserved [8]byte
var Msg_Id uint64
var Stat [7]byte
// DELIVRD 送达
// EXPIRED 过期
// DELETED 删除
// UNDELIV 未送达
// ACCEPTD 接收
// UNKNOWN 非法
// REJECTD 拒绝
var Submit_time [10]byte
var Done_time [10]byte
var Dest_terminal_Id [21]byte
var SMSC_sequence uint32
SP 等待状态报告 48 小时。
CMPP_DELIVER_RESP
var Msg_Id uint64
var Result uint8
删除短信 CMPP_CANCEL
var Msg_Id uint64
CMPP_CANCEL_RESP
var Success_Id uint8 // 0 成功, 1 失败
链路检测 CMPP_ACTIVE_TEST
无消息体。
CMPP_ACTIVE_TEST_RESP
var Reserved [8]byte
负载均衡 网络编程
2022-03-10
现象:SYN 包会丢,可能有 1.5% 的连接丢了一次,0.1% 的可能性会丢失两次。
最后反馈是 SLB 实例规格太低, 升级 SLB 解决问题。
NodeJS 网络编程 Socket
2021-01-31
本月,听了同事做的一次技术分享,觉得很有意思的问题,我在这里隐去业务细节,只保留技术部分,做个总结归纳,最后用一个业务无关的脚本来模拟一下这个过程。
同事的分享就好比是先逐步实验发现一个现象,然后研究出他背后的原理是什么。我在这里作为事后的归纳总结,就直接冲着背后的原理说了。
开发者 ASN1 网络编程
2020-01-30
我印象中曾在某个项目中接触到了这种格式,但是一时间竟也想不起来。
PS: 可能是有一次涉及 LDAP 协议的时候。
概念
ASN 全名 Abstract Syntax Notation, 翻译过来就是:抽象语法标记。
ASN.1 可能是第一版的意思(?)。
asn.1 是一套国际标准,用来定义一种通用的、严谨的数据表示(标记)方法,以及对应的数据编码格式。
PS:对数据 Scheme 的定义独立于硬件架构和编程语言。
- ITU-T Rec. X.680 (2015) | ISO/IEC 8824-1:2015
Specification of basic notation
- ITU-T Rec. X.681 (2015) | ISO/IEC 8824-2:2015
Information object specification
- ITU-T Rec. X.682 (2015) | ISO/IEC 8824-3:2015
Constraint specification
- ITU-T Rec. X.683 (2015) | ISO/IEC 8824-4:2015
Parameterization of ASN.1 specifications
- ITU-T Rec. X.690 (2015) | ISO/IEC 8825-1:2015
BER, CER and DER
PS:常见证书格式 der 就是来自这个 DER。
- ITU-T Rec. X.691 (2015) | ISO/IEC 8825-2:2015
PER (Packed Encoding Rules)
- ITU-T Rec. X.692 (2015) | ISO/IEC 8825-3:2015
ECN (Extended Component Notation)
- ITU-T Rec. X.693 (2015) | ISO/IEC 8825-4:2015
XER (XML Encoding Rules)
- ITU-T Rec. X.694 (2015) | ISO/IEC 8825-5:2015
Mapping W3C XML schema definitions into ASN.1
- ITU-T Rec. X.695 (2015) | ISO/IEC 8825-6:2015
Registration and application of PER encoding instructions
- ITU-T Rec. X.696 (2015) | ISO/IEC 8825-7:2015
OER (Octet Encoding Rules)
- ITU-T Rec. X.697 (2017) | ISO/IEC 8825-8:2018
JER (JSON Encoding Rules)
一般又被称之为 X.680 系列,最早是 1995 年出第一版。最新的是 2018 年出的 5.4 版(X.680 (2015) Amd. 1)
PS:2021 年 X.680 出了第六版。
部分应用层的网络协议就使用了 ASN.1 格式,比如 X.500 Directory Services,LDAP,VoIP,PKCS,Kerberos,移动通信(2G/GSM,GRPS,一直到 5G)。
它和 JSON 这种通用数据交换格式完全不同,更加类似与 protobuf,msgpack,thrift 这样,提供一个完备的数据定义语法用来声明 Schema(ASN.1 称之为模块),然后基于二进制紧凑地表示数据。所以非常适合用在 C/S 架构的网络编程上,作为服务通讯协议的一部分,负责内外数据交换,也就是 TCP/UDP 服务的接口部分。
如果要将 ASN.1 归类的话,更贴切的应该是接口定义语言,或者叫协议定义语言。
要是了解到 ASN.1 出现的年份(1984)的话,对照它的竞争者出现的时间,会发现它的设计确实比较超前。不管怎么说,这些晚辈确实更加流行,作为国际标准的 ASN.1 不够卖座,肯定是也有不好的地方。
PS:可能是 ASN.1 历史包袱太重, 不够轻便 (我看到的一些评论和我的猜想比较符合)。
数据定义
先来一个示例(维基上找来的,感觉没啥意义):
FooProtocol DEFINITIONS ::= BEGIN
FooQuestion ::= SEQUENCE {
-- 跟踪编号,后面括号是限制值的范围
trackingNumber INTEGER(0..199),
-- 问题内容,字符串
question IA5String
}
FooAnswer ::= SEQUENCE {
-- 问题编号
questionNumber INTEGER(10..20),
-- 答案内容
answer BOOLEAN
}
FooHistory ::= SEQUENCE {
-- 问题数组
questions SEQUENCE(SIZE(0..10)) OF FooQuestion,
-- 答案数组
answers SEQUENCE(SIZE(1..10)) OF FooAnswer,
-- 一个整型数组
anArray SEQUENCE(SIZE(100)) OF INTEGER(0..1000),
...
}
END
基本语法
- 大小写字母,数字,短横杠,空格
标识符:小写字母开头
类型名称:大写字母开头
- 多个空白符号(空格、换行)会当作一个空格
- 数据类型都有一个 TagNumber
--
注释
数据类型
简单类型
结构化类型
标记类型
其他类型:CHOICE
,ANY
类别:
- 0 Universal 通用类型
- 1 Application 应用协议相关类型
- 2 Context-specific
- 3 Private 自定义
结构化:
原始类型:
Type |
Tag number |
备注 |
INTEGER |
2 |
整型 |
BIT STRING |
3 |
|
OCTET STRING |
4 |
|
NULL |
5 |
NULL |
OBJECT IDENTIFIER |
6 |
对象 |
SEQUENCE and
SEQUENCE OF |
16 |
数组 |
SET and SET OF |
17 |
集合 |
PrintableString |
19 |
字符串 |
T61String |
20 |
|
IA5String |
22 |
|
UTCTime |
23 |
时间 |
示例:
编码规则
- 基本编码规则(BER,Basic Encoding Rules)
- 规范编码规则(CER,Canonical Encoding Rules)
- 唯一编码规则(DER,Distinguished Encoding Rules)
- 压缩编码规则(PER,Packed Encoding Rules)
- XML 编码规则(XER,XML Encoding Rules)
Python
https://www.cnblogs.com/20175211lyz/p/12769883.html
https://github.com/etingof/pyasn1
上面的示例通过 asn1ate /tmp/foo.asn > /tmp/foo.py
生成 Python 代码:
PS:并不是一定需要定义成这样类的结构,只是 pyasn1 库适合这样用而已。
from pyasn1.type import univ, char, namedtype, namedval, tag, constraint, useful
class FooAnswer(univ.Sequence):
pass
FooAnswer.componentType = namedtype.NamedTypes(
namedtype.NamedType('questionNumber', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(10, 20))),
namedtype.NamedType('answer', univ.Boolean())
)
class FooQuestion(univ.Sequence):
pass
FooQuestion.componentType = namedtype.NamedTypes(
namedtype.NamedType('trackingNumber', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 199))),
namedtype.NamedType('question', char.IA5String())
)
class FooHistory(univ.Sequence):
pass
FooHistory.componentType = namedtype.NamedTypes(
namedtype.NamedType('questions', univ.SequenceOf(componentType=FooQuestion()).subtype(subtypeSpec=constraint.ValueSizeConstraint(0, 10))),
namedtype.NamedType('answers', univ.SequenceOf(componentType=FooAnswer()).subtype(subtypeSpec=constraint.ValueSizeConstraint(1, 10))),
namedtype.NamedType('anArray', univ.SequenceOf(componentType=univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 1000))).subtype(subtypeSpec=constraint.ValueSizeConstraint(100, 100)))
)
然后就可以使用了:
import foo
from pyasn1.codec.der.encoder import encode
fa = foo.FooAnswer()
fa['questionNumber'] = 10
fa['answer'] = False
fa_encoded = encode(fa)
print(fa_encoded) # b'0\x06\x02\x01\n\x01\x01\x00'
print(binascii.b2a_hex(fa_encoded).decode()) # 300602010a010100
from pyasn1.codec.der.decoder import decode
obj, rest = decode(fa_encoded)
print(obj)
# Sequence:
# field-0=10
# field-1=False
for k, v in obj.items():
print([k, v])
# ['field-0', <Integer value object, tagSet <TagSet object, tags 0:0:2>, payload [10]>]
# ['field-1', <Boolean value object, tagSet <TagSet object, tags 0:0:1>, subtypeSpec <ConstraintsIntersection object, consts <SingleValueConstraint object, consts 0, 1>>, namedValues <NamedValues object, enums False=0, True=1>, payload [False]>]
obj, rest = decode(fa_encoded, asn1Spec=foo.FooAnswer())
print(obj)
# FooAnswer:
# questionNumber=10
# answer=False
# print(dict(obj.items()))
print(dict([(k, str(v)) for k, v in obj.items()]))
# {'questionNumber': '10', 'answer': 'False'}
print(obj['questionNumber'].__dict__)
print(obj['questionNumber']._value) # 10
print(obj['answer'].__dict__)
print(obj['answer']._value) # 0
print([int(obj['questionNumber']), bool(obj['answer'])])
GitHub 找到的几个相关库:
- wbond/asn1crypto
Python ASN.1 library with a focus on performance and a pythonic API
- etingof/pyasn1
Generic ASN.1 library for Python
- eerimoq/asn1tools
ASN.1 parsing, encoding and decoding.
- P1sec/pycrate
A Python library to ease the development of encoders and decoders for various protocols and file formats; contains ASN.1
参考资料与拓展阅读
Python 网络编程
2018-01-23
import socket
import signal
import sys
import os
def handle_connection(conn):
conn.close()
def worker(sock):
while True:
try:
conn, addr = sock.accept()
handle_connection(conn)
except OSError as e:
if e.errno == socket.ECONNABORTED:
# 忽略 ECONNABORTED 错误
pass
else:
raise
def main():
port = 8080
backlog = 10 # 连接队列长度(超出会拒绝或忽略)
num_workers = 4 # 子进程数
# 创建监听器
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('localhost', port))
listener.listen(backlog)
sock.setblocking(False)
print(f"Listening on port {port}...")
# # “来一个连接,起一个进程”的模式
# def sig_handler(sig, frame):
# listener.close()
# sys.exit(0)
# signal.signal(signal.SIGINT, sig_handler)
# signal.signal(signal.SIGTERM, sig_handler)
# while True:
# conn, addr = listener.accept()
# pid = os.fork()
# if pid == 0:
# listener.close()
# handle_connection(conn)
# sys.exit(0)
# else:
# conn.close()
# 子进程放到进程组中
os.setpgrp()
# 多个 worker 子进程一同监听端口的模式
processes = []
for i in range(num_workers):
p = Process(target=worker, args=(sock,))
processes.append(p)
p.start()
# 通过 os.killpg 向进程组发送信号
signal.signal(signal.SIGINT, lambda signum, frame: os.killpg(0, signal.SIGINT))
signal.signal(signal.SIGTERM, lambda signum, frame: os.killpg(0, signal.SIGTERM))
signal.pause()
for p in processes:
p.terminate()
if __name__ == '__main__':
main()
惊群效应是指事件发生的时候,多个进程或线程竞争处理这个事件,导致系统负载出现一个尖峰。严重的情况下可能导致系统瘫痪。
虽然 accept 会阻塞住,只有一个抢到,但是惊群的问题应该还是存在。
进程组
os.setpgrp
设置进程组
os.killpg
向进程组发送信号,如果没有设置进程组,这个操作没有意义
os.set_inheritable
继承文件描述符,然后可以独立使用和关闭