Python Subprocess
2023-09-27
程序中有些时候无法避免需要执行系统命令完成一些任务。
比如,我们系统有一个内部的小功能以来 rsync 来做文件同步。
最近发现如果数据量稍微大一点,rsync 就会卡住,不知道为什么。
经过排查之后,发现是 Popen 模块的使用错误导致。
准备环境
mkdir /tmp/aa /tmp/bb
# Create 10K ~ 1000k Files
for i in {00001..10000}; do
file_size=$((1 + $RANDOM % 100))
dd if=/dev/urandom of=/tmp/aa/file$i.txt bs=10K count=$file_size
done
# du -sh /tmp/aa/
# 4.9G /tmp/aa/
代码
from subprocess import PIPE, STDOUT, Popen
src_dir = '/tmp/aa/'
tgt_dir = '/tmp/bb/'
# --remove-source-files
command = 'rsync -av %s %s' % (src_dir, tgt_dir)
p = Popen(command, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=True)
p.wait()
if p.returncode == 0:
LOG.info('rsync success')
else:
LOG.warning('rsync error %d', p.returncode)
数据传输卡在 file0670.txt 了,总传输数据 2.3G。
排查
经过排查之后,确认是我们的编码问题。
代码捕获了标准输出和标准错误,但是我们没有去读这个数据,最后把管道缓冲区占满了,程序就无法继续运行。
Popen 初始化有一个参数 pipesize,如果设置了,则会调用 fcntl.fcntl(p2cwrite, fcntl.F_SETPIPE_SZ, self.pipesize)
设置缓冲区大小。
在 man fcntl 中了解到:
Changing the capacity of a pipe
F_SETPIPE_SZ (int; since Linux 2.6.35)
Change the capacity of the pipe referred to by fd to be at least arg bytes. An unprivileged process can adjust the pipe capacity to any value between the system page size and the limit defined in /proc/sys/fs/pipe-max-size (see proc(5)). Attempts to set the pipe capacity below the page size are silently rounded up to the page size. Attempts by an unprivileged process to set the pipe capacity above the limit in /proc/sys/fs/pipe-max-size yield the error EPERM; a privileged process (CAP_SYS_RESOURCE) can override the limit.
When allocating the buffer for the pipe, the kernel may use a capacity larger than arg, if that is convenient for the implementation. (In the current implementation, the allocation is the next higher power-of-two page-size multiple of the requested size.) The actual capacity (in bytes) that is set is returned as the function result.
Attempting to set the pipe capacity smaller than the amount of buffer space currently used to store data produces the error EBUSY.
Note that because of the way the pages of the pipe buffer are employed when data is written to the pipe, the number of bytes that can be written may be less than the nominal size, depending on the size of the writes.
F_GETPIPE_SZ (void; since Linux 2.6.35)
Return (as the function result) the capacity of the pipe referred to by fd.
又在 man 7 pipe | grep size -C10
中了解到:
Pipe capacity
A pipe has a limited capacity. If the pipe is full, then a write(2) will block or fail, depending on whether the O_NONBLOCK flag is set (see below). Different implementations have different limits for the pipe capacity.
Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.
In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 16 pages (i.e., 65,536 bytes in a system with a page size of 4096 bytes). Since Linux 2.6.35, the default pipe capacity is 16 pages, but the capacity can be queried and set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. See fcntl(2) for more information.
The following ioctl(2) operation, which can be applied to a file descriptor that refers to either end of a pipe, places a count of the number of unread bytes in the pipe in the int buffer pointed to by the final argument of the call:
ioctl(fd, FIONREAD, &nbytes);
The FIONREAD operation is not specified in any standard, but is provided on many implementations.
也就是说:
- 非特权进程可以调整管道缓冲区大小,范围是:页大小到
/proc/sys/fs/pipe-max-size
- 低于页大小,这会被当作是页大小
- 超过 pipe-max-size 则会报错 EPERM
- 特权进程不受限制
- 管道缓冲区大小
- 2.6.11 以前,系统页大小
- 2.6.11 之后,系统页大小 x 16
- 2.6.35 之后,可以通过 fcntl 来手动调整
- 如果管道缓冲区满了,则会写阻塞,除非程序设置了非阻塞运行模式(
O_NONBLOCK
)
查看当前系统的页大小
getconf PAGE_SIZE
4096
getconf PAGESIZE
4096
验证
系统的缓冲区大小应该是 16 x 4K = 64K
import subprocess
cmd = ['python', '-c', 'print("a" * 1024 * 64, end=".")']
# cmd = ['python', '-c', 'import time; time.sleep(10);']
print(1)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
print(2)
# stdout, stderr = p.communicate()
# print(repr([stdout, stderr]))
print(3)
p.wait()
子进程执行 python 命令,输出 64KB 不会卡住,增加 1B 就会卡在 wait 那个地方。
解除 communicate 那一行的注释,程序能正常运行。
调整
子程序使用系统的 stdin/stdout/stderr
Popen(command, shell=True)
重定向到 DEVNULL
Popen(command, stdout=subprocess.DEVNULL, shell=True)
# python2 不支持 DEVNULL
devnull = os.open(os.devnull, os.O_RDWR)
Popen(command, stdout=devnull, shell=True)
devnull.close()
读子程序的 stdout
p = Popen(command, stdout=PIPE, stderr=STDOUT, shell=True)
# 程序阻塞着,不停从子进程标准输出读数据
p.communicate()
使用 run
result = subprocess.run(command, shell=True)
print(result.returncode)
print(result.stdout)
print(result.stderr)
Python Subprocess
2017-10-09
如果命令不存在就会报:
FileNotFoundError: [Errno 2] No such file or directory: 'pythonjit'
示例
运行命令
import subprocess
subprocess.run('ls')
import subprocess
from subprocess import PIPE
result = subprocess.run(['touch', '/tmp/abc'])
print(result.stdout)
result = subprocess.run(['ls'], stdout=PIPE, text=True)
print(result.stdout)
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
result = subprocess.run(cmd, capture_output=True, text=True)
# capture_output=True => stdout = stderr = PIPE
print(result.stdout)
直接使用 Popen,灵活一些,也就复杂一些:
import sys
import subprocess
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
index = 0
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# print(p.stdout.read()) # 这一句会阻塞,直到程序执行完成
while True:
index += 1
try:
output, errors = p.communicate(timeout=0.1)
print(output)
break
except subprocess.TimeoutExpired:
print(str(index), end=' ')
sys.stdout.flush()
只是一个演示,如果需要异步执行的话,Python 3 支持基于 asyncio 的 subprocess。
参见:AsyncIO 异步执行命令
获取输出
import subprocess
output = subprocess.check_output(['ls'], text=True)
print(output)
获取返回代码
import subprocess
from subprocess import DEVNULL
try:
subprocess.check_call(['ls', '/etc/abc'], stdout=DEVNULL, stderr=DEVNULL)
except subprocess.CalledProcessError as e:
print(f'命令返回代码: {e.returncode}')
stdin/stdout/stderr
import subprocess
from subprocess import PIPE, STDOUT
process = subprocess.Popen(['grep', 'o'], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
process.stdin.write(b'Hello, World.\nWhats up?\nI am fine.\n3Q!\nAnd you?\n')
process.stdin.close()
return_code = process.wait()
print(process.stdout.read())
# b'Hello, World.\nAnd you?\n'
import subprocess
from subprocess import PIPE
result = subprocess.run(['ls', '-xyz'], stderr=PIPE)
print(repr(result.stderr.decode('utf-8')))
result = subprocess.run(['ls', '-xyz'], stderr=PIPE, text=True)
print(repr(result.stderr))
# 'ls: 不适用的选项 -- y\n请尝试执行 "ls --help" 来获取更多信息。\n'
管道
import subprocess
cmd = [
['tail', '-n10000', '/var/log/server.log'],
['grep', 'upload file'],
['awk', '{print $3}'],
]
stdin = None
for _cmd in cmd:
proc = subprocess.Popen(_cmd, stdin=stdin, stdout=subprocess.PIPE, text=True)
stdin = proc.stdout
output, error = proc.communicate()
print(output.strip())
print(error)
属性和方法
PIPE
-1,管道,表示捕获标准输入,或标准输出,或标准错误
默认 stdin,stdout,stderr 是 None,也就是说没有配置,使用正常的标准输入,标准输出,标准错误。
STDOUT
-2,为标准错误准备,表示捕获标准错误,将其和标准输出混在一起,相当于 2>&1
-
DEVNULL
-3,空设备,表示丢弃标准输入,或标准输出,或标准错误(重定向到 /dev/null
)
-
Popen
进程的 Python 封装
CompletedProcess
程序执行结果,包含命令,参数,返回吗,标准输出,标准错误
-
SubprocessError
-
CalledProcessError
如果得到错误状态码(非零),并且执行参数中 check=True
-
TimeoutExpired
等待子进程超时
-
list2cmdline
将命令列表组合一个完整命令
其实标准库中有另一个库可以做这个事情:
import shlex
cmd = 'grep -F "hello world" /tmp/abc.log'
args = shlex.split(cmd)
print(args)
# ['grep', '-F', 'hello world', '/tmp/abc.log']
print(shlex.join(args))
# grep -F 'hello world' /tmp/abc.log
def call(*popenargs, timeout=None, **kwargs):
with Popen(*popenargs, **kwargs) as p:
try:
return p.wait(timeout=timeout)
except:
p.kill()
raise
check_call
call 的封装,如果得到错误状态,就抛出 CalledProcessError
def check_call(*popenargs, **kwargs):
retcode = call(*popenargs, **kwargs)
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(retcode, cmd)
return 0
def check_output(*popenargs, timeout=None, **kwargs):
for kw in ('stdout', 'check'):
if kw in kwargs:
raise ValueError(f'{kw} argument not allowed, it will be overridden.')
if 'input' in kwargs and kwargs['input'] is None:
if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors'):
empty = ''
else:
empty = b''
kwargs['input'] = empty
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, **kwargs).stdout
def getoutput(cmd, *, encoding=None, errors=None):
return getstatusoutput(cmd, encoding=encoding, errors=errors)[1]
getstatusoutput
运行程序,获取状态码和输出(stdout + stderr)
def getstatusoutput(cmd, *, encoding=None, errors=None):
try:
data = check_output(cmd, shell=True, text=True, stderr=STDOUT,
encoding=encoding, errors=errors)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
exitcode = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return exitcode, data
def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
if input is not None:
if kwargs.get('stdin') is not None:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
raise ValueError('stdout and stderr arguments may not be used with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired as exc:
process.kill()
if _mswindows:
exc.stdout, exc.stderr = process.communicate()
else:
process.wait()
raise
except:
process.kill()
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
封装关系:
Popen -> call -> check_call
Popen -> run -> check_output -> getstatusoutput -> getoutput
Popen
https://github.com/python/cpython/blob/master/Lib/subprocess.py
class Popen:
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True,
shell=False, cwd=None, env=None, universal_newlines=None,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
process_group=None): pass
# 创建子进程,执行系统命令
# args 命令和参数
# bufsize
# executable 替代
# stdin
# stdout
# stderr
# preexec_fn
# close_fds
# shell
# cwd
# env
# universal_newlines
# startupinfo
# creationflags
# restore_signals
# start_new_session
# pass_fds
# user
# group
# extra_groups
# encoding
# errors
# text
# 如果指定不一样的 text 和 universal_newlines 值,则抛出 SubprocessError
# self.text_mode = encoding or errors or text or universal_newlines
# umask
# pipesize
# process_group
def __repr__(self): pass
@property
def universal_newlines(self):
return self.text_mode
@universal_newlines.setter
def universal_newlines(self, universal_newlines):
self.text_mode = bool(universal_newlines)
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback): pass
def __del__(self, _maxsize=sys.maxsize, _warn=warnings.warn): pass
def communicate(self, input=None, timeout=None) -> (stdout, stderr): pass
def poll(self) -> None|returncode: pass # 检查进程是否结束,返回状态码或 None(没有结束)
def wait(self, timeout=None) -> returncode: pass # 等待进程结束,返回状态码
def send_signal(self, sig):
self.poll()
if self.returncode is not None: # 程序已经执行结束
return
try: # 防止并发杀进程
os.kill(self.pid, sig)
except ProcessLookupError:
pass
def terminate(self): # 结束进程 -15
self.send_signal(signal.SIGTERM)
def kill(self): # 强杀进程 -9
self.send_signal(signal.SIGKILL)