我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用pathlib.PurePosixPath()。
def open(self, path, cwd=None, edit_check=True, allow_students=True): if edit_check and hasattr(self, '_edit'): self._do_edit() if cwd == None: work = self.root else: work = cwd path = PurePosixPath(path) for item in path.parts: if item.find('/') >= 0: continue if not allow_students and work is self.root.students: return False work = work.access(item) if not work.ready: work.fetch() return work
def print_file(self, output, path, recursive): node = self.vfs.open(path) if self.vfs.is_internal_link(node): self.print_internal_link(output, path, node) elif self.vfs.is_regular(node): self.print_regular(output, path) elif self.vfs.is_directory(node): self.print_directory(output, path) for child_name, child_node in node.list(): child_path = pathlib.PurePosixPath(path) / child_name child_path = child_path.as_posix() if not recursive and self.vfs.is_directory(child_node): self.print_directory(output, child_path) else: self.print_file(output, child_path, recursive) else: assert False, '?????????'
def subindex(self, *path): """ Returns an `IndexFile` object listing only those files in or below the directory given by ``*path``. The path keys in the resulting object will be relative to ``*path``. ``*path`` may be any relative path specification accepted by `PurePath.relative_to`, such as a string (e.g., ``"main"`` or ``"main/binary-amd64"``), a sequence of strings (e.g., ``"main", "binary-amd64"``), or a `PurePosixPath` object. """ ### TODO: Add an option for also updating the `filename` attributes of ### the IndexEntries? ### TODO: Add an option for controlling whether to keep `fields`? subfiles = {} for p, entry in self.files.items(): pathobj = PurePosixPath(p) ### TODO: Handle absolute paths and paths beginning with .. (or .?) try: subpath = pathobj.relative_to(*path) except ValueError: pass else: subfiles[str(subpath)] = entry return type(self)(files=subfiles, fields=self.fields.copy())
def _get_entry(self, path_str: str) -> Entry: """ Gets an entry or raises ENOENT. """ path = PurePosixPath(path_str) if len(path.parts) == 4: # Synthesize a file! _, table_name, row_ref, column_name = path.parts return ColumnFile(table_name, row_ref, column_name) elif len(path.parts) == 3: # It's a row! _, table_name, row_ref = path.parts return RowDirectory(table_name, row_ref) elif len(path.parts) == 2: # Table directory _, table_name = path.parts return TableDirectory(table_name) elif len(path.parts) == 1: # Root directory return RootDirectory() raise FuseOSError(ENOENT)
def copy_directory(self, host_path, guest_path): """ Copies a directory from host_path (pathlib.Path) to guest_path (pathlib.PurePath). This is natively supported since LXD 2.2 but we have to support 2.0+ Refs: https://github.com/lxc/lxd/issues/2401 Uses tar to pack/unpack the directory. """ guest_tar_path = self._guest_temporary_tar_path self.run(['mkdir', '-p', str(guest_path)]) with tempfile.NamedTemporaryFile() as f: logger.debug("Creating tar file from {}".format(host_path)) tar = tarfile.open(f.name, 'w') tar.add(str(host_path), arcname='.') tar.close() self.copy_file(Path(f.name), PurePosixPath(guest_tar_path)) self.run(['tar', '-xf', guest_tar_path, '-C', str(guest_path)]) self.run(['rm', '-f', str(guest_tar_path)]) ################################## # PRIVATE METHODS AND PROPERTIES # ##################################
def test_can_run_puppet_manifest_mode(self, mock_run, mock_copy_dir): class DummyGuest(Guest): name = 'dummy' host = Host() guest = DummyGuest(unittest.mock.Mock()) mock_run.return_value = 0 provisioner = PuppetProvisioner('./', host, [guest], { 'manifest_file': 'test_site.pp', 'manifests_path': 'test_manifests'}) provisioner.provision() assert mock_copy_dir.call_count == 1 assert mock_copy_dir.call_args_list[0][0][0] == Path('test_manifests') assert mock_copy_dir.call_args_list[0][0][1] == PurePosixPath( provisioner._guest_manifests_path) assert mock_run.call_count == 2 assert mock_run.call_args_list[0][0][0] == ['which', 'puppet'] assert mock_run.call_args_list[1][0][0] == [ 'sh', '-c', 'puppet apply --detailed-exitcodes --manifestdir {} {}'.format( PurePosixPath(provisioner._guest_manifests_path), PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')]
def test_can_set_binary_path(self, mock_run, mock_copy_dir): class DummyGuest(Guest): name = 'dummy' host = Host() guest = DummyGuest(unittest.mock.Mock()) mock_run.return_value = 0 provisioner = PuppetProvisioner('./', host, [guest], { 'manifest_file': 'test_site.pp', 'manifests_path': 'test_manifests', 'binary_path': '/test/path'}) provisioner.provision() assert mock_run.call_count == 2 assert mock_run.call_args_list[0][0][0] == ['test', '-x', '/test/path/puppet'] assert mock_run.call_args_list[1][0][0] == [ 'sh', '-c', '/test/path/puppet apply --detailed-exitcodes --manifestdir {} {}'.format( PurePosixPath(provisioner._guest_manifests_path), PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')]
def test_can_set_facter(self, mock_run, mock_copy_dir): class DummyGuest(Guest): name = 'dummy' host = Host() guest = DummyGuest(unittest.mock.Mock()) mock_run.return_value = 0 provisioner = PuppetProvisioner('./', host, [guest], { 'manifest_file': 'site.pp', 'manifests_path': 'test_manifests', 'facter': {'foo': 'bah', 'bar': 'baz baz'}}) provisioner.provision() assert mock_run.call_count == 2 assert mock_run.call_args_list[1][0][0] == [ 'sh', '-c', "FACTER_bar='baz baz' FACTER_foo=bah " "puppet apply --detailed-exitcodes --manifestdir {} {}".format( PurePosixPath(provisioner._guest_manifests_path), PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
def test_can_set_hiera_config_path(self, mock_run, mock_copy_dir, mock_copy_file): class DummyGuest(Guest): name = 'dummy' host = Host() guest = DummyGuest(unittest.mock.Mock()) mock_run.return_value = 0 provisioner = PuppetProvisioner('./', host, [guest], { 'manifest_file': 'site.pp', 'manifests_path': 'test_manifests', 'hiera_config_path': 'hiera.yaml'}) provisioner.provision() assert mock_copy_file.call_count == 1 assert mock_copy_file.call_args_list[0][0][0] == Path('hiera.yaml') assert mock_copy_file.call_args_list[0][0][1] == PurePosixPath( provisioner._guest_hiera_file) assert mock_run.call_count == 2 assert mock_run.call_args_list[1][0][0] == [ 'sh', '-c', "puppet apply --hiera_config={} --detailed-exitcodes --manifestdir {} {}".format( PurePosixPath(provisioner._guest_hiera_file), PurePosixPath(provisioner._guest_manifests_path), PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
def test_can_set_environment_variables(self, mock_run, mock_copy_dir): class DummyGuest(Guest): name = 'dummy' host = Host() guest = DummyGuest(unittest.mock.Mock()) mock_run.return_value = 0 provisioner = PuppetProvisioner('./', host, [guest], { 'manifest_file': 'site.pp', 'manifests_path': 'test_manifests', 'environment_variables': {'FOO': 'bah', 'BAR': 'baz baz'}}) provisioner.provision() assert mock_run.call_count == 2 assert mock_run.call_args_list[1][0][0] == [ 'sh', '-c', "BAR='baz baz' FOO=bah " "puppet apply --detailed-exitcodes --manifestdir {} {}".format( PurePosixPath(provisioner._guest_manifests_path), PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
def test_different_flavours_unordered(self): p = pathlib.PurePosixPath('a') q = pathlib.PureWindowsPath('a') with self.assertRaises(TypeError): p < q with self.assertRaises(TypeError): p <= q with self.assertRaises(TypeError): p > q with self.assertRaises(TypeError): p >= q # # Tests for the concrete classes # # Make sure any symbolic links in the base test path are resolved
def getTextLinesForCommit( self, filepath, commit_id ): assert isinstance( filepath, pathlib.Path ), 'expecting pathlib.Path got %r' % (filepath,) # git show wants a posix path, it does not work with '\' path seperators git_filepath = pathlib.PurePosixPath( filepath ) text = self.cmdShow( '%s:%s' % (commit_id, git_filepath) ) all_lines = text.split('\n') if all_lines[-1] == '': return all_lines[:-1] else: return all_lines
def getTextLinesForCommit( self, commit_id ): git_filepath = pathlib.PurePosixPath( self.__filepath ) text = self.__project.cmdShow( '%s:%s' % (commit_id, git_filepath) ) all_lines = text.split('\n') if all_lines[-1] == '': return all_lines[:-1] else: return all_lines
def __str__(self): s = super().__str__() # we have to reinstate the second / of http:// as PurePosixPath removes it return re.sub('(https?:)/([^/])', r'\1//\2', s)
def _do_edit(self): s = self.strings # add_courses for semester, sn in self._edit['add_courses']: path = PurePosixPath('/', s['dir_root_courses'], semester) node = self.open(path, edit_check=False) course = WebCourseDirectory(self, node, semester, sn) course.fetch() assert course.ready == True node.add(course.name, course) node.add(sn, InternalLink(self, node, course.name)) # add_unenrolled_courses for semester, sn in self._edit['add_unenrolled_courses']: path = PurePosixPath('/', s['dir_root_courses'], semester) node = self.open(path, edit_check=False) course = UnenrolledCourseDirectory(self, node, sn) course.fetch() assert course.ready == True node.add(course.name, course) node.add(sn, InternalLink(self, node, course.name)) # delete_files for path in self._edit['delete_files']: node = self.open(path, edit_check=False, allow_students=False) if node: if node is self.root: raise ValueError('?????????') node.parent.unlink(PurePosixPath(path).name) else: self.root.students.queue_deletion_request(path) # ????? open ?????? del self._edit # ? JSON ??????????
def add_student(self, account, sn=None, pwd=None): s = self.vfs.strings if sn and (not hasattr(self, '_last_sn') or sn != self._last_sn) and \ self._is_student_function_enabled(sn): self._last_sn = sn if hasattr(self, '_last_sn'): if hasattr(self, '_queued_addition_requests'): queued_addition_requests = self._queued_addition_requests.keys() del self._queued_addition_requests for request in queued_addition_requests: self.add(request, StudentsStudentDirectory( self.vfs, self, request)) if hasattr(self, '_queued_deletion_requests'): queued_deletion_requests = self._queued_deletion_requests.keys() del self._queued_deletion_requests for request in queued_deletion_requests: node = self.vfs.open(request, edit_check=False) node.parent.unlink(PurePosixPath(request).name) if account not in map(lambda x: x[0], self._children): self.add(account, StudentsStudentDirectory( self.vfs, self, account)) else: if not hasattr(self, '_queued_addition_requests'): self._queued_addition_requests = OrderedDict() self._queued_addition_requests[account] = None if pwd: depth = 0 while pwd is not self.vfs.root: pwd = pwd.parent depth += 1 return PurePosixPath('../' * depth, s['dir_root_students'], account).as_posix()
def queue_deletion_request(self, path): if not hasattr(self, '_last_sn'): if not hasattr(self, '_queued_deletion_requests'): self._queued_deletion_requests = OrderedDict() self._queued_deletion_requests[path] = None else: node = self.vfs.open(path, edit_check=False) node.parent.unlink(PurePosixPath(path).name)
def add_teacher(self, account, pwd=None): s = self.vfs.strings if account not in map(lambda x: x[0], self._children): self.add(account, TeachersTeacherDirectory(self.vfs, self, account)) if pwd: depth = 0 while pwd is not self.vfs.root: pwd = pwd.parent depth += 1 return PurePosixPath('../' * depth, s['dir_root_teachers'], account).as_posix()
def unprefix(suite, component): """ If ``component`` begins with one or more path components (excluding the last) that also occur at the end of ``suite`` (excluding the first), remove the prefix from ``component`` and return the remainder. This function is used for removing prefixes that may occur in component names listed in suite Release files when the suite name contains multiple path components. >>> unprefix('stable', 'main') 'main' >>> unprefix('stable/updates', 'updates/main') 'main' >>> unprefix('stable/updates', 'main') 'main' >>> unprefix('trusty', 'trusty/main') 'trusty/main' >>> unprefix('foo', 'foo') 'foo' >>> unprefix('foo', 'foo/bar') 'foo/bar' >>> unprefix('foo/bar', 'bar') 'bar' """ suite = PurePosixPath(suite) component = PurePosixPath(component) for i in range(min(len(suite.parts), len(component.parts))): if suite.parts[-(i+1):] != component.parts[:i+1]: break return str(PurePosixPath(*component.parts[i:]))
def _fetch_sources(self): self._runner.mkdir_p(self._paths.src) self._runner.cd(self._paths.src) for project in self._profile.projects.values(): source = project.source src_path = None if type(source) is vlttng.profile.HttpFtpSource: src_path = project.name # download posix_path = PurePosixPath(source.url) filename = posix_path.name self._runner.wget(source.url, filename) # extract self._runner.mkdir_p(self._paths.project_src(project.name)) self._runner.tar_x(filename, project.name) elif type(source) is vlttng.profile.GitSource: src_path = project.name # clone self._runner.git_clone(source.clone_url, project.name) # checkout self._runner.cd(self._paths.project_src(project.name)) self._runner.git_checkout(source.checkout) self._runner.cd(self._paths.src) # keep where the source of this project is if src_path is not None: src_path = self._paths.project_src(src_path) self._src_paths[project.name] = src_path
def getFileExt(f): ext = pathlib.PurePosixPath(f).suffix return ext
def relative_iterdir(self, relpath=''): relpath = self._as_zip_path(relpath) seen = set() with zipfile.ZipFile(str(self.path), mode='r') as zf: for name in zf.namelist(): if name.startswith(relpath): parts = pathlib.PurePosixPath(name).parts if len(parts) > 0: result = parts[0] if result not in seen: seen.add(result) yield result
def _as_zip_path(self, path): path = str(pathlib.PurePosixPath(path)) # zip files don't work well with '.' which is the identity of a Path # obj, so just convert to empty string which is basically the identity # of a zip's entry if path == '.': path = '' return path
def shell(self, username=None, cmd_args=[]): """ Opens a new interactive shell in the container. """ # We run this in case our lxdock.yml config was modified since our last `lxdock up`. self._setup_env() # For now, it's much easier to call `lxc`, but eventually, we might want to contribute # to pylxd so it supports `interactive = True` in `exec()`. shellcfg = self.options.get('shell', {}) shelluser = username or shellcfg.get('user') if shelluser: # This part is the result of quite a bit of `su` args trial-and-error. shellhome = shellcfg.get('home') if not username else None homearg = '--env HOME={}'.format(shellhome) if shellhome else '' cmd = 'lxc exec {} {} -- su -m {}'.format(self.lxd_name, homearg, shelluser) else: cmd = 'lxc exec {} -- su -m root'.format(self.lxd_name) if cmd_args: # Again, a bit of trial-and-error. # 1. Using `su -s SCRIPT_FILE` instead of `su -c COMMAND` because `su -c` cannot # receive SIGINT (Ctrl-C). # Ref: //sethmiller.org/it/su-forking-and-the-incorrect-trapping-of-sigint-ctrl-c/ # See also: //github.com/lxdock/lxdock/pull/67#issuecomment-299755944 # 2. Applying shlex.quote to every argument to protect special characters. # e.g.: lxdock shell container_name -c echo "he re\"s" '$PATH' self._guest.run(['mkdir', '-p', str(PurePosixPath(self._guest_shell_script_file).parent)]) self._container.files.put(self._guest_shell_script_file, textwrap.dedent( """\ #!/bin/sh {} """.format(' '.join(map(shlex.quote, cmd_args))))) self._guest.run(['chmod', 'a+rx', self._guest_shell_script_file]) cmd += ' -s {}'.format(self._guest_shell_script_file) subprocess.call(cmd, shell=True)
def test_can_set_module_path(self, mock_run, mock_copy_dir): class DummyGuest(Guest): name = 'dummy' host = Host() guest = DummyGuest(unittest.mock.Mock()) mock_run.return_value = 0 provisioner = PuppetProvisioner('./', host, [guest], { 'manifest_file': 'mani.pp', 'manifests_path': 'test_manifests', 'module_path': 'test-puppet-modules'}) provisioner.provision() assert mock_copy_dir.call_count == 2 assert (Path('test-puppet-modules'), PurePosixPath(provisioner._guest_module_path)) in { mock_copy_dir.call_args_list[0][0], mock_copy_dir.call_args_list[1][0]} assert mock_run.call_count == 2 assert mock_run.call_args_list[1][0][0] == [ 'sh', '-c', "puppet apply --modulepath {}:{} --detailed-exitcodes --manifestdir {} {}".format( PurePosixPath(provisioner._guest_module_path), PurePosixPath(provisioner._guest_default_module_path), PurePosixPath(provisioner._guest_manifests_path), PurePosixPath(provisioner._guest_manifests_path) / 'mani.pp')]
def test_can_copy_directory(self, mock_close, mock_add): class DummyGuest(Guest): name = 'dummy' guest = DummyGuest(FakeContainer()) guest.lxd_container.files.put.return_value = True with tempfile.TemporaryDirectory() as d: os.mkdir('{}/d1'.format(d)) os.mkdir('{}/d1/d2'.format(d)) with open('{}/d1/f1'.format(d), 'wb') as f1: f1.write(b'dummy f1') with open('{}/d1/d2/f2'.format(d), 'wb') as f2: f2.write(b'dummy f2') with open('{}/f3'.format(d), 'wb') as f3: f3.write(b'dummy f3') guest.copy_directory(pathlib.Path(d), pathlib.PurePosixPath('/a/b/c')) assert mock_add.call_count == 1 assert mock_add.call_args[0][0] == str(pathlib.Path(d)) assert mock_add.call_args[1]['arcname'] == '.' assert mock_close.call_count == 1 assert guest.lxd_container.execute.call_count == 4 assert guest.lxd_container.execute.call_args_list[0][0] == (['mkdir', '-p', '/a/b/c'], ) assert guest.lxd_container.execute.call_args_list[1][0] == ([ 'mkdir', '-p', str(pathlib.PurePosixPath(guest._guest_temporary_tar_path).parent)], ) assert guest.lxd_container.execute.call_args_list[2][0] == ([ 'tar', '-xf', guest._guest_temporary_tar_path, '-C', '/a/b/c'], ) assert guest.lxd_container.execute.call_args_list[3][0] == ([ 'rm', '-f', guest._guest_temporary_tar_path], ) assert guest.lxd_container.files.put.call_count == 1 assert guest.lxd_container.files.put.call_args[0][0] == guest._guest_temporary_tar_path
def test_concrete_class(self): p = self.cls('a') self.assertIs(type(p), pathlib.PureWindowsPath if os.name == 'nt' else pathlib.PurePosixPath)
def test_different_flavours_unequal(self): p = pathlib.PurePosixPath('a') q = pathlib.PureWindowsPath('a') self.assertNotEqual(p, q)
def _split_file_list(text): lines = text.split("\r\n") for line in lines: groups = line.split(",") if len(groups) == 6: directory, filename, *remaining = groups remaining = map(int, remaining) size, attr_val, date_val, time_val = remaining timeinfo = _decode_time(date_val, time_val) attribute = _decode_attribute(attr_val) path = str(PurePosixPath(directory, filename)) yield FileInfo(directory, filename, path, size, attribute, timeinfo)
def _split_file_list_raw(text): lines = text.split("\r\n") for line in lines: groups = line.split(",") if len(groups) == 6: directory, filename, size, *_ = groups path = str(PurePosixPath(directory, filename)) yield RawFileInfo(directory, filename, path, int(size))
def _possible_names(cls, package: PackageInfo) -> List[str]: names = list() for url_str in package.get_urls(): url = urlparse(url_str) host = url.netloc if not host.endswith('github.com'): continue path = PurePosixPath(url.path) parts = path.parts if path.is_absolute(): parts = parts[1:] if len(parts) >= 2: # Get the first 2 path components without extensions # this should handle: # - owner/project # - owner/project.git # - owner/project/releases name = PurePosixPath(parts[0]) # strip off .git if the project name contains it # don't just strip off any ext because "." is valid name_project = parts[1] if name_project.endswith('.git'): name_project = name_project[:-len('.git')] name = name.joinpath(name_project) names.append(str(name)) return names
def __init__(self, name: str) -> None: super().__init__(name) self._log = logging.getLogger(f'{__name__}.LocalBucket') # For local storage, the name is actually a partial path, relative # to the local storage root. self.root = pathlib.Path(current_app.config['STORAGE_DIR']) self.bucket_path = pathlib.PurePosixPath(name[:2]) / name self.abspath = self.root / self.bucket_path
def location_from_path_for_server(self, path): """Construct a location from a simple string path. Path is just a reference into the bucket of the form: {bucket_name}/{object_path} """ if not path: path = self.bucket posix_path = pathlib.PurePosixPath(path.lstrip("/")) return self.service_account.create_oauth_location( bucket=posix_path.parts[0], path=utils.join_path(*posix_path.parts[1:]))
def name_to_asserted_group_path(name): path = pathlib.PurePosixPath(name) if path.is_absolute(): raise NotImplementedError( "Absolute paths are currently not supported and unlikely to be implemented." ) if len(path.parts) < 1 and str(name) != ".": raise NotImplementedError( "Getting an item on a group with path '" + name + "' " + "is not supported and unlikely to be implemented." ) return path
def remove_root(name): path = pathlib.PurePosixPath(name) if path.is_absolute(): path = path.relative_to(path.root) return path
def test_raw_init(setup_teardown_folder): raw = Raw(setup_teardown_folder[2], pathlib.PurePosixPath(""), "test_object", io_mode=None) assert raw.root_directory == setup_teardown_folder[2] assert raw.object_name == "test_object" assert raw.parent_path == pathlib.PurePosixPath("") assert raw.io_mode is None assert raw.relative_path == pathlib.PurePosixPath("test_object") assert raw.name == "/test_object"
def test_group_init(setup_teardown_folder): group = Group(setup_teardown_folder[2], pathlib.PurePosixPath(""), "test_object", io_mode=None) assert group.root_directory == setup_teardown_folder[2] assert group.object_name == "test_object" assert group.parent_path == pathlib.PurePosixPath("") assert group.io_mode is None assert group.relative_path == pathlib.PurePosixPath("test_object") assert group.name == "/test_object" # New groups can be created via .create_group method
def provision_single(self, guest): """ Performs the provisioning operations using puppet. """ # Verify if `puppet` has been installed. binary_path = self.options.get('binary_path') if binary_path is not None: puppet_bin = str(PurePosixPath(binary_path) / 'puppet') retcode = guest.run(['test', '-x', puppet_bin]) fail_msg = ( "puppet executable is not found in the specified path {} in the " "guest container. ".format(binary_path) ) else: retcode = guest.run(['which', 'puppet']) fail_msg = ( "puppet was not automatically installed for this guest. " "Please specify the command to install it in LXDock file using " "a shell provisioner and try `lxdock provision` again. You may " "also specify `binary_path` that contains the puppet executable " "in LXDock file.") if retcode != 0: raise ProvisionFailed(fail_msg) # Copy manifests dir manifests_path = self.options.get('manifests_path') if manifests_path is not None: guest.copy_directory( Path(manifests_path), PurePosixPath(self._guest_manifests_path)) # Copy module dir module_path = self.options.get('module_path') if module_path is not None: guest.copy_directory(Path(module_path), PurePosixPath(self._guest_module_path)) # Copy environment dir environment_path = self.options.get('environment_path') if environment_path is not None: guest.copy_directory( Path(environment_path), PurePosixPath(self._guest_environment_path)) # Copy hiera file hiera_file = self.options.get('hiera_config_path') if hiera_file is not None: guest.copy_file(Path(hiera_file), PurePosixPath(self._guest_hiera_file)) # Run puppet. command = self._build_puppet_command() if environment_path: logger.info("Running Puppet with environment {}...".format(self.options['environment'])) else: logger.info("Running Puppet with {}...".format(self.options['manifest_file'])) guest.run(['sh', '-c', ' '.join(command)]) ################################## # PRIVATE METHODS AND PROPERTIES # ##################################
def _build_puppet_command(self): """ Refs: https://github.com/mitchellh/vagrant/blob/9c299a2a357fcf87f356bb9d56e18a037a53d138/ plugins/provisioners/puppet/provisioner/puppet.rb#L173 """ options = self.options.get('options', '') options = list(map(shlex.quote, shlex.split(options))) module_path = self.options.get('module_path') if module_path is not None: options += ['--modulepath', '{}:{}'.format(self._guest_module_path, self._guest_default_module_path)] hiera_path = self.options.get('hiera_config_path') if hiera_path is not None: options += ['--hiera_config={}'.format(self._guest_hiera_file)] # TODO: we are not detecting console color support now, but please contribute if you need! options += ['--detailed-exitcodes'] environment_path = self.options.get('environment_path') if environment_path is not None: options += ['--environmentpath', str(self._guest_environment_path), '--environment', self.options['environment']] else: options += ['--manifestdir', str(self._guest_manifests_path)] manifest_file = self.options.get('manifest_file') if manifest_file is not None: options += [str( PurePosixPath(self._guest_manifests_path) / manifest_file)] # Build up the custom facts if we have any facter = [] facter_dict = self.options.get('facter') if facter_dict is not None: for key, value in sorted(facter_dict.items()): facter.append("FACTER_{}={}".format(key, shlex.quote(value))) binary_path = self.options.get('binary_path') if binary_path is not None: puppet_bin = str(PurePosixPath(binary_path) / 'puppet') else: puppet_bin = 'puppet' # TODO: working_directory for hiera. Please contribute! env = [] env_variables = self.options.get('environment_variables') if env_variables is not None: for key, value in sorted(env_variables.items()): env.append("{}={}".format(key, shlex.quote(value))) command = env + facter + [puppet_bin, 'apply'] + options return command
def __getitem__(self, name): path = utils.path.name_to_asserted_group_path(name) if len(path.parts) > 1: top_directory = path.parts[0] sub_name = pathlib.PurePosixPath(*path.parts[1:]) return self[top_directory][sub_name] if name not in self: raise KeyError("No such object: '" + str(name) + "'") directory = self.directory / path if exob.is_raw_object_directory(directory): # TODO create one function that handles all Raw creation return raw.Raw( root_directory=self.root_directory, parent_path=self.relative_path, object_name=name, io_mode=self.io_mode # TODO validate name? # TODO plugin manager? ) if not exob.is_nonraw_object_directory(directory): raise IOError( "Directory '" + directory + "' is not a valid exdir object." ) meta_filename = directory / exob.META_FILENAME with meta_filename.open("r", encoding="utf-8") as meta_file: meta_data = yaml.safe_load(meta_file) if meta_data[exob.EXDIR_METANAME][exob.TYPE_METANAME] == exob.DATASET_TYPENAME: return ds.Dataset( root_directory=self.root_directory, parent_path=self.relative_path, object_name=name, io_mode=self.io_mode, validate_name=self.validate_name, plugin_manager=self.plugin_manager ) elif meta_data[exob.EXDIR_METANAME][exob.TYPE_METANAME] == exob.GROUP_TYPENAME: return Group( root_directory=self.root_directory, parent_path=self.relative_path, object_name=name, io_mode=self.io_mode, validate_name=self.validate_name, plugin_manager=self.plugin_manager ) else: print( "Object", name, "has data type", meta_data[exob.EXDIR_METANAME][exob.TYPE_METANAME] ) raise NotImplementedError("Cannot open objects of this type")