Python pkgutil 模块,get_data() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pkgutil.get_data()

项目:rci    作者:seecloud    | 项目源码 | 文件源码
def http_handler(self, request):
        if request.path.endswith("api.sock"):
            return await self.ws_handler(request)
        if request.path.endswith("/monitor/"):
            data = pkgutil.get_data("rci.services.monitor",
                                    "monitor.html").decode("utf8")
            return web.Response(text=data, content_type="text/html")
        if request.path.endswith("/login/github"):
            if request.method == "POST":
                url = self.oauth.generate_request_url(("read:org", ))
                return web.HTTPFound(url)
        if request.path.endswith("/oauth2/github"):
            return (await self._oauth2_handler(request))
        if request.path.endswith("logout"):
            if request.method == "POST":
                sid = request.cookies.get(self.config["cookie_name"])
                del(self.sessions[sid])
                return web.HTTPFound("/monitor/")
        return web.HTTPNotFound()
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _monkey_patch_httplib2(extract_dir):
  """Patch things so that httplib2 works properly in a PAR.

  Manually extract certificates to file to make OpenSSL happy and avoid error:
     ssl.SSLError: [Errno 185090050] _ssl.c:344: error:0B084002:x509 ...

  Args:
    extract_dir: the directory into which we extract the necessary files.
  """
  if os.path.isfile(httplib2.CA_CERTS):
    # Not inside of a PAR file, so don't bother.
    return
  cacerts_contents = pkgutil.get_data('httplib2', 'cacerts.txt')
  cacerts_filename = os.path.join(extract_dir, 'cacerts.txt')
  with open(cacerts_filename, 'wb') as f:
    f.write(cacerts_contents)
  httplib2.CA_CERTS = cacerts_filename
项目:pando-core    作者:DLR-RY    | 项目源码 | 文件源码
def get_filename(package, resource):
    """Rewrite of pkgutil.get_data() that return the file path.
    """
    loader = pkgutil.get_loader(package)
    if loader is None or not hasattr(loader, 'get_data'):
        return None
    mod = sys.modules.get(package) or loader.load_module(package)
    if mod is None or not hasattr(mod, '__file__'):
        return None

    # Modify the resource name to be compatible with the loader.get_data
    # signature - an os.path format "filename" starting with the dirname of
    # the package's __file__
    parts = resource.split('/')
    parts.insert(0, os.path.dirname(mod.__file__))
    resource_name = os.path.normpath(os.path.join(*parts))

    return resource_name
项目:gnuplot_kernel    作者:has2k1    | 项目源码 | 文件源码
def install_kernel_resources(destination,
                             resource='gnuplot_kernel',
                             files=None):
    """
    Copy the resource files to the kernelspec folder.
    """
    if files is None:
        files = ['logo-64x64.png', 'logo-32x32.png']

    for filename in files:
        try:
            data = pkgutil.get_data(resource,
                                    os.path.join('images', filename))
            with open(os.path.join(destination, filename), 'wb') as fp:
                fp.write(data)
        except Exception as e:
            sys.stderr.write(str(e))
项目:pdfminify    作者:johndoe31415    | 项目源码 | 文件源码
def _generate_form(self):
        font_xref = self._get_font_reference()
        seal_template = PDFTemplate(pkgutil.get_data("llpdf.resources", "seal.pdft"))
        seal_xref = seal_template.merge_into_pdf(self._pdf)["SealObject"]

        sign_template = PDFTemplate(pkgutil.get_data("llpdf.resources", "sign_form.pdft"))
        sign_template["FontXRef"] = font_xref
        sign_template["SealFormXRef"] = seal_xref
        signform_xref = sign_template.merge_into_pdf(self._pdf)["SignFormObject"]

        signform = self._pdf.lookup(signform_xref)
        signform.content[PDFName("/BBox")] = self._get_signature_bbox()
        signform_data = signform.stream.decode()

        (posx, posy, width, height) = self._get_signature_bbox()
        signform_vars = {
            "WIDTH":        b"%.0f" % (width - 1),
            "HEIGHT":       b"%.0f" % (height - 1),
            "TEXT":         self._get_signing_text(),
        }
        for (varname, replacement) in signform_vars.items():
            key = ("${" + varname + "}").encode("ascii")
            signform_data = signform_data.replace(key, replacement)
        signform.set_stream(EncodedObject.create(signform_data, compress = True))
        return signform_xref
