Python pathlib 模块,Path() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pathlib.Path()

项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def find_lib_dir(filename=None):
        import sys

        if filename is None:
            Cython.python_library_file()

        candidates = [Path(sys.exec_prefix, 'libs/'), Path('/lib'), Path('/usr/lib')]

        for path in candidates:
            if Path(path, filename).exists():
                return str(path)

        return None

# TODO: Cython project
# TODO: Embed support for Cython project
# TODO: Somehow package a whole set of modules with a runner inside?
项目:fxnn    作者:khaotik    | 项目源码 | 文件源码
def download(self, local_dir_=None, url_=None):
        '''
        Args:
            local_dir_: where to save downloaded file
            url_: where to download dataset, if None, use default 'http://yann.lecun.com/exdb/mnist/'
        '''
        # TODO check whether file exists
        if url_ is None:
            url_ = 'http://yann.lecun.com/exdb/mnist/'
        if local_dir_ is None:
            local_dir = self.DEFAULT_DIR
        else:
            local_dir = Path(local_dir_)
        local_dir.mkdir(parents=True, exist_ok=True)
        in_filename = '%(subset)s-%(type_s)s-idx%(ndim)s-ubyte.gz'
        for subset, (type_s, ndim) in product(
            ('train', 't10k'), zip(('images', 'labels'), (3,1))):
            filename = in_filename % locals()
            urllib.request.urlretrieve( url_ + filename, str(local_dir / filename))
