1、通用代码
def get_error_info():
import traceback
errorMsg = traceback.format_exc()
return errorMsg
def get_preexec_fn(run_user):
'''
@name 获取指定执行用户预处理函数
@param run_user<string> 运行用户
@return 预处理函数
'''
import pwd
pid = pwd.getpwnam(run_user)
uid = pid.pw_uid
gid = pid.pw_gid
def _exec_rn():
os.setgid(gid)
os.setuid(uid)
return _exec_rn
def ExecShell(cmdstring, timeout=None, shell=True,cwd=None,env=None,user = None):
'''
@name 执行命令
@param cmdstring 命令
@param timeout 超时时间
@param shell 是否通过shell运行
@param cwd 进入的目录
@param env 环境变量
@param user 执行用户名
@return 命令执行结果
'''
a = ''
e = ''
import subprocess,tempfile
preexec_fn = None
tmp_dir = '/dev/shm'
if user:
preexec_fn = get_preexec_fn(user)
tmp_dir = '/tmp'
try:
rx = md5(cmdstring)
succ_f = tempfile.SpooledTemporaryFile(max_size=4096,mode='wb+',suffix='_succ',prefix='btex_' + rx ,dir=tmp_dir)
err_f = tempfile.SpooledTemporaryFile(max_size=4096,mode='wb+',suffix='_err',prefix='btex_' + rx ,dir=tmp_dir)
sub = subprocess.Popen(cmdstring, close_fds=True, shell=shell,bufsize=128,stdout=succ_f,stderr=err_f,cwd=cwd,env=env,preexec_fn=preexec_fn)
if timeout:
s = 0
d = 0.01
while sub.poll() is None:
time.sleep(d)
s += d
if s >= timeout:
if not err_f.closed: err_f.close()
if not succ_f.closed: succ_f.close()
return 'Timed out'
else:
sub.wait()
err_f.seek(0)
succ_f.seek(0)
a = succ_f.read()
e = err_f.read()
if not err_f.closed: err_f.close()
if not succ_f.closed: succ_f.close()
except:
return '',get_error_info()
try:
if type(a) == bytes: a = a.decode('utf-8')
if type(e) == bytes: e = e.decode('utf-8')
except:pass
return
2、使用示例
ExecShell("wget -O {} {} --no-check-certificate".format(filename,url))