Python os 模块,popen() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.popen()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
项目:kas    作者:siemens    | 项目源码 | 文件源码
def ssh_setup_agent(config, envkeys=None):
    """
        Starts the ssh-agent
    """
    envkeys = envkeys or ['SSH_PRIVATE_KEY']
    output = os.popen('ssh-agent -s').readlines()
    for line in output:
        matches = re.search(r"(\S+)\=(\S+)\;", line)
        if matches:
            config.environ[matches.group(1)] = matches.group(2)

    for envkey in envkeys:
        key = os.environ.get(envkey)
        if key:
            ssh_add_key(config.environ, key)
        else:
            logging.warning('%s is missing', envkey)
项目:docker-monitoring-zabbix-agent    作者:digiapulssi    | 项目源码 | 文件源码
def status(args):
    with os.popen("docker inspect -f '{{json .State}}' " + args.container + " 2>&1") as pipe:
        status = pipe.read().strip()

    if "No such image or container" in status:
        print "0"
    else:
        statusjs = json.loads(status)
        if statusjs["Running"]:
            print "1"
        elif statusjs["ExitCode"] == 0:
            print "2"
        else:
            print "3"

# get the uptime in seconds, if the container is running
项目:docker-monitoring-zabbix-agent    作者:digiapulssi    | 项目源码 | 文件源码
def multi_stat_update(args, container_dir, filename):
    dict = {}
    try:
        pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename  + " 2>&1")
        for line in pipe:
            m = _STAT_RE.match(line)
            if m:
                dict[m.group(1)] = m.group(2)
        pipe.close()
        f = open(args.container + "/" + filename,"w")

        for key in dict.keys():
            f.write(key + " " + dict[key] + "\n")
        f.close()
    except Exception, e:
        debug(args.container + ": could not update " + filename)
        debug(str(sys.exc_info()))
    return dict
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def get_numa_spec():
    maxnode=0
    maxcpu=1
    numasupport=False
    try:
        lines = [line.rstrip() for line in os.popen('numactl --show')]
        if len(lines) > 0 : numasupport=True
        for l in lines:
            if l.startswith('nodebind'):
                maxnode=int(l.split()[-1])
            if l.startswith('physcpubind'):
                maxcpu=int(l.split()[-1])
            if "NO NUMA" in l.upper():
                numasupport=False

    except:
        numasupport=False

    return numasupport,maxnode,maxcpu
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def open(self, url, new=0, autoraise=True):
            if self._name == 'default':
                script = 'open location "%s"' % url.replace('"', '%22') # opens in default browser
            else:
                script = '''
                   tell application "%s"
                       activate
                       open location "%s"
                   end
                   '''%(self._name, url.replace('"', '%22'))

            osapipe = os.popen("osascript", "w")
            if osapipe is None:
                return False

            osapipe.write(script)
            rc = osapipe.close()
            return not rc


    # Don't clear _tryorder or _browsers since OS X can use above Unix support
    # (but we prefer using the OS X specific stuff)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _find_mac(command, args, hw_identifiers, get_index):
    import os
    for dir in ['', '/sbin/', '/usr/sbin']:
        executable = os.path.join(dir, command)
        if not os.path.exists(executable):
            continue

        try:
            # LC_ALL to get English output, 2>/dev/null to
            # prevent output on stderr
            cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
            with os.popen(cmd) as pipe:
                for line in pipe:
                    words = line.lower().split()
                    for i in range(len(words)):
                        if words[i] in hw_identifiers:
                            return int(
                                words[get_index(i)].replace(':', ''), 16)
        except IOError:
            continue
    return None
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _findLib_gcc(name):
        expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
        fdout, ccout = tempfile.mkstemp()
        os.close(fdout)
        cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
              '$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
        try:
            f = os.popen(cmd)
            try:
                trace = f.read()
            finally:
                rv = f.close()
        finally:
            try:
                os.unlink(ccout)
            except OSError, e:
                if e.errno != errno.ENOENT:
                    raise
        if rv == 10:
            raise OSError, 'gcc or cc command not found'
        res = re.search(expr, trace)
        if not res:
            return None
        return res.group(0)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _get_soname(f):
            # assuming GNU binutils / ELF
            if not f:
                return None
            cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
                  "objdump -p -j .dynamic 2>/dev/null " + f
            f = os.popen(cmd)
            dump = f.read()
            rv = f.close()
            if rv == 10:
                raise OSError, 'objdump command not found'
            f = os.popen(cmd)
            try:
                data = f.read()
            finally:
                f.close()
            res = re.search(r'\sSONAME\s+([^\s]+)', data)
            if not res:
                return None
            return res.group(1)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def terminal_size():
    """
    Return the number of terminal columns and rows, defaulting to
    80x24 if the standard output is not a TTY.

    NOTE THAT THIS ONLY WORKS ON UNIX.
    """
    if sys.stdout.isatty():
        # TODO: This only works on Unix.
        rows, columns = [int(x) for x in
                         os.popen('stty size', 'r').read().split()]
    else:
        rows = 24
        columns = 80

    return rows, columns
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def do_osx_install(srcdir, targetdir):

    if os.path.exists(targetdir):
        print 'Target dir %s already exists! Removing...'
        shutil.rmtree(targetdir)

    install_script = os.popen('find '+ srcdir +' -iname install.sh').read().strip()
    print 'DBG install_script:', install_script
    os.popen('chmod +x "%s"' % install_script)
    cmd_install = '%s %s %s' % (pipes.quote(install_script), srcdir, targetdir)
    print 'DBG cmd: "%s"' % cmd_install
    cmd_chmod_chromium = 'find %s -name Chromium -exec chmod +x {} \;' % (targetdir)
    cmd_chmod_chromium_helper = 'find %s -name Chromium\ Helper -exec chmod +x {} \;' % (targetdir)
    for cmd in [cmd_install, cmd_chmod_chromium, cmd_chmod_chromium_helper]:

        proc = subprocess.Popen(cmd, shell=True)
        proc.wait()
        if proc.returncode:
            print "returncode " + str(proc.returncode)
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def about(self, ctx):
        '''About me'''
        from clients import application_info
        changes = os.popen(r'git show -s HEAD~3..HEAD --format="[`%h`](https://github.com/Harmon758/Harmonbot/commit/%H) %s (%cr)"').read().strip()
        embed = discord.Embed(title = "About Me", color = clients.bot_color)
        embed.description = "[Changelog (Harmonbot Server)]({})\n[Invite Link]({})".format(clients.changelog, discord.utils.oauth_url(application_info.id))
        # avatar = ctx.message.author.avatar_url or ctx.message.author.default_avatar_url
        # embed.set_author(name = ctx.message.author.display_name, icon_url = avatar)
        avatar = self.bot.user.avatar_url or self.bot.user.default_avatar_url
        # embed.set_thumbnail(url = avatar)
        embed.set_author(name = "Harmonbot (Discord ID: {})".format(self.bot.user.id), icon_url = avatar)
        if changes: embed.add_field(name = "Latest Changes:", value = changes, inline = False)
        embed.add_field(name = "Created on:", value = "February 10th, 2016")
        embed.add_field(name = "Version", value = clients.version)
        embed.add_field(name = "Library", value = "[discord.py](https://github.com/Rapptz/discord.py) v{0}\n([Python](https://www.python.org/) v{1.major}.{1.minor}.{1.micro})".format(discord.__version__, sys.version_info))
        me = discord.utils.get(self.bot.get_all_members(), id = clients.owner_id)
        avatar = me.default_avatar_url if not me.avatar else me.avatar_url
        embed.set_footer(text = "Developer/Owner: {0} (Discord ID: {0.id})".format(me), icon_url = avatar)
        await self.bot.reply("", embed = embed)
        await self.bot.say("Changelog (Harmonbot Server): {}".format(clients.changelog))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __init__(self, prog, **kw):
        try:
            self.prog = prog
            self.kw = kw
            self.popen = None
            if Popen.verbose:
                sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw))

            do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1
            if do_delegate:
                if Popen.verbose:
                    print("Delegating to real Popen")
                self.popen = self.real_Popen(prog, **kw)
            else:
                if Popen.verbose:
                    print("Emulating")
        except Exception as e:
            if Popen.verbose:
                print("Exception: %s" % e)
            raise
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __getattr__(self, name):
        if Popen.verbose:
            sys.stdout.write("Getattr: %s..." % name)
        if name in Popen.__slots__:
            if Popen.verbose:
                print("In slots!")
            return object.__getattr__(self, name)
        else:
            if self.popen is not None:
                if Popen.verbose:
                    print("from Popen")
                return getattr(self.popen, name)
            else:
                if name == "wait":
                    return self.emu_wait
                else:
                    raise Exception("subprocess emulation: not implemented: %s" % name)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __init__(self, prog, **kw):
        try:
            self.prog = prog
            self.kw = kw
            self.popen = None
            if Popen.verbose:
                sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw))

            do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1
            if do_delegate:
                if Popen.verbose:
                    print("Delegating to real Popen")
                self.popen = self.real_Popen(prog, **kw)
            else:
                if Popen.verbose:
                    print("Emulating")
        except Exception as e:
            if Popen.verbose:
                print("Exception: %s" % e)
            raise
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __getattr__(self, name):
        if Popen.verbose:
            sys.stdout.write("Getattr: %s..." % name)
        if name in Popen.__slots__:
            if Popen.verbose:
                print("In slots!")
            return object.__getattr__(self, name)
        else:
            if self.popen is not None:
                if Popen.verbose:
                    print("from Popen")
                return getattr(self.popen, name)
            else:
                if name == "wait":
                    return self.emu_wait
                else:
                    raise Exception("subprocess emulation: not implemented: %s" % name)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __init__(self, prog, **kw):
        try:
            self.prog = prog
            self.kw = kw
            self.popen = None
            if Popen.verbose:
                sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw))

            do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1
            if do_delegate:
                if Popen.verbose:
                    print("Delegating to real Popen")
                self.popen = self.real_Popen(prog, **kw)
            else:
                if Popen.verbose:
                    print("Emulating")
        except Exception as e:
            if Popen.verbose:
                print("Exception: %s" % e)
            raise
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def post(self, *args, **kwargs):
        try:
            servidor = self.get_argument('grupo')
        except:
            servidor ="all"
                item=""
                corpo="{\"hosts\":["
        import os
        hosts=os.popen("ansible  "+servidor+" --list-hosts").read()
            item=hosts.replace("\n","\",")  
        item=item.replace("    "," \"")
        #for options in hosts:
            #   item=item+" \""+options+"\","

                item=item[1:-1]
                corpo=corpo+item+"]}"
                self.set_header("Content-Type", "application/json")
                self.write(corpo);
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def post(self, *args, **kwargs):
        try:
            servidor = self.get_argument('grupo')
        except:
            servidor ="all"
                item=""
                corpo="{\"hosts\":["
        import os
        hosts=os.popen("ansible  "+servidor+" --list-hosts").read()
            item=hosts.replace("\n","\",")  
        item=item.replace("    "," \"")
        #for options in hosts:
            #   item=item+" \""+options+"\","

                item=item[1:-1]
                corpo=corpo+item+"]}"
                self.set_header("Content-Type", "application/json")
                self.write(corpo);
