Python os 模块,devnull() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用os.devnull()

项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def test_err_in_fun(self):
            # Test that the original signal this process was hit with
            # is not returned in case fun raise an exception. Instead,
            # we're supposed to see retsig = 1.
            ret = pyrun(textwrap.dedent(
                """
                import os, signal, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    sys.stderr = os.devnull
                    1 / 0

                sig = signal.SIGTERM if os.name == 'posix' else \
                    signal.CTRL_C_EVENT
                mod.register_exit_fun(foo)
                os.kill(os.getpid(), sig)
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            if POSIX:
                self.assertEqual(ret, 1)
                assert ret != signal.SIGTERM, strfsig(ret)
项目:epubcheck    作者:titusz    | 项目源码 | 文件源码
def epubcheck_help():
    """Return epubcheck.jar commandline help text.

    :return unicode: helptext from epubcheck.jar
    """

    # tc = locale.getdefaultlocale()[1]

    with open(os.devnull, "w") as devnull:
        p = subprocess.Popen(
            [c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
            stdout=subprocess.PIPE,
            stderr=devnull,
        )
        result = p.communicate()[0]

    return result.decode()
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
    """ Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
    raise ValueError('Unsupported method at the moment')

    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                           '-w', weight_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
        proc.stdin.write('%d\t%d\t%f\n' % ijx)

    proc.stdin.close()
    proc.wait()
    devnull.close()
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
    """ Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ij in itertools.izip(matrix.row, matrix.col):
        proc.stdin.write('%d\t%d\n' % ij)

    proc.stdin.close()
    proc.wait()
    devnull.close()
项目:conv2mp4-py    作者:Kameecoding    | 项目源码 | 文件源码
def get_audio_streams(self):
        with open(os.devnull, 'w') as DEV_NULL:
            #Get file info and Parse it
            try:
                proc = subprocess.Popen([
                    FFPROBE,
                    '-i', self.input_video,
                    '-of', 'json',
                    '-show_streams'
                ], stdout=subprocess.PIPE, stderr=DEV_NULL)
            except OSError as e:
                if e.errno == os.errno.ENOENT:
                    Logger.error("FFPROBE not found, install on your system to use this script")
                    sys.exit(0)
            output = proc.stdout.read()

            return get_audio_streams(json.loads(output))
项目:wpw-sdk-python    作者:WPTechInnovation    | 项目源码 | 文件源码
def launch(self, cfg, path, flags):
        logging.debug("Determine the OS and Architecture this application is currently running on")
        hostOS = platform.system().lower()
        logging.debug("hostOS: " + str(hostOS))
        is_64bits = sys.maxsize > 2 ** 32
        if is_64bits:
            hostArchitecture = 'x64'
        else:
            hostArchitecture = 'ia32'
        logging.debug("hostArchitecture: " + str(hostArchitecture))
        if(self.validateConfig(cfg, hostOS, hostArchitecture)):
            fnull = open(os.devnull, 'w')
            if os.environ.get("WPW_HOME") is not None:
                cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            else:
                cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            cmd.extend(flags)
            proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
            return proc
        else:
            logging.debug("Invalid OS/Architecture combination detected")
项目:games_nebula    作者:yancharkin    | 项目源码 | 文件源码
def win64_available(self):

        wine_bin, \
        wineserver_bin, \
        wine_lib = self.get_wine_bin_path()

        dev_null = open(os.devnull, 'w')
        try:
            proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
                stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            dev_null.close()
            stdoutdata, stderrdata = proc.communicate()
            if proc.returncode == 1:
                self.combobox_winearch.set_visible(True)
                return True
            else:
                self.combobox_winearch.set_visible(False)
                self.winearch = 'win32'
                return False
        except:
            self.combobox_winearch.set_visible(False)
            self.winearch = 'win32'
            return False
项目:ipwb    作者:oduwsdl    | 项目源码 | 文件源码
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed, redundant of below
        # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError('Unknown error in retrieving daemon status')
        logError(sys.exc_info()[0])
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _query(self):
        """
        Returns the value of deepsea_minions
        """
        # When search matches no minions, salt prints to stdout.
        # Suppress stdout.
        _stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

        # Relying on side effect - pylint: disable=unused-variable
        ret = self.local.cmd('*', 'saltutil.pillar_refresh')
        minions = self.local.cmd('*', 'pillar.get', ['deepsea_minions'],
                                 expr_form="compound")
        sys.stdout = _stdout
        for minion in minions:
            if minions[minion]:
                return minions[minion]

        log.error("deepsea_minions is not set")
        return []
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _matches(self):
        """
        Returns the list of matched minions
        """
        if self.deepsea_minions:
            # When search matches no minions, salt prints to stdout.
            # Suppress stdout.
            _stdout = sys.stdout
            sys.stdout = open(os.devnull, 'w')
            result = self.local.cmd(self.deepsea_minions,
                                    'pillar.get',
                                    ['id'],
                                    expr_form="compound")
            sys.stdout = _stdout
            return result.keys()
        return []
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def test_exception(self):
            # Make sure handler fun is called on exception.
            ret = pyrun(textwrap.dedent(
                """
                import os, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:
                        f.write(b"1")

                mod.register_exit_fun(foo)
                sys.stderr = os.devnull
                1 / 0
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            self.assertEqual(ret, 1)
            with open(TESTFN, "rb") as f:
                self.assertEqual(f.read(), b"1")
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def test_kinterrupt(self):
            # Simulates CTRL + C and make sure the exit function is called.
            ret = pyrun(textwrap.dedent(
                """
                import os, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:
                        f.write(b"1")

                mod.register_exit_fun(foo)
                sys.stderr = os.devnull
                raise KeyboardInterrupt
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            self.assertEqual(ret, 1)
            with open(TESTFN, "rb") as f:
                self.assertEqual(f.read(), b"1")
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def __init__(self, raw_email, debug=False):
        '''
            Setup the base options of the copy/convert setup
        '''
        self.raw_email = raw_email
        self.log_processing = StringIO()
        self.log_content = StringIO()
        self.tree(self.raw_email)

        twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing)
        emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out)

        self.log_name = log.name('files')

        self.cur_attachment = None

        self.debug = debug
        if self.debug:
            if not os.path.exists('debug_logs'):
                os.makedirs('debug_logs')
            self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log')
            self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log')
        else:
            self.log_debug_err = os.devnull
            self.log_debug_out = os.devnull
项目:wib    作者:chengsoonong    | 项目源码 | 文件源码
def call(self, args, devnull=False):
        """Call other processes.
        args - list of command args
        devnull - whether to pipe stdout to /dev/null (or equivalent)
        """
        if self.debug:
            click.echo(subprocess.list2cmdline(args))
            click.confirm('Continue?', default=True, abort=True)
        try:
            kwargs = {}
            if devnull:
                # Pipe to /dev/null (or equivalent).
                kwargs['stderr'] = subprocess.STDOUT
                kwargs['stdout'] = self.FNULL
            ret_code = subprocess.call(args, **kwargs)
        except subprocess.CalledProcessError:
            return False
        return ret_code
项目:mobot    作者:JokerQyou    | 项目源码 | 文件源码
def _check_ssl_cert(self, cert, key):
        # Check SSL-Certificate with openssl, if possible
        try:
            exit_code = subprocess.call(
                ["openssl", "x509", "-text", "-noout", "-in", cert],
                stdout=open(os.devnull, 'wb'),
                stderr=subprocess.STDOUT)
        except OSError:
            exit_code = 0
        if exit_code is 0:
            try:
                self.httpd.socket = ssl.wrap_socket(
                    self.httpd.socket, certfile=cert, keyfile=key, server_side=True)
            except ssl.SSLError as error:
                self.logger.exception('Failed to init SSL socket')
                raise TelegramError(str(error))
        else:
            raise TelegramError('SSL Certificate invalid')
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:alphy    作者:maximepeschard    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def run_script(script):
    script = 'set -euo pipefail\n' + script
    try:
        with open(os.devnull) as devnull:
            # is this the right way to block stdin?
            data = subprocess.check_output(['bash', '-c', script], stderr=subprocess.STDOUT, stdin=devnull)
        status = 0
    except subprocess.CalledProcessError as ex:
        data = ex.output
        status = ex.returncode
    data = data.decode('utf8')
    if status != 0:
        raise PheWebError(
            'FAILED with status {}\n'.format(status) +
            'output was:\n' +
            data)
    return data
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, targetfd, tmpfile=None):
        self.targetfd = targetfd
        try:
            self.targetfd_save = os.dup(self.targetfd)
        except OSError:
            self.start = lambda: None
            self.done = lambda: None
        else:
            if targetfd == 0:
                assert not tmpfile, "cannot set tmpfile with stdin"
                tmpfile = open(os.devnull, "r")
                self.syscapture = SysCapture(targetfd)
            else:
                if tmpfile is None:
                    f = TemporaryFile()
                    with f:
                        tmpfile = safe_text_dupfile(f, mode="wb+")
                if targetfd in patchsysdict:
                    self.syscapture = SysCapture(targetfd, tmpfile)
                else:
                    self.syscapture = NoCapture()
            self.tmpfile = tmpfile
            self.tmpfile_fd = tmpfile.fileno()
项目:d-tailor    作者:jcg    | 项目源码 | 文件源码
def analyze_structure(seq,filename,ensemble=False):


    chdir(project_dir)

    system("echo '" + str(seq) + "' > " + filename + ".seq")        
    fnull = open(devnull, 'w')   #this line is necessary to omit output generated by UNAFOLD
    if ensemble:         
        call("./3rdParty/unafold/UNAFold.pl -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull)     #code is necessary to omit output generated by UNAFOLD
    else:
        call("./3rdParty/unafold/hybrid-ss-min -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull)     #code is necessary to omit output generated by UNAFOLD

    if os.path.isfile(filename+".ct"):        
        system("mv %s*.ct tmp/structures/" % filename)
        # remove tmp files

    system("rm %s*" % filename)
    #system("mv " + filename + "* tmp/unafold_files/")
    fnull.close()             

    return 1
项目:dartqc    作者:esteinig    | 项目源码 | 文件源码
def _run_cdhit(self, fasta_path, identity=0.95, word_size=5, description_length=0, cdhit_path=None):

        """ Run CDHIT-EST for sequences, install with sudo apt install cd-hit on Ubuntu """

        self.messages.get_cdhit_message(identity)

        if cdhit_path is None:
            cdhit_path = "cd-hit-est"

        file_name = self.project + "_IdentityClusters_" + str(identity)

        out_file = os.path.join(self.tmp_path, file_name)
        cluster_path = os.path.join(self.tmp_path, file_name + '.clstr')

        with open(os.devnull, "w") as devnull:
            call([cdhit_path, "-i", fasta_path, "-o", out_file, "-c", str(identity), "-n", str(word_size),
                  "-d", str(description_length)], stdout=devnull)

        return cluster_path
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def disassemble_it(filename, address=None):
    """
        Wrapper for the ruby disassembler script.
    """
    FNULL = open(os.devnull, 'w')

    if address is not None:
        outfile = filename + "_disass_" + hex(address)
    else:
        outfile = filename + "_disass_None"
    args = ['ruby', 'analysis_tools/disassfunc.rb',
            "-graph", "-svg", "-o", outfile, filename]
    if address is not None:
        args.append(hex(address))
    proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
    proc.wait()
    FNULL.close()
    app.logger.debug("Disassembly just finished!")
    return outfile
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def analyze_it(self):
        """
        Wrapper for the ruby analysis script.
        Executes and get results from files.
        """
        FNULL = open(os.devnull, 'w')
        args = ['ruby', 'analysis_tools/AnalyzeIt.rb', self.storage_file]
        proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
        proc.wait()
        FNULL.close()

        # TEXT report, just UTF-8 decode/parsing
        fname = self.storage_file + '.txt'
        if os.path.exists(fname):
            data = open(fname, 'rb').read()
            self.txt_report = re.sub(r'[^\x00-\x7F]', '', data).decode('utf-8')

        return True
项目:q2-vsearch    作者:qiime2    | 项目源码 | 文件源码
def test_no_clustering(self):
        with redirected_stdio(stderr=os.devnull):
            obs_table, obs_sequences = cluster_features_de_novo(
                sequences=self.input_sequences, table=self.input_table,
                perc_identity=1.0)
        # order of identifiers is important for biom.Table equality
        obs_table = \
            obs_table.sort_order(self.input_table.ids(axis='observation'),
                                 axis='observation')
        self.assertEqual(obs_table, self.input_table)

        obs_seqs = _read_seqs(obs_sequences)

        # sequences are reverse-sorted by abundance in output
        exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
                    self.input_sequences_list[2], self.input_sequences_list[1]]
        self.assertEqual(obs_seqs, exp_seqs)
项目:q2-vsearch    作者:qiime2    | 项目源码 | 文件源码
def test_99_percent_clustering(self):
        exp_table = biom.Table(np.array([[104, 106, 109],
                                         [1, 1, 2],
                                         [7, 8, 9]]),
                               ['feature1', 'feature2',
                                'feature4'],
                               ['sample1', 'sample2', 'sample3'])

        with redirected_stdio(stderr=os.devnull):
            obs_table, obs_sequences = cluster_features_de_novo(
                sequences=self.input_sequences, table=self.input_table,
                perc_identity=0.99)
        # order of identifiers is important for biom.Table equality
        obs_table = \
            obs_table.sort_order(exp_table.ids(axis='observation'),
                                 axis='observation')
        self.assertEqual(obs_table, exp_table)

        # sequences are reverse-sorted by abundance in output
        obs_seqs = _read_seqs(obs_sequences)
        exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
                    self.input_sequences_list[1]]
        self.assertEqual(obs_seqs, exp_seqs)
项目:q2-vsearch    作者:qiime2    | 项目源码 | 文件源码
def test_97_percent_clustering_feature1_most_abundant(self):
        exp_table = biom.Table(np.array([[111, 114, 118],
                                         [1, 1, 2]]),
                               ['feature1', 'feature2'],
                               ['sample1', 'sample2', 'sample3'])

        with redirected_stdio(stderr=os.devnull):
            obs_table, obs_sequences = cluster_features_de_novo(
                sequences=self.input_sequences, table=self.input_table,
                perc_identity=0.97)
        # order of identifiers is important for biom.Table equality
        obs_table = \
            obs_table.sort_order(exp_table.ids(axis='observation'),
                                 axis='observation')
        self.assertEqual(obs_table, exp_table)

        # sequences are reverse-sorted by abundance in output
        obs_seqs = _read_seqs(obs_sequences)
        exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[1]]
        self.assertEqual(obs_seqs, exp_seqs)
项目:q2-vsearch    作者:qiime2    | 项目源码 | 文件源码
def test_uchime_denovo(self):
        with redirected_stdio(stderr=os.devnull):
            chime, nonchime, stats = uchime_denovo(
                sequences=self.input_sequences, table=self.input_table)

        obs_chime = _read_seqs(chime)
        exp_chime = [self.input_sequences_list[3]]
        self.assertEqual(obs_chime, exp_chime)

        # sequences are reverse-sorted by abundance in output
        obs_nonchime = _read_seqs(nonchime)
        exp_nonchime = [self.input_sequences_list[0],
                        self.input_sequences_list[1],
                        self.input_sequences_list[2]]
        self.assertEqual(obs_nonchime, exp_nonchime)

        with stats.open() as stats_fh:
            stats_text = stats_fh.read()
        self.assertTrue('feature1' in stats_text)
        self.assertTrue('feature2' in stats_text)
        self.assertTrue('feature3' in stats_text)
        self.assertTrue('feature4' in stats_text)
        stats_lines = [e for e in stats_text.split('\n')
                       if len(e) > 0]
        self.assertEqual(len(stats_lines), 4)
项目:q2-vsearch    作者:qiime2    | 项目源码 | 文件源码
def test_join_pairs_all_samples_w_no_joined_seqs(self):
        # minmergelen is set very high here, resulting in no sequences
        # being joined across the three samples.
        with redirected_stdio(stderr=os.devnull):
            obs = join_pairs(self.input_seqs, minmergelen=500)

        # manifest is as expected
        self._test_manifest(obs)

        # expected number of fastq files are created
        output_fastqs = list(obs.sequences.iter_views(FastqGzFormat))
        self.assertEqual(len(output_fastqs), 3)

        for fastq_name, fastq_path in output_fastqs:
            with redirected_stdio(stderr=os.devnull):
                seqs = skbio.io.read(str(fastq_path), format='fastq',
                                     compression='gzip', constructor=skbio.DNA)
            seqs = list(seqs)
            seq_lengths = np.asarray([len(s) for s in seqs])

            self.assertEqual(len(seq_lengths), 0)
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def get_parsed_args(self, comp_words):
        """ gets the parsed args from a patched parser """
        active_parsers = self._patch_argument_parser()

        parsed_args = argparse.Namespace()

        self.completing = True
        if USING_PYTHON2:
            # Python 2 argparse only properly works with byte strings.
            comp_words = [ensure_bytes(word) for word in comp_words]

        try:
            stderr = sys.stderr
            sys.stderr = io.open(os.devnull, "w")

            active_parsers[0].parse_known_args(comp_words, namespace=parsed_args)

            sys.stderr.close()
            sys.stderr = stderr
        except BaseException:
            pass

        self.completing = False
        return parsed_args
项目:GoToMeetingTools    作者:plongitudes    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:behelper    作者:istommao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:cloudml-samples    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_cloud_project():
  cmd = [
      'gcloud', '-q', 'config', 'list', 'project',
      '--format=value(core.project)'
  ]
  with open(os.devnull, 'w') as dev_null:
    try:
      res = subprocess.check_output(cmd, stderr=dev_null).strip()
      if not res:
        raise Exception('--cloud specified but no Google Cloud Platform '
                        'project found.\n'
                        'Please specify your project name with the --project '
                        'flag or set a default project: '
                        'gcloud config set project YOUR_PROJECT_NAME')
      return res
    except OSError as e:
      if e.errno == errno.ENOENT:
        raise Exception('gcloud is not installed. The Google Cloud SDK is '
                        'necessary to communicate with the Cloud ML service. '
                        'Please install and set up gcloud.')
      raise
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using ``sips``.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if ``sips`` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', str(size), str(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with %d' % retcode)
项目:ParlAI    作者:facebookresearch    | 项目源码 | 文件源码
def validate(opt, agent):
    old_datatype = agent.opt['datatype']
    agent.opt['datatype'] = 'valid'

    opt = deepcopy(opt)
    opt['datatype'] = 'valid'
    opt['terminate'] = True
    opt['batchsize'] = 1

    old_stdout = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    valid_world = create_task(opt, agent)
    sys.stdout = old_stdout

    for _ in valid_world:
        valid_world.parley()

    stats = valid_world.report()
    agent.opt['datatype'] = old_datatype
    return stats
项目:pytzdata    作者:sdispater    | 项目源码 | 文件源码
def build(self):
        self.line('[<comment>Building tzdata</>]')
        dest_path = os.path.join(self.path, 'tz')

        # Getting VERSION
        with open(os.path.join(dest_path, 'version')) as f:
            version = f.read().strip()

        self.write('<comment>Building</> version <fg=cyan>{}</>'.format(version))
        os.chdir(dest_path)

        with open(os.devnull, 'w') as temp:
            subprocess.call(
                ['make', 'TOPDIR={}'.format(dest_path), 'install'],
                stdout=temp,
                stderr=temp
            )

        self.overwrite('<info>Built</> version <fg=cyan>{}</>'.format(version))
        self.line('')
项目:alfred-workflows    作者:arthurhammer    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:alfred-zebra    作者:r0x73    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def TeeCmd(cmd, logfile, fail_hard=True):
  """Runs cmd and writes the output to both stdout and logfile."""
  # Reading from PIPE can deadlock if one buffer is full but we wait on a
  # different one.  To work around this, pipe the subprocess's stderr to
  # its stdout buffer and don't give it a stdin.
  # shell=True is required in cmd.exe since depot_tools has an svn.bat, and
  # bat files only work with shell=True set.
  proc = subprocess.Popen(cmd, bufsize=1, shell=sys.platform == 'win32',
                          stdin=open(os.devnull), stdout=subprocess.PIPE,
                          stderr=subprocess.STDOUT)
  for line in iter(proc.stdout.readline,''):
    Tee(line, logfile)
    if proc.poll() is not None:
      break
  exit_code = proc.wait()
  if exit_code != 0 and fail_hard:
    print 'Failed:', cmd
    sys.exit(1)
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def TeeCmd(cmd, logfile, fail_hard=True):
  """Runs cmd and writes the output to both stdout and logfile."""
  # Reading from PIPE can deadlock if one buffer is full but we wait on a
  # different one.  To work around this, pipe the subprocess's stderr to
  # its stdout buffer and don't give it a stdin.
  # shell=True is required in cmd.exe since depot_tools has an svn.bat, and
  # bat files only work with shell=True set.
  proc = subprocess.Popen(cmd, bufsize=1, shell=sys.platform == 'win32',
                          stdin=open(os.devnull), stdout=subprocess.PIPE,
                          stderr=subprocess.STDOUT)
  for line in iter(proc.stdout.readline,''):
    Tee(line, logfile)
    if proc.poll() is not None:
      break
  exit_code = proc.wait()
  if exit_code != 0 and fail_hard:
    print 'Failed:', cmd
    sys.exit(1)
项目:swiftbackmeup    作者:redhat-cip    | 项目源码 | 文件源码
def run(self, with_intermediate_file=False, cwd=None):
        """Method to run the backup command where it applies."""

        command = self.build_dump_command()

        if with_intermediate_file:
            try:
                backup_file_f = open('%s/%s' % (self.output_directory,
                                                self.backup_file), 'w')
            except IOError as exc:
                raise

            p = subprocess.Popen(command.split(), stdout=backup_file_f,
                                 env=self.env, cwd=cwd)
            p.wait()
            backup_file_f.flush()
        else:
            FNULL = open(os.devnull, 'w')
            p = subprocess.Popen(command.split(), env=self.env, cwd=cwd,
                                 stdout=FNULL, stderr=subprocess.STDOUT)
项目:swiftbackmeup    作者:redhat-cip    | 项目源码 | 文件源码
def restore(self, backup_filename,with_intermediate_file=False):
        """Method to restore the backup."""

        self.store.get(self.swift_container, backup_filename,
                       self.output_directory)
        command = self.build_restore_command(backup_filename)


        if with_intermediate_file:
            file_path = '%s/%s' % (self.output_directory, backup_filename)
            backup_file_content = open(file_path, 'r').read()

            p = subprocess.Popen(command.split(), stdin=subprocess.PIPE)
            p.communicate(backup_file_content)
        else:
            FNULL = open(os.devnull, 'w')
            p = subprocess.Popen(command.split(), stdout=FNULL,
                                 stderr=subprocess.STDOUT)
            p.wait()

        if self.clean_local_copy:
            self._clean_local_copy(backup_filename)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def dump_database(id):
    """Dump the database to a temporary directory."""

    tmp_dir = tempfile.mkdtemp()
    current_dir = os.getcwd()
    os.chdir(tmp_dir)

    FNULL = open(os.devnull, 'w')
    heroku_app = HerokuApp(dallinger_uid=id, output=FNULL)
    heroku_app.backup_capture()
    heroku_app.backup_download()

    for filename in os.listdir(tmp_dir):
        if filename.startswith("latest.dump"):
            os.rename(filename, "database.dump")

    os.chdir(current_dir)

    return os.path.join(tmp_dir, "database.dump")
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def setUp(self):
        super(PostArgParseSetupTest, self).setUp()
        self.config.debug = False
        self.config.max_log_backups = 1000
        self.config.quiet = False
        self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count']
        self.devnull = open(os.devnull, 'w')

        from certbot.log import ColoredStreamHandler
        self.stream_handler = ColoredStreamHandler(six.StringIO())
        from certbot.log import MemoryHandler, TempHandler
        self.temp_handler = TempHandler()
        self.temp_path = self.temp_handler.path
        self.memory_handler = MemoryHandler(self.temp_handler)
        self.root_logger = mock.MagicMock(
            handlers=[self.memory_handler, self.stream_handler])