Python pkg_resources 模块,resource_filename() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pkg_resources.resource_filename()

项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def prepare_zip():
    from pkg_resources import resource_filename as resource
    from config import config
    from json import dumps
    logger.info('creating/updating gimel.zip')
    with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
        info = ZipInfo('config.json')
        info.external_attr = 0o664 << 16
        zipf.writestr(info, dumps(config))
        zipf.write(resource('gimel', 'config.py'), 'config.py')
        zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
        zipf.write(resource('gimel', 'logger.py'), 'logger.py')
        for root, dirs, files in os.walk(resource('gimel', 'vendor')):
            for file in files:
                real_file = os.path.join(root, file)
                relative_file = os.path.relpath(real_file,
                                                resource('gimel', ''))
                zipf.write(real_file, relative_file)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:spyking-circus    作者:spyking-circus    | 项目源码 | 文件源码
def create_params_file(self, fname):
        msg = QMessageBox()
        msg.setIcon(QMessageBox.Question)
        msg.setText("Parameter file %r not found, do you want SpyKING CIRCUS to "
                    "create it for you?" % fname)
        msg.setWindowTitle("Generate parameter file?")
        msg.setInformativeText("This will create a parameter file from a "
                               "template file and open it in your system's "
                               "standard text editor. Fill properly before "
                               "launching the code. See the documentation "
                               "for details")
        msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
        answer = msg.exec_()
        if answer == QMessageBox.Yes:
            user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus')
            if os.path.exists(user_path + 'config.params'):
                config_file = os.path.abspath(user_path + 'config.params')
            else:
                config_file = os.path.abspath(
                    pkg_resources.resource_filename('circus', 'config.params'))
            shutil.copyfile(config_file, fname)
            self.params = fname
            self.last_log_file = fname.replace('.params', '.log')
            self.update_params()
项目:spyking-circus    作者:spyking-circus    | 项目源码 | 文件源码
def init_gui_layout(self):
        gui_fname = pkg_resources.resource_filename('circus',
                                                    os.path.join('qt_GUI',
                                                                 'qt_merge.ui'))
        if comm.rank == 0:
            self.ui = uic.loadUi(gui_fname, self)
            # print dir(self.ui)
            self.score_ax1 = self.ui.score_1.axes
            self.score_ax2 = self.ui.score_2.axes
            self.score_ax3 = self.ui.score_3.axes
            self.waveforms_ax  = self.ui.waveforms.axes
            self.detail_ax     = self.ui.detail.axes
            self.data_ax       = self.ui.data_overview.axes
            self.current_order = self.ui.cmb_sorting.currentIndex()
            self.mpl_toolbar = NavigationToolbar(self.ui.waveforms, None)
            self.mpl_toolbar.pan()
            self.ui.show()
        else:
            self.ui = None
项目:spyking-circus    作者:spyking-circus    | 项目源码 | 文件源码
def init_gui_layout(self):
        gui_fname = pkg_resources.resource_filename('circus',
                                                    os.path.join('qt_GUI',
                                                                 'qt_preview.ui'))
        self.ui = uic.loadUi(gui_fname, self)
        self.electrode_ax = self.ui.electrodes.axes
        self.data_x = self.ui.raw_data.axes
        if self.show_fit:
            self.ui.btn_next.clicked.connect(self.increase_time)
            self.ui.btn_prev.clicked.connect(self.decrease_time)
        else:
            self.ui.btn_next.setVisible(False)
            self.ui.btn_prev.setVisible(False)
        # Toolbar will not be displayed
        self.mpl_toolbar = NavigationToolbar(self.ui.raw_data, None)
        self.mpl_toolbar.pan()
        self.ui.show()
