Python pipes 模块,quote() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pipes.quote()

项目:dingdang-robot    作者:wzpan    | 项目源码 | 文件源码
def get_speech(self, phrase):
        getinfo_url = 'http://www.peiyinge.com/make/getSynthSign'
        voice_baseurl = 'http://proxy.peiyinge.com:17063/synth?ts='
        data = {
            'content': phrase.encode('utf8')
        }
        result_info = requests.post(getinfo_url, data=data).json()
        content = urllib.quote(phrase.encode('utf8'))
        ts = result_info['ts']
        sign = result_info['sign']
        voice_url = voice_baseurl + ts + '&sign=' + sign + \
            '&vid=' + self.vid + '&volume=&speed=0&content=' + content
        r = requests.get(voice_url)
        with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
            f.write(r.content)
            tmpfile = f.name
            return tmpfile
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def _RunGdb(device, package_name, output_directory, target_cpu, extra_args,
            verbose):
  gdb_script_path = os.path.dirname(__file__) + '/adb_gdb'
  cmd = [
      gdb_script_path,
      '--package-name=%s' % package_name,
      '--output-directory=%s' % output_directory,
      '--adb=%s' % adb_wrapper.AdbWrapper.GetAdbPath(),
      '--device=%s' % device.serial,
      # Use one lib dir per device so that changing between devices does require
      # refetching the device libs.
      '--pull-libs-dir=/tmp/adb-gdb-libs-%s' % device.serial,
  ]
  # Enable verbose output of adb_gdb if it's set for this script.
  if verbose:
    cmd.append('--verbose')
  if target_cpu:
    cmd.append('--target-arch=%s' % _TargetCpuToTargetArch(target_cpu))
  cmd.extend(extra_args)
  logging.warning('Running: %s', ' '.join(pipes.quote(x) for x in cmd))
  print _Colorize('YELLOW', 'All subsequent output is from adb_gdb script.')
  os.execv(gdb_script_path, cmd)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def upload(self, step, buildIdFile, tgzFile):
        # only upload if requested
        if not self.canUploadJenkins():
            return ""

        # upload with curl if file does not exist yet on server
        return "\n" + textwrap.dedent("""\
            # upload artifact
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
            BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
            if ! curl --output /dev/null --silent --head --fail "$BOB_UPLOAD_URL" ; then
                BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {RESULT} "$BOB_UPLOAD_URL" || true)
                if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then
                    echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL}
                fi
            fi""".format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                         FAIL="" if self._ignoreErrors() else "; exit 1",
                         GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def uploadJenkinsLiveBuildId(self, step, liveBuildId, buildId):
        # only upload if requested
        if not self.canUploadJenkins():
            return ""

        # upload with curl if file does not exist yet on server
        return "\n" + textwrap.dedent("""\
            # upload live build-id
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {LIVEBUILDID}){GEN}"
            BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
            BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {BUILDID} "$BOB_UPLOAD_URL" || true)
            if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then
                echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL}
            fi
            """.format(URL=self.__url.geturl(), LIVEBUILDID=quote(liveBuildId),
                       BUILDID=quote(buildId),
                       FAIL="" if self._ignoreErrors() else "; exit 1",
                       GEN=ARCHIVE_GENERATION, SUFFIX=BUILDID_SUFFIX))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def jenkinsNamePersister(jenkins, wrapFmt, uuid):

    def persist(step, props):
        ret = BobState().getJenkinsByNameDirectory(
            jenkins, wrapFmt(step, props), step.getVariantId())
        if uuid: ret = ret + "-" + uuid
        return ret

    def fmt(step, mode, props):
        if mode == 'workspace':
            return persist(step, props)
        else:
            assert mode == 'exec'
            if step.getSandbox() is None:
                return os.path.join("$PWD", quote(persist(step, props)))
            else:
                return os.path.join("/bob", asHexStr(step.getVariantId()), "workspace")

    return fmt
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def do_osx_install(srcdir, targetdir):

    if os.path.exists(targetdir):
        print 'Target dir %s already exists! Removing...'
        shutil.rmtree(targetdir)

    install_script = os.popen('find '+ srcdir +' -iname install.sh').read().strip()
    print 'DBG install_script:', install_script
    os.popen('chmod +x "%s"' % install_script)
    cmd_install = '%s %s %s' % (pipes.quote(install_script), srcdir, targetdir)
    print 'DBG cmd: "%s"' % cmd_install
    cmd_chmod_chromium = 'find %s -name Chromium -exec chmod +x {} \;' % (targetdir)
    cmd_chmod_chromium_helper = 'find %s -name Chromium\ Helper -exec chmod +x {} \;' % (targetdir)
    for cmd in [cmd_install, cmd_chmod_chromium, cmd_chmod_chromium_helper]:

        proc = subprocess.Popen(cmd, shell=True)
        proc.wait()
        if proc.returncode:
            print "returncode " + str(proc.returncode)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
项目:temporal-segment-networks    作者:yjxiong    | 项目源码 | 文件源码
def run_optical_flow(vid_item, dev_id=0):
    vid_path = vid_item[0]
    vid_id = vid_item[1]
    vid_name = vid_path.split('/')[-1].split('.')[0]
    out_full_path = os.path.join(out_path, vid_name)
    try:
        os.mkdir(out_full_path)
    except OSError:
        pass

    current = current_process()
    dev_id = (int(current._identity[0]) - 1) % NUM_GPU
    image_path = '{}/img'.format(out_full_path)
    flow_x_path = '{}/flow_x'.format(out_full_path)
    flow_y_path = '{}/flow_y'.format(out_full_path)

    cmd = os.path.join(df_path + 'build/extract_gpu')+' -f {} -x {} -y {} -i {} -b 20 -t 1 -d {} -s 1 -o {} -w {} -h {}'.format(
        quote(vid_path), quote(flow_x_path), quote(flow_y_path), quote(image_path), dev_id, out_format, new_size[0], new_size[1])

    os.system(cmd)
    print '{} {} done'.format(vid_id, vid_name)
    sys.stdout.flush()
    return True
项目:antenna    作者:mozilla-services    | 项目源码 | 文件源码
def main():
    temp = mkdtemp(prefix='pipstrap-')
    try:
        downloads = [hashed_download(url, temp, digest)
                     for url, digest in PACKAGES]
        check_output('pip install --no-index --no-deps -U ' +
                     ' '.join(quote(d) for d in downloads),
                     shell=True)
    except HashError as exc:
        print(exc)
    except Exception:
        rmtree(temp)
        raise
    else:
        rmtree(temp)
        return 0
    return 1
项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def _debug_cmd(self, args, exe=None):
        if not self.params.get('verbose', False):
            return

        str_args = [decodeArgument(a) for a in args]

        if exe is None:
            exe = os.path.basename(str_args[0])

        try:
            import pipes
            shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
        except ImportError:
            shell_quote = repr
        self.to_screen('[debug] %s command line: %s' % (
            exe, shell_quote(str_args)))
项目:Video-Classification-Action-Recognition    作者:qijiezhao    | 项目源码 | 文件源码
def run_optical_flow(vid_item, dev_id=0):
    vid_path = vid_item[0]
    vid_id = vid_item[1]
    vid_name = vid_path.split('/')[-1].split('.')[0]
    out_full_path = os.path.join(out_path, vid_name)
    try:
        os.mkdir(out_full_path)
    except OSError:
        pass

    current = current_process()
    dev_id = (int(current._identity[0]) - 1) % NUM_GPU
    image_path = '{}/img'.format(out_full_path)
    flow_x_path = '{}/flow_x'.format(out_full_path)
    flow_y_path = '{}/flow_y'.format(out_full_path)

    cmd = os.path.join(df_path + 'build/extract_gpu')+' -f {} -x {} -y {} -i {} -b 20 -t 1 -d {} -s 1 -o {} -w {} -h {}'.format(
        quote(vid_path), quote(flow_x_path), quote(flow_y_path), quote(image_path), dev_id, out_format, new_size[0], new_size[1])

    os.system(cmd)
    print '{} {} done'.format(vid_id, vid_name)
    sys.stdout.flush()
    return True
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _set_delayed(self):
        """Delayed change of iptables rules (postprocess method).
        After deleting/creation of pod we should change iptables rules, but
        we don't know if the operation actually have been performed. So, wait
        for 2 minutes and call postprocess method as superuser (via suid
        binary 'suidwrap').

        """
        token = getattr(self, 'token', None)
        if not token or token == 'None':
            data = self.query.get(AUTH_TOKEN_PATH)
            token = data['token']
        try:
            fmt = 'echo /usr/libexec/suidwrap "{0}" {1} ' \
                  '|at now + 2 minute > /dev/null 2>&1'
            subprocess.check_call([fmt.format(token, quote(self.name))],
                                  shell=True)
        except (KeyError, TypeError, subprocess.CalledProcessError):
            return
项目:optimalvibes    作者:littlemika    | 项目源码 | 文件源码
def _debug_cmd(self, args, exe=None):
        if not self.params.get('verbose', False):
            return

        str_args = [decodeArgument(a) for a in args]

        if exe is None:
            exe = os.path.basename(str_args[0])

        try:
            import pipes
            shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
        except ImportError:
            shell_quote = repr
        self.to_screen('[debug] %s command line: %s' % (
            exe, shell_quote(str_args)))
项目:sublimeplugins    作者:ktuan89    | 项目源码 | 文件源码
def search(self, query):
        if self.grep_command is not None:
            command = self.grep_command.format(pipes.quote(query))
        elif self.show_in_view:
            command = grepFormatStr().format(
                grepPath(self.window),
                pipes.quote(query)
            )
        else:
            # we need quick results
            command = quickGrepFormatStr().format(
                grepPath(self.window),
                pipes.quote(query)
            )

        sublime.status_message("grepping {0} ...".format(pipes.quote(query)))

        output, _ = run_bash_for_output(command)
        lines = output.split('\n')
        self.show_results(query, lines)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def files_in_archive(self, force_refresh=False):
        if self._files_in_archive and not force_refresh:
            return self._files_in_archive

        cmd = [ self.cmd_path, '--list', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        if rc != 0:
            raise UnarchiveError('Unable to list files in the archive')

        for filename in out.splitlines():
            # Compensate for locale-related problems in gtar output (octal unicode representation) #11348
#            filename = filename.decode('string_escape')
            filename = codecs.escape_decode(filename)[0]
            if filename and filename not in self.excludes:
                self._files_in_archive.append(to_native(filename))
        return self._files_in_archive
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def unarchive(self):
        cmd = [ self.cmd_path, '--extract', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.file_args['owner']:
            cmd.append('--owner=' + quote(self.file_args['owner']))
        if self.file_args['group']:
            cmd.append('--group=' + quote(self.file_args['group']))
        if self.module.params['keep_newer']:
            cmd.append('--keep-newer-files')
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        return dict(cmd=cmd, rc=rc, out=out, err=err)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _read_user_execute(self):
        """
        Returns the command line for reading a crontab
        """
        user = ''

        if self.user:
            if platform.system() == 'SunOS':
                return "su %s -c '%s -l'" % (pipes.quote(self.user), pipes.quote(CRONCMD))
            elif platform.system() == 'AIX':
                return "%s -l %s" % (pipes.quote(CRONCMD), pipes.quote(self.user))
            elif platform.system() == 'HP-UX':
                return "%s %s %s" % (CRONCMD , '-l', pipes.quote(self.user))
            else:
                user = '-u %s' % pipes.quote(self.user)
        return "%s %s %s" % (CRONCMD , user, '-l')
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def query_package(module, port_path, name, state="present"):
    """ Returns whether a package is installed or not. """

    if state == "present":

        rc, out, err = module.run_command("%s installed | grep -q ^.*%s" % (pipes.quote(port_path), pipes.quote(name)), use_unsafe_shell=True)
        if rc == 0:
            return True

        return False

    elif state == "active":

        rc, out, err = module.run_command("%s installed %s | grep -q active" % (pipes.quote(port_path), pipes.quote(name)), use_unsafe_shell=True)

        if rc == 0:
            return True

        return False
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def build_module_command(self, env_string, shebang, cmd, arg_path=None, rm_tmp=None):
        # don't quote the cmd if it's an empty string, because this will break pipelining mode
        if cmd.strip() != '':
            cmd = pipes.quote(cmd)

        cmd_parts = []
        if shebang:
            shebang = shebang.replace("#!", "").strip()
        else:
            shebang = ""
        cmd_parts.extend([env_string.strip(), shebang, cmd])
        if arg_path is not None:
            cmd_parts.append(arg_path)
        new_cmd = " ".join(cmd_parts)
        if rm_tmp:
            new_cmd = '%s; rm -rf "%s" %s' % (new_cmd, rm_tmp, self._SHELL_REDIRECT_ALLNULL)
        return new_cmd
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def put_file(self, in_path, out_path):
        ''' transfer a file from local to lxc '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.lxc)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("chroot connection requires dd command in the chroot")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def fetch_file(self, in_path, out_path):
        ''' fetch a file from lxc to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.lxc)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("chroot connection requires dd command in the chroot")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def put_file(self, in_path, out_path):
        ''' transfer a file from local to chroot '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.chroot)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("chroot connection requires dd command in the chroot")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def put_file(self, in_path, out_path):
        ''' transfer a file from local to jail '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.jail)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("jail connection requires dd command in the jail")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def fetch_file(self, in_path, out_path):
        ''' fetch a file from jail to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.jail)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("jail connection requires dd command in the jail")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def put_file(self, in_path, out_path):
        ''' transfer a file from local to zone '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.zone)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(in_path, 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("jail connection requires dd command in the jail")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
项目:build-calibre    作者:kovidgoyal    | 项目源码 | 文件源码
def run(*args, **kw):
    if len(args) == 1 and isinstance(args[0], type('')):
        cmd = split(args[0])
    else:
        cmd = args
    print(' '.join(pipes.quote(x) for x in cmd))
    sys.stdout.flush()
    env = current_env(library_path=kw.get('library_path'))
    try:
        p = subprocess.Popen(cmd, env=env, cwd=kw.get('cwd'))
    except EnvironmentError as err:
        if err.errno == errno.ENOENT:
            raise SystemExit('Could not find the program: %s' % cmd[0])
        raise
    rc = p.wait()
    if kw.get('no_check'):
        return rc
    if rc != 0:
        print('The following command failed, with return code:', rc)
        print(' '.join(pipes.quote(x) for x in cmd))
        print('Dropping you into a shell')
        sys.stdout.flush()
        run_shell(library_path=kw.get('library_path'))
        raise SystemExit(1)
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def gitpkgv_revision(self, ud, d, name):
        """
        Return a sortable revision number by counting commits in the history
        Based on gitpkgv.bblass in meta-openembedded
        """
        rev = self._build_revision(ud, d, name)
        localpath = ud.localpath
        rev_file = os.path.join(localpath, "oe-gitpkgv_" + rev)
        if not os.path.exists(localpath):
            commits = None
        else:
            if not os.path.exists(rev_file) or not os.path.getsize(rev_file):
                from pipes import quote
                commits = bb.fetch2.runfetchcmd(
                        "git rev-list %s -- | wc -l" % (quote(rev)),
                        d, quiet=True).strip().lstrip('0')
                if commits:
                    open(rev_file, "w").write("%d\n" % int(commits))
            else:
                commits = open(rev_file, "r").readline(128).strip()
        if commits:
            return False, "%s+%s" % (commits, rev[:7])
        else:
            return True, str(rev)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def main():
    temp = mkdtemp(prefix='pipstrap-')
    try:
        downloads = [hashed_download(url, temp, digest)
                     for url, digest in PACKAGES]
        check_output('pip install --no-index --no-deps -U ' +
                     ' '.join(quote(d) for d in downloads),
                     shell=True)
    except HashError as exc:
        print(exc)
    except Exception:
        rmtree(temp)
        raise
    else:
        rmtree(temp)
        return 0
    return 1
项目:unionfs_cleaner    作者:l3uddz    | 项目源码 | 文件源码
def opened_files(path, excludes):
    files = []

    try:
        process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path))
        data = process.read()
        process.close()
        for item in data.split('\n'):
            if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes):
                continue
            files.append(item)

        return files

    except Exception as ex:
        logger.exception("Exception checking %r: ", path)
        return None