项目:buildhub    作者:mozilla-services    | 项目源码 | 文件源码
def initialize_kinto(loop, kinto_client, bucket, collection):
    """
    Initialize the remote server with the initialization.yml file.
    """
    # Leverage kinto-wizard async client.
    thread_pool = ThreadPoolExecutor()
    async_client = AsyncKintoClient(kinto_client, loop, thread_pool)

    initialization_manifest = pkgutil.get_data('buildhub', 'initialization.yml')
    config = yaml.safe_load(initialization_manifest)

    # Check that we push the records at the right place.
    if bucket not in config:
        raise ValueError(f"Bucket '{bucket}' not specified in `initialization.yml`.")
    if collection not in config[bucket]['collections']:
        raise ValueError(f"Collection '{collection}' not specified in `initialization.yml`.")

    await initialize_server(async_client,
                            config,
                            bucket=bucket,
                            collection=collection,
                            force=False)
项目:trivup    作者:edenhill    | 项目源码 | 文件源码
def create_file_from_template (self, relpath, unique=False, template_name=None, append_data=None, subst=True, pathtype='temp'):
        """ Create file from app template using app's conf dict.
            If subst=False no template operations will be performed and the file is copied verbatim. """
        if not template_name:
            tname = template_name = os.path.basename(relpath)
        else:
            tname = template_name

        # Try pkgutil resource locator
        tpath = os.path.join('apps', self.__class__.__name__,
                             tname + '.template')
        filedata = pkgutil.get_data('trivup', tpath)
        if filedata is None:
            raise FileNotFoundError('Class %s resource %s not found' %
                                    ('trivup', tpath))

        if subst:
            rendered = Template(filedata.decode('ascii')).substitute(self.conf)
        else:
            rendered = filedata.decode('ascii')
        if append_data is not None:
            rendered += '\n' + append_data
        return self.create_file(relpath, unique, data=rendered, pathtype=pathtype)
项目:python-smeftrunner    作者:DsixTools    | 项目源码 | 文件源码
def test_load(self):
        sm = pkgutil.get_data('smeftrunner', 'tests/data/SMInput-CPV.dat').decode('utf-8')
        wc = pkgutil.get_data('smeftrunner', 'tests/data/WCsInput-CPV-SMEFT.dat').decode('utf-8')
        wcout = pkgutil.get_data('smeftrunner', 'tests/data/Output_SMEFTrunner.dat').decode('utf-8')
        io.sm_lha2dict(pylha.load(sm))
        io.wc_lha2dict(pylha.load(wc))
        CSM = io.sm_lha2dict(pylha.load(wcout))
        C = io.wc_lha2dict(pylha.load(wcout))
        C2 = io.wc_lha2dict(io.wc_dict2lha(C))
        for k in C:
            npt.assert_array_equal(C[k], C2[k])
        smeft = SMEFT()
        smeft.load_initial((wcout,))
        for k in C:
            npt.assert_array_equal(definitions.symmetrize(C)[k], smeft.C_in[k], err_msg="Failed for {}".format(k))
        for k in CSM:
            npt.assert_array_equal(definitions.symmetrize(CSM)[k], smeft.C_in[k], err_msg="Failed for {}".format(k))
        CSM2 = io.sm_lha2dict(io.sm_dict2lha(CSM))
        for k in CSM:
            npt.assert_array_equal(CSM[k], CSM2[k], err_msg="Failed for {}".format(k))
项目:onering    作者:panyam    | 项目源码 | 文件源码
def get_source(self, environment, template_name):
        final_path = template_name
        if not template_name.startswith("/"):
            for tdir in self.template_dirs:
                full_path = os.path.join(tdir, template_name)
                if os.path.isfile(full_path):
                    final_path = full_path
                    break
                else:
                    full_path = os.path.join(tdir, template_name + self.template_extension)
                    if os.path.isfile(full_path):
                        final_path = full_path
                        break
            else:
                # See if parent can return it
                if self.parent_loader:
                    return self.parent_loader.get_source(environment, template_name)
                else:
                    source = pkgutil.get_data("onering", "data/templates/" + template_name).decode('utf-8')
                    return source, final_path, lambda: True

        with file(final_path) as f:
            source = f.read().decode('utf-8')
        return source, final_path, lambda: mtime == getmtime(final_path)