项目:DomainDependencyMemeJsai2017    作者:GINK03    | 项目源码 | 文件源码
def step3():
  key_vec = {}
  maxx    = 12505807
  size    = 10000
  for i in range(size, maxx, size):
    print(i, maxx)
    res = os.popen("head -n {i} ./dataset/bind.txt | tail -n {size} | ./fasttext print-sentence-vectors ./models/model.bin".format(i=i, size=size)).read()
    for line in res.split("\n"):
      if line == "":
        continue
      vec = list(map(float, line.split()[-100:]))
      txt = line.split()[:-100]
      key = " ".join(txt)
      if key_vec.get(key) is None:
        key_vec[key] = vec
  open("key_vec.pkl", "wb").write(pickle.dumps(key_vec))
项目:DomainDependencyMemeJsai2017    作者:GINK03    | 项目源码 | 文件源码
def _step5(arr):
  kmeans = pickle.loads(open("kmeans.model", "rb").read())
  key, lines, tipe = arr
  print(key)
  open("./tmp/tmp.{tipe}.{key}.txt".format(tipe=tipe,key=key), "w").write("\n".join(lines))
  res  = os.popen("./fasttext print-sentence-vectors ./models/model.bin < tmp/tmp.{tipe}.{key}.txt".format(tipe=tipe, key=key)).read()
  w    = open("tmp/tmp.{tipe}.{key}.json".format(tipe=tipe,key=key), "w")
  for line in res.split("\n"):
    try:
      vec = list(map(float, line.split()[-100:]))
    except:
      print(line)
      print(res)
      continue
    x = np.array(vec)
    if np.isnan(x).any():
      continue
    cluster = kmeans.predict([vec])
    txt = line.split()[:-100]
    obj = {"txt": txt, "cluster": cluster.tolist()} 
    data = json.dumps(obj, ensure_ascii=False)
    w.write( data + "\n" )
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def main():
    file_name = "dnsbee.txt"
    dot = "."
    if (os.path.exists(file_name)):
        file = open(file_name,"a")
    else:
        file = open(file_name,"w")
    host = sys.argv[1]
    nameserver = sys.argv[2]
    print "Trying to query",
    for i in range(0,len(records)):
        string = "dig " + records[i] + " " + host + " @" + nameserver
        command = os.popen(string,"r")
        while(1):
            line = command.readline()
            #line = line.strip()
            if line:
                print ".",
                file.write("Query> " + string + ":\n")
                file.write(line)
            else:
                break
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def find_jmpreg(self,dir="c:\windows\system32"):
        k = os.listdir("c:\windows\system32")
        for z in k:
            if z.endswith(".dll"):
                data = []
                cmd = "%s -D C:\windows\system32\%s" % (self.objdump,z)
                fh = os.popen(cmd,"r")
                data = fh.read(100000).split("\n")
                fh.close()
                for r in data:
                    y = "eb\x20%s" % self.reg
                    if r.find("jmp") != -1 and r.find(y) != -1:
                        (addy,null) = r.split(":")
                        temp = "0x%s" % addy
                        print temp
                        self.addrs[z] = int(temp,0)
                        self.found = 1
        if self.found != 1:
            print "found no jmp [reg]!"
            return 0
        return 1

    # pre defined database of jmp's
