#5 CMPP 短信网关协议

2022-03-29

术语

  • ISMG: Internet Short Message Gateway
  • SMPP: Short Message Peer to Peer
    一个国际上比较通用的短信网关协议
    CMPP 可能兼容 SMPP,因为 Wireshark 会将 CMPP 会话识别成 SMPP
  • CMPP: China Mobile Peer to Peer
    中国移动开发的的短信网关协议
  • SMC: Short Message Center
  • GNS: Gateway Name Server
    相当于 CMPP 网络的路由器
  • SP: Service Provider
  • SMC: Short Message Control
  • ISMG_Id: 网关代码
  • SP_Id: SP 企业代码
  • SP_Code: SP 服务代码
  • Service_Id: SP 业务类型
  • MO: 手机发送 Originate
  • MT: 手机接收 Terminated

网络连接

CMPP 基于 TCP 协议。可以长连接,也可以短连接。
长连接支持一次连接发送多次 CMPP 消息,没有消息的时候需要维持心跳。
短连接则是一次 CMPP 通信之后就断开连接。

关于心跳:
建议每 3 分钟一次心跳,如果 60 秒内没有心跳响应,应该再次心跳,总共 3 次没有响应就断开连接。
关于重试(网关与 SP 之间,网关之间):
60 秒之后没有响应,立即重试,总共 3 次没有响应就停发。

消息采用并发方式发送,加以滑动窗口流量控制,窗口大小参数 W 可配置,现阶段建议为 16,即接收方在应答前一次收到的消息最多不超过 16 条。

关于短连接:
60 秒超时,重试 2 次。

端口

  • 7890 长连接(SP 与网关之间)
  • 7900 短连接(SP 与网关之间,网关之间)
  • 7930 长连接(网关之间)
  • 9168 短连接(网关与 GNS 之间)

数据类型

var Total_Length uint32 // 消息总长度
var Command_Id uint32   // 命令或响应类型
var Sequence_Id uint32  // 消息流水号, 递增, 步长为 1, 循环使用
命令 Command_Id
CMPP_CONNECT 0x00000001
CMPP_TERMINATE 0x00000002
CMPP_SUBMIT 0x00000004
CMPP_DELIVER 0x00000005
CMPP_QUERY 0x00000006
CMPP_CANCEL 0x00000007
CMPP_ACTIVE_TEST 0x00000008
CMPP_FWD 0x00000009
CMPP_MT_ROUTE 0x00000010
CMPP_MO_ROUTE 0x00000011
CMPP_GET_ROUTE 0x00000012
CMPP_MT_ROUTE_UPDATE 0x00000013
CMPP_MO_ROUTE_UPDATE 0x00000014
CMPP_PUSH_MT_ROUTE_UPDATE 0x00000015
CMPP_PUSH_MO_ROUTE_UPDATE 0x00000016

RESP Command_Id 则是对应的 Command_Id 最高 4 位为 8,即 0x8X

SP 与 ISMG 之间的通信

对于一般开发来说就这 7 个接口。

  1. 建立连接 CMPP_CONNECT
  2. 断开连接 CMPP_TERMINATE
  3. 提交信息 CMPP_SUBMIT
  4. 获取状态 CMPP_DELIVER
  5. 撤回信息 CMPP_CANCEL
  6. 查询信息 CMPP_QUERY
    只是一些统计信息,我想不到这个接口的应用场景
  7. 链路检测 CMPP_ACTIVE_TEST

连接 CMPP_CONNECT

var Source_Addr [6]byte // SP_Id
var AuthenticatorSource [16]byte // SP_Code
// md5(Source_Addr + 9 字节 0 + shared secret + timestamp)
// timestamp: MMDDHHMMSS, 月日时分秒, 补 0
var Version uint8 // 版本号, 高 4 位表示主版本号, 低 4 位表示次版本号, 最大 15.15
var Timestamp uint32 // 时间戳, 默认为 0

CMPP_CONNECT_RESP

var Status uint8 // 状态, 0 正确, 1 结构错误, 2 非法源地址, 3 认证错误, 4 版本错误, 5 其他错误
var AuthenticatorISMG [16]byte // ISMG 认证码
// md5(Status + AuthenticatorSource + shared secret)
// 如果认证错误, 此项为空
var Version uint8 // 服务器支持的最大版本号

断开连接 CMPP_TERMINATE

无请求消息体,无响应消息体

