Python os 模块,getcwd() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.getcwd()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def client_proc(job_id, data_file, rtask, task=None):
    # send input file to rtask.location; this will be saved to dispycos process's
    # working directory
    if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
        print('Could not send input data to %s' % rtask.location)
        # terminate remote task
        rtask.send(None)
        raise StopIteration(-1)
    # send info about input
    obj = C(job_id, data_file, random.uniform(5, 8), task)
    if (yield rtask.deliver(obj)) != 1:
        print('Could not send input to %s' % rtask.location)
        raise StopIteration(-1)
    # rtask sends result to this task as message
    result = yield task.receive()
    if not result.result_file:
        print('Processing %s failed' % obj.i)
        raise StopIteration(-1)
    # rtask saves results file at this client, which is saved in pycos's
    # dest_path, not current working directory!
    result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
    # move file to cwd
    target = os.path.join(os.getcwd(), os.path.basename(result_file))
    os.rename(result_file, target)
    print('    job %s output is in %s' % (obj.i, target))
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def client_proc(job_id, data_file, rtask, task=None):
    # send input file to rtask.location; this will be saved to dispycos process's
    # working directory
    if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
        print('Could not send input data to %s' % rtask.location)
        # terminate remote task
        rtask.send(None)
        raise StopIteration(-1)
    # send info about input
    obj = C(job_id, data_file, random.uniform(5, 8), task)
    if (yield rtask.deliver(obj)) != 1:
        print('Could not send input to %s' % rtask.location)
        raise StopIteration(-1)
    # rtask sends result to this task as message
    result = yield task.receive()
    if not result.result_file:
        print('Processing %s failed' % obj.i)
        raise StopIteration(-1)
    # rtask saves results file at this client, which is saved in pycos's
    # dest_path, not current working directory!
    result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
    # move file to cwd
    target = os.path.join(os.getcwd(), os.path.basename(result_file))
    os.rename(result_file, target)
    print('    job %s output is in %s' % (obj.i, target))
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def client_proc(job_id, data_file, rtask, task=None):
    # send input file to rtask.location; this will be saved to dispycos process's
    # working directory
    if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0:
        print('Could not send input data to %s' % rtask.location)
        # terminate remote task
        rtask.send(None)
        raise StopIteration(-1)
    # send info about input
    obj = C(job_id, data_file, random.uniform(5, 8), task)
    if (yield rtask.deliver(obj)) != 1:
        print('Could not send input to %s' % rtask.location)
        raise StopIteration(-1)
    # rtask sends result to this task as message
    result = yield task.receive()
    if not result.result_file:
        print('Processing %s failed' % obj.i)
        raise StopIteration(-1)
    # rtask saves results file at this client, which is saved in pycos's
    # dest_path, not current working directory!
    result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
    # move file to cwd
    target = os.path.join(os.getcwd(), os.path.basename(result_file))
    os.rename(result_file, target)
    print('    job %s output is in %s' % (obj.i, target))
项目:rpwng    作者:MrNbaYoh    | 项目源码 | 文件源码
def build(self, file):
        if self.built:
            raise PermissionError("You cannot build multiple times!")

        if not self.loaded:
            self.load(file)

        old = os.getcwd()
        sys.path.append(os.path.dirname(os.path.abspath(file)))  # for module import that aren't "include" call
        try:
            content = open(file, "rb").read()
            os.chdir(os.path.dirname(os.path.abspath(file)))  # set the current working directory, for open() etc.
            exec(compile(content, file, 'exec'), self.user_functions)
        except Exception as err:
            print("An exception occured while building: ", file=sys.stderr)
            lines = traceback.format_exc(None, err).splitlines()
            print("  " + lines[-1], file=sys.stderr)
            for l in lines[3:-1]:
                print(l, file=sys.stderr)
            exit(1)

        os.chdir(old)
        sys.path.remove(os.path.dirname(os.path.abspath(file)))
        self.built = True