项目:Keras_FB    作者:InvidHead    | 项目源码 | 文件源码
def gpu_status(self,av_type_list):
        for t in av_type_list:
            cmd='nvidia-smi -q --display='+t
            #print('\nCMD:',cmd,'\n')
            r=os.popen(cmd)
            info=r.readlines()
            r.close()
            content = " ".join(info)
            #print('\ncontent:',content,'\n')
            index=content.find('Attached GPUs')
            s=content[index:].replace(' ','').rstrip('\n')
            self.t_send(s)
            time.sleep(.5)
        #th.exit()
#==============================================================================
# 
#==============================================================================
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def WriteResult(s_file, s_string):
    LOCK.acquire()
    try:
        found = False
        datafile = file(s_file)
        for dataline in datafile:
            if s_string in dataline:
                found = True
                break

        if found == False:
            f = open(s_file, 'a')
            f.write(str(s_string) + "\n")
            f.close()
            s = str("pkill -SIGHUP hostapd")
            p = os.popen(s, "r")
            while 1:
                line = p.readline()
                if not line:
                    break
    except:
        PrintResult(1, "Error writing cracked user credentials to EAP users file. :( Sorz")
    LOCK.release()

# This is the worker thread section.
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _syscmd_uname(option,default=''):

    """ Interface to the system's uname command.
    """
    if sys.platform in ('dos','win32','win16','os2'):
        # XXX Others too ?
        return default
    try:
        f = os.popen('uname %s 2> %s' % (option, DEV_NULL))
    except (AttributeError,os.error):
        return default
    output = string.strip(f.read())
    rc = f.close()
    if not output or rc:
        return default
    else:
        return output
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _popen(command, args):
    import os
    path = os.environ.get("PATH", os.defpath).split(os.pathsep)
    path.extend(('/sbin', '/usr/sbin'))
    for dir in path:
        executable = os.path.join(dir, command)
        if (os.path.exists(executable) and
            os.access(executable, os.F_OK | os.X_OK) and
            not os.path.isdir(executable)):
            break
    else:
        return None
    # LC_ALL to ensure English output, 2>/dev/null to prevent output on
    # stderr (Note: we don't have an example where the words we search for
    # are actually localized, but in theory some system could do so.)
    cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
    return os.popen(cmd)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _ipconfig_getnode():
    """Get the hardware address on Windows by running ipconfig.exe."""
    import os, re
    dirs = ['', r'c:\windows\system32', r'c:\winnt\system32']
    try:
        import ctypes
        buffer = ctypes.create_string_buffer(300)
        ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300)
        dirs.insert(0, buffer.value.decode('mbcs'))
    except:
        pass
    for dir in dirs:
        try:
            pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all')
        except IOError:
            continue
        with pipe:
            for line in pipe:
                value = line.split(':')[-1].strip().lower()
                if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value):
                    return int(value.replace('-', ''), 16)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _read_output(commandstring):
    """Output from successful command execution or None"""
    # Similar to os.popen(commandstring, "r").read(),
    # but without actually using os.popen because that
    # function is not usable during python bootstrap.
    # tempfile is also not available then.
    import contextlib
    try:
        import tempfile
        fp = tempfile.NamedTemporaryFile()
    except ImportError:
        fp = open("/tmp/_osx_support.%s"%(
            os.getpid(),), "w+b")

    with contextlib.closing(fp) as fp:
        cmd = "%s 2>/dev/null >'%s'" % (commandstring, fp.name)
        return fp.read().strip() if not os.system(cmd) else None
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _findLib_gcc(name):
        expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
        fdout, ccout = tempfile.mkstemp()
        os.close(fdout)
        cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
              'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
        try:
            f = os.popen(cmd)
            try:
                trace = f.read()
            finally:
                rv = f.close()
        finally:
            try:
                os.unlink(ccout)
            except OSError, e:
                if e.errno != errno.ENOENT:
                    raise
        if rv == 10:
            raise OSError, 'gcc or cc command not found'
        res = re.search(expr, trace)
        if not res:
            return None
        return res.group(0)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _get_soname(f):
            # assuming GNU binutils / ELF
            if not f:
                return None
            cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
                  "objdump -p -j .dynamic 2>/dev/null " + f
            f = os.popen(cmd)
            dump = f.read()
            rv = f.close()
            if rv == 10:
                raise OSError, 'objdump command not found'
            f = os.popen(cmd)
            try:
                data = f.read()
            finally:
                f.close()
            res = re.search(r'\sSONAME\s+([^\s]+)', data)
            if not res:
                return None
            return res.group(1)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _findLib_crle(name, is64):
            if not os.path.exists('/usr/bin/crle'):
                return None

            if is64:
                cmd = 'env LC_ALL=C /usr/bin/crle -64 2>/dev/null'
            else:
                cmd = 'env LC_ALL=C /usr/bin/crle 2>/dev/null'

            for line in os.popen(cmd).readlines():
                line = line.strip()
                if line.startswith('Default Library Path (ELF):'):
                    paths = line.split()[4]

            if not paths:
                return None

            for dir in paths.split(":"):
                libfile = os.path.join(dir, "lib%s.so" % name)
                if os.path.exists(libfile):
                    return libfile

            return None
