Python importlib 模块,util() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用importlib.util()

项目:ptm    作者:GrivIN    | 项目源码 | 文件源码
def get_factory_from_template(maintype):
    path = os.path.join(BASE_DIR, 'templates', maintype, FACTORY_FILENAME)
    if (python_version_gte(3, 5)):
        # Python 3.5 code in this block
        import importlib.util
        spec = importlib.util.spec_from_file_location(
            "{}.factory".format(maintype), path)
        foo = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(foo)
        return foo
    elif (python_version_gte(3, 0)):
        from importlib.machinery import SourceFileLoader
        foo = SourceFileLoader(
            "{}.factory".format(maintype), path).load_module()
        return foo
    else:
        # Python 2 code in this block
        import imp
        foo = imp.load_source("{}.factory".format(maintype), path)
        return foo
项目:skybeard-2    作者:LanceMaverick    | 项目源码 | 文件源码
def load_stache(stache_name, possible_dirs):
    for dir_ in possible_dirs:
        path = Path(dir_).resolve()
        python_path = path.parent
        with PythonPathContext(str(python_path)):
            stache_path = find_last_child(path) / stache_name
            stache_module = "{}.{}".format(str(find_last_child(path)), stache_name)
            module_spec = importlib.util.spec_from_file_location(
                    stache_module,
                    "{}.py".format(stache_path))

            if module_spec:
                foo = importlib.util.module_from_spec(module_spec)
                module_spec.loader.exec_module(foo)

                return

    raise Exception("No stache with name {} found".format(stache_name))
项目:bge-logic-nodes-add-on    作者:thepgi    | 项目源码 | 文件源码
def load_user_logic(module_name):
    full_path = bge.logic.expandPath("//bgelogic/cells/{}.py".format(module_name))
    loaded_value = _loaded_userlogic_files.get(full_path)
    if loaded_value: return loaded_value
    import sys
    python_version = sys.version_info
    major = python_version[0]
    minor = python_version[1]
    if (major < 3) or (major == 3 and minor < 3):
        import imp
        loaded_value = imp.load_source(module_name, full_path)
    elif (major == 3) and (minor < 5):
        from importlib.machinery import SourceFileLoader
        loaded_value = SourceFileLoader(module_name, full_path).load_module()
    else:
        import importlib.util
        spec = importlib.util.spec_from_file_location(module_name, full_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        loaded_value = module
    _loaded_userlogic_files[module_name] = loaded_value
    return loaded_value
项目:bge-logic-nodes-add-on    作者:thepgi    | 项目源码 | 文件源码
def _abs_import(module_name, full_path):
    import sys
    python_version = sys.version_info
    major = python_version[0]
    minor = python_version[1]
    if (major < 3) or (major == 3 and minor < 3):
        import imp
        return imp.load_source(module_name, full_path)
    elif (major == 3) and (minor < 5):
        from importlib.machinery import SourceFileLoader
        return SourceFileLoader(module_name, full_path).load_module()
    else:
        import importlib.util
        spec = importlib.util.spec_from_file_location(module_name, full_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        return module
    pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def forget(modname):
    """'Forget' a module was ever imported.

    This removes the module from sys.modules and deletes any PEP 3147 or
    legacy .pyc and .pyo files.
    """
    unload(modname)
    for dirname in sys.path:
        source = os.path.join(dirname, modname + '.py')
        # It doesn't matter if they exist or not, unlink all possible
        # combinations of PEP 3147 and legacy pyc and pyo files.
        unlink(source + 'c')
        unlink(source + 'o')
        unlink(importlib.util.cache_from_source(source, debug_override=True))
        unlink(importlib.util.cache_from_source(source, debug_override=False))

# Check whether a gui is actually available
项目:lazyasd    作者:xonsh    | 项目源码 | 文件源码
def run(self):
        # wait for other modules to stop being imported
        # We assume that module loading is finished when sys.modules doesn't
        # get longer in 5 consecutive 1ms waiting steps
        counter = 0
        last = -1
        while counter < 5:
            new = len(sys.modules)
            if new == last:
                counter += 1
            else:
                last = new
                counter = 0
            time.sleep(0.001)
        # now import module properly
        modname = importlib.util.resolve_name(self.name, self.package)
        if isinstance(sys.modules[modname], BackgroundModuleProxy):
            del sys.modules[modname]
        mod = importlib.import_module(self.name, package=self.package)
        for targname, varname in self.replacements.items():
            if targname in sys.modules:
                targmod = sys.modules[targname]
                setattr(targmod, varname, mod)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def find_loader(fullname):
    """Find a PEP 302 "loader" object for fullname

    This is a backwards compatibility wrapper around
    importlib.util.find_spec that converts most failures to ImportError
    and only returns the loader rather than the full spec
    """
    if fullname.startswith('.'):
        msg = "Relative module name {!r} not supported".format(fullname)
        raise ImportError(msg)
    try:
        spec = importlib.util.find_spec(fullname)
    except (ImportError, AttributeError, TypeError, ValueError) as ex:
        # This hack fixes an impedance mismatch between pkgutil and
        # importlib, where the latter raises other errors for cases where
        # pkgutil previously raised ImportError
        msg = "Error while finding loader for {!r} ({}: {})"
        raise ImportError(msg.format(fullname, type(ex), ex)) from ex
    return spec.loader if spec is not None else None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_timestamp_overflow(self):
        # A modification timestamp larger than 2**32 should not be a problem
        # when importing a module (issue #11235).
        sys.path.insert(0, os.curdir)
        try:
            source = TESTFN + ".py"
            compiled = importlib.util.cache_from_source(source)
            with open(source, 'w') as f:
                pass
            try:
                os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5))
            except OverflowError:
                self.skipTest("cannot set modification time to large integer")
            except OSError as e:
                if e.errno != getattr(errno, 'EOVERFLOW', None):
                    raise
                self.skipTest("cannot set modification time to large integer ({})".format(e))
            __import__(TESTFN)
            # The pyc file was created.
            os.stat(compiled)
        finally:
            del sys.path[0]
            remove_files(TESTFN)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def forget(modname):
    """'Forget' a module was ever imported.

    This removes the module from sys.modules and deletes any PEP 3147 or
    legacy .pyc and .pyo files.
    """
    unload(modname)
    for dirname in sys.path:
        source = os.path.join(dirname, modname + '.py')
        # It doesn't matter if they exist or not, unlink all possible
        # combinations of PEP 3147 and legacy pyc and pyo files.
        unlink(source + 'c')
        unlink(source + 'o')
        unlink(importlib.util.cache_from_source(source, debug_override=True))
        unlink(importlib.util.cache_from_source(source, debug_override=False))

