Python subprocess 模块,STDOUT 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.STDOUT

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def service_running(service_name):
    """Determine whether a system service is running"""
    if init_is_systemd():
        return service('is-active', service_name)
    else:
        try:
            output = subprocess.check_output(
                ['service', service_name, 'status'],
                stderr=subprocess.STDOUT).decode('UTF-8')
        except subprocess.CalledProcessError:
            return False
        else:
            # This works for upstart scripts where the 'service' command
            # returns a consistent string to represent running 'start/running'
            if ("start/running" in output or "is running" in output or
                    "up and running" in output):
                return True
            # Check System V scripts init script return codes
            if service_name in systemv_services_running():
                return True
            return False
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:py_find_1st    作者:roebel    | 项目源码 | 文件源码
def compiler_is_clang(comp) :
    print("check for clang compiler ...", end=' ')
    try:
        cc_output = subprocess.check_output(comp+['--version'],
                                            stderr = subprocess.STDOUT, shell=False)
    except OSError as ex:
        print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror))
        print("no")
        return False

    ret = re.search(b'clang', cc_output) is not None
    if ret :
        print("yes")
    else:
        print("no")
    return ret
项目:docklet    作者:unias    | 项目源码 | 文件源码
def del_addr(linkname, address):
        try:
            subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')]


# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_bcl2fastq_v2(hostname):
    try:
        subprocess.check_call(["which", "bcl2fastq"])
        # Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x.
        # Required for some installations of bcl2fastq.
        new_environ = dict(os.environ)
        new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '')
        output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT)
        match = None
        for l in output.split("\n"):
            match = re.match("bcl2fastq v([0-9.]+)", l)
            if match is not None:
                return (match.groups()[0], None)

        return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version")
    except subprocess.CalledProcessError:
        msg = "On machine: %s, bcl2fastq not found on PATH." % hostname
        return (None, msg)
项目:wpw-sdk-python    作者:WPTechInnovation    | 项目源码 | 文件源码
def launch(self, cfg, path, flags):
        logging.debug("Determine the OS and Architecture this application is currently running on")
        hostOS = platform.system().lower()
        logging.debug("hostOS: " + str(hostOS))
        is_64bits = sys.maxsize > 2 ** 32
        if is_64bits:
            hostArchitecture = 'x64'
        else:
            hostArchitecture = 'ia32'
        logging.debug("hostArchitecture: " + str(hostArchitecture))
        if(self.validateConfig(cfg, hostOS, hostArchitecture)):
            fnull = open(os.devnull, 'w')
            if os.environ.get("WPW_HOME") is not None:
                cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            else:
                cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            cmd.extend(flags)
            proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
            return proc
        else:
            logging.debug("Invalid OS/Architecture combination detected")
项目:games_nebula    作者:yancharkin    | 项目源码 | 文件源码
def win64_available(self):

        wine_bin, \
        wineserver_bin, \
        wine_lib = self.get_wine_bin_path()

        dev_null = open(os.devnull, 'w')
        try:
            proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
                stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            dev_null.close()
            stdoutdata, stderrdata = proc.communicate()
            if proc.returncode == 1:
                self.combobox_winearch.set_visible(True)
                return True
            else:
                self.combobox_winearch.set_visible(False)
                self.winearch = 'win32'
                return False
        except:
            self.combobox_winearch.set_visible(False)
            self.winearch = 'win32'
            return False