提交信息 CMPP_SUBMIT

var Msg_Id uint64       // 消息标识, 网关负责自主生成,SP 留空
var Pk_total uint8      // 消息总条数, 从 1 开始
var Pk_number uint8     // 消息序号, 从 1 开始
var Registered_Delivery uint8 // 是否要求返回状态确认报告, 0 不需要, 1 需要, 2 产生 SMC 话单 (该类型短信仅供网关计费使用,不发送给目的终端)
var Msg_level uint8     // 信息级别, 0 低, 1 中, 2 高
var Service_Id [10]byte // 业务类型
var Fee_UserType uint8  // 计费用户类型, 0 普通用户, 1 行业用户, 2 用户组
var Fee_terminal_Id [32]byte // 被计费用户的号码
var TP_pid uint8        // GSM协议类型, 0 GSM 03.40, 1 CDMA, 2 WCDMA, 3 CDMA2000, 4 联通移动TD专用协议
var TP_udhi uint8       // GSM协议类型
var Msg_Fmt uint8       // 信息格式, 0 ASCII 串, 3 短信写卡操作, 4 二进制, 8 UCS2 编码, 15 含 GB 汉字
var Msg_src [6]byte     // 消息发送者, SP_Id
var FeeType [2]byte     // 资费类别
var FeeCode [6]byte     // 资费代码, 以分为单位
var ValId_Time [17]byte // 存活有效期,格式遵循 SMPP3.3 协议
var At_Time [17]byte    // 定时发送时间, YYMMDDhhmmsstnnp, t 是 1/10 秒,nn 是与 UTS 时间的差值 00-48,p `+/-`
var Src_Id [21]byte     // 源号码
// SP 的服务代码或前缀为服务代码的长号码,
// 网关将该号码完整的填到 SMPP 协议 Submit_SM 消息相应的 source_addr 字段
// 该号码最终在用户手机上显示为短消息的主叫号码
var DestUsr_tl uint8    // 接收信息的用户数量, 小于 100
var Dest_terminal_Id [][21]byte // 接收短信的用户号码 (MSISDN), 21 * DestUsr_tl
var Msg_Length uint8    // 短消息长度
var Msg_Content []byte  // 短消息内容
var Reserve [8]byte     // 保留

注意:关于短信群发的问题,若 SP 对于群发消息不要求状态报告的回送时,才可以考虑群发,否则必须逐条发送。

关于 TP_udhi 的解释:

如果 = 1,则需要在 Msg_Content 中加入一个 udhi 头,定义如下:

  1. 六字节
    05 00 03 开头,分别表示剩余 5 字节,xxx,剩余标识长度为 3
    第四字节,唯一标识
    第五字节,短信条数
    第六字节,序号
  2. 七字节
    06 08 04 开头,分别表示剩余 6 字节,xxx,剩余标识长度为 4
    第四五字节,唯一标识
    第六字节,短信条数
    第七字节,序号

CMPP_SUBMIT_RESP

var Msg_Id uint64       // 消息表示, SP 自主生成
var Result uint8        // 结果, 0 正确, 1 消息结构错, 2 命令字错, 3 消息序号重复, 4 消息长度错,
                        //       5 资费代码错, 6 超过最大信息长, 7 业务代码错, 8 流量控制错, 9~ 其他错误
  • MMDDHHMMSS,26 位
    月份 4 位, 1-12
    日期 5 位, 1-31
    小时 5 位, 0-23
    分钟 6 位, 0-59
    秒钟 6 位, 0-59
  • 网关代码,22 位
  • 序列号,16 位,递增, 步长为 1, 循环使用

查询 CMPP_QUERY

var Time [8]byte // YYYYMMDD
var Query_Type uint8 // 查询类型, 0 总数查询, 1 按业务类型查询
var Query_Code [10]byte // 查询码, 查询类型为 0 时无效, 查询类型为 1 时此处为业务类型
var Reserve [8]byte // 保留

CMPP_QUERY_RESP

var Time [8]byte
var Query_Type uint8
var Query_Code [10]byte
var MT_TLMsg uint32 // 接收消息总数
var MT_TLusr uint32 // 接收用户总数
var MT_Scs uint32   // 转发成功总数
var MT_WT uint32    // 待转发总数
var MT_FL uint32    // 转发失败总数
var MO_Scs uint32   // 发送成功总数
var MO_WT uint32    // 待发送总数
var MO_FL uint32    // 发送失败总数

