Python importlib 模块,util() 实例源码


项目:ptm    作者:GrivIN    | 项目源码 | 文件源码
def get_factory_from_template(maintype):
    path = os.path.join(BASE_DIR, 'templates', maintype, FACTORY_FILENAME)
    if (python_version_gte(3, 5)):
        # Python 3.5 code in this block
        import importlib.util
        spec = importlib.util.spec_from_file_location(
            "{}.factory".format(maintype), path)
        foo = importlib.util.module_from_spec(spec)
        return foo
    elif (python_version_gte(3, 0)):
        from importlib.machinery import SourceFileLoader
        foo = SourceFileLoader(
            "{}.factory".format(maintype), path).load_module()
        return foo
        # Python 2 code in this block
        import imp
        foo = imp.load_source("{}.factory".format(maintype), path)
        return foo
项目:skybeard-2    作者:LanceMaverick    | 项目源码 | 文件源码
def load_stache(stache_name, possible_dirs):
    for dir_ in possible_dirs:
        path = Path(dir_).resolve()
        python_path = path.parent
        with PythonPathContext(str(python_path)):
            stache_path = find_last_child(path) / stache_name
            stache_module = "{}.{}".format(str(find_last_child(path)), stache_name)
            module_spec = importlib.util.spec_from_file_location(

            if module_spec:
                foo = importlib.util.module_from_spec(module_spec)


    raise Exception("No stache with name {} found".format(stache_name))
项目:bge-logic-nodes-add-on    作者:thepgi    | 项目源码 | 文件源码
def load_user_logic(module_name):
    full_path = bge.logic.expandPath("//bgelogic/cells/{}.py".format(module_name))
    loaded_value = _loaded_userlogic_files.get(full_path)
    if loaded_value: return loaded_value
    import sys
    python_version = sys.version_info
    major = python_version[0]
    minor = python_version[1]
    if (major < 3) or (major == 3 and minor < 3):
        import imp
        loaded_value = imp.load_source(module_name, full_path)
    elif (major == 3) and (minor < 5):
        from importlib.machinery import SourceFileLoader
        loaded_value = SourceFileLoader(module_name, full_path).load_module()
        import importlib.util
        spec = importlib.util.spec_from_file_location(module_name, full_path)
        module = importlib.util.module_from_spec(spec)
        loaded_value = module
    _loaded_userlogic_files[module_name] = loaded_value
    return loaded_value
项目:bge-logic-nodes-add-on    作者:thepgi    | 项目源码 | 文件源码
def _abs_import(module_name, full_path):
    import sys
    python_version = sys.version_info
    major = python_version[0]
    minor = python_version[1]
    if (major < 3) or (major == 3 and minor < 3):
        import imp
        return imp.load_source(module_name, full_path)
    elif (major == 3) and (minor < 5):
        from importlib.machinery import SourceFileLoader
        return SourceFileLoader(module_name, full_path).load_module()
        import importlib.util
        spec = importlib.util.spec_from_file_location(module_name, full_path)
        module = importlib.util.module_from_spec(spec)
        return module
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def forget(modname):
    """'Forget' a module was ever imported.

    This removes the module from sys.modules and deletes any PEP 3147 or
    legacy .pyc and .pyo files.
    for dirname in sys.path:
        source = os.path.join(dirname, modname + '.py')
        # It doesn't matter if they exist or not, unlink all possible
        # combinations of PEP 3147 and legacy pyc and pyo files.
        unlink(source + 'c')
        unlink(source + 'o')
        unlink(importlib.util.cache_from_source(source, debug_override=True))
        unlink(importlib.util.cache_from_source(source, debug_override=False))

# Check whether a gui is actually available
项目:lazyasd    作者:xonsh    | 项目源码 | 文件源码
def run(self):
        # wait for other modules to stop being imported
        # We assume that module loading is finished when sys.modules doesn't
        # get longer in 5 consecutive 1ms waiting steps
        counter = 0
        last = -1
        while counter < 5:
            new = len(sys.modules)
            if new == last:
                counter += 1
                last = new
                counter = 0
        # now import module properly
        modname = importlib.util.resolve_name(, self.package)
        if isinstance(sys.modules[modname], BackgroundModuleProxy):
            del sys.modules[modname]
        mod = importlib.import_module(, package=self.package)
        for targname, varname in self.replacements.items():
            if targname in sys.modules:
                targmod = sys.modules[targname]
                setattr(targmod, varname, mod)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def find_loader(fullname):
    """Find a PEP 302 "loader" object for fullname

    This is a backwards compatibility wrapper around
    importlib.util.find_spec that converts most failures to ImportError
    and only returns the loader rather than the full spec
    if fullname.startswith('.'):
        msg = "Relative module name {!r} not supported".format(fullname)
        raise ImportError(msg)
        spec = importlib.util.find_spec(fullname)
    except (ImportError, AttributeError, TypeError, ValueError) as ex:
        # This hack fixes an impedance mismatch between pkgutil and
        # importlib, where the latter raises other errors for cases where
        # pkgutil previously raised ImportError
        msg = "Error while finding loader for {!r} ({}: {})"
        raise ImportError(msg.format(fullname, type(ex), ex)) from ex
    return spec.loader if spec is not None else None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_timestamp_overflow(self):
        # A modification timestamp larger than 2**32 should not be a problem
        # when importing a module (issue #11235).
        sys.path.insert(0, os.curdir)
            source = TESTFN + ".py"
            compiled = importlib.util.cache_from_source(source)
            with open(source, 'w') as f:
                os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5))
            except OverflowError:
                self.skipTest("cannot set modification time to large integer")
            except OSError as e:
                if e.errno != getattr(errno, 'EOVERFLOW', None):
                self.skipTest("cannot set modification time to large integer ({})".format(e))
            # The pyc file was created.
            del sys.path[0]
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def forget(modname):
    """'Forget' a module was ever imported.

    This removes the module from sys.modules and deletes any PEP 3147 or
    legacy .pyc and .pyo files.
    for dirname in sys.path:
        source = os.path.join(dirname, modname + '.py')
        # It doesn't matter if they exist or not, unlink all possible
        # combinations of PEP 3147 and legacy pyc and pyo files.
        unlink(source + 'c')
        unlink(source + 'o')
        unlink(importlib.util.cache_from_source(source, debug_override=True))
        unlink(importlib.util.cache_from_source(source, debug_override=False))

# Check whether a gui is actually available
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_run_name(self):
        depth = 1
        run_name = "And now for something completely different"
        pkg_dir, mod_fname, mod_name, mod_spec = (
               self._make_pkg(example_source, depth))
        expected_ns = example_namespace.copy()
            "__name__": run_name,
            "__file__": mod_fname,
            "__cached__": importlib.util.cache_from_source(mod_fname),
            "__package__": mod_name.rpartition(".")[0],
            "__spec__": mod_spec,
        def create_ns(init_globals):
            return run_module(mod_name, init_globals, run_name)
            self.check_code_execution(create_ns, expected_ns)
            self._del_pkg(pkg_dir, depth, mod_name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _check_path_limitations(self, module_name):
        # base directory
        source_path_len = len(
        # a path separator + `longname` (twice)
        source_path_len += 2 * (len(self.longname) + 1)
        # a path separator + `module_name` + ".py"
        source_path_len += len(module_name) + 1 + len(".py")
        cached_path_len = (source_path_len +
            len(importlib.util.cache_from_source("")) - len(""))
        if == 'nt' and cached_path_len >= 258:
            # Under Windows, the max path len is 260 including C's terminating
            # NUL character.
            # (see
            self.skipTest("test paths too long (%d characters) for Windows' 260 character limit"
                          % cached_path_len)
        elif == 'nt' and verbose:
            print("cached_path_len =", cached_path_len)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def find_loader(fullname):
    """Find a PEP 302 "loader" object for fullname

    This is a backwards compatibility wrapper around
    importlib.util.find_spec that converts most failures to ImportError
    and only returns the loader rather than the full spec
    if fullname.startswith('.'):
        msg = "Relative module name {!r} not supported".format(fullname)
        raise ImportError(msg)
        spec = importlib.util.find_spec(fullname)
    except (ImportError, AttributeError, TypeError, ValueError) as ex:
        # This hack fixes an impedance mismatch between pkgutil and
        # importlib, where the latter raises other errors for cases where
        # pkgutil previously raised ImportError
        msg = "Error while finding loader for {!r} ({}: {})"
        raise ImportError(msg.format(fullname, type(ex), ex)) from ex
    return spec.loader if spec is not None else None
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_timestamp_overflow(self):
        # A modification timestamp larger than 2**32 should not be a problem
        # when importing a module (issue #11235).
        sys.path.insert(0, os.curdir)
            source = TESTFN + ".py"
            compiled = importlib.util.cache_from_source(source)
            with open(source, 'w') as f:
                os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5))
            except OverflowError:
                self.skipTest("cannot set modification time to large integer")
            except OSError as e:
                if e.errno != getattr(errno, 'EOVERFLOW', None):
                self.skipTest("cannot set modification time to large integer ({})".format(e))
            # The pyc file was created.
            del sys.path[0]
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def forget(modname):
    """'Forget' a module was ever imported.

    This removes the module from sys.modules and deletes any PEP 3147 or
    legacy .pyc and .pyo files.
    for dirname in sys.path:
        source = os.path.join(dirname, modname + '.py')
        # It doesn't matter if they exist or not, unlink all possible
        # combinations of PEP 3147 and legacy pyc and pyo files.
        unlink(source + 'c')
        unlink(source + 'o')
        unlink(importlib.util.cache_from_source(source, debug_override=True))
        unlink(importlib.util.cache_from_source(source, debug_override=False))

# Check whether a gui is actually available
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_run_name(self):
        depth = 1
        run_name = "And now for something completely different"
        pkg_dir, mod_fname, mod_name, mod_spec = (
               self._make_pkg(example_source, depth))
        expected_ns = example_namespace.copy()
            "__name__": run_name,
            "__file__": mod_fname,
            "__cached__": importlib.util.cache_from_source(mod_fname),
            "__package__": mod_name.rpartition(".")[0],
            "__spec__": mod_spec,
        def create_ns(init_globals):
            return run_module(mod_name, init_globals, run_name)
            self.check_code_execution(create_ns, expected_ns)
            self._del_pkg(pkg_dir, depth, mod_name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _check_path_limitations(self, module_name):
        # base directory
        source_path_len = len(
        # a path separator + `longname` (twice)
        source_path_len += 2 * (len(self.longname) + 1)
        # a path separator + `module_name` + ".py"
        source_path_len += len(module_name) + 1 + len(".py")
        cached_path_len = (source_path_len +
            len(importlib.util.cache_from_source("")) - len(""))
        if == 'nt' and cached_path_len >= 258:
            # Under Windows, the max path len is 260 including C's terminating
            # NUL character.
            # (see
            self.skipTest("test paths too long (%d characters) for Windows' 260 character limit"
                          % cached_path_len)
        elif == 'nt' and verbose:
            print("cached_path_len =", cached_path_len)
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def import_python_module_by_filename(name, module_filename):
    Import's a file as a python module, with specified name.

    Don't ask about the `name` argument, it's required.

    :param name: The name of the module to override upon imported filename.
    :param module_filename: The filename to import as a python module.
    :return: The newly imported python module.

    spec = importlib.util.spec_from_file_location(
    imported_module = importlib.util.module_from_spec(spec)
    return imported_module
项目:protofuzz    作者:trailofbits    | 项目源码 | 文件源码
def _load_module(path):
    'Helper to load a Python file at path and return as a module'

    module_name = os.path.splitext(os.path.basename(path))[0]

    module = None
    if sys.version_info.minor < 5:
        loader = importlib.machinery.SourceFileLoader(module_name, path)
        module = loader.load_module()
        spec = importlib.util.spec_from_file_location(module_name, path)
        module = importlib.util.module_from_spec(spec)

    return module
项目:Sverchok    作者:Sverchok    | 项目源码 | 文件源码
def find_spec(self, fullname, path, target=None):
        if fullname.startswith("svrx.nodes.script."):
            name = fullname.split(".")[-1]
            text_name = text_remap(name, reverse=True)
            if text_name in
                return importlib.util.spec_from_loader(fullname, SvRxLoader(text_name))
                print("couldn't find file")

        elif fullname == "svrx.nodes.script":
            # load Module, right now uses real but empty module, will perhaps change
        return None
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def has_module(name):
    Check if the Python module 'name' is available.

    name : str
        The name of the Python module, as used in an import instruction.

    This function uses ideas from this Stack Overflow question:

    b : bool
        True if the module exists, or False otherwise.

    if sys.version_info > (3, 3):
        import importlib.util
        return importlib.util.find_spec(name) is not None
    elif sys.version_info > (2, 7, 99):
        import importlib
        return importlib.find_loader(name) is not None
        import pkgutil
        return pkgutil.find_loader(name) is not None
项目:flows    作者:mastro35    | 项目源码 | 文件源码
def load_module(cls, module_name, module_filename):
            spec = importlib.util.spec_from_file_location(module_name, module_filename)
            foo = importlib.util.module_from_spec(spec)
        except Exception as ex:
            Global.LOGGER.warn(f"an error occured while importing {module_name}, so the module will be skipped.")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def make_legacy_pyc(source):
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.

    The choice of .pyc or .pyo extension is done based on the __debug__ flag

    :param source: The file system path to the source file.  The source file
        does not need to exist, however the PEP 3147 pyc file must exist.
    :return: The file system path to the legacy pyc file.
    pyc_file = importlib.util.cache_from_source(source)
    up_one = os.path.dirname(os.path.abspath(source))
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
    os.rename(pyc_file, legacy_pyc)
    return legacy_pyc
项目:lazyasd    作者:xonsh    | 项目源码 | 文件源码
def load_module_in_background(name, package=None, debug='DEBUG', env=None,
    """Entry point for loading modules in background thread.

    name : str
        Module name to load in background thread.
    package : str or None, optional
        Package name, has the same meaning as in importlib.import_module().
    debug : str, optional
        Debugging symbol name to look up in the environment.
    env : Mapping or None, optional
        Environment this will default to __xonsh_env__, if available, and
        os.environ otherwise.
    replacements : Mapping or None, optional
        Dictionary mapping fully qualified module names (eg that
        import the lazily loaded moudle, with the variable name in that
        module. For example, suppose that imports module a as b,
        this dict is then {'': 'b'}.

    module : ModuleType
        This is either the original module that is found in sys.modules or
        a proxy module that will block until delay attribute access until the
        module is fully loaded.
    modname = importlib.util.resolve_name(name, package)
    if modname in sys.modules:
        return sys.modules[modname]
    if env is None:
        env = getattr(builtins, '__xonsh_env__', os.environ)
    if env.get(debug, None):
        mod = importlib.import_module(name, package=package)
        return mod
    proxy = sys.modules[modname] = BackgroundModuleProxy(modname)
    BackgroundModuleLoader(name, package, replacements or {})
    return proxy
项目:aioweb    作者:kreopt    | 项目源码 | 文件源码
def execute(argv, argv0, engine):
    import lib, importlib, re
    import code
    os.environ.setdefault("AIOWEB_SETTINGS_MODULE", "settings")
    from aioweb import settings
    # Initialize Orator ORM

    parser = OptionParser()
    parser.add_option("--no-input", help="ask confirmation before collecting", action="store_true", dest="silent")
    (options, args) = parser.parse_args()
    if options.silent:
        agree = 'y'
        agree = input("Do you want to overwrite your assets dir? [y/n] ")
    if agree.lower() == 'y':
        DEST_DIR = os.path.join(settings.BASE_DIR, 'public')
        if os.path.exists(DEST_DIR):

        path = os.path.join(settings.BASE_DIR, 'app', 'assets')
        if os.path.exists(path):
            recursive_overwrite(path, DEST_DIR)
        for appName in settings.APPS:
                path = os.path.join(os.path.dirname(importlib.util.find_spec(appName).origin), 'assets')
                if os.path.exists(path):
                    print("collecting %s" % path)
                    recursive_overwrite(path, DEST_DIR)
            except ImportError as exc:
                # traceback.print_exc()
                print("no such module: %s" % appName)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _get_spec(finder, name):
    """Return the finder-specific module spec."""
    # Works with legacy finders.
        find_spec = finder.find_spec
    except AttributeError:
        loader = finder.find_module(name)
        if loader is None:
            return None
        return importlib.util.spec_from_loader(name, loader)
        return find_spec(name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def read_code(stream):
    # This helper is needed in order for the PEP 302 emulation to
    # correctly handle compiled files
    import marshal

    magic =
    if magic != importlib.util.MAGIC_NUMBER:
        return None # Skip timestamp and size
    return marshal.load(stream)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def get_data(package, resource):
    """Get a resource from a package.

    This is a wrapper round the PEP 302 loader get_data API. The package
    argument should be the name of a package, in standard module format
    ( The resource argument should be in the form of a relative
    filename, using '/' as the path separator. The parent directory name '..'
    is not allowed, and nor is a rooted name (starting with a '/').

    The function returns a binary string, which is the contents of the
    specified resource.

    For packages located in the filesystem, which have already been imported,
    this is the rough equivalent of

        d = os.path.dirname(sys.modules[package].__file__)
        data = open(os.path.join(d, resource), 'rb').read()

    If the package cannot be located or loaded, or it uses a PEP 302 loader
    which does not support get_data(), then None is returned.

    spec = importlib.util.find_spec(package)
    if spec is None:
        return None
    loader = spec.loader
    if loader is None or not hasattr(loader, 'get_data'):
        return None
    # XXX needs test
    mod = (sys.modules.get(package) or
    if mod is None or not hasattr(mod, '__file__'):
        return None

    # Modify the resource name to be compatible with the loader.get_data
    # signature - an os.path format "filename" starting with the dirname of
    # the package's __file__
    parts = resource.split('/')
    parts.insert(0, os.path.dirname(mod.__file__))
    resource_name = os.path.join(*parts)
    return loader.get_data(resource_name)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_cached_mode_issue_2051(self):
        # permissions of .pyc should match those of .py, regardless of mask
        mode = 0o600
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            if not os.path.exists(cached_path):
      "__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(mode))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_cached_readonly(self):
        mode = 0o400
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            if not os.path.exists(cached_path):
      "__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        expected = mode | 0o200 # Account for fix for issue #6074
        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(expected))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pyc_always_writable(self):
        # Initially read-only .pyc files on Windows used to cause problems
        # with later updates, see issue #6074 for details
        with _ready_to_import() as (name, path):
            # Write a Python file, make it read-only and import it
            with open(path, 'w') as f:
                f.write("x = 'original'\n")
            # Tweak the mtime of the source to ensure pyc gets updated later
            s = os.stat(path)
            os.utime(path, (s.st_atime, s.st_mtime-100000000))
            os.chmod(path, 0o400)
            m = __import__(name)
            self.assertEqual(m.x, 'original')
            # Change the file and then reimport it
            os.chmod(path, 0o600)
            with open(path, 'w') as f:
                f.write("x = 'rewritten'\n")
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
            # Now delete the source file and check the pyc was rewritten
            if __debug__:
                bytecode_only = path + "c"
                bytecode_only = path + "o"
            os.rename(importlib.util.cache_from_source(path), bytecode_only)
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_missing_source(self):
        # With PEP 3147 cache layout, removing the source but leaving the pyc
        # file does not satisfy the import.
        pyc_file = importlib.util.cache_from_source(self.source)
        self.assertRaises(ImportError, __import__, TESTFN)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test___cached__(self):
        # Modules now also have an __cached__ that points to the pyc file.
        m = __import__(TESTFN)
        pyc_file = importlib.util.cache_from_source(TESTFN + '.py')
        self.assertEqual(m.__cached__, os.path.join(os.curdir, pyc_file))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_package___cached___from_pyc(self):
        # Like test___cached__ but ensuring __cached__ when imported from a
        # PEP 3147 pyc file.
        def cleanup():
        # Touch the
        with open(os.path.join('pep3147', ''), 'w'):
        with open(os.path.join('pep3147', ''), 'w'):
        m = __import__('')
        m = __import__('')
        init_pyc = importlib.util.cache_from_source(
            os.path.join('pep3147', ''))
        self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc))
        foo_pyc = importlib.util.cache_from_source(os.path.join('pep3147', ''))
                         os.path.join(os.curdir, foo_pyc))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def make_legacy_pyc(source):
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.

    The choice of .pyc or .pyo extension is done based on the __debug__ flag

    :param source: The file system path to the source file.  The source file
        does not need to exist, however the PEP 3147 pyc file must exist.
    :return: The file system path to the legacy pyc file.
    pyc_file = importlib.util.cache_from_source(source)
    up_one = os.path.dirname(os.path.abspath(source))
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
    os.rename(pyc_file, legacy_pyc)
    return legacy_pyc
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _make_pkg(self, source, depth, mod_base="runpy_test",
                     *, namespace=False, parent_namespaces=False):
        # Enforce a couple of internal sanity checks on test cases
        if (namespace or parent_namespaces) and not depth:
            raise RuntimeError("Can't mark top level module as a "
                               "namespace package")
        pkg_name = "__runpy_pkg__"
        test_fname = mod_base+os.extsep+"py"
        pkg_dir = sub_dir = os.path.realpath(tempfile.mkdtemp())
        if verbose > 1: print("  Package tree in:", sub_dir)
        sys.path.insert(0, pkg_dir)
        if verbose > 1: print("  Updated sys.path:", sys.path[0])
        if depth:
            namespace_flags = [parent_namespaces] * depth
            namespace_flags[-1] = namespace
            for namespace_flag in namespace_flags:
                sub_dir = os.path.join(sub_dir, pkg_name)
                pkg_fname = self._add_pkg_dir(sub_dir, namespace_flag)
                if verbose > 1: print("  Next level in:", sub_dir)
                if verbose > 1: print("  Created:", pkg_fname)
        mod_fname = os.path.join(sub_dir, test_fname)
        mod_file = open(mod_fname, "w")
        if verbose > 1: print("  Created:", mod_fname)
        mod_name = (pkg_name+".")*depth + mod_base
        mod_spec = importlib.util.spec_from_file_location(mod_name,
        return pkg_dir, mod_fname, mod_name, mod_spec
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _fix_ns_for_legacy_pyc(self, ns, alter_sys):
        char_to_add = "c" if __debug__ else "o"
        ns["__file__"] += char_to_add
        ns["__cached__"] = ns["__file__"]
        spec = ns["__spec__"]
        new_spec = importlib.util.spec_from_file_location(,
        ns["__spec__"] = new_spec
        if alter_sys:
            ns["run_argv0"] += char_to_add
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def import_service_file(cls, file_name: str) -> ModuleType:
        cwd = os.getcwd()
        file_path = '{}/{}.py'.format(os.path.realpath(cwd), file_name)
        if file_path.endswith(''):
            file_path = file_path[:-3]
            spec = importlib.util.spec_from_file_location(file_name, file_path)  # type: Any
            service_import = importlib.util.module_from_spec(spec)
                service_import = importlib.util.module_from_spec(spec)
            except ImportError:
            sys.path.insert(0, cwd)
            sys.path.insert(0, os.path.dirname(file_path))
        except ImportError as e:
            if file_name.endswith(''):
                return cls.import_service_file(file_name[:-3])
            logging.getLogger('import').warning('Invalid service, unable to load service file "{}"'.format(file_name))
            raise e
        except OSError:
            if file_name.endswith('.py'):
                return cls.import_service_file(file_name[:-3])
            logging.getLogger('import').warning('Invalid service, no such service file "{}"'.format(file_name))
        except Exception as e:
            logging.getLogger('import').warning('Unable to load service file "{}"'.format(file_name))
            logging.getLogger('import').warning('Error: {}'.format(e))
            raise e
        return service_import
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def import_module(cls, file_name: str) -> ModuleType:
        cwd = os.getcwd()
        file_path = '{}/{}'.format(os.path.realpath(cwd), file_name)

        spec = importlib.util.spec_from_file_location(file_name, file_path)  # type: Any
        module_import = importlib.util.module_from_spec(spec)

        return module_import
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def module_from_file_location(filename):
    Load and return a module object with the given file name.
    spec = importlib.util.spec_from_file_location('__tmp__', filename)
    module = importlib.util.module_from_spec(spec)

    return module
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def import_recipe(module, filename):
            spec = importlib.util.spec_from_file_location(module, filename)
            mod = importlib.util.module_from_spec(spec)
            return mod
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _get_spec(finder, name):
    """Return the finder-specific module spec."""
    # Works with legacy finders.
        find_spec = finder.find_spec
    except AttributeError:
        loader = finder.find_module(name)
        if loader is None:
            return None
        return importlib.util.spec_from_loader(name, loader)
        return find_spec(name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def read_code(stream):
    # This helper is needed in order for the PEP 302 emulation to
    # correctly handle compiled files
    import marshal

    magic =
    if magic != importlib.util.MAGIC_NUMBER:
        return None # Skip timestamp and size
    return marshal.load(stream)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def get_data(package, resource):
    """Get a resource from a package.

    This is a wrapper round the PEP 302 loader get_data API. The package
    argument should be the name of a package, in standard module format
    ( The resource argument should be in the form of a relative
    filename, using '/' as the path separator. The parent directory name '..'
    is not allowed, and nor is a rooted name (starting with a '/').

    The function returns a binary string, which is the contents of the
    specified resource.

    For packages located in the filesystem, which have already been imported,
    this is the rough equivalent of

        d = os.path.dirname(sys.modules[package].__file__)
        data = open(os.path.join(d, resource), 'rb').read()

    If the package cannot be located or loaded, or it uses a PEP 302 loader
    which does not support get_data(), then None is returned.

    spec = importlib.util.find_spec(package)
    if spec is None:
        return None
    loader = spec.loader
    if loader is None or not hasattr(loader, 'get_data'):
        return None
    # XXX needs test
    mod = (sys.modules.get(package) or
    if mod is None or not hasattr(mod, '__file__'):
        return None

    # Modify the resource name to be compatible with the loader.get_data
    # signature - an os.path format "filename" starting with the dirname of
    # the package's __file__
    parts = resource.split('/')
    parts.insert(0, os.path.dirname(mod.__file__))
    resource_name = os.path.join(*parts)
    return loader.get_data(resource_name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_cached_mode_issue_2051(self):
        # permissions of .pyc should match those of .py, regardless of mask
        mode = 0o600
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            if not os.path.exists(cached_path):
      "__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(mode))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_cached_readonly(self):
        mode = 0o400
        with temp_umask(0o022), _ready_to_import() as (name, path):
            cached_path = importlib.util.cache_from_source(path)
            os.chmod(path, mode)
            if not os.path.exists(cached_path):
      "__import__ did not result in creation of "
                          "either a .pyc or .pyo file")
            stat_info = os.stat(cached_path)

        expected = mode | 0o200 # Account for fix for issue #6074
        self.assertEqual(oct(stat.S_IMODE(stat_info.st_mode)), oct(expected))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pyc_always_writable(self):
        # Initially read-only .pyc files on Windows used to cause problems
        # with later updates, see issue #6074 for details
        with _ready_to_import() as (name, path):
            # Write a Python file, make it read-only and import it
            with open(path, 'w') as f:
                f.write("x = 'original'\n")
            # Tweak the mtime of the source to ensure pyc gets updated later
            s = os.stat(path)
            os.utime(path, (s.st_atime, s.st_mtime-100000000))
            os.chmod(path, 0o400)
            m = __import__(name)
            self.assertEqual(m.x, 'original')
            # Change the file and then reimport it
            os.chmod(path, 0o600)
            with open(path, 'w') as f:
                f.write("x = 'rewritten'\n")
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
            # Now delete the source file and check the pyc was rewritten
            if __debug__:
                bytecode_only = path + "c"
                bytecode_only = path + "o"
            os.rename(importlib.util.cache_from_source(path), bytecode_only)
            m = __import__(name)
            self.assertEqual(m.x, 'rewritten')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_missing_source(self):
        # With PEP 3147 cache layout, removing the source but leaving the pyc
        # file does not satisfy the import.
        pyc_file = importlib.util.cache_from_source(self.source)
        self.assertRaises(ImportError, __import__, TESTFN)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test___cached__(self):
        # Modules now also have an __cached__ that points to the pyc file.
        m = __import__(TESTFN)
        pyc_file = importlib.util.cache_from_source(TESTFN + '.py')
        self.assertEqual(m.__cached__, os.path.join(os.curdir, pyc_file))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_package___cached___from_pyc(self):
        # Like test___cached__ but ensuring __cached__ when imported from a
        # PEP 3147 pyc file.
        def cleanup():
        # Touch the
        with open(os.path.join('pep3147', ''), 'w'):
        with open(os.path.join('pep3147', ''), 'w'):
        m = __import__('')
        m = __import__('')
        init_pyc = importlib.util.cache_from_source(
            os.path.join('pep3147', ''))
        self.assertEqual(m.__cached__, os.path.join(os.curdir, init_pyc))
        foo_pyc = importlib.util.cache_from_source(os.path.join('pep3147', ''))
                         os.path.join(os.curdir, foo_pyc))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def make_legacy_pyc(source):
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.

    The choice of .pyc or .pyo extension is done based on the __debug__ flag

    :param source: The file system path to the source file.  The source file
        does not need to exist, however the PEP 3147 pyc file must exist.
    :return: The file system path to the legacy pyc file.
    pyc_file = importlib.util.cache_from_source(source)
    up_one = os.path.dirname(os.path.abspath(source))
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
    os.rename(pyc_file, legacy_pyc)
    return legacy_pyc