项目:eClaire    作者:kogan    | 项目源码 | 文件源码
def generate_pdf(card):
    """
    Make a PDF from a card

    :param card: dict from fetcher.py
    :return: Binary PDF buffer
    """
    from eclaire.base import SPECIAL_LABELS

    pdf = FPDF('L', 'mm', (62, 140))
    pdf.set_margins(2.8, 2.8, 2.8)
    pdf.set_auto_page_break(False, margin=0)

    pdf.add_page()

    font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
    pdf.add_font('Clairifont', fname=font, uni=True)
    pdf.set_font('Clairifont', size=48)

    pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')

    qrcode = generate_qr_code(card.url)
    qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
    qrcode.save(qrcode_file)
    pdf.image(qrcode_file, 118, 35, 20, 20)
    os.unlink(qrcode_file)

    # May we never speak of this again.
    pdf.set_fill_color(255, 255, 255)
    pdf.rect(0, 55, 140, 20, 'F')

    pdf.set_font('Clairifont', '', 16)
    pdf.set_y(-4)
    labels = ', '.join([label.name for label in card.labels
                        if label.name not in SPECIAL_LABELS])
    pdf.multi_cell(0, 0, labels, 0, 'R')

    return pdf.output(dest='S')
项目:pynufft    作者:jyhmiinlin    | 项目源码 | 文件源码
def test_installation():
    '''
    Test the installation
    '''
    import pkg_resources
    PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
    DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
    import os.path


    print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
    print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
    print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
    print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
    print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
    print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
    print('Does example_1D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_1D.py'))
    print('Does example_2D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_2D.py'))


    for pkgname in ('reikna', 'pyopencl', 'pycuda'):
        error_code = test_pkg(pkgname)
        if 1 == error_code:
            break