送交短信 CMPP_DELIVER

从网关发送出来的消息

var Msg_Id uint64
var Dest_Id [21]byte            // 目的号码
var Service_Id [10]byte
var TP_pid uint8
var TP_udhi uint8
var Msg_Fmt uint8
var Src_terminal_Id [21]byte    // 源号码
var Registered_Delivery uint8
var Msg_Length uint8
var Msg_Content []byte          // Msg_Length
var Reserved [8]byte
var Msg_Id uint64
var Stat [7]byte
// DELIVRD 送达
// EXPIRED 过期
// DELETED 删除
// UNDELIV 未送达
// ACCEPTD 接收
// UNKNOWN 非法
// REJECTD 拒绝
var Submit_time [10]byte
var Done_time [10]byte
var Dest_terminal_Id [21]byte
var SMSC_sequence uint32

SP 等待状态报告 48 小时。

CMPP_DELIVER_RESP

var Msg_Id uint64
var Result uint8

删除短信 CMPP_CANCEL

var Msg_Id uint64

CMPP_CANCEL_RESP

var Success_Id uint8 // 0 成功, 1 失败

链路检测 CMPP_ACTIVE_TEST

无消息体。

CMPP_ACTIVE_TEST_RESP

var Reserved [8]byte

#3 Node Socket 编程 EMFILE 错误的一种情况

2021-01-31

本月,听了同事做的一次技术分享,觉得很有意思的问题,我在这里隐去业务细节,只保留技术部分,做个总结归纳,最后用一个业务无关的脚本来模拟一下这个过程。
同事的分享就好比是先逐步实验发现一个现象,然后研究出他背后的原理是什么。我在这里作为事后的归纳总结,就直接冲着背后的原理说了。

#2 ASN.1

2020-01-30

我印象中曾在某个项目中接触到了这种格式,但是一时间竟也想不起来。
PS: 可能是有一次涉及 LDAP 协议的时候。

概念

ASN 全名 Abstract Syntax Notation, 翻译过来就是:抽象语法标记。
ASN.1 可能是第一版的意思(?)。

asn.1 是一套国际标准,用来定义一种通用的、严谨的数据表示(标记)方法,以及对应的数据编码格式。
PS:对数据 Scheme 的定义独立于硬件架构和编程语言。

  • ITU-T Rec. X.680 (2015) | ISO/IEC 8824-1:2015
    Specification of basic notation
  • ITU-T Rec. X.681 (2015) | ISO/IEC 8824-2:2015
    Information object specification
  • ITU-T Rec. X.682 (2015) | ISO/IEC 8824-3:2015
    Constraint specification
  • ITU-T Rec. X.683 (2015) | ISO/IEC 8824-4:2015
    Parameterization of ASN.1 specifications
  • ITU-T Rec. X.690 (2015) | ISO/IEC 8825-1:2015
    BER, CER and DER
    PS:常见证书格式 der 就是来自这个 DER。
  • ITU-T Rec. X.691 (2015) | ISO/IEC 8825-2:2015
    PER (Packed Encoding Rules)
  • ITU-T Rec. X.692 (2015) | ISO/IEC 8825-3:2015
    ECN (Extended Component Notation)
  • ITU-T Rec. X.693 (2015) | ISO/IEC 8825-4:2015
    XER (XML Encoding Rules)
  • ITU-T Rec. X.694 (2015) | ISO/IEC 8825-5:2015
    Mapping W3C XML schema definitions into ASN.1
  • ITU-T Rec. X.695 (2015) | ISO/IEC 8825-6:2015
    Registration and application of PER encoding instructions
  • ITU-T Rec. X.696 (2015) | ISO/IEC 8825-7:2015
    OER (Octet Encoding Rules)
  • ITU-T Rec. X.697 (2017) | ISO/IEC 8825-8:2018
    JER (JSON Encoding Rules)

一般又被称之为 X.680 系列,最早是 1995 年出第一版。最新的是 2018 年出的 5.4 版(X.680 (2015) Amd. 1)
PS:2021 年 X.680 出了第六版。

部分应用层的网络协议就使用了 ASN.1 格式,比如 X.500 Directory Services,LDAP,VoIP,PKCS,Kerberos,移动通信(2G/GSM,GRPS,一直到 5G)。

