生命完美的答案,无非走过没有遗憾 —《天蓝》
写在前面
对于自动化运维来讲Python
是一个利器
常用的自动化运维工具Ansible
就是通过python
编写
博文为《Python Cookbook》
读书笔记整理而来
涉及的内容都是编写python运维脚本常用的一些知识点及Demo
理解不足小伙伴帮忙指正
生命完美的答案,无非走过没有遗憾 —《天蓝》
脚本编程与系统管理 解析命令行选项 如何能够解析脚本运行命令行选项(位于 sys.argv 中)
argparse
模块可被用来解析命令行选项
常用来定义一个脚本的说明文档,一般我们写python脚本会通过if..else
的方式来提供一个脚本说明文档,python不支持switch。所有很麻烦,其实,我们可以通过argparse
来编写说明文档。
我们来看看执行一个python脚本
对于熟悉Linux的小伙伴下面的文档在熟悉不过了,这个一个标准Linxu软件包的说明文档,文档中定义是软件包的说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./demo.py -h usage: demo.py [-h] -p pattern [-v] [-o OUTFILE] [--speed {slow,fast}] [filename [filename ...]] Search some files positional arguments: filename optional arguments: -h, --help show this help message and exit -p pattern, --pat pattern text pattern to search for -v verbose mode -o OUTFILE output file --speed {slow,fast} search speed ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
来看看这个脚本是如何编写的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import argparseparser = argparse.ArgumentParser(description='Search some files' ) parser.add_argument(dest='filenames' , metavar='filename' , nargs='*' ) parser.add_argument('-p' , '--pat' , metavar='pattern' , required=True , dest='patterns' , action='append' , help ='text pattern to search for' ) parser.add_argument('-v' , dest='verbose' , action='store_true' , help ='verbose mode' ) parser.add_argument('-o' , dest='outfile' , action='store' , help ='output file' ) parser.add_argument('--speed' , dest='speed' , action='store' , choices={'slow' , 'fast' }, default='slow' , help ='search speed' ) args = parser.parse_args() print (args.filenames)print (args.patterns)print (args.verbose)print (args.outfile)print (args.speed)
为了解析命令行选项, 首先要创建一个ArgumentParser
实例, 并使用add_argument()
方法声明你想要支持的选项。在每个add-argument()
调用中:
dest
参数指定解析结果被指派给属性的名字。 metavar
参数被用来生成帮助信息。
action 参数
指定跟属性对应的处理逻辑,通常的值为 store
, 被用来存储某个值
或将多个参数值收集到一个列表中
。
nargs 参数收集
所有剩余的命令行参数到一个列表中。在本例中它被用来构造一个文件名列表
1 parser.add_argument(dest='filenames' ,metavar='filename' , nargs='*' )
1 2 3 4 5 6 7 8 9 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py -p spam --pat=eggs foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['spam' , 'eggs' ] False None slow ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
action='store_true'
根据参数是否存在来设置一个 Boolean 标志:
1 parser.add_argument('-v' , dest='verbose' , action='store_true' , help ='verbose mode' )
1 2 3 4 5 6 7 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py -v -p spam --pat=eggs foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['spam' , 'eggs' ] True None slow
action='store'
参数接受一个单独值并将其存储为一个字符串
1 parser.add_argument('-o' , dest='outfile' , action='store' , help ='output file' )
1 2 3 4 5 6 7 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py -v -p spam --pat=eggs -o liruilong foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['spam' , 'eggs' ] True liruilong slow
action='append'
参数说明允许某个参数重复出现多次
,并将它们追加到一个列表
中去。
required 标志
表示该参数至少要有一个
。-p
和 --pat
表示两个参数名形式
都可使用。 1 2 3 parser.add_argument('-p' , '--pat' , metavar='pattern' , required=True , dest='patterns' , action='append' , help ='text pattern to search for' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py -p spam foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['spam' ] False None slow ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py --pat=eggs foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['eggs' ] False None slow ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py -p spam --pat=eggs foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['spam' , 'eggs' ] False None slow
如果一个都没有,会提示缺少参数 -p/--pat
1 2 3 4 5 6 7 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py foo.txt bar.txt usage: demo.py [-h] -p pattern [-v] [-o OUTFILE] [--speed {fast,slow}] [filename [filename ...]] demo.py: error: the following arguments are required: -p/--pat ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
choices={'slow', 'fast'},
参数说明接受一个值,但是会将其和可能的选择值做比较,以检测其合法性:
1 2 3 parser.add_argument('--speed' , dest='speed' , action='store' , choices={'slow' , 'fast' }, default='slow' , help ='search speed' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py --pat=eggs --speed 123 foo.txt bar.txt usage: demo.py [-h] -p pattern [-v] [-o OUTFILE] [--speed {slow,fast}] [filename [filename ...]] demo.py: error: argument --speed: invalid choice: '123' (choose from 'slow' , 'fast' ) ┌──[root@liruilongs.github.io]-[~/python_demo] └─$python3 demo.py --pat=eggs --speed fast foo.txt bar.txt ['foo.txt' , 'bar.txt' ] ['eggs' ] False None fast ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
一旦参数选项被指定,你就可以执行parser.parse()
方法了。它会处理sys.argv
的值并返回一个结果实例。每个参数值会被设置成该实例中add_argument()
方法的 dest
参数指定的属性值。
还很多种其他方法解析命令行选项。可以会手动的处理 sys.argv
或者使用 getopt 模块
。但是,如果你采用本节的方式,将会减少很多冗余代码,底层细节argparse 模块
已经帮你处理了。你可能还会碰到使用optparse
库解析选项的代码。尽管 optparse 和 argparse 很像
,但是后者更先进,因此在新的程序中你应该使用它。
运行时弹出密码输入提示 你写了个脚本,运行时需要一个密码。此脚本是交互式的,因此不能将密码在脚本中硬编码,而是需要弹出一个密码输入提示,让用户自己输入。
Python 的 getpass 模块
正是你所需要的。你可以让你很轻松的弹出密码输入提示,并且不会在用户终端回显密码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import getpassdef svc_login (user, passwd ): return user == passwd user = getpass.getuser() passwd = getpass.getpass() if svc_login(user, passwd): print ('Yay!' ) else : print ('Boo!' )
代码中getpass.getuser()
不会弹出用户名的输入提示。它会根据该用户的 shell 环境
或者会依据本地系统的密码库
(支持 pwd 模块的平台)来使用当前用户的登录名
1 2 3 4 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./pass.py Password: Yay!
通过重定向/管道/文件接受输入 在bash中编写pytohn脚本接收外部数据的方式,一般情况下,对于一般变量,我们用命令行变量的方式比较多(手动的处理 sys.argv
),对于文件内容或者bash命令输出
直接通过脚本内部获取需要的数据。
其实python 脚本也可以用其他方式来接收 传递给他的文件数据或者bash命令输出
,包括将命令行的输出
通过管道传递
给该脚本、重定向文件到该脚本
,或在命令行中传递一个文件名
或文件名列表
给该脚本。
这里通过 Python 内置的 fileinput 模块
,可以实现重定向,管道,文佳输出
的方式传递数据到脚本内部
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 """ @File : filein.py @Time : 2022/05/01 06:05:43 @Author : Li Ruilong @Version : 1.0 @Contact : 1224965096@qq.com @Desc : None """ import fileinputwith fileinput.input () as f_input: for line in f_input: print ("脚本输出" , line, end='' )
使用fileinput.input()
方法可以获取当前输入脚本的数据,脚本里面用一个FileInput
迭代器接收
1 2 3 4 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$vim filein.py ┌──[root@liruilongs.github.io]-[~/python_demo] └─$chmod +x filein.py
文件直接接收
1 2 3 4 5 6 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./filein.py /etc/passwd 脚本输出 root:x:0:0:root:/root:/bin/bash 脚本输出 bin:x:1:1:bin:/bin:/sbin/nologin 脚本输出 daemon:x:2:2:daemon:/sbin:/sbin/nol 。。。。
重定向接收
1 2 3 4 5 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./filein.py < /etc/passwd 脚本输出 root:x:0:0:root:/root:/bin/bash 脚本输出 bin:x:1:1:bin:/bin:/sbin/nologin 。。。。。。
管道方式接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$df -h 文件系统 容量 已用 可用 已用% 挂载点 /dev/sda1 150G 22G 129G 15% / devtmpfs 983M 0 983M 0% /dev tmpfs 993M 0 993M 0% /dev/shm tmpfs 993M 17M 976M 2% /run tmpfs 993M 0 993M 0% /sys/fs/cgroup overlay 150G 22G 129G 15% /var/lib/docker/overlay2/9fbd33d3485f02eadef6907a5b4eaead4a384684b66c572d822a2942a82ca0d5/merged overlay 150G 22G 129G 15% /var/lib/docker/overlay2/85ff22ccaf2db68a0a863bc404d79d72fa6c8744424f50ba8fb6bfa83d56b56a/merged tmpfs 199M 0 199M 0% /run/user/0 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$df -h | ./filein.py 脚本输出 文件系统 容量 已用 可用 已用% 挂载点 脚本输出 /dev/sda1 150G 22G 129G 15% / 脚本输出 devtmpfs 983M 0 983M 0% /dev 脚本输出 tmpfs 993M 0 993M 0% /dev/shm 脚本输出 tmpfs 993M 17M 976M 2% /run 脚本输出 tmpfs 993M 0 993M 0% /sys/fs/cgroup 脚本输出 overlay 150G 22G 129G 15% /var/lib/docker/overlay2/9fbd33d3485f02eadef6907a5b4eaead4a384684b66c572d822a2942a82ca0d5/merged 脚本输出 overlay 150G 22G 129G 15% /var/lib/docker/overlay2/85ff22ccaf2db68a0a863bc404d79d72fa6c8744424f50ba8fb6bfa83d56b56a/merged 脚本输出 tmpfs 199M 0 199M 0% /run/user/0 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
fileinput.input()
创建并返回一个FileInput
类的实例,该实例可以被当做一个上下文管理器
使用。因此,整合起来,如果我们要写一个打印多个文件输出的脚本,那么我们需要在输出中包含文件名和行号
1 2 3 4 5 6 7 8 9 10 11 >>> import fileinput>>> with fileinput.input ("/etc/passwd" ) as f:... for line in f:... print (f.filename(),f.fileno(),f.lineno(),line,end='' )... /etc/passwd 3 1 root:x:0 :0 :root:/root:/bin /bash /etc/passwd 3 2 bin :x:1 :1 :bin :/bin :/sbin/nologin /etc/passwd 3 3 daemon:x:2 :2 :daemon:/sbin:/sbin/nologin /etc/passwd 3 4 adm:x:3 :4 :adm:/var/adm:/sbin/nologin /etc/passwd 3 5 lp:x:4 :7 :lp:/var/spool/lpd:/sbin/nologin /etc/passwd 3 6 sync:x:5 :0 :sync:/sbin:/bin /sync
执行外部命令并获取它的输出 你想执行一个外部命令并以 Python 字符串的形式获取执行结果。
使用subprocess.check_output()
函数。
1 2 3 4 5 6 7 8 9 import subprocessout_bytes = subprocess.check_output(['netstat' ,'-a' ]) out_text = out_bytes.decode('utf-8' ) print (out_text)
执行下试试
1 2 3 4 5 6 7 8 9 10 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./py_sh.py Active Internet connections (servers and established) Proto Recv-Q Send-Q Local Address Foreign Address State tcp 0 0 localhost:2379 0.0.0.0:* LISTEN tcp 0 0 vms55.rhce.cc:2379 0.0.0.0:* LISTEN tcp 0 0 localhost:2380 0.0.0.0:* LISTEN tcp 0 0 vms55.rhce.cc:2380 0.0.0.0:* LISTEN tcp 0 0 0.0.0.0:webcache 0.0.0.0:* LISTEN tcp 0 0 0.0.0.0:http 0.0.0.0:* LISTEN
如果被执行的命令以非零码返回,就会抛出异常。下面的例子捕获到错误并获取返回码:
1 2 3 4 5 try : out_bytes = subprocess.check_output(['cmd' ,'arg1' ,'arg2' ]) except subprocess.CalledProcessError as e: out_bytes = e.output code = e.returncode
默认情况下,check_output()
仅仅返回输入到标准输出的值。如果你需要同时收集标准输出和错误输出
,使用stderr
参数:
1 out_bytes = subprocess.check_output(['cmd' ,'arg1' ,'arg2' ],stderr=subprocess.STDOUT)
如果你需要用一个超时机制来执行命令,使用 timeout 参数:
1 2 3 4 try : out_bytes = subprocess.check_output(['cmd' ,'arg1' ,'arg2' ], timeout=5 ) except subprocess.TimeoutExpired as e: ....
通常来讲,命令的执行不需要
使用到底层 shell 环境(比如 sh、bash)
。一个字符串列表会被传递给一个低级系统命令
,比如 os.execve()
。
如果你想让命令被一个shell 执行
,传递一个字符串参数,并设置参数 shell=True
. 有时候你想要Python
去执行一个复杂的 shell 命令
的时候这个就很有用了,比如管道流、I/O 重定向和其他特性。例如:
1 out_bytes = subprocess.check_output('grep python | wc > out' , shell=True )
是在 shell 中执行命令会存在一定的安全风险,特别是当参数来自于用户输入时。这时候可以使用 shlex.quote() 函数
来将参数正确的用双引用引起来。
使用 check_output() 函数
是执行外部命令
并获取其返回值
的最简单方式。但是,如果你需要对子进程做更复杂的交互
,比如给它发送输入,你得采用另外一种方法。这时候可直接使用subprocess.Popen
类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import subprocesstext = b''' hello world this is a test goodbye ''' p = subprocess.Popen(['wc' ], stdout=subprocess.PIPE, stdin=subprocess.PIPE) stdout, stderr = p.communicate(text) out = stdout.decode('utf-8' ) err = stderr.decode('utf-8' )
关于子进程,简单来看下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$(pwd ;echo $BASH_SUBSHELL ;ps --forest) /root/python_demo 1 PID TTY TIME CMD 9324 pts/0 00:00:00 bash 49906 pts/0 00:00:00 \_ bash 49907 pts/0 00:00:00 \_ ps ┌──[root@liruilongs.github.io]-[~/python_demo] └─$pwd ;echo $BASH_SUBSHELL ;ps --forest /root/python_demo 0 PID TTY TIME CMD 9324 pts/0 00:00:00 bash 49908 pts/0 00:00:00 \_ ps ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
也可以进程列表同协程结合的方式。你既可以在子shell中 进行繁重的处理工作,同时也不会让子shell的I/O受制于终端。
1 2 3 4 5 6 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$coproc (sleep 10;ps --forest;sleep 10;ps --forest) [1] 50326 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$jobs [1]+ 运行中 coproc COPROC ( sleep 10; ps --forest; sleep 10; ps --forest ) &
如果直接丢到后台会自动在终端输出IO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$( sleep 10; ps --forest; sleep 10; ps --forest ) & [1] 50335 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$ps --forest PID TTY TIME CMD 9324 pts/0 00:00:00 bash 50335 pts/0 00:00:00 \_ bash 50336 pts/0 00:00:00 | \_ sleep 50337 pts/0 00:00:00 \_ ps ┌──[root@liruilongs.github.io]-[~/python_demo] └─$ PID TTY TIME CMD 9324 pts/0 00:00:00 bash 50335 pts/0 00:00:00 \_ bash 50340 pts/0 00:00:00 \_ ps [1]+ 完成 ( sleep 10; ps --forest; sleep 10; ps --forest ) ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
subprocess 模块对于依赖 TTY 的外部命令不合适用
。例如,你不能使用它来自动化一个用户输入密码的任务(比如一个 ssh 会话)。这时候,你需要使用到第三方模块了,比如基于著名的 expect 家族的工具(pexpect 或类似的)(pexpect可以理解为Linux下的expect的Python封装、通过pexpect可以实现对ssh、ftp、passwd、telnet等命令行进行自动交互,而无需人工干涉来达到自动化的目的。比如我们可以模拟一个FTP登录时所有交互,包括输入主机地址、用户名、密码、上传文件等,待出现异常还可以进行尝试自动处理。)
终止程序并给出错误信息 你想向标准错误打印一条消息并返回某个非零状态码来终止程序运行
通过 python
的raise SystemExit(3)
命令可以主动抛出一个错误,通过sys.stderr.write
将命令写到标准的输出端
1 2 3 4 5 import syssys.stderr.write('It failed!\n' ) raise SystemExit(3 )
1 2 3 4 5 6 7 8 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$vim err.py ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./err.py It failed! ┌──[root@liruilongs.github.io]-[~/python_demo] └─$echo $? 3
直接将消息作为参数传给SystemExit()
,那么你可以省略其他步骤
1 2 3 raise SystemExit('It failed!' )
抛出一个 SystemExit
异常,使用错误消息作为参数,它会将消息在sys.stderr
中打印,然后程序以状态码1
退出
1 2 3 4 5 6 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./err.py It failed! ┌──[root@liruilongs.github.io]-[~/python_demo] └─$echo $? 1
获取终端的大小 你需要知道当前终端的大小以便正确的格式化输出。
使用 os.get terminal size() 函数
来做到这一点。
1 2 3 4 5 6 7 8 import ossz = os.get_terminal_size() print (sz)print (sz.columns)print (sz.lines)
1 2 3 4 5 6 7 8 9 10 11 12 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./tim.py os.terminal_size(columns=99, lines=30) 99 30 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./tim.py os.terminal_size(columns=165, lines=30) 165 30 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
复制或者移动文件和目录 复制或移动文件和目录,但是又不想调用 shell 命令。
shutil 模块
有很多便捷的函数可以复制文件和目录。使用起来非常简单
1 2 3 4 5 6 7 8 9 10 11 12 13 import shutilshutil.copy(src, dst) shutil.copy2(src, dst) shutil.copytree(src, dst) shutil.move(src, dst)
这里不多讲,熟悉Linux的小伙伴应该不陌生。
默认情况下,对于符号链接
这些命令处理的是它指向的东西文件。例如,如果源文件
是一个符号链接
,那么目标文件将会是符号链接
指向的文件。如果你只想复制符号链接本身
,那么需要指定关键字
参数 follow_symlinks
1 shutil.copytree(src, dst, symlinks=True )
copytree() 可以让你在复制过程中选择性的忽略某些文件或目录。你可以提供一个忽略函数,接受一个目录名和文件名列表作为输入,返回一个忽略的名称列表。例如:
1 2 3 def ignore_pyc_files (dirname, filenames ): return [name in filenames if name.endswith('.pyc' )] shutil.copytree(src, dst, ignore=ignore_pyc_files)
对于文件元数据信息,copy2()
这样的函数只能尽自己最大能力来保留它。访问时间、创建时间和权限
这些基本信息会被保留,但是对于所有者、ACLs、资源 fork
和其他更深层次的文件元信息就说不准了
通常不会去使用 shutil.copytree() 函数
来执行系统备份
。当处理文件名的时候,最好使用os.path
中的函数来确保最大的可移植性
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> filename = '/etc/docker/daemon.json' >>> import os.path>>> os.path.basename(filename)'daemon.json' >>> os.path.dirname(filename)'/etc/docker' >>> os.path.split(filename)('/etc/docker' , 'daemon.json' ) >>> os.path.join('/new/dir' , os.path.basename(filename))'/new/dir/daemon.json' >>> os.path.expanduser('~/guido/programs/daemon.json' )'/root/guido/programs/daemon.json' >>>
使用copytree()
复制文件夹的一个棘手的问题是对于错误的处理,可以使用异常块处理,或者通过 参数 ignore dangling symlinks=True
忽略掉无效符号链接。
1 2 3 4 5 6 7 8 try : shutil.copytree(src, dst) except shutil.Error as e: for src, dst, msg in e.args[0 ]: print (dst, src, msg)
创建和解压归档文件 创建或解压常见格式的归档文件(比如.tar, .tgz 或.zip)
shutil 模块拥有两个函数—— make archive() 和 unpack archive() 可派上用场,
1 2 3 4 5 >>> import shutil>>> shutil.unpack_archive('Python-3.3.0.tgz' )>>> shutil.make_archive('py33' ,'zip' ,'Python-3.3.0' )'/Users/beazley/Downloads/py33.zip'
make archive()
的第二个参数是期望的输出格式。可以使用get archive formats()
获取所有支持的归档格式列表。
1 2 3 4 >>> import shutil>>> shutil.get_archive_formats()[('bztar' , "bzip2'ed tar-file" ), ('gztar' , "gzip'ed tar-file" ), ('tar' , 'uncompressed tar file' ), ('xztar' , "xz'ed tar-file" ), ('zip' , 'ZIP file' )] >>>
通过文件名查找文件 你需要写一个涉及到文件查找操作的脚本,比如对日志归档文件的重命名工具,你不想在 Python 脚本中调用 shell,或者你要实现一些 shell 不能做的功能。
查找文件,可使用 os.walk() 函数
,传一个顶级目录名给它
1 2 3 4 5 6 7 8 9 10 11 12 import os,sysdef findfile (start, name ): for relpath, dirs, files in os.walk(start): if name in files: full_path = os.path.join(start, relpath, name) print (os.path.normpath(os.path.abspath(full_path))) if __name__ == '__main__' : findfile(sys.argv[1 ], sys.argv[2 ])
os.walk() 方法
为我们遍历目录树
,每次进入一个目录,它会返回一个三元组
,包含相对于查找目录的相对路径,一个该目录下的目录名列表,以及那个目录下面的文件名列表。
1 2 3 4 5 6 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./find.py /etc/ passwd /etc/passwd /etc/pam.d/passwd ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
对于每个元组,只需检测一下目标文件名是否在文件列表中。如果是就使用os.path.join()
合并路径。为了避免奇怪的路径名比如 ././foo//bar
,使用了另外两个函数来修正结果
第一个是os.path.abspath()
, 它接受一个路径,可能是相对路径,最后返回绝对路径。
第二个是os.path.normpath()
,用来返回正常路径,可以解决双斜杆、对目录的多重引用的问题等。
os.walk(start)
还有跨平台的优势。并且,还能很轻松的加入其他的功能。我们再演示一个例子,下面的函数打印所有最近被修改过的文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import osimport timeimport sysdef modified_within (top, seconds ): now = time.time() for path, dirs, files in os.walk(top): for name in files: fullpath = os.path.join(path, name) if os.path.exists(fullpath): mtime = os.path.getmtime(fullpath) if mtime > (now - seconds): print (fullpath) if __name__ == '__main__' : if len (sys.argv) != 3 : print ('Usage: {} dir seconds' .format (sys.argv[0 ])) raise SystemExit(1 ) modified_within(sys.argv[1 ], float (sys.argv[2 ]))
打印10分钟之前被修改的数据
1 2 3 4 5 6 7 8 9 10 11 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./find.py /etc/ 10 /etc/mtab ┌──[root@liruilongs.github.io]-[~/python_demo] └─$ll /etc/mtab lrwxrwxrwx. 1 root root 17 10月 18 2018 /etc/mtab -> /proc/self/mounts ┌──[root@liruilongs.github.io]-[~/python_demo] └─$ll /proc/self/mounts -r--r--r-- 1 root root 0 5月 2 01:18 /proc/self/mounts ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
读取配置文件 怎样读取普通.ini 格式的配置文件?
configparser 模块
能被用来读取配置文件
编写配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [installation] library =%(prefix)s/libinclude =%(prefix)s/includebin =%(prefix)s/binprefix =/usr/local[debug] log_errors =true show_warnings =False [server] port: 8080 nworkers: 32 pid-file =/tmp/spam.pidroot =/www/rootsignature: ================================= Brought to you by the Python Cookbook =================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 >>> from configparser import ConfigParser>>> cfg = ConfigParser()>>> cfg.read('config.ini' )['config.ini' ] >>> cfg.sections()['installation' , 'debug' , 'server' ] >>> cfg.get('installation' ,'library' )'/usr/local/lib' >>> cfg.getboolean('debug' ,'log_errors' )True >>> cfg.getint('server' ,'port' )8080 >>> cfg.getint('server' ,'nworkers' )32 >>> print (cfg.get('server' ,'signature' ))================================= Brought to you by the Python Cookbook ================================= >>>
如果有需要,你还能修改配置并使用cfg.write()
方法将其写回到文件中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 >>> from configparser import ConfigParser>>> cfg = ConfigParser()>>> cfg.read('config.ini' )['config.ini' ] >>> cfg.set ('server' ,'port' ,'9000' )>>> cfg.set ('debug' ,'log_errors' ,'False' )>>> import sys>>> cfg.write(sys.stdout)[installation] library = %(prefix)s/lib include = %(prefix)s/include bin = %(prefix)s/bin prefix = /usr/local [debug] log_errors = False show_warnings = False [server] port = 9000 nworkers = 32 pid-file = /tmp/spam.pid root = /www/root signature = ================================= Brought to you by the Python Cookbook ================================= >>>
配置文件中的名字是不区分大小写
解析值的时候,getboolean() 方法
查找任何可行的值。
ConfigParser
能一次读取多个配置文件然后合并成一个配置。后面读取的配置文件会覆盖前面的配置文件
给简单脚本增加日志功能 你希望在脚本和程序中将诊断信息写入日志文件。
python 脚本打印日志最简单方式是使用 logging 模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import loggingdef main (): logging.basicConfig(filename='app.log' , level=logging.ERROR) hostname = 'www.python.org' item = 'spam' filename = 'data.csv' mode = 'r' logging.critical('Host %s unknown' , hostname) logging.error("Couldn't find %r" , item) logging.warning('Feature is deprecated' ) logging.info('Opening file %r, mode=%r' , filename, mode) logging.debug('Got here' ) if __name__ == '__main__' : main()
五个日志调用(critical(), error(), warning(), info(), debug()
)以降序方式表示不同的严重级别。 basicConfig()
的level
参数是一个过滤器
。所有级别低于此级别的日志消息都会被忽略掉。每个logging
操作的参数是一个消息字符串,后面再跟一个或多个参数。构造最终的日志消息的时候我们使用了%
操作符来格式化消息字符串。
1 2 3 4 5 6 7 8 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./logger.py ┌──[root@liruilongs.github.io]-[~/python_demo] └─$cat app.log CRITICAL:root:Host www.python.org unknown ERROR:root:Couldn't find ' spam' ┌──[root@liruilongs.github.io]-[~/python_demo] └─$
如果你想使用配置文件,可以像下面这样修改basicConfig()
调用:
1 2 3 4 5 import logging import logging.config def main(): logging.config.fileConfig('logconfig.ini' )
logconfig.ini
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [loggers] keys =root[handlers] keys =defaultHandler[formatters] keys =defaultFormatter[logger_root] level =INFOhandlers =defaultHandlerqualname =root[handler_defaultHandler] class =FileHandlerformatter =defaultFormatterargs =('app.log' , 'a' )[formatter_defaultFormatter] format =%(levelname)s:%(name)s:%(message)s
在调用日志操作前先执行下 basicConfig() 函数方法
,可以找标准输出或者文件中输出
basicConfig()
在程序中只能被执行一次。如果你稍后想改变日志配置,就需要先获取 root logger
,然后直接修改它。
1 logging.getLogger().level = logging.DEBUG
更多见日志模块文档https://docs.python.org/3/howto/logging-cookbook.html
给函数库增加日志功能 你想给某个函数库增加日志功能,但是又不能影响到那些不使用日志功能的程序。
对于想要执行日志操作的函数库,你应该创建一个专属的logger
对象,并且像下面这样初始化配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 import logginglog = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) def func (): log.critical('A Critical Error!' ) log.debug('A debug message' ) func()
使用这个配置,默认情况下不会打印日志,只有配置过日志系统,那么日志消息打印就开始生效
1 2 3 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./logg.py CRITICAL:__main__:A Critical Error!
通常来讲,不应该在函数库代码中自己配置日志系统
,或者是已经有个已经存在的日志配置了。调用getLogger( name )
创建一个和调用模块同名的 logger 模块
。由于模块
都是唯一的,因此创建的 logger 也将是唯一
的。所以当前进程中只有一个logging会生效。
log.addHandler(logging.NullHandler())
操作将一个空处理器
绑定到刚刚已经创建好的 logger 对象
上。一个空处理器默认会忽略调用所有的日志消息。因此,如果使用该函数库的时候还没有配置日志,那么将不会有消息或警告出现。
在这里,根日志被配置成仅仅输出 ERROR 或更高级别的消息
。不过,somelib 的日志级别被单独配置成可以输出 debug 级别的消息,
它的优先级比全局配置高。像这样更改单独模块的日志配置对于调试来讲是很方便的,因为你无需去更改任何的全局日志配置——只需要修改你想要更多输出的模块的日志等级。(这个还有待研究)
实现一个计时器 你想记录程序执行多个任务所花费的时间
time 模块
包含很多函数来执行跟时间有关的函数。尽管如此,通常我们会在此基础之上构造一个更高级的接口来模拟一个计时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import timeclass Timer : def __init__ (self, func=time.perf_counter ): self.elapsed = 0.0 self._func = func self._start = None def start (self ): if self._start is not None : raise RuntimeError('Already started' ) self._start = self._func() def stop (self ): if self._start is None : raise RuntimeError('Not started' ) end = self._func() self.elapsed += end - self._start self._start = None def reset (self ): self.elapsed = 0.0 @property def running (self ): return self._start is not None def __enter__ (self ): self.start() return self def __exit__ (self, *args ): self.stop()
这个类定义了一个可以被用户根据需要启动、停止和重置的计时器。它会在elapsed 属性中记录整个消耗时间。下面是一个例子来演示怎样使用它:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import timeclass Timer : def __init__ (self, func=time.perf_counter ): self.elapsed = 0.0 self._func = func self._start = None def start (self ): if self._start is not None : raise RuntimeError('Already started' ) self._start = self._func() def stop (self ): if self._start is None : raise RuntimeError('Not started' ) end = self._func() self.elapsed += end - self._start self._start = None def reset (self ): self.elapsed = 0.0 @property def running (self ): return self._start is not None def __enter__ (self ): self.start() return self def __exit__ (self, *args ): self.stop() def countdown (n ): while n > 0 : n -= 1 t = Timer() t.start() countdown(1000000 ) t.stop() print (t.elapsed)with t: countdown(1000000 ) print (t.elapsed)with Timer() as t2: countdown(1000000 ) print (t2.elapsed)
这里通过__enter__,__exit__
,使用with 语句
以及上下文管理器协议可以省略计时器打开和关闭操作。(关于上下文管理协议,即with语句,为了让一个对象兼容with语句,必须在这个对象的类中声明__enter__和__exit__方法,
,__enter__
在出现with语句被调用,__exit__
在代码执行完毕被调用,可以参考open()方法)
1 2 3 4 5 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./times.py 0.05191648800246185 0.12038616700374405 0.06592946800083155
在计时中要考虑一个底层的时间函数问题
。 一般来说, 使用 time.time()
或time.clock()
计算的时间精度因操作系统的不同会有所不同。而使用time.perf_counter()
函数可以确保使用系统上面最精确的计时器
。
限制脚本的内存和CPU的使用量 你想对在 Unix 系统上面运行的程序设置内存或 CPU 的使用限制。
cpu 限制 resource 模块
能同时执行这两个任务。例如,要限制 CPU 时间,下面的代码在windows平台执行不了,但是Linux是可以的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import signalimport resource import osdef time_exceeded (signo, frame ): print ("Time's up!" ) raise SystemExit(1 ) def set_max_runtime (seconds ): soft, hard = resource.getrlimit(resource.RLIMIT_CPU) resource.setrlimit(resource.RLIMIT_CPU, (seconds, hard)) signal.signal(signal.SIGXCPU, time_exceeded) if __name__ == '__main__' : set_max_runtime(15 ) while True : pass
程序运行时,SIGXCPU 信号
在时间过期时被生成,然后执行清理并退出。
1 2 3 4 5 6 7 ┌──[root@liruilongs.github.io]-[~/python_demo] └─$vim cpu.py ┌──[root@liruilongs.github.io]-[~/python_demo] └─$chmod +x cpu.py ┌──[root@liruilongs.github.io]-[~/python_demo] └─$./cpu.py Time's up!
内存限制 这暂时没有好的Demo…
1 2 3 4 5 6 7 8 9 10 11 12 #!/usr/bin/env python3 import resource def limit_memory(maxsize): soft, hard = resource.getrlimit(resource.RLIMIT_AS) resource.setrlimit(resource.RLIMIT_AS, (maxsize, hard)) limit_memory(576460752303423488)
程序运行到没有多余内存时会抛出 MemoryError 异常。
setrlimit() 函数
被用来设置特定资源上面的软限制和硬限制
。
软限制是一个值
,当超过这个值的时候操作系统通常会发送一个信号来限制或通知该进程.
1 2 >>> resource.RLIMIT_AS9
硬限制
是用来指定软限制能设定的最大值。通常来讲,这个由系统管理员通过设置系统级参数来决定。尽管硬限制可以改小一点,但是最好不要使用用户进程去修改。1 2 >>> resource.getrlimit(resource.RLIMIT_AS)(-1 , -1 )
setrlimit() 函数
还能被用来设置子进程数量、打开文件数以及类似系统资源的限制(cgroup)
。
启动一个WEB浏览器 通过脚本启动浏览器并打开指定的 URL 网页
webbrowser 模块
能被用来启动一个浏览器,并且与平台无关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Windows PowerShell 版权所有 (C) Microsoft Corporation。保留所有权利。 尝试新的跨平台 PowerShell https://aka.ms/pscore6 PS E:\docker> python Python 3.9.0 (tags/v3.9.0:9cf6752, Oct 5 2020, 15:23:07) [MSC v.1927 32 bit (Intel)] on win32 Type "help" , "copyright" , "credits" or "license" for more information. >>> >>> import webbrowser >>> webbrowser.open('http://www.python.org' ) True >>>
新窗口打卡网站
1 webbrowser.open_new('http://www.python.org' )
当前窗口打开一个tab页
1 webbrowser.open_new_tab('http://www.python.org' )
指定浏览器类型,可以使用 webbrowser.get() 函数
1 2 3 4 5 6 >>> c = webbrowser.get('firefox' )>>> c.open ('http://www.python.org' )True >>> c.open_new_tab('http://docs.python.org' )True >>>