项目:Bella    作者:Trietptm-on-Security    | 项目源码 | 文件源码
def iTunes_backup_looker():
    backupPath = globber("/Users/*/Library/Application Support/MobileSync/Backup/*/Info.plist")
    if len(backupPath) > 0:
        backups = True
        returnable = "%sLooking for backups! %s\n" % (blue_star, blue_star)
        for z, y in enumerate(backupPath):
            returnable += "\n----- Device " + str(z + 1) + " -----\n\n"
            returnable += "Product Name: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Product Name\"' '%s'" % y).read().replace("\n", "")
            returnable += "Product Version: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Product Version\"' '%s'" % y).read().replace("\n", "")
            returnable += "Last Backup Date: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Last Backup Date\"' '%s'" % y).read().replace("\n", "")
            returnable += "Device Name: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Device Name\"' '%s'" % y).read().replace("\n", "")
            returnable += "Phone Number: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Phone Number\"' '%s'" % y).read().replace("\n", "")
            returnable += "Serial Number: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Serial Number\"' '%s'" % y).read().replace("\n", "")
            returnable += "IMEI/MEID: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"IMEI\"' '%s'" % y).read().replace("\n", "")
            returnable += "UDID: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Target Identifier\"' '%s'" % y).read().replace("\n", "")
            returnable += "iTunes Version: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"iTunes Version\"' '%s'" % y).read().replace("\n", "")
            #iTunesBackupString += "Installed Apps: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Installed Applications\"' '%s'" % y).read().replace("\n", "")
    else:
        backups = False
        returnable = "%sNo local backups found %s\n" % (blue_star, blue_star)
    return (returnable, backups, len(backupPath))