项目:err-fabric    作者:netquity    | 项目源码 | 文件源码
def execute_task(host, tasks):
        """Call Fabric to execute tasks against a host

        Return: CompletedProcess instance
        """
        # TODO: add support for groups, multiple hosts
        # TODO: add support for providing input data from team files
        return subprocess.run(
            [
                PYTHON2_PATH,
                FABRIC_PATH,
                '--abort-on-prompts',
                '--hosts=%s' % host,
                '--fabfile=%s' % FABFILE_PATH,
                *tasks,
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # Combine out/err into stdout; stderr will be None
            universal_newlines=True,
            check=True,
        )
项目:PimuxBot    作者:Finn10111    | 项目源码 | 文件源码
def index():
    if request.args.get('code'):
        unlock_code = request.args.get('code')
        # unlock, new password
        re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
        if re:
            jid = re.jid
            re.password_code = None
            s.merge(re)
            s.commit()
            # set new password and send email
            email_address = re.email
            password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
            p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
            p.communicate(args)
            sendMail(email_address, 'new password', password)
            content = render_template('success.html', message='password was sent')
        else:
            content = render_template('error.html', message='link invalid')
    else:
        content = render_template('index.html')
    return content
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def run_app(please_stop, server_is_ready):
    proc = subprocess.Popen(
        ["python", "active_data\\app.py", "--settings", "tests/config/elasticsearch.json"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        bufsize=-1
        #creationflags=CREATE_NEW_PROCESS_GROUP
    )

    while not please_stop:
        line = proc.stdout.readline()
        if not line:
            continue
        if line.find(" * Running on") >= 0:
            server_is_ready.go()
        Log.note("SERVER: {{line}}", {"line": line.strip()})

    proc.send_signal(signal.CTRL_C_EVENT)


# read_alternate_settings
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
项目:dotbot-yaourt    作者:niklas-heer    | 项目源码 | 文件源码
def _install(self, pkg):
        # to have a unified string which we can query
        # we need to execute the command with LANG=en_US.UTF-8
        cmd = 'LANG=en_US.UTF-8 yaourt --needed --noconfirm -S {}'.format(pkg)

        self._log.info("Installing \"{}\". Please wait...".format(pkg))

        # needed to avoid conflicts due to locking
        time.sleep(1)

        proc = subprocess.Popen(cmd, shell=True,
                stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        out = proc.stdout.read()
        proc.stdout.close()

        for item in self._strings.keys():
            if out.decode("utf-8").find(self._strings[item]) >= 0:
                return item

        self._log.warning("Could not determine what happened with package {}".format(pkg))
        return PkgStatus.NOT_SURE
项目:detox    作者:tox-dev    | 项目源码 | 文件源码
def invoke(command, success_codes=(0,)):
    try:
        output = subprocess.check_output(command, stderr=subprocess.STDOUT)
        status = 0
    except subprocess.CalledProcessError as error:
        output = error.output
        status = error.returncode
    output = output.decode('utf-8')
    if status not in success_codes:
        raise Exception(
            'Command %r return exit code %d and output: """%s""".' % (
                command,
                status,
                output,
            )
        )
    return status, output
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def get_verifier_id():
    """
    Returns verifier id for current Tempest
    """
    create_rally_deployment()
    create_verifier()
    cmd = ("rally verify list-verifiers | awk '/" +
           CONST.__getattribute__('tempest_verifier_name') +
           "/ {print $2}'")
    p = subprocess.Popen(cmd, shell=True,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    deployment_uuid = p.stdout.readline().rstrip()
    if deployment_uuid == "":
        logger.error("Tempest verifier not found.")
        raise Exception('Error with command:%s' % cmd)
    return deployment_uuid
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def run_defcore_default(self):
        """Run default defcore sys command."""
        options = ["-v"] if not self.insecure else ["-v", self.insecure]
        cmd = (["refstack-client", "test", "-c", self.confpath] +
               options + ["--test-list", self.defcorelist])
        LOGGER.info("Starting Refstack_defcore test case: '%s'.", cmd)

        with open(os.path.join(conf_utils.REFSTACK_RESULTS_DIR,
                               "environment.log"), 'w+') as f_env:
            f_env.write(
                ("Refstack environment:\n"
                 "  SUT: {}\n  Scenario: {}\n  Node: {}\n  Date: {}\n").format(
                    CONST.__getattribute__('INSTALLER_TYPE'),
                    CONST.__getattribute__('DEPLOY_SCENARIO'),
                    CONST.__getattribute__('NODE_NAME'),
                    time.strftime("%a %b %d %H:%M:%S %Z %Y")))

        with open(os.path.join(conf_utils.REFSTACK_RESULTS_DIR,
                               "refstack.log"), 'w+') as f_stdout:
            subprocess.call(cmd, shell=False, stdout=f_stdout,
                            stderr=subprocess.STDOUT)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def is_crm_leader(resource, retry=False):
    """
    Returns True if the charm calling this is the elected corosync leader,
    as returned by calling the external "crm" command.

    We allow this operation to be retried to avoid the possibility of getting a
    false negative. See LP #1396246 for more info.
    """
    if resource == DC_RESOURCE_NAME:
        return is_crm_dc()
    cmd = ['crm', 'resource', 'show', resource]
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError:
        status = None

    if status and get_unit_hostname() in status:
        return True

    if status and "resource %s is NOT running" % (resource) in status:
        raise CRMResourceNotFound("CRM resource %s not found" % (resource))

    return False
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def popen4(cmd, mode="t", bufsize=-1):
            """Execute the shell command 'cmd' in a sub-process.  On UNIX, 'cmd'
            may be a sequence, in which case arguments will be passed directly to
            the program without shell intervention (as with os.spawnv()).  If 'cmd'
            is a string it will be passed to the shell (as with os.system()). If
            'bufsize' is specified, it sets the buffer size for the I/O pipes.  The
            file objects (child_stdin, child_stdout_stderr) are returned."""
            import warnings
            msg = "os.popen4 is deprecated.  Use the subprocess module."
            warnings.warn(msg, DeprecationWarning, stacklevel=2)

            import subprocess
            PIPE = subprocess.PIPE
            p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
                                 bufsize=bufsize, stdin=PIPE, stdout=PIPE,
                                 stderr=subprocess.STDOUT, close_fds=True)
            return p.stdin, p.stdout
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def is_crm_dc():
    """
    Determine leadership by querying the pacemaker Designated Controller
    """
    cmd = ['crm', 'status']
    try:
        status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if not isinstance(status, six.text_type):
            status = six.text_type(status, "utf-8")
    except subprocess.CalledProcessError as ex:
        raise CRMDCNotFound(str(ex))

    current_dc = ''
    for line in status.split('\n'):
        if line.startswith('Current DC'):
            # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum
            current_dc = line.split(':')[1].split()[0]
    if current_dc == get_unit_hostname():
        return True
    elif current_dc == 'NONE':
        raise CRMDCNotFound('Current DC: NONE')

    return False
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def start_cfssl(cmdline=""):
    global cfsslproc
    cmd = "cfssl serve -loglevel=1 %s "%cmdline
    env = os.environ.copy()
    env['PATH']=env['PATH']+":/usr/local/bin"

    # make sure cfssl isn't running
    os.system('pkill -f cfssl')

    cfsslproc = subprocess.Popen(cmd,env=env,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True)
    if cfsslproc.returncode is not None:
        raise Exception("Unable to launch %: failed with code "%(cmd,cfsslproc.returncode))

    logger.debug("Waiting for cfssl to start...")
    while True:
        line = cfsslproc.stdout.readline()
        if "Now listening on" in line:
            break
    time.sleep(0.2)# give cfssl a little more time to get started
    logger.debug("cfssl started successfully")
项目:krait    作者:lmdu    | 项目源码 | 文件源码
def designPrimers(p3_args, input_log=None, output_log=None, err_log=None):
    ''' Return the raw primer3_core output for the provided primer3 args.

    Returns an ordered dict of the boulderIO-format primer3 output file
    '''
    sp = subprocess.Popen([pjoin(PRIMER3_HOME, 'primer3_core')],
                          stdout=subprocess.PIPE, stdin=subprocess.PIPE,
                          stderr=subprocess.STDOUT)
    p3_args.setdefault('PRIMER_THERMODYNAMIC_PARAMETERS_PATH',
                       pjoin(PRIMER3_HOME, 'primer3_config/'))
    in_str = _formatBoulderIO(p3_args)
    if input_log:
        input_log.write(in_str)
        input_log.flush()
    out_str, err_str = sp.communicate(input=in_str)
    if output_log:
        output_log.write(out_str)
        output_log.flush()
    if err_log and err_str is not None:
        err_log.write(err_str)
        err_log.flush()
    return _parseBoulderIO(out_str)
项目:wib    作者:chengsoonong    | 项目源码 | 文件源码
def call(self, args, devnull=False):
        """Call other processes.
        args - list of command args
        devnull - whether to pipe stdout to /dev/null (or equivalent)
        """
        if self.debug:
            click.echo(subprocess.list2cmdline(args))
            click.confirm('Continue?', default=True, abort=True)
        try:
            kwargs = {}
            if devnull:
                # Pipe to /dev/null (or equivalent).
                kwargs['stderr'] = subprocess.STDOUT
                kwargs['stdout'] = self.FNULL
            ret_code = subprocess.call(args, **kwargs)
        except subprocess.CalledProcessError:
            return False
        return ret_code
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def _start_from_profile_path(self, path):
        self._firefox_env["XRE_PROFILE_PATH"] = path

        if platform.system().lower() == 'linux':
            self._modify_link_library_path()
        command = [self._start_cmd, "-silent"]
        if self.command_line is not None:
            for cli in self.command_line:
                command.append(cli)

# The following exists upstream and is known to create hanging
# firefoxes, leading to zombies.
#        subprocess.Popen(command, stdout=self._log_file,
#              stderr=subprocess.STDOUT,
#              env=self._firefox_env).communicate()
        command[1] = '-foreground'
        self.process = subprocess.Popen(
            command, stdout=self._log_file, stderr=subprocess.STDOUT,
            env=self._firefox_env)
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def __init__(self, project, arguments=None, stdout="file", stdin=None, timeout=10.0, name=None):
        if not name:
            name = "process:%s" % basename(arguments[0])
        ProjectAgent.__init__(self, project, name)
        self.env = Environment(self)
        if arguments is None:
            arguments = []
        self.cmdline = CommandLine(self, arguments)
        self.timeout = timeout
        self.max_memory = 100*1024*1024
        self.stdout = stdout
        self.popen_args = {
            'stderr': STDOUT,
        }
        if stdin is not None:
            if stdin == "null":
                self.popen_args['stdin'] = open('/dev/null', 'r')
            else:
                raise ValueError("Invalid stdin value: %r" % stdin)
项目:graphql-python-subscriptions    作者:hballard    | 项目源码 | 文件源码
def test_should_trigger_on_connect_if_client_connect_valid(server_with_mocks):
    node_script = '''
        module.paths.push('{0}')
        WebSocket = require('ws')
        const SubscriptionClient =
        require('subscriptions-transport-ws').SubscriptionClient
        new SubscriptionClient('ws://localhost:{1}/socket')
    '''.format(
        os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
    try:
        subprocess.check_output(
            ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2)
    except:
        mock = server_with_mocks.get_nowait()
        assert mock.name == 'on_connect'
        mock.assert_called_once()
项目:graphql-python-subscriptions    作者:hballard    | 项目源码 | 文件源码
def test_should_trigger_on_connect_with_correct_cxn_params(server_with_mocks):
    node_script = '''
        module.paths.push('{0}')
        WebSocket = require('ws')
        const SubscriptionClient =
        require('subscriptions-transport-ws').SubscriptionClient
        const connectionParams = {{test: true}}
        new SubscriptionClient('ws://localhost:{1}/socket', {{
        connectionParams,
        }})
    '''.format(
        os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
    try:
        subprocess.check_output(
            ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2)
    except:
        mock = server_with_mocks.get_nowait()
        assert mock.name == 'on_connect'
        mock.assert_called_once()
        mock.assert_called_with({'test': True})
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def interface_exists(interface):
    '''
    Checks if interface exists on node.
    '''
    try:
        subprocess.check_call(['ip', 'link', 'show', interface],
                              stdout=open(os.devnull, 'w'),
                              stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError:
        return False
    return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def systemv_services_running():
    output = subprocess.check_output(
        ['service', '--status-all'],
        stderr=subprocess.STDOUT).decode('UTF-8')
    return [row.split()[-1] for row in output.split('\n') if '[ + ]' in row]
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def service_available(service_name):
    """Determine whether a system service is available"""
    try:
        subprocess.check_output(
            ['service', service_name, 'status'],
            stderr=subprocess.STDOUT).decode('UTF-8')
    except subprocess.CalledProcessError as e:
        return b'unrecognized service' not in e.output
    else:
        return True
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def _call_security(self, action, service, account, *args):
        """Call ``security`` CLI program that provides access to keychains.

        May raise `PasswordNotFound`, `PasswordExists` or `KeychainError`
        exceptions (the first two are subclasses of `KeychainError`).

        :param action: The ``security`` action to call, e.g.
                           ``add-generic-password``
        :type action: ``unicode``
        :param service: Name of the service.
        :type service: ``unicode``
        :param account: name of the account the password is for, e.g.
            "Pinboard"
        :type account: ``unicode``
        :param password: the password to secure
        :type password: ``unicode``
        :param *args: list of command line arguments to be passed to
                      ``security``
        :type *args: `list` or `tuple`
        :returns: ``(retcode, output)``. ``retcode`` is an `int`, ``output`` a
                  ``unicode`` string.
        :rtype: `tuple` (`int`, ``unicode``)

        """
        cmd = ['security', action, '-s', service, '-a', account] + list(args)
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        stdout, _ = p.communicate()
        if p.returncode == 44:  # password does not exist
            raise PasswordNotFound()
        elif p.returncode == 45:  # password already exists
            raise PasswordExists()
        elif p.returncode > 0:
            err = KeychainError('Unknown Keychain error : %s' % stdout)
            err.retcode = p.returncode
            raise err
        return stdout.strip().decode('utf-8')
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def run_script(self, script):
        fd, self.tmp_script_filename = tempfile.mkstemp()
        os.close(fd)
        f = open(self.tmp_script_filename, 'w')
        f.write(script)
        f.close()
        os.chmod(self.tmp_script_filename, 0o744)
        p = subprocess.Popen(
            self.tmp_script_filename, shell=True, stdin=subprocess.PIPE,
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
        self.process = p
        self.stream = p.stdout
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def scrapeDoi(url):
    env = os.environ.copy()
    cmd_line = ['timeout', '30s', 'google-chrome-unstable', '--headless', '--dump-dom', url]
    p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                         env=env)
    out, err = p.communicate()
    if p.returncode:
        print('UTOH')
        return None
    elif b'ERROR:headless_shell.cc' in out:
        print(out)
        raise IOError('Something is wrong...')
    qurl = quote(url, '')
    if len(qurl) > 200:
        qurl = qurl[:200]
    with open(os.path.expanduser(f'~/files/scibot/{qurl}'), 'wb') as f:
        f.write(out)
    both = BeautifulSoup(out, 'lxml')
    doi = getDoi(both, both)
    return doi
项目:docklet    作者:unias    | 项目源码 | 文件源码
def sys_run(command,check=False):
    Ret = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell=True, check=check)
    return Ret
项目:docklet    作者:unias    | 项目源码 | 文件源码
def list_links():
        try:
            ret = subprocess.run(['ip', 'link', 'show'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            links = ipcontrol.parse(ret.stdout.decode('utf-8'))
            return [True, list(links.keys())]
        except subprocess.CalledProcessError as suberror:
            return [False, "list links failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def link_exist(linkname):
        try:
            subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return True
        except subprocess.CalledProcessError:
            return False
项目:docklet    作者:unias    | 项目源码 | 文件源码
def link_info(linkname):
        try:
            ret = subprocess.run(['ip', 'address', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]]
        except subprocess.CalledProcessError as suberror:
            return [False, "get link info failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def link_state(linkname):
        try:
            ret = subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]['state']]
        except subprocess.CalledProcessError as suberror:
            return [False, "get link state failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def down_link(linkname):
        try:
            subprocess.run(['ip', 'link', 'set', 'dev', str(linkname), 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "set link down failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def add_addr(linkname, address):
        try:
            subprocess.run(['ip', 'address', 'add', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "add address failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def list_bridges():
        try:
            ret = subprocess.run(['ovs-vsctl', 'list-br'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, ret.stdout.decode('utf-8').split()]
        except subprocess.CalledProcessError as suberror:
            return [False, "list bridges failed : %s" % suberror.stdout.decode('utf-8')]