项目:rpwng    作者:MrNbaYoh    | 项目源码 | 文件源码
def load(self, file):
        if self.loaded:
            return

        sys.path.append(os.path.dirname(os.path.abspath(file)))  # for module import that aren't "include" call
        old = os.getcwd()
        try:
            content = open(file, "rb").read()
            os.chdir(os.path.dirname(os.path.abspath(file)))  # set the current working directory, for open() etc.
            exec(compile(content, file, 'exec'), self.user_functions)
        except Exception as err:
            print("An exception occured while loading: ", file=sys.stderr)
            lines = traceback.format_exc(None, err).splitlines()
            print("  " + lines[-1], file=sys.stderr)
            for l in lines[3:-1]:
                print(l, file=sys.stderr)
            exit(1)

        os.chdir(old)
        sys.path.remove(os.path.dirname(os.path.abspath(file)))
        self.loaded = True
        self.mem_offset = 0
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def StartFileServer(fileServerDir):
    """
    Start file server.
    """
    if not fileServerDir:
        message = \
            "The PYUPDATER_FILESERVER_DIR environment variable is not set."
        if hasattr(sys, "frozen"):
            logger.error(message)
            return None
        else:
            fileServerDir = os.path.join(os.getcwd(), 'pyu-data', 'deploy')
            message += "\n\tSetting fileServerDir to: %s\n" % fileServerDir
            logger.warning(message)
    fileServerPort = GetEphemeralPort()
    thread = threading.Thread(target=RunFileServer,
                              args=(fileServerDir, fileServerPort))
    thread.start()
    WaitForFileServerToStart(fileServerPort)
    return fileServerPort
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def save_users_and_groups_to_csv(user_data, csv_output_filepath):
    """
    Creates a CSV file with exported user data
    :param user_data: The exported user data
    :param csv_output_filepath: The output file to save
    :return: None
    """
    full_output_path = os.path.join(os.getcwd(), csv_output_filepath)
    with open(full_output_path, 'wb') as f:
        fields = ['email', 'lastname', 'firstname', 'groups']
        w = csv.DictWriter(f, fields)
        w.writeheader()
        for key, val in sorted(user_data.items()):
            val['groups'] = ", ".join(val['groups'][0::2])
            row = {'email': key}
            row.update(val)
            w.writerow(row)
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def configure(logger, path_to_config_file, export_formats):
    """
    instantiate and configure logger, load config settings from file, instantiate SafetyCulture SDK
    :param logger:              the logger
    :param path_to_config_file: path to config file
    :param export_formats:      desired export formats
    :return:                    instance of SafetyCulture SDK object, config settings
    """

    config_settings = load_config_settings(logger, path_to_config_file)
    config_settings[EXPORT_FORMATS] = export_formats
    sc_client = sp.SafetyCulture(config_settings[API_TOKEN])

    if config_settings[EXPORT_PATH] is not None:
        create_directory_if_not_exists(logger, config_settings[EXPORT_PATH])
    else:
        logger.info('Invalid export path was found in ' + path_to_config_file + ', defaulting to /exports')
        config_settings[EXPORT_PATH] = os.path.join(os.getcwd(), 'exports')
        create_directory_if_not_exists(logger, config_settings[EXPORT_PATH])

    return sc_client, config_settings
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def abspath(path):
        """Return the absolute version of a path."""

        if path: # Empty path must return current working directory.
            path = os.fspath(path)
            try:
                path = _getfullpathname(path)
            except OSError:
                pass # Bad path - return unchanged.
        elif isinstance(path, bytes):
            path = os.getcwdb()
        else:
            path = os.getcwd()
        return normpath(path)

# realpath is a no-op on systems without islink support
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _candidate_tempdir_list():
    """Generate a list of candidate temporary directories which
    _get_default_tempdir will try."""

    dirlist = []

    # First, try the environment.
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        dirname = _os.getenv(envname)
        if dirname: dirlist.append(dirname)

    # Failing that, try OS-specific locations.
    if _os.name == 'nt':
        dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
    else:
        dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])

    # As a last resort, the current directory.
    try:
        dirlist.append(_os.getcwd())
    except (AttributeError, OSError):
        dirlist.append(_os.curdir)

    return dirlist
