Python pexpect 模块,run() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pexpect.run()

项目:kernda    作者:maxpoint    | 项目源码 | 文件源码
def kernel():
    """Create a ipykernel conda environment separate from this test
    environment where jupyter console is installed.

    The environments must be separate otherwise we cannot easily check
    if kernel start is activating the environment or if it was already
    active when the test suite started.
    """
    # unique name for the kernel and environment
    name = str(uuid4())
    env_path = '{}/kernel-env-{name}'.format(gettempdir(), name=name)
    pexpect.run('/bin/bash -c "conda create -y -p {env_path} ipykernel && \
                source activate {env_path} && \
                python -m ipykernel install --user \
                --name {name}"'.format(env_path=env_path, name=name))
    # query jupyter for the user data directory in a separate command to
    # make parsing easier
    stdout = pexpect.run('jupyter --data-dir')
    user_path = stdout.decode('utf-8').strip()
    # the kernel spec resides in the jupyter user data path
    spec_path = os.path.join(user_path, 'kernels', name)
    yield Kernel(name, os.path.join(spec_path, 'kernel.json'), env_path)
    shutil.rmtree(env_path)
项目:envoy-perf    作者:envoyproxy    | 项目源码 | 文件源码
def GetGcloud(args, project=None, service=None):
  """Get gcloud command with arguments.

  Functionalities might be expanded later to run gcloud commands.
  Args:
    args: command with arguments as an array
    project: the project on which the glcoud compute will work
    service: the service on gcloud that you want to use. default: compute
  Returns:
    returns thr formatted command for gcloud compute
  """
  command = ["gcloud"]
  if service:
    command.append(service)

  if project:
    command.extend(["--project", project])

  command.extend(args)
  return command