项目:onering    作者:panyam    | 项目源码 | 文件源码
def get_source(self, environment, template_name):
        final_path = template_name
        if not template_name.startswith("/"):
            for tdir in self.template_dirs:
                full_path = os.path.join(tdir, template_name)
                if os.path.isfile(full_path):
                    final_path = full_path
                    break
                else:
                    full_path = os.path.join(tdir, template_name + self.template_extension)
                    if os.path.isfile(full_path):
                        final_path = full_path
                        break
            else:
                # See if parent can return it
                if self.parent_loader:
                    return self.parent_loader.get_source(environment, template_name)
                else:
                    source = pkgutil.get_data("onering", "data/templates/" + template_name).decode('utf-8')
                    return source, final_path, lambda: True

        with file(final_path) as f:
            source = f.read().decode('utf-8')
        return source, final_path, lambda: mtime == getmtime(final_path)
项目:httpolice    作者:vfaronov    | 项目源码 | 文件源码
def __init__(self, cls, name):
        self.cls = cls
        self.name = name
        self.filename = '%s.csv' % self.name
        self.keys_by_name = {}
        self.raw_by_key = {}
        self.processed_by_key = {}

        data = pkgutil.get_data(__name__, self.filename)
        buf = io.StringIO(data.decode('ascii'), newline=u'')
        reader = csv.DictReader(buf, lineterminator=u'\n')
        self.fieldnames = reader.fieldnames
        for raw in reader:
            key = self.cls(raw['key'])
            assert key not in self.raw_by_key
            self.raw_by_key[key] = raw
            name = self.name_from_raw(key, raw)
            assert name not in self.keys_by_name
            self.keys_by_name[name] = key

        self.accessor = KnowledgeAccessor(self)
项目:httpolice    作者:vfaronov    | 项目源码 | 文件源码
def _load_notices():
    lookup = lxml.etree.ElementNamespaceClassLookup()
    parser = lxml.etree.XMLParser()
    parser.set_element_class_lookup(lookup)
    ns = lookup.get_namespace(None)
    for severity in Severity:
        ns[severity.name] = Notice
    ns['title'] = Title
    ns['explain'] = Paragraph
    ns['exception'] = ExceptionDetails
    ns['var'] = Var
    ns['ref'] = Ref
    ns['cite'] = Cite
    ns['rfc'] = CiteRFC
    for tag in known_map:
        ns[tag] = Known
    notices_xml = pkgutil.get_data('httpolice', 'notices.xml')
    root = lxml.etree.fromstring(notices_xml, parser)
    r = {}
    for elem in root:
        if isinstance(elem, Notice):
            assert elem.id not in r
            r[elem.id] = elem
    return r, parser
项目:oh-my-ladder    作者:JamesPan    | 项目源码 | 文件源码
def reduce_domains(domains):
    # reduce 'www.google.com' to 'google.com'
    # remove invalid domains
    tld_content = pkgutil.get_data('gfwlist2pac', 'resources/tld.txt')
    tlds = set(tld_content.splitlines(False))
    new_domains = set()
    for domain in domains:
        domain_parts = domain.split('.')
        last_root_domain = None
        for i in xrange(0, len(domain_parts)):
            root_domain = '.'.join(domain_parts[len(domain_parts) - i - 1:])
            if i == 0:
                if not tlds.__contains__(root_domain):
                    # root_domain is not a valid tld
                    break
            last_root_domain = root_domain
            if tlds.__contains__(root_domain):
                continue
            else:
                break
        if last_root_domain is not None:
            new_domains.add(last_root_domain)
    return new_domains
项目:oh-my-ladder    作者:JamesPan    | 项目源码 | 文件源码
def generate_pac_precise(rules, proxy):
    def grep_rule(rule):
        if rule:
            if rule.startswith('!'):
                return None
            if rule.startswith('['):
                return None
            return rule
        return None
    # render the pac file
    proxy_content = pkgutil.get_data('gfwlist2pac', 'resources/abp.js')
    rules = filter(grep_rule, rules)
    proxy_content = proxy_content.replace('__PROXY__', json.dumps(str(proxy)))
    proxy_content = proxy_content.replace('__RULES__',
                                          json.dumps(rules, indent=2))
    return proxy_content