项目:py_find_1st    作者:roebel    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:Adafruit_Python_PCA9685    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:Python    作者:Guzi219    | 项目源码 | 文件源码
def init_work_dir(self):
        retval = os.getcwd()
        print '#current dir is : ' + retval
        # ??????
        store_dir = retval + os.sep + 'tmp'
        print '#all imgs are going to be stored in dir :' + store_dir

        if not os.path.exists(store_dir):
            print '#tmp dir does not exist, attemp to mkdir'
            os.mkdir(store_dir)
            print '#mkdir sucessfully'
        else:
            print '#tmp dir is already exist'

        self.store_dir = store_dir

        # print '#now change current dir to tmp'
        # os.chdir(store_dir) #no neccessary
        # print os.getcwd()
项目:Python    作者:Guzi219    | 项目源码 | 文件源码
def init_work_dir(self):
        retval = os.getcwd()
        print '#current dir is : ' + retval
        # ??????
        store_dir = retval + os.sep + 'tmp'
        print '#all imgs are going to be stored in dir :' + store_dir

        if not os.path.exists(store_dir):
            print '#tmp dir does not exist, attemp to mkdir'
            os.mkdir(store_dir)
            print '#mkdir sucessfully'
        else:
            print '#tmp dir is already exist'

        self.store_dir = store_dir

        # print '#now change current dir to tmp'
        # os.chdir(store_dir) #no neccessary
        # print os.getcwd()
项目:ardy    作者:avara1986    | 项目源码 | 文件源码
def _run_local_lambda(self, lambda_config):
        prev_folder = os.getcwd()
        os.chdir(self.config.get_projectdir())
        sys.path.append(self.config.get_projectdir())
        lambda_name = lambda_config["FunctionName"]
        lambda_handler = self.import_function(lambda_config["Handler"])

        # Run and set a counter
        start = time.time()
        results = lambda_handler({}, MockContext(lambda_name))
        end = time.time()

        # restore folder
        os.chdir(prev_folder)

        # Print results
        logger.info("{0}".format(results))
        logger.info("\nexecution time: {:.8f}s\nfunction execution "
                    "timeout: {:2}s".format(end - start, lambda_config["Timeout"]))