它和 JSON 这种通用数据交换格式完全不同,更加类似与 protobuf,msgpack,thrift 这样,提供一个完备的数据定义语法用来声明 Schema(ASN.1 称之为模块),然后基于二进制紧凑地表示数据。所以非常适合用在 C/S 架构的网络编程上,作为服务通讯协议的一部分,负责内外数据交换,也就是 TCP/UDP 服务的接口部分。

如果要将 ASN.1 归类的话,更贴切的应该是接口定义语言,或者叫协议定义语言。

要是了解到 ASN.1 出现的年份(1984)的话,对照它的竞争者出现的时间,会发现它的设计确实比较超前。不管怎么说,这些晚辈确实更加流行,作为国际标准的 ASN.1 不够卖座,肯定是也有不好的地方。
PS:可能是 ASN.1 历史包袱太重, 不够轻便 (我看到的一些评论和我的猜想比较符合)。

数据定义

先来一个示例(维基上找来的,感觉没啥意义):

FooProtocol DEFINITIONS ::= BEGIN

    FooQuestion ::= SEQUENCE {
        -- 跟踪编号,后面括号是限制值的范围
        trackingNumber INTEGER(0..199),
        -- 问题内容,字符串
        question       IA5String
    }

    FooAnswer ::= SEQUENCE {
        -- 问题编号
        questionNumber INTEGER(10..20),
        -- 答案内容
        answer         BOOLEAN
    }

    FooHistory ::= SEQUENCE {
        -- 问题数组
        questions SEQUENCE(SIZE(0..10)) OF FooQuestion,
        -- 答案数组
        answers   SEQUENCE(SIZE(1..10)) OF FooAnswer,
        -- 一个整型数组
        anArray   SEQUENCE(SIZE(100))  OF INTEGER(0..1000),
        ...
    }

END

基本语法

  1. 大小写字母,数字,短横杠,空格
    标识符:小写字母开头
    类型名称:大写字母开头
  2. 多个空白符号(空格、换行)会当作一个空格
  3. 数据类型都有一个 TagNumber
  4. -- 注释

数据类型

简单类型
结构化类型
标记类型
其他类型:CHOICEANY

类别:

  • 0 Universal 通用类型
  • 1 Application 应用协议相关类型
  • 2 Context-specific
  • 3 Private 自定义

结构化:

原始类型:

Type Tag number 备注
INTEGER 2 整型
BIT STRING 3
OCTET STRING 4
NULL 5 NULL
OBJECT IDENTIFIER 6 对象
SEQUENCE and
SEQUENCE OF
16 数组
SET and SET OF 17 集合
PrintableString 19 字符串
T61String 20
IA5String 22
UTCTime 23 时间

示例:


编码规则

  • 基本编码规则(BER,Basic Encoding Rules)
  • 规范编码规则(CER,Canonical Encoding Rules)
  • 唯一编码规则(DER,Distinguished Encoding Rules)
  • 压缩编码规则(PER,Packed Encoding Rules)
  • XML 编码规则(XER,XML Encoding Rules)

Python

https://www.cnblogs.com/20175211lyz/p/12769883.html
https://github.com/etingof/pyasn1

上面的示例通过 asn1ate /tmp/foo.asn > /tmp/foo.py 生成 Python 代码:
PS:并不是一定需要定义成这样类的结构,只是 pyasn1 库适合这样用而已。

from pyasn1.type import univ, char, namedtype, namedval, tag, constraint, useful


class FooAnswer(univ.Sequence):
    pass


FooAnswer.componentType = namedtype.NamedTypes(
    namedtype.NamedType('questionNumber', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(10, 20))),
    namedtype.NamedType('answer', univ.Boolean())
)


class FooQuestion(univ.Sequence):
    pass


FooQuestion.componentType = namedtype.NamedTypes(
    namedtype.NamedType('trackingNumber', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 199))),
    namedtype.NamedType('question', char.IA5String())
)


class FooHistory(univ.Sequence):
    pass


FooHistory.componentType = namedtype.NamedTypes(
    namedtype.NamedType('questions', univ.SequenceOf(componentType=FooQuestion()).subtype(subtypeSpec=constraint.ValueSizeConstraint(0, 10))),
    namedtype.NamedType('answers', univ.SequenceOf(componentType=FooAnswer()).subtype(subtypeSpec=constraint.ValueSizeConstraint(1, 10))),
    namedtype.NamedType('anArray', univ.SequenceOf(componentType=univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 1000))).subtype(subtypeSpec=constraint.ValueSizeConstraint(100, 100)))
)