# Check whether a gui is actually available
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_run_name(self):
        depth = 1
        run_name = "And now for something completely different"
        pkg_dir, mod_fname, mod_name, mod_spec = (
               self._make_pkg(example_source, depth))
        forget(mod_name)
        expected_ns = example_namespace.copy()
        expected_ns.update({
            "__name__": run_name,
            "__file__": mod_fname,
            "__cached__": importlib.util.cache_from_source(mod_fname),
            "__package__": mod_name.rpartition(".")[0],
            "__spec__": mod_spec,
        })
        def create_ns(init_globals):
            return run_module(mod_name, init_globals, run_name)
        try:
            self.check_code_execution(create_ns, expected_ns)
        finally:
            self._del_pkg(pkg_dir, depth, mod_name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _check_path_limitations(self, module_name):
        # base directory
        source_path_len = len(self.here)
        # a path separator + `longname` (twice)
        source_path_len += 2 * (len(self.longname) + 1)
        # a path separator + `module_name` + ".py"
        source_path_len += len(module_name) + 1 + len(".py")
        cached_path_len = (source_path_len +
            len(importlib.util.cache_from_source("x.py")) - len("x.py"))
        if os.name == 'nt' and cached_path_len >= 258:
            # Under Windows, the max path len is 260 including C's terminating
            # NUL character.
            # (see http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#maxpath)
            self.skipTest("test paths too long (%d characters) for Windows' 260 character limit"
                          % cached_path_len)
        elif os.name == 'nt' and verbose:
            print("cached_path_len =", cached_path_len)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def find_loader(fullname):
    """Find a PEP 302 "loader" object for fullname

    This is a backwards compatibility wrapper around
    importlib.util.find_spec that converts most failures to ImportError
    and only returns the loader rather than the full spec
    """
    if fullname.startswith('.'):
        msg = "Relative module name {!r} not supported".format(fullname)
        raise ImportError(msg)
    try:
        spec = importlib.util.find_spec(fullname)
    except (ImportError, AttributeError, TypeError, ValueError) as ex:
        # This hack fixes an impedance mismatch between pkgutil and
        # importlib, where the latter raises other errors for cases where
        # pkgutil previously raised ImportError
        msg = "Error while finding loader for {!r} ({}: {})"
        raise ImportError(msg.format(fullname, type(ex), ex)) from ex
    return spec.loader if spec is not None else None
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_timestamp_overflow(self):
        # A modification timestamp larger than 2**32 should not be a problem
        # when importing a module (issue #11235).
        sys.path.insert(0, os.curdir)
        try:
            source = TESTFN + ".py"
            compiled = importlib.util.cache_from_source(source)
            with open(source, 'w') as f:
                pass
            try:
                os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5))
            except OverflowError:
                self.skipTest("cannot set modification time to large integer")
            except OSError as e:
                if e.errno != getattr(errno, 'EOVERFLOW', None):
                    raise
                self.skipTest("cannot set modification time to large integer ({})".format(e))
            __import__(TESTFN)
            # The pyc file was created.
            os.stat(compiled)
        finally:
            del sys.path[0]
            remove_files(TESTFN)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def forget(modname):
    """'Forget' a module was ever imported.

    This removes the module from sys.modules and deletes any PEP 3147 or
    legacy .pyc and .pyo files.
    """
    unload(modname)
    for dirname in sys.path:
        source = os.path.join(dirname, modname + '.py')
        # It doesn't matter if they exist or not, unlink all possible
        # combinations of PEP 3147 and legacy pyc and pyo files.
        unlink(source + 'c')
        unlink(source + 'o')
        unlink(importlib.util.cache_from_source(source, debug_override=True))
        unlink(importlib.util.cache_from_source(source, debug_override=False))