项目:Dockerfiles    作者:appscode    | 项目源码 | 文件源码
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
    """
    Search in increasingly higher folders for the given file

    Returns path to the file if found, or an empty string otherwise
    """
    if usecwd or '__file__' not in globals():
        # should work without __file__, e.g. in REPL or IPython notebook
        path = os.getcwd()
    else:
        # will work for .py files
        frame_filename = sys._getframe().f_back.f_code.co_filename
        path = os.path.dirname(os.path.abspath(frame_filename))

    for dirname in _walk_to_root(path):
        check_path = os.path.join(dirname, filename)
        if os.path.exists(check_path):
            return check_path

    if raise_error_if_not_found:
        raise IOError('File not found')

    return ''
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def _changing_cd(f, *args, **kwargs):
    def inner(*args, **kwargs):
        try:
            state = State(args[0].view)
        except AttributeError:
            state = State(args[0].window.active_view())

        old = os.getcwd()
        try:
            # FIXME: Under some circumstances, like when switching projects to
            # a file whose _cmdline_cd has not been set, _cmdline_cd might
            # return 'None'. In such cases, change to the actual current
            # directory as a last measure. (We should probably fix this anyway).
            os.chdir(state.settings.vi['_cmdline_cd'] or old)
            f(*args, **kwargs)
        finally:
            os.chdir(old)

    return inner
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_comp_macro_directories_config_python(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_comp_macro_directories_config_cpp(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="cpp", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_comp_macro_directories_config_java(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="java", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def setUp(self):

        cfg = "log4j.rootLogger=TRACE,CONSOLE,FILE\n" + \
            "log4j.debug=false\n" + \
            "# Direct log messages to FILE\n" + \
            "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + \
            "log4j.appender.CONSOLE.File=stdout\n" + \
            "log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \
            "log4j.appender.FILE.File="+os.getcwd()+"/tmp_logfile.log\n" + \
            "log4j.appender.CONSOLE.threshold=TRACE\n" + \
            "log4j.appender.FILE.threshold=TRACE\n" + \
            "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \
            "log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n"

        fp = open('tmp_logfile.config','w')
        fp.write(cfg)
        fp.close()

        nodebooter, self._domMgr = self.launchDomainManager()
        self._domBooter = nodebooter
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def setUp(self):
        cfg = "log4j.rootLogger=DEBUG,STDOUT,FILE\n " + \
            "# Direct log messages to FILE\n" + \
            "log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender\n" + \
            "log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \
            "log4j.appender.FILE.File=tmp_logfile.log\n" + \
            "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \
            "log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \
            "log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n"

        fp = open('tmp_logfile.config','w')
        fp.write(cfg)
        fp.close()
        self.domBooter, self._domMgr = self.launchDomainManager(loggingURI=os.getcwd()+'/tmp_logfile.config')
        self.devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
        self._app = None
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:stalkerGKSU    作者:zense    | 项目源码 | 文件源码
def get_json(org):
    d = {"nodes":[],"links":[]}
    for i in graph:
        dt = {}
        dt["id"]=i
        dt["group"]=1
        d["nodes"].append(dt)
    for i in graph:
        for j in graph[i]:
            for k in j:
                dt={}
                dt["source"]=i
                dt["target"]=k[1::]
                dt["value"]=10
                d["links"].append(dt)
    string_json = json.dumps(d)
    filename = org + ".json"
    f = open(os.path.join(os.getcwd(), "static", filename), "w")
    f.write(string_json)
    f.close()
项目:libbuild    作者:appscode    | 项目源码 | 文件源码
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
    """
    Search in increasingly higher folders for the given file

    Returns path to the file if found, or an empty string otherwise
    """
    if usecwd or '__file__' not in globals():
        # should work without __file__, e.g. in REPL or IPython notebook
        path = os.getcwd()
    else:
        # will work for .py files
        frame_filename = sys._getframe().f_back.f_code.co_filename
        path = os.path.dirname(os.path.abspath(frame_filename))

    for dirname in _walk_to_root(path):
        check_path = os.path.join(dirname, filename)
        if os.path.exists(check_path):
            return check_path

    if raise_error_if_not_found:
        raise IOError('File not found')

    return ''
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def __init__(self, additional_compose_file=None, additional_services=None):
        # To resolve docker client server version mismatch issue.
        os.environ["COMPOSE_API_VERSION"] = "auto"
        dir_name = os.path.split(os.getcwd())[-1]
        self.project = "{}{}".format(
            re.sub(r'[^a-z0-9]', '', dir_name.lower()),
            getpass.getuser()
        )
        self.additional_compose_file = additional_compose_file

        self.services = ["zookeeper", "schematizer", "kafka"]

        if additional_services is not None:
            self.services.extend(additional_services)

        # This variable is meant to capture the running/not-running state of
        # the dependent testing containers when tests start running.  The idea
        # is, we'll only start and stop containers if they aren't already
        # running.  If they are running, we'll just use the ones that exist.
        # It takes a while to start all the containers, so when running lots of
        # tests, it's best to start them out-of-band and leave them up for the
        # duration of the session.
        self.containers_already_running = self._are_containers_already_running()
项目:BackManager    作者:linuxyan    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with ContextualZipFile(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:wpw-sdk-python    作者:WPTechInnovation    | 项目源码 | 文件源码
def startRPC(self, port, eventListenerPort):

    logging.basicConfig(filename='worldpay-within-wrapper.log', level=logging.DEBUG)
    reqOS = ["darwin", "win32", "windows", "linux"]
    reqArch = ["x64", "ia32"]
    cfg = launcher.Config(reqOS, reqArch)
    launcherLocal = launcher.launcher()
    # define log file name for rpc agent, so e.g
    # for "runConsumerOWP.py" it will be: "rpc-wpwithin-runConsumerOWP.log"
    logfilename = os.path.basename(sys.argv[0])
    logfilename = "rpc-wpwithin-" + logfilename.rsplit(".", 1)[0] + ".log"

    args = []
    if eventListenerPort > 0:
        logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info" + " -callbackport " + str(eventListenerPort))
        args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info', '-callbackport', str(eventListenerPort)]
    else:
        logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info")
        args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info']

    process = launcherLocal.launch(cfg, os.getcwd() + "", args)

    return process
项目:freeradius    作者:epiphyte    | 项目源码 | 文件源码
def compose(env):
    """Compose the configuration."""
    offset = _get_utils(env)
    rsync = ["rsync",
             "-aczv",
             USER_FOLDER,
             os.path.join(offset, USER_FOLDER),
             "--delete-after",
             _get_exclude("*.pyc"),
             _get_exclude("README.md"),
             _get_exclude("__init__.py"),
             _get_exclude("__config__.py")]
    call(rsync, "rsync user definitions")
    here = os.getcwd()
    composition = ["python2.7",
                   "config_compose.py",
                   "--output", os.path.join(here, FILE_NAME)]
    call(composition, "compose configuration", working_dir=offset)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsClientUpdateBuffers(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
        self.plugin.setup_rtags_daemon()
        self.plugin.connect_rtags_client()
        self.plugin.update_rtags_buffers(
            [str(src_info["test_cpp"]),
             str(src_info["cpp"])])
        try:
            rtags_client_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_buffers"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        filepath = os.getcwd() + str(src_info["test_cpp"])
        self.assertTrue(str(rtags_client_status).find(filepath))
项目:Adafruit_Python_ADS1x15    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def load_dynamic_config(configurations, config_dir=getcwd()):
    """Load and parse dynamic config"""
    # Create full path of config
    config_file = '{path}/config.py'.format(path=config_dir)

    # Insert config path so we can import it
    sys.path.insert(0, path.dirname(path.abspath(config_file)))
    try:
        config_module = __import__('config')

        for key, value in config_module.CONFIG.items():
            LOG.debug('Importing %s with key %s', key, value)
            # Update configparser object
            configurations.update({key: value})
    except ImportError:
        # Provide a default if config not found
        configurations = {}
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with ContextualZipFile(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def pquery(command, stdin=None, **kwargs):
    if very_verbose:
        info('Query "'+' '.join(command)+'" in '+getcwd())
    try:
        proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
    except OSError as e:
        if e[0] == errno.ENOENT:
            error(
                "Could not execute \"%s\".\n"
                "Please verify that it's installed and accessible from your current path by executing \"%s\".\n" % (command[0], command[0]), e[0])
        else:
            raise e

    stdout, _ = proc.communicate(stdin)

    if very_verbose:
        log(str(stdout).strip()+"\n")

    if proc.returncode != 0:
        raise ProcessException(proc.returncode, command[0], ' '.join(command), getcwd())

    return stdout
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def seturl(url):
        info("Setting url to \"%s\" in %s" % (url, getcwd()))
        hgrc = os.path.join('.hg', 'hgrc')
        tagpaths = '[paths]'
        remote = 'default'
        lines = []

        try:
            with open(hgrc) as f:
                lines = f.read().splitlines()
        except IOError:
            pass

        if tagpaths in lines:
            idx = lines.index(tagpaths)
            m = re.match(r'^([\w_]+)\s*=\s*(.*)$', lines[idx+1])
            if m:
                remote = m.group(1)
                del lines[idx+1]
            lines.insert(idx, remote+' = '+url)
        else:
            lines.append(tagpaths)
            lines.append(remote+' = '+url)
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def unignore(dest):
        Hg.ignore_file = os.path.join('.hg', 'hgignore')
        try:
            with open(Hg.ignore_file) as f:
                lines = f.read().splitlines()
        except IOError:
            lines = []

        if dest in lines:
            lines.remove(dest)
            try:
                with open(Hg.ignore_file, 'w') as f:
                    f.write('\n'.join(lines) + '\n')
            except IOError:
                error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Hg.ignore_file), 1)

# pylint: disable=no-self-argument, no-method-argument, no-member, no-self-use, unused-argument
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def checkout(rev, clean=False):
        if not rev:
            return
        info("Checkout \"%s\" in %s" % (rev, os.path.basename(getcwd())))
        branch = None
        refs = Git.getbranches(rev)
        for ref in refs: # re-associate with a local or remote branch (rev is the same)
            m = re.match(r'^(.*?)\/(.*?)$', ref)
            if m and m.group(2) != "HEAD": # matches origin/<branch> and isn't HEAD ref
                if not os.path.exists(os.path.join('.git', 'refs', 'heads', m.group(2))): # okay only if local branch with that name doesn't exist (git will checkout the origin/<branch> in that case)
                    branch = m.group(2)
            elif ref != "HEAD":
                branch = ref # matches local branch and isn't HEAD ref

            if branch:
                info("Revision \"%s\" matches a branch \"%s\" reference. Re-associating with branch" % (rev, branch))
                popen([git_cmd, 'checkout', branch] + ([] if very_verbose else ['-q']))
                break

        if not branch:
            popen([git_cmd, 'checkout', rev] + (['-f'] if clean else []) + ([] if very_verbose else ['-q']))
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def update(rev=None, clean=False, clean_files=False, is_local=False):
        if not is_local:
            Git.fetch()
        if clean:
            Git.discard(clean_files)
        if rev:
            Git.checkout(rev, clean)
        else:
            remote = Git.getremote()
            branch = Git.getbranch()
            if remote and branch:
                try:
                    Git.merge('%s/%s' % (remote, branch))
                except ProcessException:
                    pass
            else:
                err = "Unable to update \"%s\" in \"%s\"." % (os.path.basename(getcwd()), getcwd())
                if not remote:
                    info(err+" The local repository is not associated with a remote one.")
                if not branch:
                    info(err+" Working set is not on a branch.")
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def ignore(dest):
        try:
            with open(Git.ignore_file) as f:
                exists = dest in f.read().splitlines()
        except IOError:
            exists = False

        if not exists:
            try:
                ignore_file_parent_directory = os.path.dirname(Git.ignore_file)
                if not os.path.exists(ignore_file_parent_directory):
                    os.mkdir(ignore_file_parent_directory)

                with open(Git.ignore_file, 'a') as f:
                    f.write(dest.replace("\\", "/") + '\n')
            except IOError:
                error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Git.ignore_file), 1)
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def fromrepo(cls, path=None):
        repo = cls()
        if path is None:
            path = Repo.findparent(getcwd())
            if path is None:
                error(
                    "Could not find mbed program in current path \"%s\".\n"
                    "You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd())

        repo.path = os.path.abspath(path)
        repo.name = os.path.basename(repo.path)

        cache_cfg = Global().get_cfg('CACHE', '')
        if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled':
            loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None
            repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache')

        repo.sync()

        if repo.scm is None:
            warning(
                "Program \"%s\" in \"%s\" does not use source control management.\n"
                "To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path))

        return repo
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def pathtype(cls, path=None):
        path = os.path.abspath(path or getcwd())

        depth = 0
        while cd(path):
            tpath = path
            path = Repo.findparent(path)
            if path:
                depth += 1
                path = os.path.split(path)[0]
                if tpath == path:       # Reached root.
                    break
            else:
                break

        return "directory" if depth == 0 else ("program" if depth == 1 else "library")
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def __init__(self, path=None, print_warning=False):
        path = os.path.abspath(path or getcwd())
        self.path = path
        self.is_cwd = True

        while cd(path):
            tpath = path
            if os.path.isfile(os.path.join(path, Cfg.file)):
                self.path = path
                self.is_cwd = False
                break

            path = os.path.split(path)[0]
            if tpath == path:       # Reached root.
                break

        self.name = os.path.basename(self.path)
        self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld'))

        # is_cwd flag indicates that current dir is assumed to be root, not root repo
        if self.is_cwd and print_warning:
            warning(
                "Could not find mbed program in current path \"%s\".\n"
                "You can fix this by calling \"mbed new .\" in the root of your program." % self.path)
项目:simple_rl    作者:david-abel    | 项目源码 | 文件源码
def main():
    # Add examples to path.
    parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
    sys.path.insert(0, parent_dir)

    # Grab all example files.
    example_dir = os.path.join(os.getcwd(), "..", "examples")
    example_files = [f for f in os.listdir(example_dir) if os.path.isfile(os.path.join(example_dir, f)) and "py" == f.split(".")[-1] and "init" not in f and "viz_exam" not in f]

    print("\n" + "="*32)
    print("== Running", len(example_files), "simple_rl tests ==")
    print("="*32 + "\n")
    total_passed = 0

    for i, ex in enumerate(example_files):
        print("\t [Test", str(i + 1) + "] ", ex + ": ",)
        result = run_example(os.path.join(example_dir, ex))
        if result:
            total_passed += 1
            print("\t\tPASS.")
        else:
            print("\t\tFAIL.")
    print("\nResults:", total_passed, "/", len(example_files), "passed.")
项目:ptm    作者:GrivIN    | 项目源码 | 文件源码
def create(ctx, maintype, subtype, app_name, factory, args):
    args = args or []
    maintype = maintype or ctx.obj['SETTINGS'].get(
        'default_maintype', 'python')
    subtype = subtype or ctx.obj['SETTINGS'].get(
        'default_subtype', 'app')
    click.echo('Type: {};\t Subtype: {};\t App name: {};'.format(
        maintype, subtype, app_name))
    current_dir = os.getcwd()
    additional_dirs = ctx.obj['SETTINGS'].get('templates', [])

    factory_module, path = get_factory(maintype, factory, additional_dirs)
    if not factory_module:
        click.echo('ERROR: factory not found:{}'.format(maintype), err=True)
        exit(1)

    app_factory = factory_module.AppFactory(path, args)
    app_factory.setup(subtype, app_name, current_dir)
    app_factory.set_context(ctx.obj['SETTINGS'].get('context', {}))
    app_factory.run()
    click.echo('Done!')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def chdir(directory):
    """Change the current working directory to a different directory for a code
    block and return the previous directory after the block exits. Useful to
    run commands from a specificed directory.

    :param str directory: The directory path to change to for this context.
    """
    cur = os.getcwd()
    try:
        yield os.chdir(directory)
    finally:
        os.chdir(cur)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _add_services(self, this_service, other_services):
        """Add services.

           Add services to the deployment where this_service is the local charm
           that we're testing and other_services are the other services that
           are being used in the local amulet tests.
           """
        if this_service['name'] != os.path.basename(os.getcwd()):
            s = this_service['name']
            msg = "The charm's root directory name needs to be {}".format(s)
            amulet.raise_status(amulet.FAIL, msg=msg)

        if 'units' not in this_service:
            this_service['units'] = 1

        self.d.add(this_service['name'], units=this_service['units'],
                   constraints=this_service.get('constraints'))

        for svc in other_services:
            if 'location' in svc:
                branch_location = svc['location']
            elif self.series:
                branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
            else:
                branch_location = None

            if 'units' not in svc:
                svc['units'] = 1

            self.d.add(svc['name'], charm=branch_location, units=svc['units'],
                       constraints=svc.get('constraints'))
项目:AutoML5    作者:djajetic    | 项目源码 | 文件源码
def show_io(input_dir, output_dir):  
    ''' show directory structure and inputs and autputs to scoring program'''      
    swrite('\n=== DIRECTORIES ===\n\n')
    # Show this directory
    swrite("-- Current directory " + pwd() + ":\n")
    write_list(ls('.'))
    write_list(ls('./*'))
    write_list(ls('./*/*'))
    swrite("\n")

    # List input and output directories
    swrite("-- Input directory " + input_dir + ":\n")
    write_list(ls(input_dir))
    write_list(ls(input_dir + '/*'))
    write_list(ls(input_dir + '/*/*'))
    write_list(ls(input_dir + '/*/*/*'))
    swrite("\n")
    swrite("-- Output directory  " + output_dir + ":\n")
    write_list(ls(output_dir))
    write_list(ls(output_dir + '/*'))
    swrite("\n")

    # write meta data to sdterr
    swrite('\n=== METADATA ===\n\n')
    swrite("-- Current directory " + pwd() + ":\n")
    try:
        metadata = yaml.load(open('metadata', 'r'))
        for key,value in metadata.items():
            swrite(key + ': ')
            swrite(str(value) + '\n')
    except:
        swrite("none\n");
    swrite("-- Input directory " + input_dir + ":\n")
    try:
        metadata = yaml.load(open(os.path.join(input_dir, 'metadata'), 'r'))
        for key,value in metadata.items():
            swrite(key + ': ')
            swrite(str(value) + '\n')
        swrite("\n")
    except:
        swrite("none\n");