然后就可以使用了:

import foo
from pyasn1.codec.der.encoder import encode
fa = foo.FooAnswer()
fa['questionNumber'] = 10
fa['answer'] = False

fa_encoded = encode(fa)
print(fa_encoded)  # b'0\x06\x02\x01\n\x01\x01\x00'
print(binascii.b2a_hex(fa_encoded).decode())  # 300602010a010100

from pyasn1.codec.der.decoder import decode
obj, rest = decode(fa_encoded)
print(obj)
# Sequence:
#  field-0=10
#  field-1=False
for k, v in obj.items():
    print([k, v])
    # ['field-0', <Integer value object, tagSet <TagSet object, tags 0:0:2>, payload [10]>]
    # ['field-1', <Boolean value object, tagSet <TagSet object, tags 0:0:1>, subtypeSpec <ConstraintsIntersection object, consts <SingleValueConstraint object, consts 0, 1>>, namedValues <NamedValues object, enums False=0, True=1>, payload [False]>]

obj, rest = decode(fa_encoded, asn1Spec=foo.FooAnswer())
print(obj)
# FooAnswer:
#  questionNumber=10
#  answer=False
# print(dict(obj.items()))
print(dict([(k, str(v)) for k, v in obj.items()]))
# {'questionNumber': '10', 'answer': 'False'}

print(obj['questionNumber'].__dict__)
print(obj['questionNumber']._value)  # 10
print(obj['answer'].__dict__)
print(obj['answer']._value)  # 0
print([int(obj['questionNumber']), bool(obj['answer'])])

GitHub 找到的几个相关库:

  • wbond/asn1crypto stars Python ASN.1 library with a focus on performance and a pythonic API
  • etingof/pyasn1 stars Generic ASN.1 library for Python
  • eerimoq/asn1tools stars ASN.1 parsing, encoding and decoding.
  • P1sec/pycrate stars A Python library to ease the development of encoders and decoders for various protocols and file formats; contains ASN.1

参考资料与拓展阅读

#1 Python 多进程共同监听同一个端口

2018-01-23
import socket
import signal
import sys
import os

def handle_connection(conn):
    conn.close()

def worker(sock):
    while True:
        try:
            conn, addr = sock.accept()
            handle_connection(conn)
        except OSError as e:
            if e.errno == socket.ECONNABORTED:
                # 忽略 ECONNABORTED 错误
                pass
            else:
                raise

def main():
    port = 8080
    backlog = 10  # 连接队列长度(超出会拒绝或忽略)
    num_workers = 4 # 子进程数

    # 创建监听器
    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listener.bind(('localhost', port))
    listener.listen(backlog)
    sock.setblocking(False)
    print(f"Listening on port {port}...")

    # # “来一个连接,起一个进程”的模式
    # def sig_handler(sig, frame):
    #     listener.close()
    #     sys.exit(0)
    # signal.signal(signal.SIGINT, sig_handler)
    # signal.signal(signal.SIGTERM, sig_handler)
    # while True:
    #     conn, addr = listener.accept()
    #     pid = os.fork()
    #     if pid == 0:
    #         listener.close()
    #         handle_connection(conn)
    #         sys.exit(0)
    #     else:
    #         conn.close()

    # 子进程放到进程组中
    os.setpgrp()

    # 多个 worker 子进程一同监听端口的模式
    processes = []
    for i in range(num_workers):
        p = Process(target=worker, args=(sock,))
        processes.append(p)
        p.start()
    # 通过 os.killpg 向进程组发送信号
    signal.signal(signal.SIGINT, lambda signum, frame: os.killpg(0, signal.SIGINT))
    signal.signal(signal.SIGTERM, lambda signum, frame: os.killpg(0, signal.SIGTERM))
    signal.pause()
    for p in processes:
        p.terminate()

if __name__ == '__main__':
    main()

惊群效应是指事件发生的时候,多个进程或线程竞争处理这个事件,导致系统负载出现一个尖峰。严重的情况下可能导致系统瘫痪。
虽然 accept 会阻塞住,只有一个抢到,但是惊群的问题应该还是存在。

进程组

  • os.setpgrp 设置进程组
  • os.killpg 向进程组发送信号,如果没有设置进程组,这个操作没有意义
  • os.set_inheritable 继承文件描述符,然后可以独立使用和关闭