# Check whether a gui is actually available
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_run_name(self):
        depth = 1
        run_name = "And now for something completely different"
        pkg_dir, mod_fname, mod_name, mod_spec = (
               self._make_pkg(example_source, depth))
        forget(mod_name)
        expected_ns = example_namespace.copy()
        expected_ns.update({
            "__name__": run_name,
            "__file__": mod_fname,
            "__cached__": importlib.util.cache_from_source(mod_fname),
            "__package__": mod_name.rpartition(".")[0],
            "__spec__": mod_spec,
        })
        def create_ns(init_globals):
            return run_module(mod_name, init_globals, run_name)
        try:
            self.check_code_execution(create_ns, expected_ns)
        finally:
            self._del_pkg(pkg_dir, depth, mod_name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _check_path_limitations(self, module_name):
        # base directory
        source_path_len = len(self.here)
        # a path separator + `longname` (twice)
        source_path_len += 2 * (len(self.longname) + 1)
        # a path separator + `module_name` + ".py"
        source_path_len += len(module_name) + 1 + len(".py")
        cached_path_len = (source_path_len +
            len(importlib.util.cache_from_source("x.py")) - len("x.py"))
        if os.name == 'nt' and cached_path_len >= 258:
            # Under Windows, the max path len is 260 including C's terminating
            # NUL character.
            # (see http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#maxpath)
            self.skipTest("test paths too long (%d characters) for Windows' 260 character limit"
                          % cached_path_len)
        elif os.name == 'nt' and verbose:
            print("cached_path_len =", cached_path_len)
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def import_python_module_by_filename(name, module_filename):
    """
    Import's a file as a python module, with specified name.

    Don't ask about the `name` argument, it's required.

    :param name: The name of the module to override upon imported filename.
    :param module_filename: The filename to import as a python module.
    :return: The newly imported python module.
    """

    sys.path.append(abspath(dirname(module_filename)))
    spec = importlib.util.spec_from_file_location(
        name,
        location=module_filename)
    imported_module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(imported_module)
    return imported_module
项目:protofuzz    作者:trailofbits    | 项目源码 | 文件源码
def _load_module(path):
    'Helper to load a Python file at path and return as a module'

    module_name = os.path.splitext(os.path.basename(path))[0]

    module = None
    if sys.version_info.minor < 5:
        loader = importlib.machinery.SourceFileLoader(module_name, path)
        module = loader.load_module()
    else:
        spec = importlib.util.spec_from_file_location(module_name, path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)

    return module
项目:Sverchok    作者:Sverchok    | 项目源码 | 文件源码
def find_spec(self, fullname, path, target=None):
        if fullname.startswith("svrx.nodes.script."):
            name = fullname.split(".")[-1]
            text_name = text_remap(name, reverse=True)
            if text_name in bpy.data.texts:
                return importlib.util.spec_from_loader(fullname, SvRxLoader(text_name))
            else:
                print("couldn't find file")

        elif fullname == "svrx.nodes.script":
            # load Module, right now uses real but empty module, will perhaps change
            pass
        return None
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def has_module(name):
    """
    Check if the Python module 'name' is available.

    Parameters
    ----------
    name : str
        The name of the Python module, as used in an import instruction.

    This function uses ideas from this Stack Overflow question:
    http://stackoverflow.com/questions/14050281/

    Returns
    -------
    b : bool
        True if the module exists, or False otherwise.
    """

    if sys.version_info > (3, 3):
        import importlib.util
        return importlib.util.find_spec(name) is not None
    elif sys.version_info > (2, 7, 99):
        import importlib
        return importlib.find_loader(name) is not None
    else:
        import pkgutil
        return pkgutil.find_loader(name) is not None
项目:flows    作者:mastro35    | 项目源码 | 文件源码
def load_module(cls, module_name, module_filename):
        try:
            spec = importlib.util.spec_from_file_location(module_name, module_filename)
            foo = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(foo)
        except Exception as ex:
            Global.LOGGER.warn(f"{ex}")
            Global.LOGGER.warn(f"an error occured while importing {module_name}, so the module will be skipped.")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def make_legacy_pyc(source):
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.

    The choice of .pyc or .pyo extension is done based on the __debug__ flag
    value.

    :param source: The file system path to the source file.  The source file
        does not need to exist, however the PEP 3147 pyc file must exist.
    :return: The file system path to the legacy pyc file.
    """
    pyc_file = importlib.util.cache_from_source(source)
    up_one = os.path.dirname(os.path.abspath(source))
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
    os.rename(pyc_file, legacy_pyc)
    return legacy_pyc
项目:lazyasd    作者:xonsh    | 项目源码 | 文件源码
def load_module_in_background(name, package=None, debug='DEBUG', env=None,
                              replacements=None):
    """Entry point for loading modules in background thread.

    Parameters
    ----------
    name : str
        Module name to load in background thread.
    package : str or None, optional
        Package name, has the same meaning as in importlib.import_module().
    debug : str, optional
        Debugging symbol name to look up in the environment.
    env : Mapping or None, optional
        Environment this will default to __xonsh_env__, if available, and
        os.environ otherwise.
    replacements : Mapping or None, optional
        Dictionary mapping fully qualified module names (eg foo.bar.baz) that
        import the lazily loaded moudle, with the variable name in that
        module. For example, suppose that foo.bar imports module a as b,
        this dict is then {'foo.bar': 'b'}.

    Returns
    -------
    module : ModuleType
        This is either the original module that is found in sys.modules or
        a proxy module that will block until delay attribute access until the
        module is fully loaded.
    """
    modname = importlib.util.resolve_name(name, package)
    if modname in sys.modules:
        return sys.modules[modname]
    if env is None:
        env = getattr(builtins, '__xonsh_env__', os.environ)
    if env.get(debug, None):
        mod = importlib.import_module(name, package=package)
        return mod
    proxy = sys.modules[modname] = BackgroundModuleProxy(modname)
    BackgroundModuleLoader(name, package, replacements or {})
    return proxy
项目:aioweb    作者:kreopt    | 项目源码 | 文件源码
def execute(argv, argv0, engine):
    import lib, importlib, re
    import code
    os.environ.setdefault("AIOWEB_SETTINGS_MODULE", "settings")
    from aioweb import settings
    sys.path.append(settings.BASE_DIR)
    # Initialize Orator ORM

    parser = OptionParser()
    parser.add_option("--no-input", help="ask confirmation before collecting", action="store_true", dest="silent")
    (options, args) = parser.parse_args()
    if options.silent:
        agree = 'y'
    else:
        agree = input("Do you want to overwrite your assets dir? [y/n] ")
    if agree.lower() == 'y':
        DEST_DIR = os.path.join(settings.BASE_DIR, 'public')
        if os.path.exists(DEST_DIR):
            shutil.rmtree(DEST_DIR)
        else:
            os.makedirs(DEST_DIR)

        path = os.path.join(settings.BASE_DIR, 'app', 'assets')
        if os.path.exists(path):
            recursive_overwrite(path, DEST_DIR)
        for appName in settings.APPS:
            try:
                path = os.path.join(os.path.dirname(importlib.util.find_spec(appName).origin), 'assets')
                if os.path.exists(path):
                    print("collecting %s" % path)
                    recursive_overwrite(path, DEST_DIR)
            except ImportError as exc:
                # traceback.print_exc()
                print("no such module: %s" % appName)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _get_spec(finder, name):
    """Return the finder-specific module spec."""
    # Works with legacy finders.
    try:
        find_spec = finder.find_spec
    except AttributeError:
        loader = finder.find_module(name)
        if loader is None:
            return None
        return importlib.util.spec_from_loader(name, loader)
    else:
        return find_spec(name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def read_code(stream):
    # This helper is needed in order for the PEP 302 emulation to
    # correctly handle compiled files
    import marshal

    magic = stream.read(4)
    if magic != importlib.util.MAGIC_NUMBER:
        return None

    stream.read(8) # Skip timestamp and size
    return marshal.load(stream)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def get_data(package, resource):
    """Get a resource from a package.

    This is a wrapper round the PEP 302 loader get_data API. The package
    argument should be the name of a package, in standard module format
    (foo.bar). The resource argument should be in the form of a relative
    filename, using '/' as the path separator. The parent directory name '..'
    is not allowed, and nor is a rooted name (starting with a '/').

    The function returns a binary string, which is the contents of the
    specified resource.

    For packages located in the filesystem, which have already been imported,
    this is the rough equivalent of

        d = os.path.dirname(sys.modules[package].__file__)
        data = open(os.path.join(d, resource), 'rb').read()

    If the package cannot be located or loaded, or it uses a PEP 302 loader
    which does not support get_data(), then None is returned.
    """

    spec = importlib.util.find_spec(package)
    if spec is None:
        return None
    loader = spec.loader
    if loader is None or not hasattr(loader, 'get_data'):
        return None
    # XXX needs test
    mod = (sys.modules.get(package) or
           importlib._bootstrap._SpecMethods(spec).load())
    if mod is None or not hasattr(mod, '__file__'):
        return None

    # Modify the resource name to be compatible with the loader.get_data
    # signature - an os.path format "filename" starting with the dirname of
    # the package's __file__
    parts = resource.split('/')
    parts.insert(0, os.path.dirname(mod.__file__))
    resource_name = os.path.join(*parts)
    return loader.get_data(resource_name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_cached_mode_issue_2051(self):
        # permissions of .pyc should match those of .py, regardless of mask
        mode = 0o600
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            __import__(name)
            if not os.path.exists(cached_path):
                self.fail("__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(mode))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_cached_readonly(self):
        mode = 0o400
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            __import__(name)
            if not os.path.exists(cached_path):
                self.fail("__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        expected = mode | 0o200 # Account for fix for issue #6074
        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(expected))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pyc_always_writable(self):
        # Initially read-only .pyc files on Windows used to cause problems
        # with later updates, see issue #6074 for details
        with _ready_to_import() as (name, path):
            # Write a Python file, make it read-only and import it
            with open(path, 'w') as f:
                f.write("x = 'original'\n")
            # Tweak the mtime of the source to ensure pyc gets updated later
            s = os.stat(path)
            os.utime(path, (s.st_atime, s.st_mtime-100000000))
            os.chmod(path, 0o400)
            m = __import__(name)
            self.assertEqual(m.x, 'original')
            # Change the file and then reimport it
            os.chmod(path, 0o600)
            with open(path, 'w') as f:
                f.write("x = 'rewritten'\n")
            unload(name)
            importlib.invalidate_caches()
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
            # Now delete the source file and check the pyc was rewritten
            unlink(path)
            unload(name)
            importlib.invalidate_caches()
            if __debug__:
                bytecode_only = path + "c"
            else:
                bytecode_only = path + "o"
            os.rename(importlib.util.cache_from_source(path), bytecode_only)
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_missing_source(self):
        # With PEP 3147 cache layout, removing the source but leaving the pyc
        # file does not satisfy the import.
        __import__(TESTFN)
        pyc_file = importlib.util.cache_from_source(self.source)
        self.assertTrue(os.path.exists(pyc_file))
        os.remove(self.source)
        forget(TESTFN)
        importlib.invalidate_caches()
        self.assertRaises(ImportError, __import__, TESTFN)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test___cached__(self):
        # Modules now also have an __cached__ that points to the pyc file.
        m = __import__(TESTFN)
        pyc_file = importlib.util.cache_from_source(TESTFN + '.py')
        self.assertEqual(m.__cached__, os.path.join(os.curdir, pyc_file))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_package___cached___from_pyc(self):
        # Like test___cached__ but ensuring __cached__ when imported from a
        # PEP 3147 pyc file.
        def cleanup():
            rmtree('pep3147')
            unload('pep3147.foo')
            unload('pep3147')
        os.mkdir('pep3147')
        self.addCleanup(cleanup)
        # Touch the __init__.py
        with open(os.path.join('pep3147', '__init__.py'), 'w'):
            pass
        with open(os.path.join('pep3147', 'foo.py'), 'w'):
            pass
        importlib.invalidate_caches()
        m = __import__('pep3147.foo')
        unload('pep3147.foo')
        unload('pep3147')
        importlib.invalidate_caches()
        m = __import__('pep3147.foo')
        init_pyc = importlib.util.cache_from_source(
            os.path.join('pep3147', '__init__.py'))
        self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc))
        foo_pyc = importlib.util.cache_from_source(os.path.join('pep3147', 'foo.py'))
        self.assertEqual(sys.modules['pep3147.foo'].__cached__,
                         os.path.join(os.curdir, foo_pyc))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def make_legacy_pyc(source):
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.

    The choice of .pyc or .pyo extension is done based on the __debug__ flag
    value.

    :param source: The file system path to the source file.  The source file
        does not need to exist, however the PEP 3147 pyc file must exist.
    :return: The file system path to the legacy pyc file.
    """
    pyc_file = importlib.util.cache_from_source(source)
    up_one = os.path.dirname(os.path.abspath(source))
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
    os.rename(pyc_file, legacy_pyc)
    return legacy_pyc
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _make_pkg(self, source, depth, mod_base="runpy_test",
                     *, namespace=False, parent_namespaces=False):
        # Enforce a couple of internal sanity checks on test cases
        if (namespace or parent_namespaces) and not depth:
            raise RuntimeError("Can't mark top level module as a "
                               "namespace package")
        pkg_name = "__runpy_pkg__"
        test_fname = mod_base+os.extsep+"py"
        pkg_dir = sub_dir = os.path.realpath(tempfile.mkdtemp())
        if verbose > 1: print("  Package tree in:", sub_dir)
        sys.path.insert(0, pkg_dir)
        if verbose > 1: print("  Updated sys.path:", sys.path[0])
        if depth:
            namespace_flags = [parent_namespaces] * depth
            namespace_flags[-1] = namespace
            for namespace_flag in namespace_flags:
                sub_dir = os.path.join(sub_dir, pkg_name)
                pkg_fname = self._add_pkg_dir(sub_dir, namespace_flag)
                if verbose > 1: print("  Next level in:", sub_dir)
                if verbose > 1: print("  Created:", pkg_fname)
        mod_fname = os.path.join(sub_dir, test_fname)
        mod_file = open(mod_fname, "w")
        mod_file.write(source)
        mod_file.close()
        if verbose > 1: print("  Created:", mod_fname)
        mod_name = (pkg_name+".")*depth + mod_base
        mod_spec = importlib.util.spec_from_file_location(mod_name,
                                                          mod_fname)
        return pkg_dir, mod_fname, mod_name, mod_spec
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _fix_ns_for_legacy_pyc(self, ns, alter_sys):
        char_to_add = "c" if __debug__ else "o"
        ns["__file__"] += char_to_add
        ns["__cached__"] = ns["__file__"]
        spec = ns["__spec__"]
        new_spec = importlib.util.spec_from_file_location(spec.name,
                                                          ns["__file__"])
        ns["__spec__"] = new_spec
        if alter_sys:
            ns["run_argv0"] += char_to_add
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def import_service_file(cls, file_name: str) -> ModuleType:
        cwd = os.getcwd()
        file_path = '{}/{}.py'.format(os.path.realpath(cwd), file_name)
        if file_path.endswith('.py.py'):
            file_path = file_path[:-3]
        try:
            spec = importlib.util.spec_from_file_location(file_name, file_path)  # type: Any
            service_import = importlib.util.module_from_spec(spec)
            try:
                importlib.reload(service_import)
                service_import = importlib.util.module_from_spec(spec)
            except ImportError:
                pass
            sys.path.insert(0, cwd)
            sys.path.insert(0, os.path.dirname(file_path))
            spec.loader.exec_module(service_import)
        except ImportError as e:
            if file_name.endswith('.py.py'):
                return cls.import_service_file(file_name[:-3])
            logging.getLogger('import').warning('Invalid service, unable to load service file "{}"'.format(file_name))
            raise e
        except OSError:
            if file_name.endswith('.py'):
                return cls.import_service_file(file_name[:-3])
            logging.getLogger('import').warning('Invalid service, no such service file "{}"'.format(file_name))
            sys.exit(2)
        except Exception as e:
            logging.getLogger('import').warning('Unable to load service file "{}"'.format(file_name))
            logging.getLogger('import').warning('Error: {}'.format(e))
            raise e
        return service_import
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def import_module(cls, file_name: str) -> ModuleType:
        cwd = os.getcwd()
        file_path = '{}/{}'.format(os.path.realpath(cwd), file_name)

        spec = importlib.util.spec_from_file_location(file_name, file_path)  # type: Any
        module_import = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module_import)

        return module_import
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def module_from_file_location(filename):
    """
    Reference: https://stackoverflow.com/questions/67631/
    Load and return a module object with the given file name.
    """
    spec = importlib.util.spec_from_file_location('__tmp__', filename)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)

    return module
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def import_recipe(module, filename):
            spec = importlib.util.spec_from_file_location(module, filename)
            mod = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(mod)
            return mod
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _get_spec(finder, name):
    """Return the finder-specific module spec."""
    # Works with legacy finders.
    try:
        find_spec = finder.find_spec
    except AttributeError:
        loader = finder.find_module(name)
        if loader is None:
            return None
        return importlib.util.spec_from_loader(name, loader)
    else:
        return find_spec(name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def read_code(stream):
    # This helper is needed in order for the PEP 302 emulation to
    # correctly handle compiled files
    import marshal

    magic = stream.read(4)
    if magic != importlib.util.MAGIC_NUMBER:
        return None

    stream.read(8) # Skip timestamp and size
    return marshal.load(stream)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def get_data(package, resource):
    """Get a resource from a package.

    This is a wrapper round the PEP 302 loader get_data API. The package
    argument should be the name of a package, in standard module format
    (foo.bar). The resource argument should be in the form of a relative
    filename, using '/' as the path separator. The parent directory name '..'
    is not allowed, and nor is a rooted name (starting with a '/').

    The function returns a binary string, which is the contents of the
    specified resource.

    For packages located in the filesystem, which have already been imported,
    this is the rough equivalent of

        d = os.path.dirname(sys.modules[package].__file__)
        data = open(os.path.join(d, resource), 'rb').read()

    If the package cannot be located or loaded, or it uses a PEP 302 loader
    which does not support get_data(), then None is returned.
    """

    spec = importlib.util.find_spec(package)
    if spec is None:
        return None
    loader = spec.loader
    if loader is None or not hasattr(loader, 'get_data'):
        return None
    # XXX needs test
    mod = (sys.modules.get(package) or
           importlib._bootstrap._SpecMethods(spec).load())
    if mod is None or not hasattr(mod, '__file__'):
        return None

    # Modify the resource name to be compatible with the loader.get_data
    # signature - an os.path format "filename" starting with the dirname of
    # the package's __file__
    parts = resource.split('/')
    parts.insert(0, os.path.dirname(mod.__file__))
    resource_name = os.path.join(*parts)
    return loader.get_data(resource_name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_cached_mode_issue_2051(self):
        # permissions of .pyc should match those of .py, regardless of mask
        mode = 0o600
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            __import__(name)
            if not os.path.exists(cached_path):
                self.fail("__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(mode))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_cached_readonly(self):
        mode = 0o400
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            __import__(name)
            if not os.path.exists(cached_path):
                self.fail("__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        expected = mode | 0o200 # Account for fix for issue #6074
        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(expected))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pyc_always_writable(self):
        # Initially read-only .pyc files on Windows used to cause problems
        # with later updates, see issue #6074 for details
        with _ready_to_import() as (name, path):
            # Write a Python file, make it read-only and import it
            with open(path, 'w') as f:
                f.write("x = 'original'\n")
            # Tweak the mtime of the source to ensure pyc gets updated later
            s = os.stat(path)
            os.utime(path, (s.st_atime, s.st_mtime-100000000))
            os.chmod(path, 0o400)
            m = __import__(name)
            self.assertEqual(m.x, 'original')
            # Change the file and then reimport it
            os.chmod(path, 0o600)
            with open(path, 'w') as f:
                f.write("x = 'rewritten'\n")
            unload(name)
            importlib.invalidate_caches()
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
            # Now delete the source file and check the pyc was rewritten
            unlink(path)
            unload(name)
            importlib.invalidate_caches()
            if __debug__:
                bytecode_only = path + "c"
            else:
                bytecode_only = path + "o"
            os.rename(importlib.util.cache_from_source(path), bytecode_only)
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_missing_source(self):
        # With PEP 3147 cache layout, removing the source but leaving the pyc
        # file does not satisfy the import.
        __import__(TESTFN)
        pyc_file = importlib.util.cache_from_source(self.source)
        self.assertTrue(os.path.exists(pyc_file))
        os.remove(self.source)
        forget(TESTFN)
        importlib.invalidate_caches()
        self.assertRaises(ImportError, __import__, TESTFN)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test___cached__(self):
        # Modules now also have an __cached__ that points to the pyc file.
        m = __import__(TESTFN)
        pyc_file = importlib.util.cache_from_source(TESTFN + '.py')
        self.assertEqual(m.__cached__, os.path.join(os.curdir, pyc_file))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_package___cached___from_pyc(self):
        # Like test___cached__ but ensuring __cached__ when imported from a
        # PEP 3147 pyc file.
        def cleanup():
            rmtree('pep3147')
            unload('pep3147.foo')
            unload('pep3147')
        os.mkdir('pep3147')
        self.addCleanup(cleanup)
        # Touch the __init__.py
        with open(os.path.join('pep3147', '__init__.py'), 'w'):
            pass
        with open(os.path.join('pep3147', 'foo.py'), 'w'):
            pass
        importlib.invalidate_caches()
        m = __import__('pep3147.foo')
        unload('pep3147.foo')
        unload('pep3147')
        importlib.invalidate_caches()
        m = __import__('pep3147.foo')
        init_pyc = importlib.util.cache_from_source(
            os.path.join('pep3147', '__init__.py'))
        self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc))
        foo_pyc = importlib.util.cache_from_source(os.path.join('pep3147', 'foo.py'))
        self.assertEqual(sys.modules['pep3147.foo'].__cached__,
                         os.path.join(os.curdir, foo_pyc))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def make_legacy_pyc(source):
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.

    The choice of .pyc or .pyo extension is done based on the __debug__ flag
    value.

    :param source: The file system path to the source file.  The source file
        does not need to exist, however the PEP 3147 pyc file must exist.
    :return: The file system path to the legacy pyc file.
    """
    pyc_file = importlib.util.cache_from_source(source)
    up_one = os.path.dirname(os.path.abspath(source))
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
    os.rename(pyc_file, legacy_pyc)
    return legacy_pyc