项目:pymlask    作者:ikegami-yukino    | 项目源码 | 文件源码
def _read_emodic(self):
        """ Load emotion dictionaries """

        self.emodic = {'emotem': {}, 'emotion': {}}

        # Reading dictionaries of syntactical indicator of emotiveness
        emotemy = ('interjections', 'exclamation', 'vulgar', 'endearments', 'emotikony', 'gitaigo')
        for emotem_class in emotemy:
            data = pkgutil.get_data('mlask',
                                    os.path.join('emotemes', '%s_uncoded.txt') % emotem_class)
            phrases = data.decode('utf8').splitlines()
            self.emodic['emotem'][emotem_class] = phrases

        # Reading dictionaries of emotion
        emotions = ('aware', 'haji', 'ikari', 'iya', 'kowa', 'odoroki', 'suki', 'takaburi', 'yasu', 'yorokobi')
        for emotion_class in emotions:
            data = pkgutil.get_data('mlask',
                                    os.path.join('emotions', '%s_uncoded.txt') % emotion_class)
            phrases = data.decode('utf8').splitlines()
            self.emodic['emotion'][emotion_class] = phrases
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def has_sorted_training_set(self):
        try:
            pkgutil.get_data('numerai.data', 'r' + str(self.round_number) + '_numerai_sorted_training_data.csv')
            return True
        except IOError:
            return False
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, _ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def _http_settings(self, request):
        import jinja2
        template = jinja2.Template(
                pkgutil.get_data("rci.services.github", "github_settings.html").decode("utf8"))
        client = self._get_client(request)
        if client is None:
            return web.HTTPUnauthorized(text="fail")
        orgs = []
        for org in (await client.get("user/orgs")):
            orgs.append(org)
        return web.Response(text=template.render(orgs=orgs), content_type="text/html")
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def pkgdata(name):
    data = pkgutil.get_data("pythonwhois", name)
    if sys.version_info < (3, 0):
        return data
    else:
        return data.decode("utf-8")
项目:onedrive-e    作者:tobecontinued    | 项目源码 | 文件源码
def get_content(file_name, pkg_name='onedrivee', is_text=True):
    """
    Read a resource file in data/.
    :param str file_name:
    :param str pkg_name:
    :param True | False is_text: True to indicate the text is UTF-8 encoded.
    :return str | bytes: Content of the file.
    """
    content = pkgutil.get_data(pkg_name, 'store/' + file_name)
    if is_text:
        content = content.decode('utf-8')
    return content
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def pkgdata(name):
    data = pkgutil.get_data("pythonwhois", name)
    if sys.version_info < (3, 0):
        return data
    else:
        return data.decode("utf-8")
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:flake8-html    作者:lordmauve    | 项目源码 | 文件源码
def write_images(self):
        """Write the SVG images."""
        for path in ('file.svg', 'back.svg'):
            source = pkgutil.get_data('flake8_html', 'images/' + path)
            outpath = os.path.join(self.outdir, path)
            with open(outpath, 'wb') as f:
                f.write(source)
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:subpar    作者:google    | 项目源码 | 文件源码
def lib():
    print('In dir_shadowing_lib.py lib()')
    # Test resource extraction
    lib_dat = pkgutil.get_data('test_dir_shadowing',
                               'dir_shadowing_lib_dat.txt')
    assert (lib_dat == b'Dummy data file for dir_shadowing_lib.py\n'), lib_dat
项目:subpar    作者:google    | 项目源码 | 文件源码
def main():
    print('In dir_shadowing_main.py main()')
    dir_shadowing_lib.lib()
    # Test resource extraction
    dat = pkgutil.get_data('test_dir_shadowing', 'dir_shadowing_main_dat.txt')
    assert (dat == b'Dummy data file for dir_shadowing_main.py\n'), dat
项目:subpar    作者:google    | 项目源码 | 文件源码
def main():
    a.main()
    a_lib.lib()
    print('In b.py main()')
    # Test resource extraction
    b_dat = pkgutil.get_data('subpar.tests.package_b', 'b_dat.txt')
    assert (b_dat == b'Dummy data file for b.py\n'), b_dat
项目:subpar    作者:google    | 项目源码 | 文件源码
def lib():
    print('In a_lib.py lib()')
    # Test resource extraction
    a_lib_dat = pkgutil.get_data('subpar.tests.package_a', 'a_lib_dat.txt')
    assert (a_lib_dat == b'Dummy data file for a_lib.py\n'), a_lib_dat
