#3 Python Popen 卡住

2023-09-27

程序中有些时候无法避免需要执行系统命令完成一些任务。
比如,我们系统有一个内部的小功能以来 rsync 来做文件同步。

最近发现如果数据量稍微大一点,rsync 就会卡住,不知道为什么。

经过排查之后,发现是 Popen 模块的使用错误导致。

准备环境

mkdir /tmp/aa /tmp/bb
# Create 10K ~ 1000k Files
for i in {00001..10000}; do
    file_size=$((1 + $RANDOM % 100))
    dd if=/dev/urandom of=/tmp/aa/file$i.txt bs=10K count=$file_size
done
# du -sh /tmp/aa/
# 4.9G    /tmp/aa/

代码

from subprocess import PIPE, STDOUT, Popen

src_dir = '/tmp/aa/'
tgt_dir = '/tmp/bb/'

# --remove-source-files
command = 'rsync -av %s %s' % (src_dir, tgt_dir)
p = Popen(command, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True)
p.wait()
if p.returncode == 0:
    LOG.info('rsync success')
else:
    LOG.warning('rsync error %d', p.returncode)

数据传输卡在 file0670.txt 了,总传输数据 2.3G。

排查

经过排查之后,确认是我们的编码问题。
代码捕获了标准输出和标准错误,但是我们没有去读这个数据,最后把管道缓冲区占满了,程序就无法继续运行。

Popen 初始化有一个参数 pipesize,如果设置了,则会调用 fcntl.fcntl(p2cwrite, fcntl.F_SETPIPE_SZ, self.pipesize) 设置缓冲区大小。
在 man fcntl 中了解到:

Changing the capacity of a pipe
    F_SETPIPE_SZ (int; since Linux 2.6.35)
            Change  the capacity of the pipe referred to by fd to be at least arg bytes.  An unprivileged process can adjust the pipe capacity to any value between the system page size and the limit defined in /proc/sys/fs/pipe-max-size (see proc(5)).  Attempts to set the pipe capacity below the page size are silently rounded up to the page  size.   Attempts  by  an  unprivileged  process  to  set  the  pipe  capacity  above  the  limit  in /proc/sys/fs/pipe-max-size yield the error EPERM; a privileged process (CAP_SYS_RESOURCE) can override the limit.

            When  allocating  the  buffer for the pipe, the kernel may use a capacity larger than arg, if that is convenient for the implementation.  (In the current implementation, the allocation is the next higher power-of-two page-size multiple of the requested size.)  The actual capacity (in bytes) that is set is returned as the function result.

            Attempting to set the pipe capacity smaller than the amount of buffer space currently used to store data produces the error EBUSY.

            Note that because of the way the pages of the pipe buffer are employed when data is written to the pipe, the number of bytes that can be written may be less than the nominal size, depending on the size of the writes.

    F_GETPIPE_SZ (void; since Linux 2.6.35)
            Return (as the function result) the capacity of the pipe referred to by fd.

又在 man 7 pipe | grep size -C10 中了解到:

Pipe capacity
    A pipe has a limited capacity.  If the pipe is full, then a write(2) will block or fail, depending on whether the O_NONBLOCK flag is set (see below).  Different implementations have different limits for the  pipe  capacity.
    Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.

    In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386).  Since Linux 2.6.11, the pipe capacity is 16 pages (i.e., 65,536 bytes in a system with a page size of 4096 bytes).  Since Linux 2.6.35, the default pipe capacity is 16 pages, but the capacity can be queried and set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations.  See fcntl(2) for more information.

    The following ioctl(2) operation, which can be applied to a file descriptor that refers to either end of a pipe, places a count of the number of unread bytes in the pipe in the int buffer pointed to by the final argument of the call:

        ioctl(fd, FIONREAD, &nbytes);

    The FIONREAD operation is not specified in any standard, but is provided on many implementations.