项目:plugs-mail    作者:solocompt    | 项目源码 | 文件源码
def get_template_language(self, file_):
        """
        Return the template language
        Every template file must end in
        with the language code, and the
        code must match the ISO_6301 lang code
        https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes
        valid examples:

        account_created_pt.html
        payment_created_en.txt
        """
        stem = Path(file_).stem
        language_code = stem.split('_')[-1:][0]
        if len(language_code) != 2:
            # TODO naive and temp implementation
            # check if the two chars correspond to one of the
            # available languages
            raise Exception('Template file `%s` must end in ISO_639-1 language code.' % file_)
        return language_code.lower()
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def add_local_charm_dir(self, charm_dir, series):
        """Upload a local charm to the model.

        This will automatically generate an archive from
        the charm dir.

        :param charm_dir: Path to the charm directory
        :param series: Charm series

        """
        fh = tempfile.NamedTemporaryFile()
        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
        with fh:
            func = partial(
                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
            charm_url = await self._connector.loop.run_in_executor(None, func)

        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
        return charm_url
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def sync_tools(
            self, all_=False, destination=None, dry_run=False, public=False,
            source=None, stream=None, version=None):
        """Copy Juju tools into this model.

        :param bool all_: Copy all versions, not just the latest
        :param str destination: Path to local destination directory
        :param bool dry_run: Don't do the actual copy
        :param bool public: Tools are for a public cloud, so generate mirrors
            information
        :param str source: Path to local source directory
        :param str stream: Simplestreams stream for which to sync metadata
        :param str version: Copy a specific major.minor version

        """
        raise NotImplementedError()
项目:kubernaut    作者:datawire    | 项目源码 | 文件源码
def build_package(builder_image, package_type, version, out_dir, dependencies):
    """
    Build a deb or RPM package using a fpm-within-docker Docker image.

    :param package_type str: "rpm" or "deb".
    :param version str: The package version.
    :param out_dir Path: Directory where package will be output.
    :param dependencies list: package names the resulting package should depend
        on.
    """
    run([
        "docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version,
        "-e", "PACKAGE_TYPE=" + package_type,
        "-v", "{}:/build-inside:rw".format(THIS_DIRECTORY),
        "-v", "{}:/source:rw".format(THIS_DIRECTORY.parent),
        "-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image,
        "/build-inside/build-package.sh", *dependencies
    ],
        check=True)
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def prompt_extractor(self, item):
        extractor = extractors[item.data(Qt.UserRole)]
        inputs = []
        if not assert_installed(self.view, **extractor.get('depends', {})):
            return

        if not extractor.get('pick_url', False):
            files, mime = QFileDialog.getOpenFileNames()
            for path in files:
                inputs.append((path, Path(path).stem))
        else:
            text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:')
            if text:
                url = urlparse(text)
                inputs.append((url.geturl(), url.netloc))

        if inputs:
            wait = QProgressDialog('Extracting .proto structures...', None, 0, 0)
            wait.setWindowTitle(' ')
            self.set_view(wait)

            self.worker = Worker(inputs, extractor)
            self.worker.progress.connect(self.extraction_progress)
            self.worker.finished.connect(self.extraction_done)
            self.worker.start()
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def CreateFile(
      self, directory_path: str, file_name: str, filename_suffix: str):
    """Creates a empty file.

    Args:
      directory_path (str): The path to the directory the file should be
          created.
      file_name (str): the name of the new file.
      filename_suffix (str): the suffix of the new file.

    Returns:
      str: the path of the created file
    """
    file_path = self.CreateFilePath(directory_path, file_name,
                                    filename_suffix)

    if not os.path.exists(directory_path):
      self._CreateFolder(directory_path)

    pathlib.Path(file_path).touch()
    return file_path
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testPluginNameIfExisting(self):
    """test method after getting the plugin Name from the user if the plugin
    Name already exists"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_info='the_plugin',
          prompt_error='the_plugin', )
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          plugin_exists=True, change_bool_after_every_call_plugin_exists=True,
          valid_name=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actualName = 'the_plugin'
      controller._path = 'somepath'
      actual = controller.PluginName(None, None, actualName)
      expected = 'Plugin exists. Choose new Name'
      actual_prompt = self._ReadFromFile(path)
      self.assertEqual(expected, actual_prompt)
      self.assertEqual(actualName, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testCreateSQLQueryModelWithUserInputWithError(self):
    """test method CreateEventModelWithUserInput"""
    error_message = "Some Error..."
    fake_execution = fake_sqlite_query_execution.SQLQueryExecution(
        sql_query_data.SQLQueryData(has_error=True,
                                    error_message=error_message)
    )
    sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate'
    name = 'Contact'
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_info=name)
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          folder_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False,
                                                            fake_execution)
      self.assertIsNone(actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testSourcePathIfNotExisting(self):
    """test method after getting the source path from the user"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='the source path')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          folder_exists=False, change_bool_after_every_call_folder_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actualPath = 'testpath'
      source_path = controller.SourcePath(None, None, actualPath)
      expected = 'Folder does not exists. Enter correct one'
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(source_path, 'the source path')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testTestPathIfExisting(self):
    """test method after getting the source path from the user"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler())
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          file_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actualPath = os.path.join(path_helper.TestDatabasePath(),
                                'twitter_ios.db')
      valid_path = controller.TestPath(None, None, actualPath)

      actual_output = self._ReadFromFile(path)
      self.assertEqual(actualPath, controller._testfile)
      self.assertEqual('', actual_output)
      self.assertEqual(valid_path, actualPath)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testTestPathIfNotExisting(self):
    """test method after getting the source path from the user"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()
      wrongPath = os.path.join(tmpdir, 'testpath')
      validPath = os.path.join(tmpdir, 'testpathvalid')

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error=validPath)
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          file_exists=False, change_bool_after_every_call_file_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actual_path = controller.TestPath(None, None, wrongPath)
      expected = 'File does not exists. Choose another.'
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(validPath, actual_path)
      # close connection so the temp file can be deleted before the program
      #   circle is finished
      controller._query_execution._connection.close()
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidateRowNameIfNotOk(self):
    """test the validate row name method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='TheValidRowName')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_row_name=False,
          change_bool_after_every_call_valid_row_name=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidateRowName("theWrongName")
      expected = ('Row name is not in a valid format. Choose new Name ['
                  'RowName...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'TheValidRowName')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidateTimestampStringIfNotOk(self):
    """test the validate timestamp string method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='this,that,bla')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_comma_separated_string=False,
          change_bool_after_every_call_valid_comma_separated_string=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidateTimestampString("this, that,bla")
      expected = (
          'Timestamps are not in valid format. Reenter them correctly [name,'
          'name...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'this,that,bla')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidateColumnStringIfNotOk(self):
    """test the validate column string method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='this,that,bla')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_comma_separated_string=False,
          change_bool_after_every_call_valid_comma_separated_string=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidateColumnString("this, that,bla")
      expected = (
          'Column names are not in valid format. Reenter them correctly [name,'
          'name...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'this,that,bla')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testGenerateIfNotConfirmed(self):
    """test the generate if confirmed """
    template_path = path_helper.TemplatePath()

    with self.assertRaises(SystemExit):
      with tempfile.TemporaryDirectory() as tmpdir:
        file = os.path.join(tmpdir, 'testfile')
        pathlib.Path(file).touch()

        output_handler = output_handler_file.OutputHandlerFile(
            file, file_handler.FileHandler(), confirm=False)

        plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
            valid_name=False,
            change_bool_after_every_call_valid_name=True)
        controller = sqlite_controller.SQLiteController(output_handler,
                                                        plugin_helper)
        controller.Generate('not used', 'not used')

        self.assertFalse(template_path)
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def writerow(self, row):
        """

        :param row:
        :return:
        """
        self._bytes_written += self._out_writer.writerow(row)
        row_txt = self._buffer.getvalue()
        self._out_csv.write(row_txt)
        self._reset_buffer()
        self._out_csv.flush()
        if self._bytes_written > self.max_bytes:
            self._out_csv.close()
            self._make_csv_writer()
            out_name = str(Path(self._out_csv.name).absolute())
            subprocess.Popen(['7z', 'a', '-t7z', '-m0=lzma', '-mx=9', '-mfb=64', '-md=16m',
                              out_name + '.7z', out_name])

        return row_txt
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def _make_writer(self):
        """

        :return:
        """
        self._buffer = StringIO()

        self._bytes_written = 0
        now = datetime.now()
        self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6)))
        self.fname = str(pathlib.Path(self.fname))
        self._out_fh = open(self.fname, 'w')
        self.write_pid()
        logging.warning("Writing to  {} ({} bytes)".format(self._out_fh.name, self.max_bytes))

        # compress any old files still lying around
        for fname in glob(self.log_folder+"/*.json"):
            if fname != self.fname:
                self._compress(fname)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsDaemonStartClean(self):
        try:
            os.chdir("clean")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertFalse(self.cmake_build_info["build_dir"].is_dir())
        self.plugin.setup_rtags_daemon()
        try:
            rtags_daemon_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_status"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(
            len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
                ) <= len(str(rtags_daemon_status)))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def test_RTagsDaemonStartDirty(self):
        try:
            os.chdir("dirty")
        except OSError:
            print("Test Error: Couldn't cd into 'dirty' test directory.")
            raise
        self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
        self.plugin.setup_rtags_daemon()
        try:
            rtags_daemon_status = subprocess.check_output(
                self.cmake_cmd_info["rtags_status"])
        except subprocess.CalledProcessError as e:
            print(e.output)
        self.assertTrue(
            len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
                ) <= len(str(rtags_daemon_status)))
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def __init__(self, cfg_yaml=None, secret_cfg_yaml=None,
                 create_vcs_client=True,
                 load_cfg=True, load_secret_cfg=True,
                 default_data_directory=None,
                 create_default_data_directory=True):
        self.data = OrderedDict()
        self.cfg_yaml = cfg_yaml
        self.secret_cfg_yaml = secret_cfg_yaml
        if load_cfg and cfg_yaml and Path(cfg_yaml).exists():
            self.cfg = self.load_cfg(cfg_yaml)
        else:
            self.cfg = {}
        if (load_secret_cfg and secret_cfg_yaml
            and Path(secret_cfg_yaml).exists()):
            self.secret_cfg = self.load_cfg(secret_cfg_yaml)
        else:
            self.secret_cfg = {}
        self._ensure_cfg_structure()
        if create_vcs_client:
            self._create_vcs_client()
        if default_data_directory:
            self.set_default_data_directory(
                default_data_directory, create=create_default_data_directory)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def get_node_id(self, ent=None, ent_attrs: dict=None):
        if ent and ent.kindname() == 'file':
            node_id = str(Path(ent.longname()).relative_to(
                self.root_path))
        elif ent:
            node_id = ent.uniquename()
        elif ent_attrs and ent_attrs['kindname'] == 'file':
            try:
                node_id = str(Path(ent_attrs['longname']).relative_to(
                    self.root_path))
            except ValueError:
                node_id = ent_attrs['longname']
        elif ent_attrs:
            node_id = ent_attrs['uniquename']
        else:
            node_id = None
        return node_id
项目:findcve    作者:garethr    | 项目源码 | 文件源码
def load_vulnerability_database():
    # Currently manually downloaded from
    # https://security-tracker.debian.org/tracker/data/json
    # Should instead download if not found in option localtion
    # or redownload if found but out of date
    # progress bar for download

    url = "https://security-tracker.debian.org/tracker/data/json"
    db = Path('debian.json')
    r = requests.get(url, stream=True)
    if not db.exists():
        with open(db.name, 'wb') as data_file:
            total_length = 1024*20722
            for chunk in progress.bar(r.iter_content(chunk_size=1024), label="Downloading Debian data", expected_size=(total_length/1024) + 1): 
                if chunk:
                    data_file.write(chunk)
                    data_file.flush()
    with open(db.name, 'r') as data_file:
        return json.load(data_file)
项目:ananke    作者:beiko-lab    | 项目源码 | 文件源码
def timeseriesdata_constructor_new_file(temp_dir):
    """Tests the TimeSeriesData class constructor when the file does not
    exist. Tests that a new file is created, that all expected data sets
    are present, and that the flag that indicates that the file is empty
    is set to False.
    """
    tsd = TimeSeriesData(temp_dir + "/new_ananke.h5")
    tsd_file = Path(temp_dir + "/new_ananke.h5")
    #Check that the file was really created
    assert tsd_file.is_file()
    #Check that the data sets have been created
    assert set(tsd.h5_table.keys()) == {"genes", "timeseries", "samples"}
    assert set(tsd.h5_table["timeseries"].keys()) == {"data", "indices", 
                                                      "indptr"}
    assert set(tsd.h5_table["genes"].keys()) == {"sequences", "sequenceids",
                                                 "clusters", "taxonomy",
                                                 "sequenceclusters"}
    assert set(tsd.h5_table["samples"].keys()) == {"names", "time",
                                                   "metadata", "mask"}
    #Check that the empty file flag is set
    assert tsd.filled_data == False
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def get_config():
    """
    This load some configuration from the ``.travis.yml``, if file is present,
    ``doctr`` key if present.
    """
    p = Path('.travis.yml')
    if not p.exists():
        return {}
    with p.open() as f:
        travis_config = yaml.safe_load(f.read())

    config = travis_config.get('doctr', {})

    if not isinstance(config, dict):
        raise ValueError('config is not a dict: {}'.format(config))
    return config
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def clarin_corpora_sorted_by_size(base_directory: Path) -> List[GermanClarinCorpus]:
    return [
        sc1(base_directory),
        pd2(base_directory),
        ziptel(base_directory),
        sc10(base_directory),
        GermanClarinCorpus("all.HEMPEL.4.cmdi.11610.1490680796", base_directory),
        GermanClarinCorpus("all.PD1.3.cmdi.16312.1490681066", base_directory),
        GermanClarinCorpus("all.VM1.3.cmdi.1508.1490625070", base_directory,
                           id_filter_regex=vm1_id_german_filter_regex,
                           training_test_split=TrainingTestSplit.training_only),
        GermanClarinCorpus("all.RVG-J.1.cmdi.18181.1490681704", base_directory),
        GermanClarinCorpus("all.ALC.4.cmdi.16602.1490632862", base_directory,
                           training_test_split=TrainingTestSplit.randomly_grouped_by(lambda e: e.id[:3])),
        GermanClarinCorpus("all.VM2.3.cmdi.4260.1490625316", base_directory,
                           id_filter_regex=vm2_id_german_filter_regex,
                           training_test_split=TrainingTestSplit.training_only)
    ]
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def __init__(self, base_directory: Path):
        super().__init__(
            corpus_name="german-speechdata-package-v2",
            base_directory=base_directory,
            base_source_url_or_directory="http://www.repository.voxforge1.org/downloads/de/",
            tar_gz_extension=".tar.gz",
            subdirectory_depth=1,
            umlaut_decoder=UmlautDecoder.none,
            training_test_split=TrainingTestSplit.by_directory(),
            tags_to_ignore=[],
            # exclude those 7 audio files because the first 2 are corrupt, the last 5 are empty:
            id_filter_regex=re.compile("(?!^2014-03-24-13-39-24_Kinect-RAW)"
                                       "(?!^2014-03-27-11-50-33_Kinect-RAW)"
                                       "(?!^2014-03-18-15-34-19_Realtek)"
                                       "(?!^2014-06-17-13-46-27_Kinect-RAW)"
                                       "(?!^2014-06-17-13-46-27_Realtek)"
                                       "(?!^2014-06-17-13-46-27_Samson)"
                                       "(?!^2014-06-17-13-46-27_Yamaha)"
                                       "(^.*$)"))
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
        xml_ending = ".xml"

        microphone_endings = [
            "_Yamaha",
            "_Kinect-Beam",
            "_Kinect-RAW",
            "_Realtek",
            "_Samson",
            "_Microsoft-Kinect-Raw"
        ]

        xml_files = [file for file in files if file.name.endswith(xml_ending) if
                     self.id_filter_regex.match(name_without_extension(file))]

        return OrderedDict(
            (name_without_extension(file) + microphone_ending,
             self._extract_label_from_xml(file))
            for file in xml_files
            for microphone_ending in microphone_endings
            if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def train(self,
              labeled_spectrogram_batches: Iterable[List[LabeledSpectrogram]],
              preview_labeled_spectrogram_batch: List[LabeledSpectrogram],
              tensor_board_log_directory: Path,
              net_directory: Path,
              batches_per_epoch: int):
        print_preview_batch = lambda: log(self.test_and_predict_batch(preview_labeled_spectrogram_batch))

        print_preview_batch()
        self.loss_net.fit_generator(self._loss_inputs_generator(labeled_spectrogram_batches), epochs=100000000,
                                    steps_per_epoch=batches_per_epoch,
                                    callbacks=self.create_callbacks(
                                        callback=print_preview_batch,
                                        tensor_board_log_directory=tensor_board_log_directory,
                                        net_directory=net_directory),
                                    initial_epoch=self.load_epoch if (self.load_epoch is not None) else 0)
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def __init__(self,
                 audio_file: Path,
                 id: Optional[str] = None,
                 sample_rate_to_convert_to: int = 16000,
                 label: Optional[str] = "nolabel",
                 fourier_window_length: int = 512,
                 hop_length: int = 128,
                 mel_frequency_count: int = 128,
                 label_with_tags: str = None,
                 positional_label: Optional[PositionalLabel] = None):
        # The default values for hop_length and fourier_window_length are powers of 2 near the values specified in the wave2letter paper.

        if id is None:
            id = name_without_extension(audio_file)

        self.audio_file = audio_file

        super().__init__(
            id=id, get_raw_audio=lambda: librosa.load(str(self.audio_file), sr=self.sample_rate)[0],
            label=label, sample_rate=sample_rate_to_convert_to,
            fourier_window_length=fourier_window_length, hop_length=hop_length, mel_frequency_count=mel_frequency_count,
            label_with_tags=label_with_tags, positional_label=positional_label)
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def load(corpus_csv_file: Path,
             sampled_training_example_count: Optional[int] = None) -> 'Corpus':
        import csv
        with corpus_csv_file.open(encoding='utf8') as opened_csv:
            reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            def to_absolute(audio_file_path: Path) -> Path:
                return audio_file_path if audio_file_path.is_absolute() else Path(
                    corpus_csv_file.parent) / audio_file_path

            examples = [
                (
                    LabeledExampleFromFile(
                        audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
                        positional_label=None if positional_label == "" else PositionalLabel.deserialize(
                            positional_label)), Phase[phase])
                for id, audio_file_path, label, phase, positional_label in reader]

            return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
                          test_examples=[e for e, phase in examples if phase == Phase.test],
                          sampled_training_example_count=sampled_training_example_count)
项目:susi_linux    作者:fossasia    | 项目源码 | 文件源码
def init_hotword_switch(self):
        try:
            parent_dir = os.path.dirname(TOP_DIR)
            snowboyDetectFile = Path(os.path.join(
                parent_dir, "hotword_engine/snowboy/_snowboydetect.so"))
            print(snowboyDetectFile)
            if not snowboyDetectFile.exists():
                self.snowboy_switch.set_sensitive(False)
                config['hotword_engine'] = 'PocketSphinx'

        except Exception as e:
            logging.error(e)
            config['hotword_engine'] = 'PocketSphinx'

        if config['hotword_engine'] == 'Snowboy':
            self.snowboy_switch.set_active(True)
        else:
            self.snowboy_switch.set_active(False)
项目:susi_linux    作者:fossasia    | 项目源码 | 文件源码
def request_hotword_choice():
    """ Method to request user for default Hotword Engine and configure it in settings.
    """
    try:
        print("Checking for Snowboy Availability...")
        snowboyDetectFile = Path("main/hotword_engine/snowboy/_snowboydetect.so")
        if snowboyDetectFile.exists():
            print("Snowboy is available on this platform")
            choice = input("Do you wish to use Snowboy as default Hotword Detection Engine (Recommended). (y/n) ")
            if choice == 'y':
                config['hotword_engine'] = 'Snowboy'
                print('\nSnowboy set as default Hotword Detection Engine\n')
            else:
                config['hotword_engine'] = 'PocketSphinx'
                print('\nPocketSphinx set as default Hotword Detection Engine\n')

    except Exception:
        print("Some Error Occurred. Using PocketSphinx as default engine for Hotword. Run this script again to change")
        config['hotword_engine'] = 'PocketSphinx'
项目:lopocs    作者:Oslandia    | 项目源码 | 文件源码
def create_potree_page(work_dir, server_url, tablename, column):
    '''Create an html demo page with potree viewer
    '''
    # get potree build
    potree = os.path.join(work_dir, 'potree')
    potreezip = os.path.join(work_dir, 'potree.zip')
    if not os.path.exists(potree):
        download('Getting potree code', 'http://3d.oslandia.com/potree.zip', potreezip)
        # unzipping content
        with ZipFile(potreezip) as myzip:
            myzip.extractall(path=work_dir)
    tablewschema = tablename.split('.')[-1]
    sample_page = os.path.join(work_dir, 'potree-{}.html'.format(tablewschema))
    abs_sample_page = str(Path(sample_page).absolute())
    pending('Creating a potree demo page : file://{}'.format(abs_sample_page))
    resource = '{}.{}'.format(tablename, column)
    server_url = server_url.replace('http://', '')
    with io.open(sample_page, 'wb') as html:
        html.write(potree_page.format(resource=resource, server_url=server_url).encode())
    ok()
项目:lopocs    作者:Oslandia    | 项目源码 | 文件源码
def create_cesium_page(work_dir, tablename, column):
    '''Create an html demo page with cesium viewer
    '''
    cesium = os.path.join(work_dir, 'cesium')
    cesiumzip = os.path.join(work_dir, 'cesium.zip')
    if not os.path.exists(cesium):
        download('Getting cesium code', 'http://3d.oslandia.com/cesium.zip', cesiumzip)
        # unzipping content
        with ZipFile(cesiumzip) as myzip:
            myzip.extractall(path=work_dir)
    tablewschema = tablename.split('.')[-1]
    sample_page = os.path.join(work_dir, 'cesium-{}.html'.format(tablewschema))
    abs_sample_page = str(Path(sample_page).absolute())
    pending('Creating a cesium demo page : file://{}'.format(abs_sample_page))
    resource = '{}.{}'.format(tablename, column)
    with io.open(sample_page, 'wb') as html:
        html.write(cesium_page.format(resource=resource).encode())
    ok()
项目:dnc-theano    作者:khaotik    | 项目源码 | 文件源码
def download(self, local_dir_=None, url_=None):
        '''
        Args:
            local_dir_: where to save downloaded file
            url_: where to download dataset, if None, use default 'http://yann.lecun.com/exdb/mnist/'
        '''
        # TODO check whether file exists
        if url_ is None:
            url_ = 'http://yann.lecun.com/exdb/mnist/'
        if local_dir_ is None:
            local_dir = self.DEFAULT_DIR
        else:
            local_dir = Path(local_dir_)
        local_dir.mkdir(parents=True, exist_ok=True)
        in_filename = '%(subset)s-%(type_s)s-idx%(ndim)s-ubyte.gz'
        for subset, (type_s, ndim) in product(
            ('train', 't10k'), zip(('images', 'labels'), (3,1))):
            filename = in_filename % locals()
            urllib.request.urlretrieve( url_ + filename, str(local_dir / filename))
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def __init__(self, config):
        self.config = config
        self.ui = None
        self.data = {}
        self.order = []
        self.player = None
        self.start_segment = None
        self.start_line = 0
        self.end_segment = None
        self.song_data = {}
        self.last_note = None
        self.playing_row = None
        self.project_dir = Path(config["instance"]["project_dir"])
        self.changed = False
        self.last_bpm_tap = ""
        self.last_nudge = ""
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def scan(self):
        self.import_file = self.config["instance"]["import-file"]
        self.bits = self.import_lister.get(self.import_file)
        self.data_file = Path(self.bits["metadata"]).with_suffix(".data")
        self.metadata = ConfigParser(inline_comment_prefixes=None)
        self.metadata.read(str(self.bits["metadata"]))

        if self.data_file.exists():
            with self.data_file.open(newline="") as csvfile:
                data_reader = csv.DictReader(csvfile, dialect=ImportCsvDialect)
                for row in data_reader:
                    location = float(row["location"])
                    row["location"] = location
                    self.data[location] = row
        if len(self.data) == 0:
            self.add_row(0.0, mark="START")
            self.add_row(self.bits["length_secs"], mark="END")
        self.update_order()
        self.clean()
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def backup(self):
        dsecs = self.data_file.stat().st_mtime
        meta_file = Path(self.bits["metadata"])
        msecs = meta_file.stat().st_mtime
        secs = max(dsecs, msecs)
        suffix = filename_iso_time(secs)
        backup_data = self.data_file.with_name("{}-{}{}".format(self.data_file.stem, suffix, self.data_file.suffix))
        backup_meta = meta_file.with_name("{}-{}{}".format(meta_file.stem, suffix, meta_file.suffix))
        with backup_data.open("w", newline='') as csvfile:
            fieldnames = ['location', 'lyric', 'mark', 'track-change', "chord-change", "chord-selection", "note"]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore', dialect=ImportCsvDialect)
            writer.writeheader()
            for location in self.order:
                writer.writerow(self.data[location])
        with backup_meta.open('w') as meta:
            self.metadata.write(meta)
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def load_home_config(args=None):
    """ Load the user-specific configuration.

        It can not contain the `project` section. It is based upon the
        application's internal defaults. (Note: It will modified with project
        specific settings before it is used.)
    """
    global config
    global home_dir
    if home_dir is None:
        if args is not None and args.user_config is not None:
            home_dir = Path(args.user_config)
        else:
            home_dir = Path("~/.bittyband").expanduser()
        home_dir.mkdir(parents=True, exist_ok=True)
        home_config = home_dir / "settings.conf"
        if home_config.exists():
            config.read(str(home_config))
            if "project" in config:
                config["project"].clear()
            if "instance" in config:
                config["instance"].clear()
    return config
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def get_project_root():
    """ Return the common base-directory for projects.

        This allows you to specify a `--project` on the command-line
        and to reference a project directory off of this root without adding
        a mapping in the `projects` section of the user configuration file.

        The value for this comes from a `root` symlink in the `--user-config`
        directory. If this is a file, instead of a directory, it needs to
        be the name of the directory. (It does '~' expansion.)

        The expectation is that this will allow a common `~/BittyProjects`
        or `~/Documents/BittyTunes` directory to root your projects.
    """
    global home_dir
    root = home_dir / "root"
    if root.is_file():
        t = root.read_text().strip()
        root = Path(t).expanduser()
    if root.is_dir():
        return root.resolve()
    return None
项目:bittyband    作者:yam655    | 项目源码 | 文件源码
def find_project_dir(project):
    if project is None:
        project = "."
    root = get_project_root()
    if root is None:
        root = Path()
    project_dir = Path(project)
    if "." == project:
        project_dir = Path(".")
    elif "/" not in project and "\\" not in project:
        if "projects" in config:
            if project in config["projects"]:
                project_dir = Path(config["projects"].get(project))
        project_dir = root.joinpath(project_dir.expanduser())
    else:
        project_dir = project_dir.expanduser()
    if not project_dir.exists():
        project_dir = root.joinpath(project_dir)
    if project_dir.is_dir():
        project_dir = project_dir.resolve()
    return project_dir
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def __init__(self, cwd=os.curdir):
        self._data = []
        self.working_directory = Path(cwd)
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def _process_str_paths(self, paths):
        # TODO: Dies if a file doesn't exist
        return [Path(file).resolve() for file in paths]
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def add(self, *args):
        added = 0

        for pattern in args:
            try:
                files = self._glob(pattern)
                files = self._process_str_paths(files)
            except NotImplementedError:
                # self._glob says "Non-relative patterns are unsupported"
                # Check if it exists and add it
                files = Path(pattern)
                if files.exists():
                    files = [files]
                else:
                    files = None

            if not files:
                # TODO: Proper handling of this?
                print('Include pattern did not match:', pattern)
            else:
                for file in files:
                    if file not in self._data:
                        self._data.append(file)
                        added += 1

        return added
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def _get_gitinfo():
    import pygrunt.platform as platform
    import subprocess
    from pathlib import Path

    git = platform.current.find_executable('git')
    if git is None:
        # No git installed; assume we're on master
        return ('master', '')

    cwd = str(Path(__file__).parent)

    args = [git, 'rev-parse', '--abbrev-ref', 'HEAD']
    result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
    if result.returncode != 0:
        # Quietly return defaults on fail
        return ('master', '')

    branch = result.stdout

    args = [git, 'rev-parse', 'HEAD']
    result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
    if result.returncode != 0:
        # Quietly return defaults on fail
        return ('master', '')

    commit = result.stdout

    return (branch.strip(), commit.strip())
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def sanitize(self):
        import os.path

        if not self.working_dir:
            self.working_dir = os.path.curdir

        if not self.output_dir:
            self.output_dir = os.path.join(self.working_dir, 'build', '')

        allowed_types = ['executable', 'library', 'shared']
        if self.type not in allowed_types:
            # TODO: exceptions?
            Style.error('Invalid output type:', self.type)
            Style.error('Allowed types:', allowed_types)
            self.type = allowed_types[0]
            Style.error('Reverting to', self.type)

        if not self.executable:
            self.executable = os.path.join(self.output_dir, self.name)

        if self.type == 'executable':
            self.executable = platform.current.as_executable(self.executable)
        elif self.type == 'library':
            self.executable = platform.current.as_static_library(self.executable)
        elif self.type == 'shared':
            self.executable = platform.current.as_shared_library(self.executable)

        self.working_dir = os.path.realpath(self.working_dir)
        self.output_dir = os.path.realpath(self.output_dir)

        self.sources.working_directory = Path(self.working_dir)

        self._init_hooks()

    # Stage hooks

    # Create an empty list for each stage
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def compile_object(self, in_file, out_file):
        import subprocess

        in_file = Path(in_file)
        out_file = Path(out_file)

        # Skip compile if RecompileStrategy says so
        # Since preprocess_source ( possibly used by recompile ) also modifies self._args,
        # we gotta back it up
        # TODO: Maybe use something more elegant than self._args?
        old_args = self._args
        if out_file.is_file():
            if not self.recompile.should_recompile(str(in_file)):
                # Style.info('Nothing to do with', in_file)
                return True
        self._args = old_args

        Path(out_file).parent.mkdir(parents=True, exist_ok=True)
        self._args.extend(self._build_compiler_flags())
        result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

        # TODO: do something useful with output
        if result.stdout:
            print(result.stdout)

        if result.stderr:
            print(result.stderr)

        return result.returncode == 0
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def link_executable(self, in_files, out_file):
        import subprocess

        Path(out_file).parent.mkdir(parents=True, exist_ok=True)
        self._args.extend(self._build_linker_flags())
        result = subprocess.run(self._args)
        return result.returncode == 0