项目:aws-config-rdk    作者:awslabs    | 项目源码 | 文件源码
def __print_log_event(self, event):
        time_string = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(event['timestamp']/1000))

        rows, columns = os.popen('stty size', 'r').read().split()
        line_wrap = int(columns) - 22
        message_lines = str(event['message']).splitlines()
        formatted_lines = []

        for line in message_lines:
            line = line.replace('\t','    ')
            formatted_lines.append('\n'.join(line[i:i+line_wrap] for i in range(0, len(line), line_wrap)))

        message_string = '\n'.join(formatted_lines)
        message_string = message_string.replace('\n','\n                      ')

        print(time_string + " - " + message_string)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def __init__(self):
        self.client = socket(AF_INET, SOCK_STREAM)
        self.client.connect(self.ADDR)
        while True:
            data = self.client.recv(BUFSIZ)
            if data:
                try:
                    if data == "exit":
                        self.client.close()
                        break
                    ME = os.popen(data.decode('utf8'))
                    os_result = ME.read()
                    #print(os_result)
                    self.client.sendall(os_result)

                except:
                    self.client.close()
项目:human-name-py    作者:djudd    | 项目源码 | 文件源码
def test_no_memory_leak():
    import gc
    import os

    def rss():
        gc.collect()
        out = os.popen("ps -o rss= -p %d" % os.getpid()).read()
        return int(out.strip())

    before = rss()

    for _ in range(100000):
        n = Name.parse("Reallyverylongfirstname Reallyverylonglastname")
        n.given_name
        n.surname

    after = rss()

    assert after < 1.25 * before
