#39 Python 项目结构图
Python Pyreverse UML graphviz dot 2018-01-30常用的代码风格检查工具 pylint 中集成了 pyreverse, 这是一个用于生成 Python 的类图的工具。
coding in a complicated world
常用的代码风格检查工具 pylint 中集成了 pyreverse, 这是一个用于生成 Python 的类图的工具。
import socket
import signal
import sys
import os
def handle_connection(conn):
conn.close()
def worker(sock):
while True:
try:
conn, addr = sock.accept()
handle_connection(conn)
except OSError as e:
if e.errno == socket.ECONNABORTED:
# 忽略 ECONNABORTED 错误
pass
else:
raise
def main():
port = 8080
backlog = 10 # 连接队列长度(超出会拒绝或忽略)
num_workers = 4 # 子进程数
# 创建监听器
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('localhost', port))
listener.listen(backlog)
sock.setblocking(False)
print(f"Listening on port {port}...")
# # “来一个连接,起一个进程”的模式
# def sig_handler(sig, frame):
# listener.close()
# sys.exit(0)
# signal.signal(signal.SIGINT, sig_handler)
# signal.signal(signal.SIGTERM, sig_handler)
# while True:
# conn, addr = listener.accept()
# pid = os.fork()
# if pid == 0:
# listener.close()
# handle_connection(conn)
# sys.exit(0)
# else:
# conn.close()
# 子进程放到进程组中
os.setpgrp()
# 多个 worker 子进程一同监听端口的模式
processes = []
for i in range(num_workers):
p = Process(target=worker, args=(sock,))
processes.append(p)
p.start()
# 通过 os.killpg 向进程组发送信号
signal.signal(signal.SIGINT, lambda signum, frame: os.killpg(0, signal.SIGINT))
signal.signal(signal.SIGTERM, lambda signum, frame: os.killpg(0, signal.SIGTERM))
signal.pause()
for p in processes:
p.terminate()
if __name__ == '__main__':
main()
惊群效应是指事件发生的时候,多个进程或线程竞争处理这个事件,导致系统负载出现一个尖峰。严重的情况下可能导致系统瘫痪。
虽然 accept 会阻塞住,只有一个抢到,但是惊群的问题应该还是存在。
os.setpgrp
设置进程组os.killpg
向进程组发送信号,如果没有设置进程组,这个操作没有意义os.set_inheritable
继承文件描述符,然后可以独立使用和关闭标记所有当前正在使用的对象,然后清除所有未被标记的对象来回收内存。
这个算法的缺点是当内存中有大量未被标记的垃圾时,清除过程可能会变得非常缓慢。
将对象分为三代:
不同代执行不同的标记策略。
Python 中有一些内置的工具可以用于检查内存使用、内存分配情况以及各类对象数量的统计,下面列举一些常用的方法:
@profile
装饰器 + -m memory_profiler
参数,输出每个代码行的内存占用情况,以及整个程序的内存使用情况。objgraph.show_refs()
对象引用图objgraph.show_most_common_types()
对象数量统计gc.collect()
手动触发垃圾回收gc.get_count()
垃圾回收相关信息手动释放内存:
如果命令不存在就会报:
FileNotFoundError: [Errno 2] No such file or directory: 'pythonjit'
import subprocess
subprocess.run('ls')
import subprocess
from subprocess import PIPE
result = subprocess.run(['touch', '/tmp/abc'])
print(result.stdout)
result = subprocess.run(['ls'], stdout=PIPE, text=True)
print(result.stdout)
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
result = subprocess.run(cmd, capture_output=True, text=True)
# capture_output=True => stdout = stderr = PIPE
print(result.stdout)
直接使用 Popen,灵活一些,也就复杂一些:
import sys
import subprocess
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
index = 0
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# print(p.stdout.read()) # 这一句会阻塞,直到程序执行完成
while True:
index += 1
try:
output, errors = p.communicate(timeout=0.1)
print(output)
break
except subprocess.TimeoutExpired:
print(str(index), end=' ')
sys.stdout.flush()
只是一个演示,如果需要异步执行的话,Python 3 支持基于 asyncio 的 subprocess。
参见:AsyncIO 异步执行命令
import subprocess
output = subprocess.check_output(['ls'], text=True)
print(output)
import subprocess
from subprocess import DEVNULL
try:
subprocess.check_call(['ls', '/etc/abc'], stdout=DEVNULL, stderr=DEVNULL)
except subprocess.CalledProcessError as e:
print(f'命令返回代码: {e.returncode}')
import subprocess
from subprocess import PIPE, STDOUT
process = subprocess.Popen(['grep', 'o'], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
process.stdin.write(b'Hello, World.\nWhats up?\nI am fine.\n3Q!\nAnd you?\n')
process.stdin.close()
return_code = process.wait()
print(process.stdout.read())
# b'Hello, World.\nAnd you?\n'
import subprocess
from subprocess import PIPE
result = subprocess.run(['ls', '-xyz'], stderr=PIPE)
print(repr(result.stderr.decode('utf-8')))
result = subprocess.run(['ls', '-xyz'], stderr=PIPE, text=True)
print(repr(result.stderr))
# 'ls: 不适用的选项 -- y\n请尝试执行 "ls --help" 来获取更多信息。\n'
import subprocess
cmd = [
['tail', '-n10000', '/var/log/server.log'],
['grep', 'upload file'],
['awk', '{print $3}'],
]
stdin = None
for _cmd in cmd:
proc = subprocess.Popen(_cmd, stdin=stdin, stdout=subprocess.PIPE, text=True)
stdin = proc.stdout
output, error = proc.communicate()
print(output.strip())
print(error)
PIPE
-1,管道,表示捕获标准输入,或标准输出,或标准错误STDOUT
-2,为标准错误准备,表示捕获标准错误,将其和标准输出混在一起,相当于 2>&1
DEVNULL
-3,空设备,表示丢弃标准输入,或标准输出,或标准错误(重定向到 /dev/null
)
Popen
进程的 Python 封装
CompletedProcess
程序执行结果,包含命令,参数,返回吗,标准输出,标准错误SubprocessError
CalledProcessError
如果得到错误状态码(非零),并且执行参数中 check=True
TimeoutExpired
等待子进程超时
list2cmdline
将命令列表组合一个完整命令
其实标准库中有另一个库可以做这个事情:
import shlex
cmd = 'grep -F "hello world" /tmp/abc.log'
args = shlex.split(cmd)
print(args)
# ['grep', '-F', 'hello world', '/tmp/abc.log']
print(shlex.join(args))
# grep -F 'hello world' /tmp/abc.log
call
极简版本def call(*popenargs, timeout=None, **kwargs):
with Popen(*popenargs, **kwargs) as p:
try:
return p.wait(timeout=timeout)
except:
p.kill()
raise
check_call
call 的封装,如果得到错误状态,就抛出 CalledProcessErrordef check_call(*popenargs, **kwargs):
retcode = call(*popenargs, **kwargs)
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(retcode, cmd)
return 0
check_output
运行程序,获取标准输出def check_output(*popenargs, timeout=None, **kwargs):
for kw in ('stdout', 'check'):
if kw in kwargs:
raise ValueError(f'{kw} argument not allowed, it will be overridden.')
if 'input' in kwargs and kwargs['input'] is None:
if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors'):
empty = ''
else:
empty = b''
kwargs['input'] = empty
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, **kwargs).stdout
getoutput
运行程序,获取输出def getoutput(cmd, *, encoding=None, errors=None):
return getstatusoutput(cmd, encoding=encoding, errors=errors)[1]
getstatusoutput
运行程序,获取状态码和输出(stdout + stderr)def getstatusoutput(cmd, *, encoding=None, errors=None):
try:
data = check_output(cmd, shell=True, text=True, stderr=STDOUT,
encoding=encoding, errors=errors)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
exitcode = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return exitcode, data
run
运行程序def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
if input is not None:
if kwargs.get('stdin') is not None:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
raise ValueError('stdout and stderr arguments may not be used with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired as exc:
process.kill()
if _mswindows:
exc.stdout, exc.stderr = process.communicate()
else:
process.wait()
raise
except:
process.kill()
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
封装关系:
Popen -> call -> check_call
Popen -> run -> check_output -> getstatusoutput -> getoutput
https://github.com/python/cpython/blob/master/Lib/subprocess.py
class Popen:
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True,
shell=False, cwd=None, env=None, universal_newlines=None,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
process_group=None): pass
# 创建子进程,执行系统命令
# args 命令和参数
# bufsize
# executable 替代
# stdin
# stdout
# stderr
# preexec_fn
# close_fds
# shell
# cwd
# env
# universal_newlines
# startupinfo
# creationflags
# restore_signals
# start_new_session
# pass_fds
# user
# group
# extra_groups
# encoding
# errors
# text
# 如果指定不一样的 text 和 universal_newlines 值,则抛出 SubprocessError
# self.text_mode = encoding or errors or text or universal_newlines
# umask
# pipesize
# process_group
def __repr__(self): pass
@property
def universal_newlines(self):
return self.text_mode
@universal_newlines.setter
def universal_newlines(self, universal_newlines):
self.text_mode = bool(universal_newlines)
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback): pass
def __del__(self, _maxsize=sys.maxsize, _warn=warnings.warn): pass
def communicate(self, input=None, timeout=None) -> (stdout, stderr): pass
def poll(self) -> None|returncode: pass # 检查进程是否结束,返回状态码或 None(没有结束)
def wait(self, timeout=None) -> returncode: pass # 等待进程结束,返回状态码
def send_signal(self, sig):
self.poll()
if self.returncode is not None: # 程序已经执行结束
return
try: # 防止并发杀进程
os.kill(self.pid, sig)
except ProcessLookupError:
pass
def terminate(self): # 结束进程 -15
self.send_signal(signal.SIGTERM)
def kill(self): # 强杀进程 -9
self.send_signal(signal.SIGKILL)
这里是指在本地执行命令的方法,通过 SSH 的方式这里不做讨论。
其他项目:
PyCrypto Crypto
曾经非常著名的项目,后来没有维护了,推荐后面两个。
PyCryptodome Crypto
从 PyCrypto 分叉出来的项目,继续维护中。
Cryptodome
Cryptodome
做包名,方便和老的 PyCrypto 共存。Cryptography cryptography
算 Python 官方项目(PyCA Python Cryptographic Authority)。
pip install virtualenvwrapper
把以下内容加入 ~/.bashrc
,~/.zshrc
等 Shell 初始化代码中:
export WORKON_HOME="~/.virtualenvs"
export VIRTUALENVWRAPPER_PYTHON="/usr/bin/python3"
source virtualenvwrapper.sh
source virtualenvwrapper_lazy.sh
也可以,只是不会自动补全了。/usr/local/bin/
目录,或者 ~/.local/bin/
目录,所以可以直接 source 加载。# 创建并进入虚拟环境
mkvirtualenv myproject
# 进入指定虚拟环境
workon myproject
# 退出虚拟环境
deactivate
# 列出虚拟环境
workon
# 删除虚拟环境
rmvirtualenv myproject
virtualenvwrapper
virtualenvwrapper is a set of extensions to Ian Bicking's virtualenv
tool. The extensions include wrappers for creating and deleting
virtual environments and otherwise managing your development workflow,
making it easier to work on more than one project at a time without
introducing conflicts in their dependencies.
For more information please refer to the documentation:
http://virtualenvwrapper.readthedocs.org/en/latest/command_ref.html
Commands available:
add2virtualenv: add directory to the import path
allvirtualenv: run a command in all virtualenvs
cdproject: change directory to the active project
cdsitepackages: change to the site-packages directory
cdvirtualenv: change to the $VIRTUAL_ENV directory
cpvirtualenv: duplicate the named virtualenv to make a new one
lssitepackages: list contents of the site-packages directory
lsvirtualenv: list virtualenvs
mkproject: create a new project directory and its associated virtualenv
mktmpenv: create a temporary virtualenv
mkvirtualenv: Create a new virtualenv in $WORKON_HOME
rmvirtualenv: Remove a virtualenv
setvirtualenvproject: associate a project directory with a virtualenv
showvirtualenv: show details of a single virtualenv
toggleglobalsitepackages: turn access to global site-packages on/off
virtualenvwrapper: show this help message
wipeenv: remove all packages installed in the current virtualenv
workon: list or change working virtualenvs
import logging
LOG_LEVEL = logging.DEBUG
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
logging.info('hello world')
默认时间格式是 yyyy-mm-dd hh:mm:ss,xxx
,如果我们要改这个格式可以用 datefmt
参数,遵循 time.strftime
的格式化参数格式。
问题有一个,毫秒从哪里来?
最简单的办法:在格式字符串中使用 msecs 占位符,比如:
import logging
LOG_LEVEL = logging.DEBUG
LOG_FORMAT = '%(asctime)s.%(msecs)03d %(levelname)s %(message)s'
LOG_DATEFMT = '%H:%M:%S'
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, datefmt=LOG_DATEFMT)
logging.info('hello world')
import logging
from datetime import datetime
class MyFormatter(logging.Formatter):
converter = datetime.fromtimestamp
def formatTime(self, record, datefmt=None):
ct = self.converter(record.created)
if datefmt:
# s = time.strftime(datefmt, ct)
s = ct.strftime(datefmt)
else:
# t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
t = ct.strftime("%Y-%m-%d %H:%M:%S")
s = "%s,%03d" % (t, record.msecs)
return s
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
logger.addHandler(console)
formatter = MyFormatter(fmt='%(asctime)s %(levelname)s %(message)s', datefmt='%H:%M:%S.%f')
console.setFormatter(formatter)
logging.info('hello world')
help
help <action>
add <name> […]
remove <name> […]
update
update all
update <gname> […]
clear <name>
clear <name> <name>
clear all
fg <process>
pid
pid <name>
pid all
reload
reread
restart <name>
不会重新加载配置文件restart <gname>:*
restart <name> <name>
restart all
signal
start <name>
start <gname>:*
start <name> <name>
start all
status
status <name>
status <name> <name>
stop <name>
stop <gname>:*
stop <name> <name>
stop all
tail [-f] <name> [stdout|stderr]
(default stdout)