#222 Linux 检查表达式
Linux 2017-12-06比较独特的命令 [
coding in a complicated world
比较独特的命令 [
几处小的知识点
<!doctype html>
<html lang="en" xmlns="https://www.w3.org/1999/xhtml">
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body style="width:1000px;margin:10px auto;">
<svg
id="example"
width="1000"
height="800"
xmlns="https://www.w3.org/2000/svg"
>
<!-- 画个框框当边界 -->
<rect
x="0"
y="0"
width="100%"
height="100%"
fill="none"
stroke="rgb(99,99,99)"
stroke-width="2"
/>
<circle
id="circle1"
class="dragable"
cx="100"
cy="100"
r="30"
fill="red"
fill-opacity="0.7"
/>
<circle
id="circle2"
class="dragable"
cx="200"
cy="100"
r="30"
fill="yellow"
fill-opacity="0.7"
/>
<circle
id="circle3"
class="dragable"
cx="300"
cy="100"
r="30"
fill="blue"
fill-opacity="0.7"
/>
</svg>
</body>
<script>
var svg = document.getElementById("example"),
target = null,
startPoint = svg.createSVGPoint(),
curPoint = svg.createSVGPoint();
function refreshPoint(event) {
curPoint.x = event.clientX;
curPoint.y = event.clientY;
}
var dragables = document.getElementsByClassName("dragable");
for (var i = 0; i < dragables.length; i++) {
var dragable = dragables[i];
dragable.addEventListener("mousedown", function (event) {
target = event.target;
refreshPoint(event);
var matrix = target.getCTM();
startPoint.x = curPoint.x - matrix.e;
startPoint.y = curPoint.y - matrix.f;
});
}
svg.addEventListener("mousemove", function (event) {
if (!target) return;
refreshPoint(event);
var newX = curPoint.x - startPoint.x;
var newY = curPoint.y - startPoint.y;
target.setAttributeNS(
null,
"transform",
"translate(" + newX + "," + newY + ")",
);
});
svg.addEventListener("mouseup", function (event) {
if (!target) return;
target = null;
});
</script>
</html>
xmlnsviewBoxrectcircleellipselinepolygonpolylinepathstroke: 边框颜色stroke-width: 边框宽度fill: 填充颜色style<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Document</title>
</head>
<body style="background-color:beige;width:800px;margin:0 auto;">
<svg id="example" xmlns="https://www.w3.org/2000/svg" viewBox="0 0 800 800">
<!--
-->
<rect
x="0"
y="0"
width="100%"
height="100%"
fill="none"
stroke="rgb(99,99,99)"
stroke-width="2"
/>
<line
x1="800"
y1="0"
x2="0"
y2="800"
style="stroke:rgb(99,99,99);stroke-width:2"
/>
<line
x1="0"
y1="0"
x2="800"
y2="800"
style="stroke:rgb(99,99,99);stroke-width:2"
/>
<circle cx="400" cy="400" r="100" />
<ellipse cx="400" cy="150" rx="120" ry="60" />
<polygon points="250,300 130,520 50,350" />
<polygon points="80,40 120,60 80,80 60,120 40,80 0,60 40,40 60,0 80,40" />
<polyline
points="150,750 170,800 190,750 210,800 230,750"
fill="none"
stroke="rgb(99,99,99)"
stroke-width="2"
/>
<rect x="550" y="325" width="200" height="150" />
<text
xml:space="preserve"
text-anchor="start"
font-family="Helvetica, Arial, sans-serif"
font-size="24"
x="400"
y="600"
fill-opacity="null"
stroke-opacity="null"
stroke-width="0"
stroke="#000"
fill="#000000"
>
@
</text>
<!--
https://editor.method.ac/#move_front
M = moveto
L = lineto
H = horizontal lineto
V = vertical lineto
C = curveto
S = smooth curveto
Q = quadratic Belzier curve
T = smooth quadratic Belzier curveto
A = elliptical Arc
Z = closepath
-->
<path
d="m395.5,698c0,0 1,0 4,0c4,0 8.29361,2.7265 12,7c3.276,3.77728 6.54135,7.7027 8,11c1.66803,3.77063 1.79395,8.08746 4,13c1.83203,4.07965 3,6 3,8c0,2 0,3 0,5c0,2 0.70709,3.29291 0,4c-0.70709,0.70709 -3,1 -4,1c-1,0 -3,0 -4,0c-1,0 -3,0 -4,0c-1,0 -2,0 -3,0c-1,0 -2.186,0.30743 -4,-1c-1.14728,-0.8269 -2,-2 -2,-3c0,-1 -0.22977,-3.02673 0,-4c0.51373,-2.17627 2.07339,-2.93164 3,-4c3.276,-3.77728 6,-7 9,-10c3,-3 4.79489,-5.22021 8,-8c3.77728,-3.276 6.21167,-5.71411 9,-8c2.18735,-1.79321 4.186,-2.69257 6,-4c2.29453,-1.65381 3,-2 4,-2l1,0l2,-1l1,0"
fill-opacity="null"
stroke-opacity="null"
stroke-width="2"
stroke="#000"
fill="none"
/>
</svg>
</body>
</html>
jQuery 上传数组时会在字段名后自动加上 [],太挫。
/tmp 目录下的文件失败
项目中有一个下载日志文件的功能,大致流程是 WEB 后端调用底层方法收集并压缩一些 .log 文件生成一个 zip 压缩包,放在 /tmp 目录下,前端访问指定路径下载。
之前系统环境用的 CentOS 6.5,现在升级到了 CentOS 7,结果测试时发现下载文件下载失败。
awk 的文档写出来可能有一本很厚的书,里面甚至有一种内嵌的解释性编程语言在里面。但是我们普通人就把他当一个小工具,了解一下基础用法就好了,不用深入研究。它的基本功能是将字符串切割之后按照 $1 ... $n 来处理,$0 表示整个字符串(整行)。
egrep = grep -Efgrep = grep -F# -G, --basic-regexp 基本正则, 默认
# -E, --extended-regexp 拓展正则
# -P, --perl-regexp Perl 正则
# -w 完全匹配字词
# -x 完全匹配整行
grep markjour /var/log/auth.log
grep -E markjour /var/log/auth.log
# -F, --fixed-strings
grep -F markjour /var/log/auth.log
tail -1000 /var/log/auth.log | grep -Ev 'gnome-keyring-daemon|CRON'
-r 目录-R 目录,处理软链-v 排除-i 忽略大小写 (ignore-case)-m 控制匹配次数-a 包含二进制内容的文件当作纯文本处理-I 包含二进制内容的文件跳过
-b 输出命中内容的偏移量
-n 输出行号-o 仅输出匹配部分-h 不输出文件名(匹配多个文件时默认输出文件名 -H)-L, --files-without-match 仅输出没有匹配的文件名-l, --files-with-matches 仅输出匹配文件名-c, --count 仅输出文件名和匹配行数
--include=GLOB 只查找匹配 GLOB(文件模式)的文件
--exclude=GLOB 跳过匹配 GLOB 的文件--exclude-from=FILE 跳过所有匹配给定文件内容中任意模式的文件--exclude-dir=GLOB 跳过所有匹配 GLOB 的目录
-B, --before-context=NUM 打印文本及其前面NUM 行
-A, --after-context=NUM 打印文本及其后面NUM 行-C, --context=NUM 打印NUM 行输出文本-NUM 等同于 --context=NUM如果命令不存在就会报:
FileNotFoundError: [Errno 2] No such file or directory: 'pythonjit'
import subprocess
subprocess.run('ls')
import subprocess
from subprocess import PIPE
result = subprocess.run(['touch', '/tmp/abc'])
print(result.stdout)
result = subprocess.run(['ls'], stdout=PIPE, text=True)
print(result.stdout)
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
result = subprocess.run(cmd, capture_output=True, text=True)
# capture_output=True => stdout = stderr = PIPE
print(result.stdout)
直接使用 Popen,灵活一些,也就复杂一些:
import sys
import subprocess
cmd = ["python", "-c", "import time; time.sleep(3); print('hello')"]
index = 0
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# print(p.stdout.read()) # 这一句会阻塞,直到程序执行完成
while True:
index += 1
try:
output, errors = p.communicate(timeout=0.1)
print(output)
break
except subprocess.TimeoutExpired:
print(str(index), end=' ')
sys.stdout.flush()
只是一个演示,如果需要异步执行的话,Python 3 支持基于 asyncio 的 subprocess。
参见:AsyncIO 异步执行命令
import subprocess
output = subprocess.check_output(['ls'], text=True)
print(output)
import subprocess
from subprocess import DEVNULL
try:
subprocess.check_call(['ls', '/etc/abc'], stdout=DEVNULL, stderr=DEVNULL)
except subprocess.CalledProcessError as e:
print(f'命令返回代码: {e.returncode}')
import subprocess
from subprocess import PIPE, STDOUT
process = subprocess.Popen(['grep', 'o'], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
process.stdin.write(b'Hello, World.\nWhats up?\nI am fine.\n3Q!\nAnd you?\n')
process.stdin.close()
return_code = process.wait()
print(process.stdout.read())
# b'Hello, World.\nAnd you?\n'
import subprocess
from subprocess import PIPE
result = subprocess.run(['ls', '-xyz'], stderr=PIPE)
print(repr(result.stderr.decode('utf-8')))
result = subprocess.run(['ls', '-xyz'], stderr=PIPE, text=True)
print(repr(result.stderr))
# 'ls: 不适用的选项 -- y\n请尝试执行 "ls --help" 来获取更多信息。\n'
import subprocess
cmd = [
['tail', '-n10000', '/var/log/server.log'],
['grep', 'upload file'],
['awk', '{print $3}'],
]
stdin = None
for _cmd in cmd:
proc = subprocess.Popen(_cmd, stdin=stdin, stdout=subprocess.PIPE, text=True)
stdin = proc.stdout
output, error = proc.communicate()
print(output.strip())
print(error)
PIPE -1,管道,表示捕获标准输入,或标准输出,或标准错误STDOUT -2,为标准错误准备,表示捕获标准错误,将其和标准输出混在一起,相当于 2>&1DEVNULL -3,空设备,表示丢弃标准输入,或标准输出,或标准错误(重定向到 /dev/null)
Popen 进程的 Python 封装
CompletedProcess 程序执行结果,包含命令,参数,返回吗,标准输出,标准错误SubprocessError
CalledProcessError 如果得到错误状态码(非零),并且执行参数中 check=True
TimeoutExpired 等待子进程超时
list2cmdline 将命令列表组合一个完整命令
其实标准库中有另一个库可以做这个事情:
import shlex
cmd = 'grep -F "hello world" /tmp/abc.log'
args = shlex.split(cmd)
print(args)
# ['grep', '-F', 'hello world', '/tmp/abc.log']
print(shlex.join(args))
# grep -F 'hello world' /tmp/abc.log
call 极简版本def call(*popenargs, timeout=None, **kwargs):
with Popen(*popenargs, **kwargs) as p:
try:
return p.wait(timeout=timeout)
except:
p.kill()
raise
check_call call 的封装,如果得到错误状态,就抛出 CalledProcessErrordef check_call(*popenargs, **kwargs):
retcode = call(*popenargs, **kwargs)
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(retcode, cmd)
return 0
check_output 运行程序,获取标准输出def check_output(*popenargs, timeout=None, **kwargs):
for kw in ('stdout', 'check'):
if kw in kwargs:
raise ValueError(f'{kw} argument not allowed, it will be overridden.')
if 'input' in kwargs and kwargs['input'] is None:
if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors'):
empty = ''
else:
empty = b''
kwargs['input'] = empty
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, **kwargs).stdout
getoutput 运行程序,获取输出def getoutput(cmd, *, encoding=None, errors=None):
return getstatusoutput(cmd, encoding=encoding, errors=errors)[1]
getstatusoutput 运行程序,获取状态码和输出(stdout + stderr)def getstatusoutput(cmd, *, encoding=None, errors=None):
try:
data = check_output(cmd, shell=True, text=True, stderr=STDOUT,
encoding=encoding, errors=errors)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
exitcode = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return exitcode, data
run 运行程序def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs):
if input is not None:
if kwargs.get('stdin') is not None:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
raise ValueError('stdout and stderr arguments may not be used with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired as exc:
process.kill()
if _mswindows:
exc.stdout, exc.stderr = process.communicate()
else:
process.wait()
raise
except:
process.kill()
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
封装关系:
Popen -> call -> check_call
Popen -> run -> check_output -> getstatusoutput -> getoutput
https://github.com/python/cpython/blob/master/Lib/subprocess.py
class Popen:
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True,
shell=False, cwd=None, env=None, universal_newlines=None,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
process_group=None): pass
# 创建子进程,执行系统命令
# args 命令和参数
# bufsize
# executable 替代
# stdin
# stdout
# stderr
# preexec_fn
# close_fds
# shell
# cwd
# env
# universal_newlines
# startupinfo
# creationflags
# restore_signals
# start_new_session
# pass_fds
# user
# group
# extra_groups
# encoding
# errors
# text
# 如果指定不一样的 text 和 universal_newlines 值,则抛出 SubprocessError
# self.text_mode = encoding or errors or text or universal_newlines
# umask
# pipesize
# process_group
def __repr__(self): pass
@property
def universal_newlines(self):
return self.text_mode
@universal_newlines.setter
def universal_newlines(self, universal_newlines):
self.text_mode = bool(universal_newlines)
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback): pass
def __del__(self, _maxsize=sys.maxsize, _warn=warnings.warn): pass
def communicate(self, input=None, timeout=None) -> (stdout, stderr): pass
def poll(self) -> None|returncode: pass # 检查进程是否结束,返回状态码或 None(没有结束)
def wait(self, timeout=None) -> returncode: pass # 等待进程结束,返回状态码
def send_signal(self, sig):
self.poll()
if self.returncode is not None: # 程序已经执行结束
return
try: # 防止并发杀进程
os.kill(self.pid, sig)
except ProcessLookupError:
pass
def terminate(self): # 结束进程 -15
self.send_signal(signal.SIGTERM)
def kill(self): # 强杀进程 -9
self.send_signal(signal.SIGKILL)
这里是指在本地执行命令的方法,通过 SSH 的方式这里不做讨论。