也就是说:

  1. 非特权进程可以调整管道缓冲区大小,范围是:页大小到 /proc/sys/fs/pipe-max-size
  2. 低于页大小,这会被当作是页大小
  3. 超过 pipe-max-size 则会报错 EPERM
  4. 特权进程不受限制
  5. 管道缓冲区大小
  6. 2.6.11 以前,系统页大小
  7. 2.6.11 之后,系统页大小 x 16
  8. 2.6.35 之后,可以通过 fcntl 来手动调整
  9. 如果管道缓冲区满了,则会写阻塞,除非程序设置了非阻塞运行模式(O_NONBLOCK

查看当前系统的页大小

getconf PAGE_SIZE
4096
getconf PAGESIZE
4096

验证

系统的缓冲区大小应该是 16 x 4K = 64K

import subprocess

cmd = ['python', '-c', 'print("a" * 1024 * 64, end=".")']
# cmd = ['python', '-c', 'import time; time.sleep(10);']
print(1)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
print(2)
# stdout, stderr = p.communicate()
# print(repr([stdout, stderr]))
print(3)
p.wait()

子进程执行 python 命令,输出 64KB 不会卡住,增加 1B 就会卡在 wait 那个地方。
解除 communicate 那一行的注释,程序能正常运行。

调整

子程序使用系统的 stdin/stdout/stderr

Popen(command, shell=True)

重定向到 DEVNULL

Popen(command, stdout=subprocess.DEVNULL, shell=True)

# python2 不支持 DEVNULL
devnull = os.open(os.devnull, os.O_RDWR)
Popen(command, stdout=devnull, shell=True)
devnull.close()

读子程序的 stdout

p = Popen(command, stdout=PIPE, stderr=STDOUT, shell=True)
# 程序阻塞着,不停从子进程标准输出读数据
p.communicate()

使用 run

result = subprocess.run(command, shell=True)
print(result.returncode)
print(result.stdout)
print(result.stderr)

#2 Subprocess Popen

2017-10-09

如果命令不存在就会报:

FileNotFoundError: [Errno 2] No such file or directory: 'pythonjit'

示例

运行命令

import subprocess
subprocess.run('ls')
import subprocess
from subprocess import PIPE

result = subprocess.run(['touch', '/tmp/abc'])
print(result.stdout)

result = subprocess.run(['ls'], stdout=PIPE, text=True)
print(result.stdout)

cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
result = subprocess.run(cmd, capture_output=True, text=True)
# capture_output=True => stdout = stderr = PIPE
print(result.stdout)

直接使用 Popen,灵活一些,也就复杂一些:

import sys
import subprocess
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
index = 0
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# print(p.stdout.read())  # 这一句会阻塞,直到程序执行完成
while True:
    index += 1
    try:
        output, errors = p.communicate(timeout=0.1)
        print(output)
        break
    except subprocess.TimeoutExpired:
        print(str(index), end=' ')
        sys.stdout.flush()

只是一个演示,如果需要异步执行的话,Python 3 支持基于 asyncio 的 subprocess。
参见:AsyncIO 异步执行命令

获取输出

import subprocess
output = subprocess.check_output(['ls'], text=True)
print(output)

获取返回代码

import subprocess
from subprocess import DEVNULL
try:
    subprocess.check_call(['ls', '/etc/abc'], stdout=DEVNULL, stderr=DEVNULL)
except subprocess.CalledProcessError as e:
    print(f'命令返回代码: {e.returncode}')

stdin/stdout/stderr

import subprocess
from subprocess import PIPE, STDOUT
process = subprocess.Popen(['grep', 'o'], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
process.stdin.write(b'Hello, World.\nWhats up?\nI am fine.\n3Q!\nAnd you?\n')
process.stdin.close()
return_code = process.wait()
print(process.stdout.read())
# b'Hello, World.\nAnd you?\n'
import subprocess
from subprocess import PIPE

result = subprocess.run(['ls', '-xyz'], stderr=PIPE)
print(repr(result.stderr.decode('utf-8')))

result = subprocess.run(['ls', '-xyz'], stderr=PIPE, text=True)
print(repr(result.stderr))
# 'ls: 不适用的选项 -- y\n请尝试执行 "ls --help" 来获取更多信息。\n'

管道

import subprocess
cmd = [
    ['tail', '-n10000', '/var/log/server.log'],
    ['grep', 'upload file'],
    ['awk', '{print $3}'],
]
stdin = None
for _cmd in cmd:
    proc = subprocess.Popen(_cmd, stdin=stdin, stdout=subprocess.PIPE, text=True)
    stdin = proc.stdout
output, error = proc.communicate()
print(output.strip())
print(error)

属性和方法

  • PIPE -1,管道,表示捕获标准输入,或标准输出,或标准错误
    默认 stdin,stdout,stderr 是 None,也就是说没有配置,使用正常的标准输入,标准输出,标准错误。
  • STDOUT -2,为标准错误准备,表示捕获标准错误,将其和标准输出混在一起,相当于 2>&1
  • DEVNULL -3,空设备,表示丢弃标准输入,或标准输出,或标准错误(重定向到 /dev/null

  • Popen 进程的 Python 封装

  • CompletedProcess 程序执行结果,包含命令,参数,返回吗,标准输出,标准错误
  • SubprocessError

  • CalledProcessError 如果得到错误状态码(非零),并且执行参数中 check=True

  • TimeoutExpired 等待子进程超时

  • list2cmdline 将命令列表组合一个完整命令

其实标准库中有另一个库可以做这个事情:

import shlex
cmd = 'grep -F "hello world" /tmp/abc.log'
args = shlex.split(cmd)
print(args)
# ['grep', '-F', 'hello world', '/tmp/abc.log']
print(shlex.join(args))
# grep -F 'hello world' /tmp/abc.log
  • call 极简版本
def call(*popenargs, timeout=None, **kwargs):
    with Popen(*popenargs, **kwargs) as p:
        try:
            return p.wait(timeout=timeout)
        except:
            p.kill()
            raise
  • check_call call 的封装,如果得到错误状态,就抛出 CalledProcessError
def check_call(*popenargs, **kwargs):
    retcode = call(*popenargs, **kwargs)
    if retcode:
        cmd = kwargs.get("args")
        if cmd is None:
            cmd = popenargs[0]
        raise CalledProcessError(retcode, cmd)
    return 0
  • check_output 运行程序,获取标准输出
def check_output(*popenargs, timeout=None, **kwargs):
    for kw in ('stdout', 'check'):
        if kw in kwargs:
            raise ValueError(f'{kw} argument not allowed, it will be overridden.')
    if 'input' in kwargs and kwargs['input'] is None:
        if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors'):
            empty = ''
        else:
            empty = b''
        kwargs['input'] = empty
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, **kwargs).stdout
  • getoutput 运行程序,获取输出
def getoutput(cmd, *, encoding=None, errors=None):
    return getstatusoutput(cmd, encoding=encoding, errors=errors)[1]
  • getstatusoutput 运行程序,获取状态码和输出(stdout + stderr)
def getstatusoutput(cmd, *, encoding=None, errors=None):
    try:
        data = check_output(cmd, shell=True, text=True, stderr=STDOUT,
                            encoding=encoding, errors=errors)
        exitcode = 0
    except CalledProcessError as ex:
        data = ex.output
        exitcode = ex.returncode
    if data[-1:] == '\n':
        data = data[:-1]
    return exitcode, data
  • run 运行程序
def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
    if input is not None:
        if kwargs.get('stdin') is not None:
            raise ValueError('stdin and input arguments may not both be used.')
        kwargs['stdin'] = PIPE
    if capture_output:
        if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
            raise ValueError('stdout and stderr arguments may not be used with capture_output.')
        kwargs['stdout'] = PIPE
        kwargs['stderr'] = PIPE
    with Popen(*popenargs, **kwargs) as process:
        try:
            stdout, stderr = process.communicate(input, timeout=timeout)
        except TimeoutExpired as exc:
            process.kill()
            if _mswindows:
                exc.stdout, exc.stderr = process.communicate()
            else:
                process.wait()
            raise
        except:
            process.kill()
            raise
        retcode = process.poll()
        if check and retcode:
            raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr)
    return CompletedProcess(process.args, retcode, stdout, stderr)

封装关系:

Popen -> call -> check_call
Popen -> run -> check_output -> getstatusoutput -> getoutput

Popen

https://github.com/python/cpython/blob/master/Lib/subprocess.py

class Popen:
    def __init__(self, args, bufsize=-1, executable=None,
                 stdin=None, stdout=None, stderr=None,
                 preexec_fn=None, close_fds=True,
                 shell=False, cwd=None, env=None, universal_newlines=None,
                 startupinfo=None, creationflags=0,
                 restore_signals=True, start_new_session=False,
                 pass_fds=(), *, user=None, group=None, extra_groups=None,
                 encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
                 process_group=None): pass
        # 创建子进程,执行系统命令
        # args          命令和参数
        # bufsize
        # executable    替代
        # stdin
        # stdout
        # stderr
        # preexec_fn
        # close_fds
        # shell
        # cwd
        # env
        # universal_newlines
        # startupinfo
        # creationflags
        # restore_signals
        # start_new_session
        # pass_fds
        # user
        # group
        # extra_groups
        # encoding
        # errors
        # text
        #   如果指定不一样的 text 和 universal_newlines 值,则抛出 SubprocessError
        #   self.text_mode = encoding or errors or text or universal_newlines
        # umask
        # pipesize
        # process_group

    def __repr__(self): pass

    @property
    def universal_newlines(self):
        return self.text_mode

    @universal_newlines.setter
    def universal_newlines(self, universal_newlines):
        self.text_mode = bool(universal_newlines)

    def __enter__(self):
        return self
    def __exit__(self, exc_type, value, traceback): pass
    def __del__(self, _maxsize=sys.maxsize, _warn=warnings.warn): pass

    def communicate(self, input=None, timeout=None) -> (stdout, stderr): pass
    def poll(self) -> None|returncode: pass  # 检查进程是否结束,返回状态码或 None(没有结束)
    def wait(self, timeout=None) -> returncode: pass  # 等待进程结束,返回状态码
    def send_signal(self, sig):
        self.poll()
        if self.returncode is not None:  # 程序已经执行结束
            return
        try:  # 防止并发杀进程
            os.kill(self.pid, sig)
        except ProcessLookupError:
            pass
    def terminate(self):  # 结束进程 -15
        self.send_signal(signal.SIGTERM)
    def kill(self):  # 强杀进程 -9
        self.send_signal(signal.SIGKILL)