项目:subpar    作者:google    | 项目源码 | 文件源码
def main():
    print('In a.py main()')
    # Test resource extraction
    a_dat = pkgutil.get_data('subpar.tests.package_a', 'a_dat.txt')
    assert (a_dat == b'Dummy data file for a.py\n'), a_dat
项目:Orator-Google-App-Engine    作者:MakarenaLabs    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, _ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:logrotated    作者:nir0s    | 项目源码 | 文件源码
def _generate_from_template(destination, **params):
    template = pkgutil.get_data(__name__, os.path.join(
        'resources', 'logrotate'))

    pretty_params = json.dumps(params, indent=4, sort_keys=True)
    lgr.debug('Rendering logrotate with params: {0}...'.format(pretty_params))
    generated = jinja2.Environment().from_string(template).render(**params)
    lgr.debug('Writing generated file to {0}...'.format(destination))
    with open(destination, 'w') as f:
        f.write(generated)
项目:flowder    作者:amir-khakshour    | 项目源码 | 文件源码
def get_default_config(self):
        try:
            return get_data(__package__, 'default_{section}.conf'.format(section=self.SECTION))
        except IOError:
            pass
项目:SimpleFF    作者:seanyeh    | 项目源码 | 文件源码
def _gen_ffbinary(self, ffname):
        bin_data = pkgutil.get_data("bin", ffname)

        temp = tempfile.NamedTemporaryFile(delete=False)
        temp.write(bin_data)
        temp.close()

        # chmod +x
        os.chmod(temp.name, os.stat(temp.name).st_mode | stat.S_IEXEC)

        return temp
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getdata_filesys(self):
        pkg = 'test_getdata_filesys'

        # Include a LF and a CRLF, to test that binary data is read back
        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'

        # Make a package with some resources
        package_dir = os.path.join(self.dirname, pkg)
        os.mkdir(package_dir)
        # Empty init.py
        f = open(os.path.join(package_dir, '__init__.py'), "wb")
        f.close()
        # Resource files, res.txt, sub/res.txt
        f = open(os.path.join(package_dir, 'res.txt'), "wb")
        f.write(RESOURCE_DATA)
        f.close()
        os.mkdir(os.path.join(package_dir, 'sub'))
        f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb")
        f.write(RESOURCE_DATA)
        f.close()

        # Check we can read the resources
        res1 = pkgutil.get_data(pkg, 'res.txt')
        self.assertEqual(res1, RESOURCE_DATA)
        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
        self.assertEqual(res2, RESOURCE_DATA)

        del sys.modules[pkg]
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getdata_zipfile(self):
        zip = 'test_getdata_zipfile.zip'
        pkg = 'test_getdata_zipfile'

        # Include a LF and a CRLF, to test that binary data is read back
        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'

        # Make a package with some resources
        zip_file = os.path.join(self.dirname, zip)
        z = zipfile.ZipFile(zip_file, 'w')

        # Empty init.py
        z.writestr(pkg + '/__init__.py', "")
        # Resource files, res.txt, sub/res.txt
        z.writestr(pkg + '/res.txt', RESOURCE_DATA)
        z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA)
        z.close()

        # Check we can read the resources
        sys.path.insert(0, zip_file)
        res1 = pkgutil.get_data(pkg, 'res.txt')
        self.assertEqual(res1, RESOURCE_DATA)
        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
        self.assertEqual(res2, RESOURCE_DATA)

        names = []
        for loader, name, ispkg in pkgutil.iter_modules([zip_file]):
            names.append(name)
        self.assertEqual(names, ['test_getdata_zipfile'])

        del sys.path[0]

        del sys.modules[pkg]
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_data(self, path):
            return "Hello, world!"
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getdata_pep302(self):
        # Use a dummy importer/loader
        self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!")
        del sys.modules['foo']
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_alreadyloaded(self):
        # Ensure that get_data works without reloading - the "loads" module
        # variable in the example loader should count how many times a reload
        # occurs.
        import foo
        self.assertEqual(foo.loads, 1)
        self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!")
        self.assertEqual(foo.loads, 1)
        del sys.modules['foo']
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def get_version_file():
    """
    Looks for a file VERSION in the package data and returns the contents in
    this. Does not check consistency.
    """
    return pkgutil.get_data('kolibri', 'VERSION').decode('utf-8')