项目:unionfs_cleaner    作者:l3uddz    | 项目源码 | 文件源码
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run):
    upload_cmd = 'rclone move %s %s' \
                 ' --delete-after' \
                 ' --no-traverse' \
                 ' --stats=60s' \
                 ' -v' \
                 ' --transfers=%d' \
                 ' --checkers=%d' \
                 ' --drive-chunk-size=%s' % \
                 (cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size)
    if bwlimit and len(bwlimit):
        upload_cmd += ' --bwlimit="%s"' % bwlimit
    for item in excludes:
        upload_cmd += ' --exclude="%s"' % item
    if dry_run:
        upload_cmd += ' --dry-run'
    return upload_cmd
项目:unionfs_cleaner    作者:l3uddz    | 项目源码 | 文件源码
def remove_empty_directories(config, force_dry_run=False):
    open_files = opened_files(config['local_folder'], config['lsof_excludes'])
    if not len(open_files):
        clearing = False
        for dir, depth in config['rclone_remove_empty_on_upload'].items():
            if os.path.exists(dir):
                clearing = True
                logger.debug("Removing empty directories from %r with mindepth %r", dir, depth)
                cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth)
                if not config['dry_run'] and not force_dry_run:
                    cmd += ' -delete'
                run_command(cmd)
        if clearing:
            logger.debug("Finished clearing empty directories")
    else:
        logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files),
                     open_files)