# TODO(sohamcodes): pexpect.spawn can be changed to subprocess call
# However, timeout for subprocess is available only on python3
# So, we can implement it later.
项目:envoy-perf    作者:envoyproxy    | 项目源码 | 文件源码
def RunCommand(args, timeout=None, logfile=None):
  """Runs a given command through pexpect.run.

  This function acts as a wrapper over pxpect.run . You can have exception or
  return values based on the exitstatus of the command execution. If exitstatus
   is not zero, then it will return -1, unless you want RuntimeError. If there
  is TIMEOUT, then exception is raised. If events do not match, command's
  output is printed, and -1 is returned.
  Args:
    args: command with arguments as an array
    timeout: timeout for pexpect.run .
    logfile: an opened filestream to write the output
  Raises:
    RuntimeError: Command's exit status is not zero
  Returns:
    Returns -1, if bad exitstatus is not zero and when events do not match
    Otherwise returns 0, if everything is fine
  """
  child = pexpect.spawn(args[0], args=args[1:], timeout=timeout,
                        logfile=logfile)
  child.expect(pexpect.EOF)
  child.close()
  if child.exitstatus:
    print args
    raise RuntimeError(("Error: {}\nProblem running command. "
                        "Exit status: {}").format(child.before,
                                                  child.exitstatus))
  return 0
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def get_docker_volume_list(*expected_volumes):
        '''Get the output from "docker volume ls" for specified volumes.'''

        volume_listing_pattern = (
            r'(?P<driver>\S+)\s+'
            r'(?P<name>\S+)'
            # r'\s*$'
            )
        volume_listing_re = re.compile(volume_listing_pattern)

        docker_volumes_response = pexpect.run('docker volume ls')

        volume_list = []

        for line in docker_volumes_response.split('\n'):
            match = volume_listing_re.match(line)
            if match:
                volume_list.append(match.groupdict())

        return volume_list

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def getoutput(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def getoutput_pexpect(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:dynamic-firmware-analysis    作者:secjey    | 项目源码 | 文件源码
def get_architecture(firmware_id):
    """Gets the architecture of the given firmware image.""" 
    print(bcolors.OKBLUE + "[-] Getting the firmware architecture..." + bcolors.ENDC)
    command = GETARCH_COMMAND.format(FIRMADYNE_PATH, OUTPUT_DIR, firmware_id)
    print(bcolors.ITALIC + command + bcolors.ENDC)
    output = pexpect.run(command, events={'Password for user {}:'.format(USER):PASSWORD + '\n'})
    # extract the architecture info from the output
    arch = ""
    try:
        arch = output.split('\n')[0].split(':')[1]
    except:
        print(bcolors.FAIL + "[!] The firmware architecture couldn't be determined..." + bcolors.ENDC)
        print(bcolors.ITALIC + "[!] Please try manually with the file command and provide the correct architecture type with the --arch parameter..." + bcolors.ENDC)
    else:
        print(bcolors.OKGREEN + "[+] The architecture of your firmware image is:" + arch + bcolors.ENDC)
    return arch
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def exec2():
    child.expect('#')
    if subprocess.getstatusoutput('id root >> /dev/null 2&1  && echo $?') != 0:
        __newpasswd = 'edong&1310'
        subprocess.getstatusoutput('useradd zach')
        run('passwd zach',events={'(?i)password:':__newpasswd})
        #TODO run EQUAL TO FOLLOW COMMIT!
        '''
        child.expect('password:')
        child.sendline()
        child.expect('password:')
        child.sendline(__newpasswd)
        '''
        child.expect('#')
        child.sendline('su - zach')
    child.expect('$')
    child.sendline('whomai')
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def getoutput(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def getoutput_pexpect(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:blender    作者:gastrodia    | 项目源码 | 文件源码
def getoutput(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:blender    作者:gastrodia    | 项目源码 | 文件源码
def getoutput_pexpect(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def getoutput(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def getoutput_pexpect(self, cmd):
        """Run a command and return its stdout/stderr as a string.

        Parameters
        ----------
        cmd : str
          A command to be executed in the system shell.

        Returns
        -------
        output : str
          A string containing the combination of stdout and stderr from the
        subprocess, in whatever order the subprocess originally wrote to its
        file descriptors (so the order of the information in this string is the
        correct order as would be seen if running the command in a terminal).
        """
        try:
            return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
        except KeyboardInterrupt:
            print('^C', file=sys.stderr, end='')
项目:fluxgui    作者:xflux-gui    | 项目源码 | 文件源码
def _start(self, startup_args=None):
        if not startup_args:
            startup_args = self._create_startup_arg_list(self._current_color,
                **self.init_kwargs)
        try:
            previous_instances = pexpect.run('pgrep -d, -u %s xflux' % pexpect.run('whoami')).strip()
            if previous_instances != "":
                for process in previous_instances.split(","):
                    pexpect.run('kill -9 %s' % process)

            self._xflux = pexpect.spawn("xflux", startup_args)
                    #logfile=file("tmp/xfluxout.txt",'w'))

        except pexpect.ExceptionPexpect:
            raise FileNotFoundError(
                    "\nError: Please install xflux in the PATH \n")
项目:emscripten-docker    作者:apiaryio    | 项目源码 | 文件源码
def run(step, cmd, expect, bail, timeout=10):
    print_local_step(step)
    res = pexpect.run(cmd, timeout=timeout).decode('utf-8')
    log(res)
    res = ansi.sub('', res)
    if expect not in res:
        bail_out(bail, res)

    return res
项目:kernda    作者:maxpoint    | 项目源码 | 文件源码
def test_original_spec(kernel):
    """The kernel should output a conda path in the test suite environment.

    More of a test of the complicated logic in the test fixture than
    anything, but it ensures a a good test baseline.
    """
    stdout = pexpect.run('which conda')
    conda_path = stdout.decode('utf-8').strip()
    assert conda_path.startswith(sys.prefix)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def _stop_tunnel(cmd):
    pexpect.run(cmd)
项目:envoy-perf    作者:envoyproxy    | 项目源码 | 文件源码
def RunGCloudService(args, project, service, logfile=None):
  """This function runs a gcloud `service` command.

  Args:
    args: command with arguments as an array
    project: the project in which the remotehost belongs to
    service: the service user wants to run on gcloud
    logfile: an opened filestream to write log
  Returns:
    Returns the return value of RunCommand
  """
  return RunCommand(GetGcloud(args, project=project,
                              service=service), logfile=logfile)
项目:envoy-perf    作者:envoyproxy    | 项目源码 | 文件源码
def TryFunctionWithTimeout(func, error_handler, num_tries,
                           sleep_between_attempt_secs, *args, **kwargs):
  """The function tries to run a function without any exception.

  The function tries for a certain number of tries. If it cannot succeed,
  it raises BenchmarkError.
  Args:
    func: the function to try running without exception
    error_handler: the exception that the function should catch and keep trying.
    num_tries: number of tries it should make before raising the final
    exception
    sleep_between_attempt_secs: number of seconds to sleep between each retry
    *args: arguments to the function
    **kwargs: named arguments to the function
  Raises:
    BenchmarkError: When all tries are failed.
  """
  count = num_tries
  while count > 0:
    try:
      count -= 1
      ret_val = func(*args, **kwargs)
      if not ret_val:
        return
      else:
        print ret_val
    except error_handler as e:
      print e
    print ("Problem running function, {}. Trying again after"
           " {}s. Total tries left: {}.").format(func,
                                                 sleep_between_attempt_secs,
                                                 count)
    time.sleep(sleep_between_attempt_secs)
  raise BenchmarkError("All tries failed.")
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def tearDown(self):
        '''Test case common fixture teardown.'''

        logger.info("======= tearDown: %s", self.delete_artifacts)

        self.log_docker_constructs()

        new_containers = self.new_constructs('container')
        new_images = self.new_constructs('image')
        new_volumes = self.new_constructs('volume')

        if self.delete_artifacts:
            # Every case should clean up its docker images and containers.

            for container in new_containers:
                # These should have been launched with the --rm flag,
                # so they should be removed once stopped.
                logger.info("REMOVING %s", container['id'])
                pexpect.run('docker stop {}'.format(container['id']))

            for image in new_images:
                logger.info("REMOVING %s", image['id'])
                pexpect.run('docker rmi {}'.format(image['id']))

            for volume in new_volumes:
                logger.info("REMOVING %s", volume['name'])
                pexpect.run('docker volume rm {}'.format(volume['name']))

        else:
            # We'll leave behind any new docker constructs, so we need
            # to update the "original docker volumes".
            self.constructs['current']['container'].extend(new_containers)
            self.constructs['current']['image'].extend(new_images)
            self.constructs['current']['volume'].extend(new_volumes)

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def get_docker_image_list(*expected_images):
        '''Get the output from "docker image ls" for specified image names.'''

        image_listing_pattern = (
            r'(?P<name>[^\s]+)\s+'
            r'(?P<tag>[^\s]+)\s+'
            r'(?P<id>[0-9a-f]+)\s+'
            r'(?P<created>.+ago)\s+'
            r'(?P<size>[^\s]+)'
            r'\s*$'
            )
        image_listing_re = re.compile(image_listing_pattern)

        docker_images_response = pexpect.run('docker image ls')

        image_list = []
        expected_image_nametag_pairs = [
            (x.split(':') + ['latest'])[0:2] for x in expected_images
            ] if expected_images else None

        for line in docker_images_response.split('\n'):
            match = image_listing_re.match(line)
            if (
                    match and (
                        not expected_images or [
                            match.groupdict()['name'], match.groupdict()['tag']
                            ] in expected_image_nametag_pairs
                        )
                    ):
                image_list.append(match.groupdict())

        return image_list

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def get_docker_container_list(*expected_containers):
        '''Get the output from "docker ps -a" for specified container names.'''

        container_listing_pattern = (
            r'(?P<id>[0-9a-f]+)\s+'
            r'(?P<image>[^\s]+)\s+'
            r'(?P<command>"[^"]+")\s+'
            r'(?P<created>.+ago)\s+'
            r'(?P<status>(Created|Exited.*ago|Up \d+ \S+))\s+'
            r'(?P<ports>[^\s]+)?\s+'
            r'(?P<name>[a-z]+_[a-z]+)'
            # r'\s*$'
            )
        container_listing_re = re.compile(container_listing_pattern)

        docker_containers_response = pexpect.run('docker ps -a')

        container_list = []
        # expected_container_nametag_pairs = [
        #     (x.split(':') + ['latest'])[0:2] for x in expected_containers
        #     ] if expected_containers else []

        for line in docker_containers_response.split('\n'):
            match = container_listing_re.match(line)
            if match:
                container_list.append(match.groupdict())

        return container_list

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def get_container_details(self, image):
        '''Run a docker container shell and retrieve several details.'''

        # (detail name, command, result filter) for extracting details
        # from container command lines.
        shell_commands = (
            ('pwd', 'pwd', None),
            ('phantomjs_path', 'which phantomjs', None),
            ('phantomjs_perms', 'ls -l $(which phantomjs)', lambda x: x[1:10]),
            ('config_file', 'ls {}'.format(CONTAINER_CONFIG_PATH), None),
            ('config_contents', 'cat {}'.format(CONTAINER_CONFIG_PATH), None),
            )

        command = "docker run --rm -it {} bash".format(image)
        logger.info('IMAGE: %s', image)
        logger.info('CONTAINER LAUNCH COMMAND: %s', command)
        spawn = pexpect.spawn(command)

        container_details = {}

        for field, shell_command, response_filter in shell_commands:
            container_details[field] = interact(
                spawn, shell_command, response_filter
                )

        # Exit the container.
        spawn.sendcontrol('d')

        # "Expand" the config records if we found a config file.
        if container_details['config_file'] == CONTAINER_CONFIG_PATH:
            try:
                exec(container_details['config_contents'], container_details)
            except SyntaxError:
                pass
            # The '__builtins__' are noise:
            if '__builtins__' in container_details:
                del container_details['__builtins__']

        return container_details

    # - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def test_make_aardvark_sqlite_no_images_tag(self):
        '''Test "make aardvark-sqlite" without specifying the images tag.'''

        self.case_worker(
            target='aardvark-sqlite',
            expected_docker_images=[
                'aardvark-base',
                'aardvark-data-init',
                'aardvark-collector',
                'aardvark-apiserver',
                ],
            expected_details={
                '_common': {
                    'pwd': '/usr/share/aardvark-data',
                    'NUM_THREADS': 5,
                    'PHANTOMJS': EXPECTED_PHANTOMJS_PATH,
                    'ROLENAME': 'Aardvark',
                    'SQLALCHEMY_DATABASE_URI': EXPECTED_SQLITE_DB_URI,
                    'SQLALCHEMY_TRACK_MODIFICATIONS': EXPECTED_SQL_TRACK_MODS,
                    },
                'aardvark-base': {
                    'pwd': '/etc/aardvark',
                    },
                },
            expected_artifacts=[
                'aardvark-base-docker-build',
                'aardvark-data-docker-build',
                'aardvark-data-docker-run',
                'aardvark-apiserver-docker-build',
                'aardvark-collector-docker-build',
                ],
            set_images_tag=False
            )


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Define test suites.
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:Skynet2.0    作者:Skynet2-0    | 项目源码 | 文件源码
def __init__(self):
        """ Constructor. """
        output = pexpect.run('electrum listaddresses')
        print(output)
        pattern = re.compile(r'\[\W*"[A-z0-9]+"\W*\]') #the specific output for electrum if 1 adress exists

        print(pattern.search(output))

        if(pattern.search(output)):
            #if a wallet exists, initialize that one
            print('using already existing wallet')
        else:
            self._create_wallet()
        subprocess.call(['electrum', 'daemon', 'start'])
项目:Skynet2.0    作者:Skynet2-0    | 项目源码 | 文件源码
def _create_wallet(self):
        print('did not find an existing wallet, creating a new one')
        #ensure the daemon is stopped, as this causes path errors (mostly usefull for development)
        pexpect.run('electrum daemon stop')
        #build a new wallet if no wallet yet exists
        walletpair=str(subprocess.check_output('python addrgen/addrgen.py',shell=True))
        walletpair = re.split('\W+', walletpair)

        self.address = walletpair[1]
        self.privkey = walletpair[2]
        print('created a wallet with address \''+self.address+'\' and privatekey \''+self.privkey+'\'')
        child = pexpect.spawn('electrum', ['restore', self.privkey])
        #respectively: use default password, use default fee (0.002), use default gap limit and give seed
        self._answer_prompt(child, '')

        #check if wallet was created succesfulyl
        command = """electrum listaddresses"""
        output = pexpect.run(command)

        walletFinder = re.compile(r'\[\W*"([A-z0-9]+)"\W*\]')

        result = walletFinder.search(output)

        #This horrible feedback loop is here due to a quirk of electrum.
        #Needs refactoring, but do not refactor without extensive testing (i.e. multiple vps all from clean install)
        #Because electrum behaviour right after startup tends to differ from server to server (i suspect something to do wtih specs)
        try:
            print result.group(1)
            return result.group(1)
        except:
            return self._create_wallet()



    # def __del__(self):
    #     '''
    #     clear up the electrum service
    #     '''
    #     subprocess.call(['electrum', 'daemon', 'stop'])
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _stop_tunnel(cmd):
    pexpect.run(cmd)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _stop_tunnel(cmd):
    pexpect.run(cmd)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _stop_tunnel(cmd):
    pexpect.run(cmd)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _stop_tunnel(cmd):
    pexpect.run(cmd)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _stop_tunnel(cmd):
    pexpect.run(cmd)
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def get_process_info ():
    ps = pexpect.run ('ps ax -O ppid')
    pass
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def runu(command, timeout=-1, withexitstatus=False, events=None,
        extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
    """This offers the same interface as :func:`run`, but using unicode.

    Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
    which will be used for both input and output.
    """
    return _run(command, timeout=timeout, withexitstatus=withexitstatus,
                events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
                env=env, _spawn=spawnu, **kwargs)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def readlines(self, sizehint=-1):
        '''This reads until EOF using readline() and returns a list containing
        the lines thus read. The optional 'sizehint' argument is ignored.
        Remember, because this reads until EOF that means the child
        process should have closed its stdout. If you run this method on
        a child that is still running with its stdout open then this
        method will block until it timesout.'''

        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def get_process_info ():

    # This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable.

    ps = pexpect.run ('ps ax -O ppid')
    pass
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_expect_eof (self):
        the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'],
                stdout=subprocess.PIPE).communicate()[0].rstrip()
        p = pexpect.spawn('/bin/ls -l /bin')
        p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function.
        the_new_way = p.before
        the_new_way = the_new_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        the_old_way = the_old_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_env(self):
        default = pexpect.run('env')
        userenv = pexpect.run('env', env={'foo':'pexpect'})
        assert default!=userenv, "'default' and 'userenv' should be different"
        assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_cwd (self): # This assumes 'pwd' and '/tmp' exist on this platform.
        default = pexpect.run('pwd')
        tmpdir =  pexpect.run('pwd', cwd='/tmp')
        assert default!=tmpdir, "'default' and 'tmpdir' should be different"
        assert (b'tmp' in tmpdir), "'tmp' should be returned by 'pwd' command"
项目:ssh-tunnel    作者:aalku    | 项目源码 | 文件源码
def runu(command, timeout=-1, withexitstatus=False, events=None,
        extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
    """This offers the same interface as :func:`run`, but using unicode.

    Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
    which will be used for both input and output.
    """
    return _run(command, timeout=timeout, withexitstatus=withexitstatus,
                events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
                env=env, _spawn=spawnu, **kwargs)
项目:ssh-tunnel    作者:aalku    | 项目源码 | 文件源码
def readlines(self, sizehint=-1):
        '''This reads until EOF using readline() and returns a list containing
        the lines thus read. The optional 'sizehint' argument is ignored.
        Remember, because this reads until EOF that means the child
        process should have closed its stdout. If you run this method on
        a child that is still running with its stdout open then this
        method will block until it timesout.'''

        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines
项目:docker-ida    作者:thawsystems    | 项目源码 | 文件源码
def execute_command():
    logger.info('Got incoming request')
    if 'command' not in request.form:
        return jsonify(error="Missing parameter 'command'"), 422

    command = request.form['command']
    file_name = _extract_filename_from_command(command)
    if file_name is not None and not os.path.isfile(file_name):
        logger.warn("Couldn't find file %s", file_name)

    if not command.startswith('idaw ') and not command.startswith('idaw64 '):
        return jsonify(error="'idaw' and 'idaw64' are the only valid commands"), 422

    try:
        logger.info('Executing %s', command)
        timeout = None if 'timeout' not in request.form else int(request.form['timeout'])
        _, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True)
    except pexpect.TIMEOUT:
        return jsonify(error='request to ida timed out'), 408
    finally:
        if file_name is not None:
            _remove_ida_created_files(file_name)
            logger.info('Removed ida leftover files')

    if exit_code == 0:
        logger.info('Command %s finished executing successfully', command)
    else:
        logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code)

    if exit_code != 0:
        return jsonify(error='ida finish with status code %s' % exit_code), 500
    else:
        return jsonify(message='OK'), 200
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def transfer(self, method, source_path, remote_filename):
        """create Par2 files and transfer the given file and the Par2 files
        with the wrapped backend.

        Par2 must run on the real filename or it would restore the
        temp-filename later on. So first of all create a tempdir and symlink
        the soure_path with remote_filename into this.
        """
        import pexpect

        par2temp = source_path.get_temp_in_same_dir()
        par2temp.mkdir()
        source_symlink = par2temp.append(remote_filename)
        source_target = source_path.get_canonical()
        if not os.path.isabs(source_target):
            source_target = os.path.join(os.getcwd(), source_target)
        os.symlink(source_target, source_symlink.get_canonical())
        source_symlink.setdata()

        log.Info("Create Par2 recovery files")
        par2create = 'par2 c -r%d -n1 %s %s' % (self.redundancy, self.common_options, source_symlink.get_canonical())
        out, returncode = pexpect.run(par2create, None, True)

        source_symlink.delete()
        files_to_transfer = []
        if not returncode:
            for file in par2temp.listdir():
                files_to_transfer.append(par2temp.append(file))

        method(source_path, remote_filename)
        for file in files_to_transfer:
            method(file, file.get_filename())

        par2temp.deltree()
项目:hammercloud    作者:gtmanfred    | 项目源码 | 文件源码
def script(self, logininfo, filepath):
        '''
        run script on managed cloud server using /usr/bin/env expect
        '''
        if not logininfo.admin_password:
            raise Exception('Unmanaged Cloud Server: no rack password')

        if '/' in filepath:
            logininfo.script = filepath.split('/')[-1]
        else:
            logininfo.script = filepath

        if filepath.startswith('https://'):
            newpath = os.path.expanduser(
                '~/.cache/hammercloud/{login.script}'.format(login=logininfo)
            )
            if not os.path.exists(newpath):
                with open(newpath, 'w') as newfile:
                    resp = requests.get(filepath)
                    print(resp.content, file=newfile)
            filepath = newpath

        sftp(
            logininfo, 'put', filepath, logininfo.script,
            quiet=True, executable=True
        )

        command = '/home/{login.ssh_user}/{login.script} {login.extraargs}; '

        if not logininfo.no_clean:
            command += 'rm /home/{login.ssh_user}/{login.script}'

        logininfo.command = command
        cmd(logininfo)
项目:docker-ida    作者:intezer    | 项目源码 | 文件源码
def execute_command():
    logger.info('Got incoming request')
    if 'command' not in request.form:
        return jsonify(error="Missing parameter 'command'"), 422

    command = request.form['command']
    file_name = _extract_filename_from_command(command)
    if file_name is not None and not os.path.isfile(file_name):
        logger.warn("Couldn't find file %s", file_name)

    if not command.startswith('idal ') and not command.startswith('idal64 '):
        return jsonify(error="'idal' and 'idal64' are the only valid commands"), 422

    try:
        logger.info('Executing %s', command)
        timeout = None if 'timeout' not in request.form else int(request.form['timeout'])
        _, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True)
    except pexpect.TIMEOUT:
        return jsonify(error='request to ida timed out'), 408
    finally:
        if file_name is not None:
            _remove_ida_created_files(file_name)
            logger.info('Removed ida leftover files')

    if exit_code == 0:
        logger.info('Command %s finished executing successfully', command)
    else:
        logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code)

    if exit_code != 0:
        return jsonify(error='ida finish with status code %s' % exit_code), 500
    else:
        return jsonify(message='OK'), 200
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def runu(command, timeout=-1, withexitstatus=False, events=None,
        extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
    """This offers the same interface as :func:`run`, but using unicode.

    Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
    which will be used for both input and output.
    """
    return _run(command, timeout=timeout, withexitstatus=withexitstatus,
                events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
                env=env, _spawn=spawnu, **kwargs)
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def readlines(self, sizehint=-1):
        '''This reads until EOF using readline() and returns a list containing
        the lines thus read. The optional 'sizehint' argument is ignored.
        Remember, because this reads until EOF that means the child
        process should have closed its stdout. If you run this method on
        a child that is still running with its stdout open then this
        method will block until it timesout.'''

        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def get_process_info ():

    # This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable.

    ps = pexpect.run ('ps ax -O ppid')
    pass
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def test_expect_eof (self):
        the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'],
                stdout=subprocess.PIPE).communicate()[0].rstrip()
        p = pexpect.spawn('/bin/ls -l /bin')
        p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function.
        the_new_way = p.before
        the_new_way = the_new_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        the_old_way = the_old_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def test_env(self):
        default = pexpect.run('env')
        userenv = pexpect.run('env', env={'foo':'pexpect'})
        assert default!=userenv, "'default' and 'userenv' should be different"
        assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"