本文主要介绍Python中,可以直接使用的执行Linux中shell命令,或者程序的通用工具函数方法。

1、通用代码

def get_error_info():
    import traceback
    errorMsg = traceback.format_exc()
    return errorMsg
def get_preexec_fn(run_user):
    '''
        @name 获取指定执行用户预处理函数
        @param run_user<string> 运行用户
        @return 预处理函数
    '''
    import pwd
    pid = pwd.getpwnam(run_user)
    uid = pid.pw_uid
    gid = pid.pw_gid

    def _exec_rn():
        os.setgid(gid)
        os.setuid(uid)
    return _exec_rn
def ExecShell(cmdstring, timeout=None, shell=True,cwd=None,env=None,user = None):
    '''
        @name 执行命令
        @param cmdstring 命令
        @param timeout 超时时间
        @param shell 是否通过shell运行
        @param cwd 进入的目录
        @param env 环境变量
        @param user 执行用户名
        @return 命令执行结果
    '''
    a = ''
    e = ''
    import subprocess,tempfile
    preexec_fn = None
    tmp_dir = '/dev/shm'
    if user:
        preexec_fn = get_preexec_fn(user)
        tmp_dir = '/tmp'
    try:
        rx = md5(cmdstring)
        succ_f = tempfile.SpooledTemporaryFile(max_size=4096,mode='wb+',suffix='_succ',prefix='btex_' + rx ,dir=tmp_dir)
        err_f = tempfile.SpooledTemporaryFile(max_size=4096,mode='wb+',suffix='_err',prefix='btex_' + rx ,dir=tmp_dir)
        sub = subprocess.Popen(cmdstring, close_fds=True, shell=shell,bufsize=128,stdout=succ_f,stderr=err_f,cwd=cwd,env=env,preexec_fn=preexec_fn)
        if timeout:
            s = 0
            d = 0.01
            while sub.poll() is None:
                time.sleep(d)
                s += d
                if s >= timeout:
                    if not err_f.closed: err_f.close()
                    if not succ_f.closed: succ_f.close()
                    return 'Timed out'
        else:
            sub.wait()
        err_f.seek(0)
        succ_f.seek(0)
        a = succ_f.read()
        e = err_f.read()
        if not err_f.closed: err_f.close()
        if not succ_f.closed: succ_f.close()
    except:
        return '',get_error_info()
    try:
        #编码修正
        if type(a) == bytes: a = a.decode('utf-8')
        if type(e) == bytes: e = e.decode('utf-8')
    except:pass
    return

2、使用示例

ExecShell("wget -O {} {} --no-check-certificate".format(filename,url))

推荐文档