项目:Universal-Linux-Script    作者:CYRO4S    | 项目源码 | 文件源码
def Update():

    # Check for ROOT
    # If "uls --update" is not run as ROOT, notify user & exit.
    print('Checking for ROOT...')
    if os.geteuid() != 0:
        print("ERR_1005: 'uls --update' must be run as ROOT. Use 'sudo uls --update' instead.")
        exit(1005)

    # Check for Internet connection before update
    print('Checking for Internet connection...')
    rPing = os.popen("ping -c 3 raw.githubusercontent.com | grep '0 received' | wc -l")
    strPing = rPing.read().strip('\n')
    rPing.close()

    # If Internet is unavailable, exit.
    if strPing == '1':
        print('ERR_1003: Internet is unavailable. Check your connection before running update.')
        exit(1003)

    # Now, do the update 
    os.system("wget --no-check-certificate -O /usr/share/uls/uls_update.sh https://raw.githubusercontent.com/CYRO4S/Universal-Linux-Script/master/uls_update.sh && bash /usr/share/uls/uls_update.sh")
    exit(0)

# Echo
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_no_unknown_exported_symbols():
    if not hasattr(_cffi_backend, '__file__'):
        py.test.skip("_cffi_backend module is built-in")
    if not sys.platform.startswith('linux'):
        py.test.skip("linux-only")
    g = os.popen("objdump -T '%s'" % _cffi_backend.__file__, 'r')
    for line in g:
        if not line.startswith('0'):
            continue
        if '*UND*' in line:
            continue
        name = line.split()[-1]
        if name.startswith('_') or name.startswith('.'):
            continue
        if name not in ('init_cffi_backend', 'PyInit__cffi_backend'):
            raise Exception("Unexpected exported name %r" % (name,))
    g.close()
