TOC

Python 多进程共同监听同一个端口

import socket
import signal
import sys
import os

def handle_connection(conn):
    conn.close()

def worker(sock):
    while True:
        try:
            conn, addr = sock.accept()
            handle_connection(conn)
        except OSError as e:
            if e.errno == socket.ECONNABORTED:
                # 忽略 ECONNABORTED 错误
                pass
            else:
                raise

def main():
    port = 8080
    backlog = 10  # 连接队列长度(超出会拒绝或忽略)
    num_workers = 4 # 子进程数

    # 创建监听器
    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listener.bind(('localhost', port))
    listener.listen(backlog)
    sock.setblocking(False)
    print(f"Listening on port {port}...")

    # # “来一个连接,起一个进程”的模式
    # def sig_handler(sig, frame):
    #     listener.close()
    #     sys.exit(0)
    # signal.signal(signal.SIGINT, sig_handler)
    # signal.signal(signal.SIGTERM, sig_handler)
    # while True:
    #     conn, addr = listener.accept()
    #     pid = os.fork()
    #     if pid == 0:
    #         listener.close()
    #         handle_connection(conn)
    #         sys.exit(0)
    #     else:
    #         conn.close()

    # 子进程放到进程组中
    os.setpgrp()

    # 多个 worker 子进程一同监听端口的模式
    processes = []
    for i in range(num_workers):
        p = Process(target=worker, args=(sock,))
        processes.append(p)
        p.start()
    # 通过 os.killpg 向进程组发送信号
    signal.signal(signal.SIGINT, lambda signum, frame: os.killpg(0, signal.SIGINT))
    signal.signal(signal.SIGTERM, lambda signum, frame: os.killpg(0, signal.SIGTERM))
    signal.pause()
    for p in processes:
        p.terminate()

if __name__ == '__main__':
    main()

惊群效应是指事件发生的时候,多个进程或线程竞争处理这个事件,导致系统负载出现一个尖峰。严重的情况下可能导致系统瘫痪。
虽然 accept 会阻塞住,只有一个抢到,但是惊群的问题应该还是存在。

进程组

  • os.setpgrp 设置进程组
  • os.killpg 向进程组发送信号,如果没有设置进程组,这个操作没有意义
  • os.set_inheritable 继承文件描述符,然后可以独立使用和关闭