############################################################
# CONFIG STUFF
############################################################
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def test_activate(monkeypatch):
    can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch)

    def activate_redis_url(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert can_connect_args['port'] == 6379
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        if len(result) > 2:
            import os
            print("os.environ=" + repr(os.environ))
            print("result=" + repr(result))
        assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result

    with_directory_contents_completing_project_file(
        {DEFAULT_PROJECT_FILENAME: """
services:
  REDIS_URL: redis
    """}, activate_redis_url)
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def test_activate_quoting(monkeypatch):
    def activate_foo(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result

    with_directory_contents_completing_project_file(
        {
            DEFAULT_PROJECT_FILENAME: """
variables:
  FOO: {}
    """,
            DEFAULT_LOCAL_STATE_FILENAME: """
variables:
  FOO: $! boo
"""
        }, activate_foo)
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def activate(dirname, ui_mode, conda_environment, command_name):
    """Prepare project and return lines to be sourced.

    Future direction: should also activate the proper conda env.

    Returns:
        None on failure or a list of lines to print.
    """
    project = load_project(dirname)
    result = prepare_with_ui_mode_printing_errors(project,
                                                  ui_mode=ui_mode,
                                                  env_spec_name=conda_environment,
                                                  command_name=command_name)
    if result.failed:
        return None

    exports = []
    # sort so we have deterministic output order for tests
    sorted_keys = list(result.environ.keys())
    sorted_keys.sort()
    for key in sorted_keys:
        value = result.environ[key]
        if key not in os.environ or os.environ[key] != value:
            exports.append("export {key}={value}".format(key=key, value=quote(value)))
    return exports
项目:kodi-plugin.video.ted-talks-chinese    作者:daineseh    | 项目源码 | 文件源码
def _debug_cmd(self, args, exe=None):
        if not self.params.get('verbose', False):
            return

        str_args = [decodeArgument(a) for a in args]

        if exe is None:
            exe = os.path.basename(str_args[0])

        try:
            import pipes
            shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
        except ImportError:
            shell_quote = repr
        self.to_screen('[debug] %s command line: %s' % (
            exe, shell_quote(str_args)))
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def is_available(cls):
        if (super(cls, cls).is_available() and
           diagnose.check_executable('text2wave') and
           diagnose.check_executable('festival')):

            logger = logging.getLogger(__name__)
            cmd = ['festival', '--pipe']
            with tempfile.SpooledTemporaryFile() as out_f:
                with tempfile.SpooledTemporaryFile() as in_f:
                    logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                           for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=out_f)
                    out_f.seek(0)
                    output = out_f.read().strip()
                    if output:
                        logger.debug("Output was: '%s'", output)
                    return ('No default voice found' not in output)
        return False
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        cmd = ['text2wave']
        with tempfile.NamedTemporaryFile(suffix='.wav') as out_f:
            with tempfile.SpooledTemporaryFile() as in_f:
                in_f.write(phrase)
                in_f.seek(0)
                with tempfile.SpooledTemporaryFile() as err_f:
                    self._logger.debug('Executing %s',
                                       ' '.join([pipes.quote(arg)
                                                 for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=err_f)
                    err_f.seek(0)
                    output = err_f.read()
                    if output:
                        self._logger.debug("Output was: '%s'", output)
            self.play(out_f.name)
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        cmd = ['flite']
        if self.voice:
            cmd.extend(['-voice', self.voice])
        cmd.extend(['-t', phrase])
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd.append(fname)
        with tempfile.SpooledTemporaryFile() as out_f:
            self._logger.debug('Executing %s',
                               ' '.join([pipes.quote(arg)
                                         for arg in cmd]))
            subprocess.call(cmd, stdout=out_f, stderr=out_f)
            out_f.seek(0)
            output = out_f.read().strip()
        if output:
            self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd = ['espeak', '-v', self.voice,
                         '-p', self.pitch_adjustment,
                         '-s', self.words_per_minute,
                         '-w', fname,
                         phrase]
        cmd = [str(x) for x in cmd]
        self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                     for arg in cmd]))
        with tempfile.TemporaryFile() as f:
            subprocess.call(cmd, stdout=f, stderr=f)
            f.seek(0)
            output = f.read()
            if output:
                self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd = ['pico2wave', '--wave', fname]
        if self.language not in self.languages:
                raise ValueError("Language '%s' not supported by '%s'",
                                 self.language, self.SLUG)
        cmd.extend(['-l', self.language])
        cmd.append(phrase)
        self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                     for arg in cmd]))
        with tempfile.TemporaryFile() as f:
            subprocess.call(cmd, stdout=f, stderr=f)
            f.seek(0)
            output = f.read()
            if output:
                self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def get_command(self, file, **options):
            # on darwin open returns immediately resulting in the temp
            # file removal while app is opening
            command = "open -a /Applications/Preview.app"
            command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file),
                                                        quote(file))
            return command
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def show_file(self, file, **options):
            command, executable = self.get_command_ex(file, **options)
            command = "(%s %s; rm -f %s)&" % (command, quote(file),
                                              quote(file))
            os.system(command)
            return 1

    # implementations
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def get_command_ex(self, file, title=None, **options):
            # note: xv is pretty outdated.  most modern systems have
            # imagemagick's display command instead.
            command = executable = "xv"
            if title:
                command += " -name %s" % quote(title)
            return command, executable
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __uploadJenkins(self, step, buildIdFile, resultFile, suffix):
        """Generate upload shell script.

        We cannot simply copy the artifact to the final location as this is not
        atomic. Instead we create a temporary file at the repository root, copy
        the artifact there and hard-link the temporary file at the final
        location. If the link fails it is usually caused by a concurrent
        upload. Test that the artifact is readable in this case to distinguish
        it from other fatal errors.
        """
        if not self.canUploadJenkins():
            return ""

        return "\n" + textwrap.dedent("""\
            # upload artifact
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
            BOB_UPLOAD_FILE="{DIR}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
            if [[ ! -e ${{BOB_UPLOAD_FILE}} ]] ; then
                (
                    set -eE
                    T="$(mktemp -p {DIR})"
                    trap 'rm -f $T' EXIT
                    cp {RESULT} "$T"
                    mkdir -p "${{BOB_UPLOAD_FILE%/*}}"
                    if ! ln -T "$T" "$BOB_UPLOAD_FILE" ; then
                        [[ -r "$BOB_UPLOAD_FILE" ]] || exit 2
                    fi
                ){FIXUP}
            fi""".format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(resultFile),
                         FIXUP=" || echo Upload failed: $?" if self._ignoreErrors() else "",
                         GEN=ARCHIVE_GENERATION, SUFFIX=suffix))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def download(self, step, buildIdFile, tgzFile):
        if not self.canDownloadJenkins():
            return ""

        return "\n" + textwrap.dedent("""\
            if [[ ! -e {RESULT} ]] ; then
                BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
                BOB_DOWNLOAD_FILE="{DIR}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
                cp "$BOB_DOWNLOAD_FILE" {RESULT} || echo Download failed: $?
            fi
            """.format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                       GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def download(self, step, buildIdFile, tgzFile):
        # only download if requested
        if not self.canDownloadJenkins():
            return ""

        return "\n" + textwrap.dedent("""\
            if [[ ! -e {RESULT} ]] ; then
                BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
                BOB_DOWNLOAD_URL="{URL}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
                curl -sSg --fail -o {RESULT} "$BOB_DOWNLOAD_URL" || echo Download failed: $?
            fi
            """.format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                       GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def download(self, step, buildIdFile, tgzFile):
        # only download if requested
        if not self.canDownloadJenkins():
            return ""

        return """
if [[ ! -e {RESULT} ]] ; then
    BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
    BOB_LOCAL_ARTIFACT={RESULT}
    BOB_REMOTE_ARTIFACT="${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
    {CMD}
fi
""".format(CMD=self.__downloadCmd, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
           GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __getitem__(self, item):
            mode = item[0]
            item = item[1:]
            content = []
            try:
                paths = sorted(glob(os.path.join(self.baseDir, item)))
                if not paths:
                    raise ParseError("No files matched in include pattern '{}'!"
                        .format(item))
                for path in paths:
                    content.append(self.fileLoader(path))
            except OSError as e:
                raise ParseError("Error including '"+item+"': " + str(e))
            content = b''.join(content)

            self.incDigests.append(asHexStr(hashlib.sha1(content).digest()))
            if mode == '<':
                var = "_{}{}".format(self.varBase, self.count)
                self.count += 1
                self.prolog.extend([
                    "{VAR}=$(mktemp)".format(VAR=var),
                    "_BOB_TMP_CLEANUP+=( ${VAR} )".format(VAR=var),
                    "base64 -d > ${VAR} <<EOF".format(VAR=var)])
                self.prolog.extend(sliceString(b64encode(content).decode("ascii"), 76))
                self.prolog.append("EOF")
                ret = "${" + var + "}"
            else:
                assert mode == "'"
                ret = quote(content.decode('utf8'))

            return ret