项目:deep-text-corrector    作者:andabi    | 项目源码 | 文件源码
def parse_batch(self, sentenceDumpedFileName, parsingDumpedFileName):

        if os.path.exists('../stanford-parser-2012-03-09') == False:
            print >> sys.stderr, 'can not find Stanford parser directory'
            sys.exit(1)

        # tokenized
        cmd = r'java -server -mx4096m -cp "../stanford-parser-2012-03-09/*:" edu.stanford.nlp.parser.lexparser.LexicalizedParser  -retainTMPSubcategories -sentences newline -tokenized -escaper edu.stanford.nlp.process.PTBEscapingProcessor  -outputFormat "wordsAndTags, penn, typedDependencies" -outputFormatOptions "basicDependencies" edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz ' + sentenceDumpedFileName

        r = os.popen(cmd).read().strip().decode('utf-8')
        f = open(parsingDumpedFileName, 'w')
        f.write(r.encode('utf-8'))
        f.close()

        rlist = r.replace('\n\n\n', '\n\n\n\n').split('\n\n')
        return rlist
项目:deep-text-corrector    作者:andabi    | 项目源码 | 文件源码
def parse_batch(self, sentenceDumpedFileName, parsingDumpedFileName):

        if os.path.exists('../stanford-parser-2012-03-09') == False:
            print >> sys.stderr, 'can not find Stanford parser directory'
            sys.exit(1)

        # tokenized
        cmd = r'java -server -mx4096m -cp "../stanford-parser-2012-03-09/*:" edu.stanford.nlp.parser.lexparser.LexicalizedParser  -retainTMPSubcategories -sentences newline -tokenized -escaper edu.stanford.nlp.process.PTBEscapingProcessor  -outputFormat "wordsAndTags, penn, typedDependencies" -outputFormatOptions "basicDependencies" edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz ' + sentenceDumpedFileName

        r = os.popen(cmd).read().strip().decode('utf-8')
        f = open(parsingDumpedFileName, 'w')
        f.write(r.encode('utf-8'))
        f.close()

        rlist = r.replace('\n\n\n', '\n\n\n\n').split('\n\n')
        return rlist
项目:GoToMeetingTools    作者:plongitudes    | 项目源码 | 文件源码
def monkey_func(self, url, new=0, autoraise=True):
        if self._name == 'default':
            script = 'do shell script "open %s"' % url.replace('"', '%22')  # opens in default browser
        else:
            script = '''
               tell application "%s"
                   activate
                   open location "%s"
               end
               ''' % (self._name, url.replace('"', '%22'))
        osapipe = os.popen("osascript", "w")
        if osapipe is None:
            return False
        osapipe.write(script)
        rc = osapipe.close()
        return not rc
项目:FileBot    作者:Searinox    | 项目源码 | 文件源码
def console_setup():
    global COLOR_WINDOW_TX
    global COLOR_WINDOW_BG
    global WINDOW_ROWS
    global WINDOW_COLS
    global WINDOW_HANDLE

    OUTPUT_ROW_BUFFER=WINDOW_ROWS

    os.system("@title FileBot v"+__version__)
    os.system("color "+str(hex(COLOR_WINDOW_TX+COLOR_WINDOW_BG*16))[2:])
    os.popen("mode con: lines="+str(WINDOW_ROWS)+" cols="+str(WINDOW_COLS))
    WINDOW_HANDLE=ctypes.windll.kernel32.GetStdHandle(-12)
    ctypes.windll.kernel32.SetConsoleScreenBufferSize(WINDOW_HANDLE,OUTPUT_ROW_BUFFER*(2**16)+WINDOW_COLS)
    cursor_visibility(False)
    return