项目:pynufft    作者:jyhmiinlin    | 项目源码 | 文件源码
def test_installation():
    '''
    Test the installation
    '''
    import pkg_resources
    PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
    DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
    import os.path


    print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
    print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
    print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
    print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
    print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
    print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
    print('Does 1D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
    print('Does 2D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def _transfer_ping_script(self, ssh):
        """
        Transfert vping script to VM.

        Uses SCP to copy the ping script via the SSH client
        :param ssh: the SSH client
        :return:
        """
        self.logger.info("Trying to transfer ping.sh")
        scp = SCPClient(ssh.get_transport())
        ping_script = pkg_resources.resource_filename(
            'functest.opnfv_tests.openstack.vping', 'ping.sh')
        try:
            scp.put(ping_script, "~/")
        except Exception:  # pylint: disable=broad-except
            self.logger.error("Cannot SCP the file '%s'", ping_script)
            return False

        cmd = 'chmod 755 ~/ping.sh'
        # pylint: disable=unused-variable
        (stdin, stdout, stderr) = ssh.exec_command(cmd)
        for line in stdout.readlines():
            print line

        return True
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def __init__(self):
        """Initialize helper object."""
        self.functest_test = pkg_resources.resource_filename(
            'functest', 'opnfv_tests')
        self.conf_path = pkg_resources.resource_filename(
            'functest',
            'opnfv_tests/openstack/refstack_client/refstack_tempest.conf')
        self.defcore_list = pkg_resources.resource_filename(
            'functest', 'opnfv_tests/openstack/refstack_client/defcore.txt')
        self.confpath = os.path.join(self.functest_test,
                                     self.conf_path)
        self.defcorelist = os.path.join(self.functest_test,
                                        self.defcore_list)
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            '-c', '--config',
            help='the file path of refstack_tempest.conf',
            default=self.confpath)
        self.parser.add_argument(
            '-t', '--testlist',
            help='Specify the file path or URL of a test list text file. '
                 'This test list will contain specific test cases that '
                 'should be tested.',
            default=self.defcorelist)
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def test_resources():
  # loading by resource
  cls = bob.bio.base.load_resource("pca", "algorithm")
  assert isinstance (cls, bob.bio.base.algorithm.PCA)

  # loading by configuration file
  cls = bob.bio.base.load_resource(pkg_resources.resource_filename("bob.bio.base.config.algorithm", "pca.py"), "algorithm")
  assert isinstance (cls, bob.bio.base.algorithm.PCA)

  # loading by instatiation
  cls = bob.bio.base.load_resource("bob.bio.base.algorithm.PCA(10, distance_function=scipy.spatial.distance.euclidean)", "algorithm", imports=['bob.bio.base', 'scipy.spatial'])
  assert isinstance (cls, bob.bio.base.algorithm.PCA)

  # get list of extensions
  extensions = bob.bio.base.extensions()
  assert isinstance(extensions, list)
  assert 'bob.bio.base' in extensions
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        with open(filename, 'rb') as json_file:
            return json.loads(json_file.read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:search_google    作者:rrwen    | 项目源码 | 文件源码
def setUp(self):
    file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
    with open(file_path, 'r') as in_file:
      defaults = json.load(in_file)
    buildargs = {
      'serviceName': 'customsearch',
      'version': 'v1',
      'developerKey': defaults['build_developerKey']
    }
    cseargs = {
      'q': 'google',
      'num': 1,
      'fileType': 'png',
      'cx': defaults['cx']
    }
    self.results = search_google.api.results(buildargs, cseargs)
    tempfile = TemporaryFile()
    self.tempfile = str(tempfile.name)
    tempfile.close()
    self.tempdir = str(TemporaryDirectory().name)
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def register_adapters():
    global adapters_registered

    if adapters_registered is True:
        return

    try:
        import pkg_resources
        packageDir = pkg_resources.resource_filename('pyamf', 'adapters')
    except:
        packageDir = os.path.dirname(__file__)

    for f in glob.glob(os.path.join(packageDir, '*.py')):
        mod = os.path.basename(f).split(os.path.extsep, 1)[0]

        if mod == '__init__' or not mod.startswith('_'):
            continue

        try:
            register_adapter(mod[1:].replace('_', '.'), PackageImporter(mod))
        except ImportError:
            pass

    adapters_registered = True
项目:crema    作者:bmcfee    | 项目源码 | 文件源码
def _instantiate(self, rsc):

        # First, load the pump
        with open(resource_filename(__name__,
                                    os.path.join(rsc, 'pump.pkl')),
                  'rb') as fd:
            self.pump = pickle.load(fd)

        # Now load the model
        with open(resource_filename(__name__,
                                    os.path.join(rsc, 'model_spec.pkl')),
                  'rb') as fd:
            spec = pickle.load(fd)
            self.model = keras.models.model_from_config(spec)

        # And the model weights
        self.model.load_weights(resource_filename(__name__,
                                                  os.path.join(rsc,
                                                               'model.h5')))

        # And the version number
        with open(resource_filename(__name__,
                                    os.path.join(rsc, 'version.txt')),
                  'r') as fd:
            self.version = fd.read().strip()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:donatemates    作者:donatemates    | 项目源码 | 文件源码
def populate_table(table, data_file, print_msg):
    """Method to populate a table with records stored in a dict loaded from a json file

    Args:
        table(api.aws.DynamoTable): the table to write to
        data_json(dict): the data to write. Should be an array in an object named 'data'

    Returns:
        None
    """
    with open(os.path.join(resource_filename("manage", "data"), data_file), 'rt') as df:
        data = json.load(df)
        if len(data["data"]) == 1:
            # Assume this is a campaign file for now.
            print(" - Example campaign loaded: https://<your_stack>/campaign.html?id={}".format(data["data"][0]["campaign_id"]))
        for item in data["data"]:
            table.put_item(item)
            print(print_msg)
项目:donatemates    作者:donatemates    | 项目源码 | 文件源码
def get_code(self):
        """Zip up the code and return bytes

        Returns:
            bytes
        """
        with open(tempfile.NamedTemporaryFile().name, 'w') as zf:
            zfh = zipfile.ZipFile(zf.name, mode='w')

            old_path = os.getcwd()
            os.chdir(os.path.join(resource_filename("manage", "configs")))

            zfh.write("url_rewrite.js")
            zfh.close()
            zf.close()

            os.chdir(old_path)
            with open(zf.name, "rb") as zfr:
                return zfr.read()
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def main():
    insurance_lib_data_dir = resource_filename('resources', 'insurance_lib_v2')
    print('Using data from {}'.format(insurance_lib_data_dir))

    # Either re-use an existing collection id by over riding the below, or leave as is to create one
    collection_id = "TestCollection-InsLibV2"

    discovery = DiscoveryProxy()

    collection_id = discovery.setup_collection(collection_id=collection_id,
                                               config_id="889a08c9-cad9-4287-a87d-2f0380363bff")
    discovery.print_collection_stats(collection_id)

    # This thing seems to misbehave when run from python notebooks due to its use of multiprocessing, so just
    # running in a script
    discovery.upload_documents(collection_id=collection_id,
                               corpus=document_corpus_as_iterable(
                                   path.join(insurance_lib_data_dir, 'document_corpus.solr.xml')))

    discovery.print_collection_stats(collection_id)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_cached_phrases_no_files(self,
                                     corpus_base_path,
                                     doc_content_stream):
        from eea.corpus.processing.phrases.process import cached_phrases
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        # we want the B.phras.* files in fixtures
        env = {'phash_id': 'X', 'file_name': 'ignore'}
        settings = {}

        stream = cached_phrases(doc_content_stream, env, settings)
        with pytest.raises(StopIteration):
            next(stream)
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_cached_phrases_cached_files(self,
                                         corpus_base_path,
                                         doc_content_stream):

        # TODO: this test should be improved. Text quality should be tested
        from eea.corpus.processing.phrases.process import cached_phrases
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        # we want the B.phras.* files in fixtures
        env = {'phash_id': 'B', 'file_name': 'ignore'}
        settings = {}

        stream = cached_phrases(doc_content_stream, env, settings)
        doc = next(stream)
        assert 'water_stress_conditions' in doc.text
        assert 'positive_development' in doc.text
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_preview_phrases_with_cache_files(self, corpus_base_path):
        from eea.corpus.processing.phrases.process import preview_phrases
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        content = ['hello', 'world']
        env = {
            'file_name': 'x.csv',
            'text_column': 'text',
            'phash_id': 'B',
        }

        stream = preview_phrases(content, env, {})
        assert list(stream) == []
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_preview_phrases_nocache_files_with_job(self,
                                                    corpus_base_path,
                                                    get_assigned_job):
        from eea.corpus.processing.phrases.process import preview_phrases
        from pkg_resources import resource_filename

        get_assigned_job.return_value = Mock(id='job1')
        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        content = ['hello', 'world']
        env = {
            'file_name': 'x.csv',
            'text_column': 'text',
            'phash_id': 'X',
        }

        stream = preview_phrases(content, env, {})
        assert list(stream) == ['hello', 'world']
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_produce_phrases_with_no_job(self,
                                         cached_phrases,
                                         corpus_base_path,
                                         get_pipeline_for_component,
                                         build_phrases
                                         ):
        from eea.corpus.processing.phrases.process import produce_phrases
        from pkg_resources import resource_filename

        content = ['hello', 'world']
        env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
        base_path = resource_filename('eea.corpus', 'tests/fixtures/')

        corpus_base_path.return_value = base_path
        cached_phrases.return_value = ['something', 'else']

        stream = produce_phrases(content, env, {})

        assert list(stream) == ['something', 'else']
        assert corpus_base_path.call_count == 1
        assert get_pipeline_for_component.call_count == 1
        assert build_phrases.call_count == 1
        assert cached_phrases.call_count == 1
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_produce_phrases_with_ok_job(self,
                                         cached_phrases,
                                         corpus_base_path,
                                         get_pipeline_for_component,
                                         build_phrases,
                                         get_job_finish_status
                                         ):
        from eea.corpus.processing.phrases.process import produce_phrases
        from pkg_resources import resource_filename

        content = ['hello', 'world']
        env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
        base_path = resource_filename('eea.corpus', 'tests/fixtures/')

        corpus_base_path.return_value = base_path
        cached_phrases.return_value = ['something', 'else']
        get_job_finish_status.return_value = True

        stream = produce_phrases(content, env, {})

        assert list(stream) == ['something', 'else']
        assert corpus_base_path.call_count == 1
        assert get_pipeline_for_component.call_count == 0
        assert build_phrases.call_count == 0
        assert cached_phrases.call_count == 1
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_phrase_model_status(self, get_assigned_job):
        from eea.corpus.processing.phrases.views import phrase_model_status
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')

        o_st = phrase_model_status.__globals__['CORPUS_STORAGE']
        phrase_model_status.__globals__['CORPUS_STORAGE'] = base_path

        req = Mock(matchdict={'phash_id': 'A'})
        assert phrase_model_status(req) == {'status': 'OK'}

        get_assigned_job.return_value = None
        req = Mock(matchdict={'phash_id': 'X'})
        assert phrase_model_status(req) == {'status': 'unavailable'}

        job = Mock()
        get_assigned_job.return_value = job
        job.get_status.return_value = '_job_status_here_'
        assert phrase_model_status(req) == {'status':
                                            'preview__job_status_here_'}

        phrase_model_status.__globals__['CORPUS_STORAGE'] = o_st
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def test_build_pipeline_for_preview(self, upload_location):
        from eea.corpus.processing import build_pipeline
        from pkg_resources import resource_filename

        file_name = 'test.csv'
        upload_location.return_value = resource_filename(
            'eea.corpus', 'tests/fixtures/test.csv')
        text_column = 'text'

        pipeline = [
            ('eea_corpus_processing_limit_process', 'ABC', {'max_count': 2})
        ]

        stream = build_pipeline(file_name, text_column, pipeline,
                                preview_mode=True)

        docs = list(stream)
        assert len(docs) == 2
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def dashboard(global_config, **settings):
    """ WSGI entry point for the Flask app RQ Dashboard
    """

    redis_uri = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
    p = parse.urlparse(redis_uri)
    host, port = p.netloc.split(':')
    db = len(p.path) > 1 and p.path[1:] or '0'

    redis_settings = {
        'REDIS_URL': redis_uri,
        'REDIS_DB': db,
        'REDIS_HOST': host,
        'REDIS_PORT': port,
    }

    app = Flask(__name__,
                static_url_path="/static",
                static_folder=resource_filename("rq_dashboard", "static")
                )
    app.config.from_object(rq_dashboard.default_settings)
    app.config.update(redis_settings)
    app.register_blueprint(rq_dashboard.blueprint)
    return app.wsgi_app
项目:nnmnkwii    作者:r9y9    | 项目源码 | 文件源码
def example_audio_file():
    """Get path of audio file.

    Returns:
        str: Path of the example audio file.

    See also:
        :func:`example_label_file`

    Examples:
        >>> from nnmnkwii.util import example_audio_file
        >>> from scipy.io import wavfile
        >>> fs, x = wavfile.read(example_audio_file())
    """
    name = "arctic_a0009"
    wav_path = pkg_resources.resource_filename(
        __name__, '_example_data/{}.wav'.format(name))
    return wav_path
项目:nnmnkwii    作者:r9y9    | 项目源码 | 文件源码
def example_question_file():
    """Get path of example question file.

    The question file was taken from Merlin_.

    .. _Merlin: https://github.com/CSTR-Edinburgh/merlin

    Returns:
        str: Path of the example audio file.

    Examples:
        >>> from nnmnkwii.util import example_question_file
        >>> from nnmnkwii.io import hts
        >>> binary_dict, continuous_dict = hts.load_question_set(example_question_file())
    """
    return pkg_resources.resource_filename(
        __name__, '_example_data/questions-radio_dnn_416.hed')
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        with open(filename, 'rb') as json_file:
            return json.loads(json_file.read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
项目:reactionrnn    作者:minimaxir    | 项目源码 | 文件源码
def __init__(self, weights_path=None,
                 vocab_path=None):

        if weights_path is None:
            weights_path = resource_filename(__name__,
                                             'reactionrnn_weights.hdf5')

        if vocab_path is None:
            vocab_path = resource_filename(__name__,
                                           'reactionrnn_vocab.json')

        with open(vocab_path, 'r') as json_file:
            self.vocab = json.load(json_file)

        self.tokenizer = Tokenizer(filters='', char_level=True)
        self.tokenizer.word_index = self.vocab
        self.num_classes = len(self.vocab) + 1
        self.model = reactionrnn_model(weights_path, self.num_classes)
        self.model_enc = Model(inputs=self.model.input,
                               outputs=self.model.get_layer('rnn').output)
项目:hugo_jupyter    作者:knowsuchagency    | 项目源码 | 文件源码
def main(argv=None):
    args = docopt(__doc__, argv=argv, version='1.0.3')
    assert 'config.toml' in (p.name for p in Path().iterdir()), "config.toml not found in directory. Are you sure you're in the project's root?"
    if args['--init']:
        notebooks_dir = Path('./notebooks/')
        notebooks_dir.mkdir(exist_ok=True)

        with open(resource_filename('hugo_jupyter', '__fabfile.py')) as fp:
            fabfile = Path('fabfile.py')
            fabfile.write_text(fp.read())

    print(dedent("""
    Successfully initialized. From this directory, the following commands are available.
    Just remember to prepend them with `fab`
    """))

    run(('fab', '-l'))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_copy_config(self):
        tempdir = tempfile.mkdtemp()
        server_root = pkg_resources.resource_filename(__name__, "testdata")
        letshelp_le_apache.copy_config(server_root, tempdir)

        temp_testdata = os.path.join(tempdir, "testdata")
        self.assertFalse(os.path.exists(os.path.join(
            temp_testdata, os.path.basename(_PASSWD_FILE))))
        self.assertFalse(os.path.exists(os.path.join(
            temp_testdata, os.path.basename(_KEY_FILE))))
        self.assertFalse(os.path.exists(os.path.join(
            temp_testdata, os.path.basename(_SECRET_FILE))))
        self.assertTrue(os.path.exists(os.path.join(
            temp_testdata, _PARTIAL_CONF_PATH)))
        self.assertTrue(os.path.exists(os.path.join(
            temp_testdata, _PARTIAL_LINK_PATH)))
项目:docker-utils    作者:a-ba    | 项目源码 | 文件源码
def pkg_file(path):
    return pkg_resources.resource_filename(
            pkg_resources.Requirement.parse("docker-utils-aba"),
            os.path.join("docker_utils_aba", path))
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def __init__(self, *args):
        self.log = logging.getLogger(resource_filename(__name__, __file__))

        self.cluster = Cluster(args[0])

        self.connection = self.cluster.connect()
        self.connection.row_factory = tuple_factory

        self.connection.execute("CREATE KEYSPACE IF NOT EXISTS public \
            WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

        self.connection.execute("CREATE KEYSPACE IF NOT EXISTS internal \
            WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

        self.connection.execute("CREATE TABLE IF NOT EXISTS public.users ( \
            name text PRIMARY KEY, \
            n text, \
            e text, \
            secret text);")

        self.connection.execute("CREATE TABLE IF NOT EXISTS public.contracts ( \
            id uuid PRIMARY KEY, \
            owner text, \
            package text, \
            template blob, \
            example blob);")
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def __init__(self, *args):
        self.log = logging.getLogger(resource_filename(__name__, __file__))
        self.engine = create_engine(args[0])
        self.connection = self.engine.connect()
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def __init__(self, round_number):
        self.round_number = round_number
        self.train_file_name = 'r' + str(round_number) + '_numerai_training_data.csv'
        self.test_file_name = 'r' + str(round_number) + '_numerai_tournament_data.csv'
        self.sorted_file_name = 'r' + str(round_number) + '_numerai_sorted_training_data.csv'

        if not os.path.exists(resource_filename('numerai.data', self.train_file_name)):
            raise IOError('File {} not found.'.format(self.train_file_name))

        if not os.path.exists(resource_filename('numerai.data', self.test_file_name)):
            raise IOError('File {} not found.'.format(self.test_file_name))
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def training_set(self):
        return pd.read_csv(resource_filename('numerai.data', self.train_file_name))
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def test_set(self):
        return pd.read_csv(resource_filename('numerai.data', self.test_file_name))
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def sorted_training_set(self):
        return pd.read_csv(resource_filename('numerai.data', self.sorted_file_name))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def teardown_module():
    """Delete eggs/wheels created by tests."""
    base = pkg_resources.resource_filename('wheel.test', '')
    for dist in test_distributions:
        for subdir in ('build', 'dist'):
            try:
                rmtree(os.path.join(base, dist, subdir))
            except OSError:
                pass
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def build_wheel():
    """Build wheels from test distributions."""
    for dist in test_distributions:
        pwd = os.path.abspath(os.curdir)
        distdir = pkg_resources.resource_filename('wheel.test', dist)
        os.chdir(distdir)
        try:
            sys.argv = ['', 'bdist_wheel']
            exec(compile(open('setup.py').read(), 'setup.py', 'exec'))
        finally:
            os.chdir(pwd)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def build_egg():
    """Build eggs from test distributions."""
    for dist in test_distributions:
        pwd = os.path.abspath(os.curdir)
        distdir = pkg_resources.resource_filename('wheel.test', dist)
        os.chdir(distdir)
        try:
            sys.argv = ['', 'bdist_egg']
            exec(compile(open('setup.py').read(), 'setup.py', 'exec'))
        finally:
            os.chdir(pwd)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_egg_re():
    """Make sure egg_info_re matches."""
    egg_names = open(pkg_resources.resource_filename('wheel', 'eggnames.txt'))
    for line in egg_names:
        line = line.strip()
        if not line:
            continue
        assert egg2wheel.egg_info_re.match(line), line