TOC

Subprocess Popen

如果命令不存在就会报:

FileNotFoundError: [Errno 2] No such file or directory: 'pythonjit'

示例

运行命令

import subprocess
subprocess.run('ls')
import subprocess
from subprocess import PIPE

result = subprocess.run(['touch', '/tmp/abc'])
print(result.stdout)

result = subprocess.run(['ls'], stdout=PIPE, text=True)
print(result.stdout)

cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
result = subprocess.run(cmd, capture_output=True, text=True)
# capture_output=True => stdout = stderr = PIPE
print(result.stdout)

直接使用 Popen,灵活一些,也就复杂一些:

import sys
import subprocess
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
index = 0
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# print(p.stdout.read())  # 这一句会阻塞,直到程序执行完成
while True:
    index += 1
    try:
        output, errors = p.communicate(timeout=0.1)
        print(output)
        break
    except subprocess.TimeoutExpired:
        print(str(index), end=' ')
        sys.stdout.flush()

只是一个演示,如果需要异步执行的话,Python 3 支持基于 asyncio 的 subprocess。
参见:AsyncIO 异步执行命令

获取输出

import subprocess
output = subprocess.check_output(['ls'], text=True)
print(output)

获取返回代码

import subprocess
from subprocess import DEVNULL
try:
    subprocess.check_call(['ls', '/etc/abc'], stdout=DEVNULL, stderr=DEVNULL)
except subprocess.CalledProcessError as e:
    print(f'命令返回代码: {e.returncode}')

stdin/stdout/stderr

import subprocess
from subprocess import PIPE, STDOUT
process = subprocess.Popen(['grep', 'o'], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
process.stdin.write(b'Hello, World.\nWhats up?\nI am fine.\n3Q!\nAnd you?\n')
process.stdin.close()
return_code = process.wait()
print(process.stdout.read())
# b'Hello, World.\nAnd you?\n'
import subprocess
from subprocess import PIPE

result = subprocess.run(['ls', '-xyz'], stderr=PIPE)
print(repr(result.stderr.decode('utf-8')))

result = subprocess.run(['ls', '-xyz'], stderr=PIPE, text=True)
print(repr(result.stderr))
# 'ls: 不适用的选项 -- y\n请尝试执行 "ls --help" 来获取更多信息。\n'

管道

import subprocess
cmd = [
    ['tail', '-n10000', '/var/log/server.log'],
    ['grep', 'upload file'],
    ['awk', '{print $3}'],
]
stdin = None
for _cmd in cmd:
    proc = subprocess.Popen(_cmd, stdin=stdin, stdout=subprocess.PIPE, text=True)
    stdin = proc.stdout
output, error = proc.communicate()
print(output.strip())
print(error)

属性和方法

  • PIPE -1,管道,表示捕获标准输入,或标准输出,或标准错误
    默认 stdin,stdout,stderr 是 None,也就是说没有配置,使用正常的标准输入,标准输出,标准错误。
  • STDOUT -2,为标准错误准备,表示捕获标准错误,将其和标准输出混在一起,相当于 2>&1
  • DEVNULL -3,空设备,表示丢弃标准输入,或标准输出,或标准错误(重定向到 /dev/null

  • Popen 进程的 Python 封装

  • CompletedProcess 程序执行结果,包含命令,参数,返回吗,标准输出,标准错误
  • SubprocessError

  • CalledProcessError 如果得到错误状态码(非零),并且执行参数中 check=True

  • TimeoutExpired 等待子进程超时

  • list2cmdline 将命令列表组合一个完整命令

其实标准库中有另一个库可以做这个事情:

import shlex
cmd = 'grep -F "hello world" /tmp/abc.log'
args = shlex.split(cmd)
print(args)
# ['grep', '-F', 'hello world', '/tmp/abc.log']
print(shlex.join(args))
# grep -F 'hello world' /tmp/abc.log
  • call 极简版本
def call(*popenargs, timeout=None, **kwargs):
    with Popen(*popenargs, **kwargs) as p:
        try:
            return p.wait(timeout=timeout)
        except:
            p.kill()
            raise
  • check_call call 的封装,如果得到错误状态,就抛出 CalledProcessError
def check_call(*popenargs, **kwargs):
    retcode = call(*popenargs, **kwargs)
    if retcode:
        cmd = kwargs.get("args")
        if cmd is None:
            cmd = popenargs[0]
        raise CalledProcessError(retcode, cmd)
    return 0
  • check_output 运行程序,获取标准输出
def check_output(*popenargs, timeout=None, **kwargs):
    for kw in ('stdout', 'check'):
        if kw in kwargs:
            raise ValueError(f'{kw} argument not allowed, it will be overridden.')
    if 'input' in kwargs and kwargs['input'] is None:
        if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors'):
            empty = ''
        else:
            empty = b''
        kwargs['input'] = empty
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, **kwargs).stdout
  • getoutput 运行程序,获取输出
def getoutput(cmd, *, encoding=None, errors=None):
    return getstatusoutput(cmd, encoding=encoding, errors=errors)[1]
  • getstatusoutput 运行程序,获取状态码和输出(stdout + stderr)
def getstatusoutput(cmd, *, encoding=None, errors=None):
    try:
        data = check_output(cmd, shell=True, text=True, stderr=STDOUT,
                            encoding=encoding, errors=errors)
        exitcode = 0
    except CalledProcessError as ex:
        data = ex.output
        exitcode = ex.returncode
    if data[-1:] == '\n':
        data = data[:-1]
    return exitcode, data
  • run 运行程序
def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
    if input is not None:
        if kwargs.get('stdin') is not None:
            raise ValueError('stdin and input arguments may not both be used.')
        kwargs['stdin'] = PIPE
    if capture_output:
        if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
            raise ValueError('stdout and stderr arguments may not be used with capture_output.')
        kwargs['stdout'] = PIPE
        kwargs['stderr'] = PIPE
    with Popen(*popenargs, **kwargs) as process:
        try:
            stdout, stderr = process.communicate(input, timeout=timeout)
        except TimeoutExpired as exc:
            process.kill()
            if _mswindows:
                exc.stdout, exc.stderr = process.communicate()
            else:
                process.wait()
            raise
        except:
            process.kill()
            raise
        retcode = process.poll()
        if check and retcode:
            raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr)
    return CompletedProcess(process.args, retcode, stdout, stderr)