项目:breadAI    作者:ideamark    | 项目源码 | 文件源码
def translate(word):
    if not word:
        return
    if re.match(u'.*[\u4e00-\u9fa5].*', word) or ' ' in word:
        p = {'wd': word}
        return "http://dict.baidu.com/s?" + urllib.parse.urlencode(p)
    reses = os.popen('sdcv -n ' + word).readlines()
    if not re.match(u'^Found 1 items.*', reses[0]):
        p = {'wd': word}
        return "http://dict.baidu.com/s?" + urllib.parse.urlencode(p)
    res = ''
    for i in range(4, len(reses)):
        res += reses[i]
    res = re.sub(u'\[.+\]', '', res)
    res = res.replace('\n', '')
    res = res.replace('//', '\r')
    return res
项目:CustomContextMenu    作者:bigsinger    | 项目源码 | 文件源码
def getXmlInfo(self):
        xml_cmd = ''
        self.lastError = ''

        if 'Windows' in platform.system():
            xml_cmd = "%s d xmltree \"%s\" AndroidManifest.xml " % (self.aapt_path, self.apk_path)
            # xml_cmd = "%s%saapt.exe d xmltree \"%s\" AndroidManifest.xml "%(self.aapt_path,os.sep, self.apk_path)

        if 'Linux' in platform.system():
            xml_cmd = "%s%saapt d xmltree %s AndroidManifest.xml " % (self.aapt_path, os.sep, self.apk_path)

        try:
            strxml = os.popen(xml_cmd)
            self.xmltree = strxml.read()
        except Exception as e:
            # print "aapt Mainfestxml error"
            self.lastError = 'aapt get AndroidManifest.xml error'
            return False
        return True

    # ?xml???????
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def _popen(command, args):
    import os
    path = os.environ.get("PATH", os.defpath).split(os.pathsep)
    path.extend(('/sbin', '/usr/sbin'))
    for dir in path:
        executable = os.path.join(dir, command)
        if (os.path.exists(executable) and
            os.access(executable, os.F_OK | os.X_OK) and
            not os.path.isdir(executable)):
            break
    else:
        return None
    # LC_ALL to ensure English output, 2>/dev/null to prevent output on
    # stderr (Note: we don't have an example where the words we search for
    # are actually localized, but in theory some system could do so.)
    cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
    return os.popen(cmd)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def _ipconfig_getnode():
    """Get the hardware address on Windows by running ipconfig.exe."""
    import os, re
    dirs = ['', r'c:\windows\system32', r'c:\winnt\system32']
    try:
        import ctypes
        buffer = ctypes.create_string_buffer(300)
        ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300)
        dirs.insert(0, buffer.value.decode('mbcs'))
    except:
        pass
    for dir in dirs:
        try:
            pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all')
        except IOError:
            continue
        with pipe:
            for line in pipe:
                value = line.split(':')[-1].strip().lower()
                if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value):
                    return int(value.replace('-', ''), 16)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def _popen(command, args):
    import os
    path = os.environ.get("PATH", os.defpath).split(os.pathsep)
    path.extend(('/sbin', '/usr/sbin'))
    for dir in path:
        executable = os.path.join(dir, command)
        if (os.path.exists(executable) and
            os.access(executable, os.F_OK | os.X_OK) and
            not os.path.isdir(executable)):
            break
    else:
        return None
    # LC_ALL to ensure English output, 2>/dev/null to prevent output on
    # stderr (Note: we don't have an example where the words we search for
    # are actually localized, but in theory some system could do so.)
    cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args)
    return os.popen(cmd)
项目:ApkParser    作者:yigitozgumus    | 项目源码 | 文件源码
def parse_icon(self, icon_path=None):
        """
        parse icon.
        :param icon_path: icon storage path
        """
        if not icon_path:
            icon_path = os.path.dirname(os.path.abspath(__file__))

        pkg_name_path = os.path.join(icon_path, self.package)
        if not os.path.exists(pkg_name_path):
            os.mkdir(pkg_name_path)

        aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
        parse_icon_rt = os.popen(aapt_line).read()
        icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]

        zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
        for icon in icon_paths:
            icon_name = icon.replace('/', '_')
            data = zfile.read(icon)
            with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
                icon_file.write(data)
        print "APK ICON in: %s" % pkg_name_path