项目:physt    作者:janpipek    | 项目源码 | 文件源码
def load_dataset(name):
    """Load example dataset.

    If seaborn is present, its datasets can be loaded.
    Physt also includes some datasets in CSV format.

    Parameters
    ----------
    name : str

    Returns
    -------
    dataset : pandas.DataFrame
    """
    # Our custom datasets:
    try:
        try:
            import pandas as pd
        except ImportError:
            raise RuntimeError("Pandas not installed.")
        import pkgutil
        import io
        binary_data = pkgutil.get_data('physt', 'examples/{0}.csv'.format(name))
        return pd.read_csv(io.BytesIO(binary_data))
    except FileNotFoundError:
        pass

    # Seaborn datasets?
    try:
        import seaborn as sns
        import warnings
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            if name in sns.get_dataset_names():
                    return sns.load_dataset(name)
    except ImportError:
        pass

    # Fall through
    raise RuntimeError("Dataset {0} not available.".format(name))
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def getzoneinfofile_stream():
    try:
        return BytesIO(get_data(__name__, ZONEFILENAME))
    except IOError as e:  # TODO  switch to FileNotFoundError?
        warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror))
        return None
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def get_resource(rel_path, pkg_name='onedrived', is_text=True):
    """
    Read a resource file in data/.
    :param str rel_path:
    :param str pkg_name:
    :param True | False is_text: True to indicate the text is UTF-8 encoded.
    :return str | bytes: Content of the file.
    """
    content = pkgutil.get_data(pkg_name, rel_path)
    if is_text:
        content = content.decode('utf-8')
    return content
项目:osp-api    作者:opensyllabus    | 项目源码 | 文件源码
def load_json_mock(mock_name: str) -> object:
    """Load a JSON mock from package data.

    Arguments:
        mock_name:

    Returns:
        ...

    """

    mock_path = ('osp_api', 'mocks/%s.json' % mock_name)
    mock_obj = json.loads(pkgutil.get_data(*mock_path).decode('utf-8'))
    return mock_obj
项目:keyview    作者:ALSchwalm    | 项目源码 | 文件源码
def try_load_pkg_data(name):
    try:
        return pkgutil.get_data(__name__, name)
    except ValueError:
        return open(name, "rb").read()
项目:bridgy    作者:wagoodman    | 项目源码 | 文件源码
def config_template_contents(self): 
        return pkgutil.get_data('bridgy', 'config/samples/' + self.config_template_path)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_getdata_filesys(self):
        pkg = 'test_getdata_filesys'

        # Include a LF and a CRLF, to test that binary data is read back
        RESOURCE_DATA = 'Hello, world!\nSecond line\r\nThird line'

        # Make a package with some resources
        package_dir = os.path.join(self.dirname, pkg)
        os.mkdir(package_dir)
        # Empty init.py
        f = open(os.path.join(package_dir, '__init__.py'), "wb")
        f.close()
        # Resource files, res.txt, sub/res.txt
        f = open(os.path.join(package_dir, 'res.txt'), "wb")
        f.write(RESOURCE_DATA)
        f.close()
        os.mkdir(os.path.join(package_dir, 'sub'))
        f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb")
        f.write(RESOURCE_DATA)
        f.close()

        # Check we can read the resources
        res1 = pkgutil.get_data(pkg, 'res.txt')
        self.assertEqual(res1, RESOURCE_DATA)
        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
        self.assertEqual(res2, RESOURCE_DATA)

        del sys.modules[pkg]
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_getdata_zipfile(self):
        zip = 'test_getdata_zipfile.zip'
        pkg = 'test_getdata_zipfile'

        # Include a LF and a CRLF, to test that binary data is read back
        RESOURCE_DATA = 'Hello, world!\nSecond line\r\nThird line'

        # Make a package with some resources
        zip_file = os.path.join(self.dirname, zip)
        z = zipfile.ZipFile(zip_file, 'w')

        # Empty init.py
        z.writestr(pkg + '/__init__.py', "")
        # Resource files, res.txt, sub/res.txt
        z.writestr(pkg + '/res.txt', RESOURCE_DATA)
        z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA)
        z.close()

        # Check we can read the resources
        sys.path.insert(0, zip_file)
        res1 = pkgutil.get_data(pkg, 'res.txt')
        self.assertEqual(res1, RESOURCE_DATA)
        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
        self.assertEqual(res2, RESOURCE_DATA)
        del sys.path[0]

        del sys.modules[pkg]