封装关系:

Popen -> call -> check_call
Popen -> run -> check_output -> getstatusoutput -> getoutput

Popen

https://github.com/python/cpython/blob/master/Lib/subprocess.py

class Popen:
    def __init__(self, args, bufsize=-1, executable=None,
                 stdin=None, stdout=None, stderr=None,
                 preexec_fn=None, close_fds=True,
                 shell=False, cwd=None, env=None, universal_newlines=None,
                 startupinfo=None, creationflags=0,
                 restore_signals=True, start_new_session=False,
                 pass_fds=(), *, user=None, group=None, extra_groups=None,
                 encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
                 process_group=None): pass
        # 创建子进程,执行系统命令
        # args          命令和参数
        # bufsize
        # executable    替代
        # stdin
        # stdout
        # stderr
        # preexec_fn
        # close_fds
        # shell
        # cwd
        # env
        # universal_newlines
        # startupinfo
        # creationflags
        # restore_signals
        # start_new_session
        # pass_fds
        # user
        # group
        # extra_groups
        # encoding
        # errors
        # text
        #   如果指定不一样的 text 和 universal_newlines 值,则抛出 SubprocessError
        #   self.text_mode = encoding or errors or text or universal_newlines
        # umask
        # pipesize
        # process_group

    def __repr__(self): pass

    @property
    def universal_newlines(self):
        return self.text_mode

    @universal_newlines.setter
    def universal_newlines(self, universal_newlines):
        self.text_mode = bool(universal_newlines)

    def __enter__(self):
        return self
    def __exit__(self, exc_type, value, traceback): pass
    def __del__(self, _maxsize=sys.maxsize, _warn=warnings.warn): pass

    def communicate(self, input=None, timeout=None) -> (stdout, stderr): pass
    def poll(self) -> None|returncode: pass  # 检查进程是否结束,返回状态码或 None(没有结束)
    def wait(self, timeout=None) -> returncode: pass  # 等待进程结束,返回状态码
    def send_signal(self, sig):
        self.poll()
        if self.returncode is not None:  # 程序已经执行结束
            return
        try:  # 防止并发杀进程
            os.kill(self.pid, sig)
        except ProcessLookupError:
            pass
    def terminate(self):  # 结束进程 -15
        self.send_signal(signal.SIGTERM)
    def kill(self):  # 强杀进程 -9
        self.send_signal(signal.SIGKILL)