Python sqlite3 模块,version() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlite3.version()

项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def connect(self):
        if self not in opened_dbs:
            opened_dbs.append(self)

        self.log.debug("Connecting to %s (sqlite version: %s)..." % (self.db_path, sqlite3.version))
        if not os.path.isdir(self.db_dir):  # Directory not exist yet
            os.makedirs(self.db_dir)
            self.log.debug("Created Db path: %s" % self.db_dir)
        if not os.path.isfile(self.db_path):
            self.log.debug("Db file not exist yet: %s" % self.db_path)
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row
        self.conn.isolation_level = None
        self.cur = self.getCursor()
        # We need more speed then security
        self.cur.execute("PRAGMA journal_mode = WAL")
        self.cur.execute("PRAGMA journal_mode = MEMORY")
        self.cur.execute("PRAGMA synchronous = OFF")
        if self.foreign_keys:
            self.execute("PRAGMA foreign_keys = ON")


    # Execute query using dbcursor
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def getTableVersion(self, table_name):
        """if not self.table_names: # Get existing table names
                res = self.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
                self.table_names = [row["name"] for row in res]
        if table_name not in self.table_names:
                return False

        else:"""
        if not self.db_keyvalues:  # Get db keyvalues
            try:
                res = self.cur.execute("SELECT * FROM keyvalue WHERE json_id=0")  # json_id = 0 is internal keyvalues
            except sqlite3.OperationalError, err:  # Table not exist
                self.log.debug("Query error: %s" % err)
                return False

            for row in res:
                self.db_keyvalues[row["key"]] = row["value"]

        return self.db_keyvalues.get("table.%s.version" % table_name, 0)

    # Check Db tables
    # Return: <list> Changed table names
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def __init__(self, schema, db_path):
        self.db_path = db_path
        self.db_dir = os.path.dirname(db_path) + "/"
        self.schema = schema
        self.schema["version"] = self.schema.get("version", 1)
        self.conn = None
        self.cur = None
        self.log = logging.getLogger("Db:%s" % schema["db_name"])
        self.table_names = None
        self.collect_stats = False
        self.foreign_keys = False
        self.query_stats = {}
        self.db_keyvalues = {}
        self.last_query_time = time.time()
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def getCursor(self):
        if not self.conn:
            self.connect()
        return DbCursor(self.conn, self)

    # Get the table version
    # Return: Table version or None if not exist
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def get_summary(cls):
        """
        Return a dictionary of information about this database backend.
        """
        summary = {
            "DB-API version": "2.0",
            "Database SQL type": cls.__name__,
            "Database SQL module": "sqlite3",
            "Database SQL Python module version": sqlite3.version,
            "Database SQL module version": sqlite3.sqlite_version,
            "Database SQL module location": sqlite3.__file__,
        }
        return summary
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    if test.support.verbose:
        print("test_sqlite: testing with version",
              "{!r}, sqlite_version {!r}".format(sqlite3.version,
                                                 sqlite3.sqlite_version))
    test.support.run_unittest(dbapi.suite(), types.suite(),
                               userfunctions.suite(),
                               factory.suite(), transactions.suite(),
                               hooks.suite(), regression.suite(),
                               dump.suite())
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def load_tests(*args):
    if test.support.verbose:
        print("test_sqlite: testing with version",
              "{!r}, sqlite_version {!r}".format(sqlite3.version,
                                                 sqlite3.sqlite_version))
    return unittest.TestSuite([dbapi.suite(), types.suite(),
                               userfunctions.suite(),
                               factory.suite(), transactions.suite(),
                               hooks.suite(), regression.suite(),
                               dump.suite()])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def load_tests(*args):
    if test.support.verbose:
        print("test_sqlite: testing with version",
              "{!r}, sqlite_version {!r}".format(sqlite3.version,
                                                 sqlite3.sqlite_version))
    return unittest.TestSuite([dbapi.suite(), types.suite(),
                               userfunctions.suite(),
                               factory.suite(), transactions.suite(),
                               hooks.suite(), regression.suite(),
                               dump.suite()])
项目:griffith    作者:Strit    | 项目源码 | 文件源码
def copytree(src, dst, symlinks=False):
    """Recursively copy a directory tree using copy2().

    This is shutil's copytree modified version
    """
    from shutil import copy2
    names = os.listdir(src)
    if not os.path.isdir(dst):
        os.mkdir(dst)
    errors = []
    for name in names:
        srcname = os.path.join(src, name)
        dstname = os.path.join(dst, name)
        try:
            if symlinks and os.path.islink(srcname):
                linkto = os.readlink(srcname)
                os.symlink(linkto, dstname)
            elif os.path.isdir(srcname):
                copytree(srcname, dstname, symlinks)
            else:
                copy2(srcname, dstname)
        except (IOError, os.error), why:
            errors.append((srcname, dstname, why))
        # catch the Error from the recursive copytree so that we can
        # continue with other files
        except EnvironmentError, err:
            errors.extend(err.args[0])
    if errors:
        raise EnvironmentError(errors)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def load_tests(*args):
    if test.support.verbose:
        print("test_sqlite: testing with version",
              "{!r}, sqlite_version {!r}".format(sqlite3.version,
                                                 sqlite3.sqlite_version))
    return unittest.TestSuite([dbapi.suite(), types.suite(),
                               userfunctions.suite(),
                               factory.suite(), transactions.suite(),
                               hooks.suite(), regression.suite(),
                               dump.suite()])
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def checkTables(self):
        s = time.time()
        changed_tables = []
        cur = self.getCursor()

        cur.execute("BEGIN")

        # Check internal tables
        # Check keyvalue table
        changed = cur.needTable("keyvalue", [
            ["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
            ["key", "TEXT"],
            ["value", "INTEGER"],
            ["json_id", "INTEGER REFERENCES json (json_id)"],
        ], [
            "CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)"
        ], version=self.schema["version"])
        if changed:
            changed_tables.append("keyvalue")

        # Check json table
        if self.schema["version"] == 1:
            changed = cur.needTable("json", [
                ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
                ["path", "VARCHAR(255)"]
            ], [
                "CREATE UNIQUE INDEX path ON json(path)"
            ], version=self.schema["version"])
        else:
            changed = cur.needTable("json", [
                ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
                ["directory", "VARCHAR(255)"],
                ["file_name", "VARCHAR(255)"]
            ], [
                "CREATE UNIQUE INDEX path ON json(directory, file_name)"
            ], version=self.schema["version"])
        if changed:
            changed_tables.append("json")

        # Check schema tables
        for table_name, table_settings in self.schema["tables"].items():
            changed = cur.needTable(
                table_name, table_settings["cols"],
                table_settings["indexes"], version=table_settings["schema_changed"]
            )
            if changed:
                changed_tables.append(table_name)

        cur.execute("COMMIT")
        self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables))
        if changed_tables:
            self.db_keyvalues = {}  # Refresh table version cache

        return changed_tables

    # Load json file to db
    # Return: True if matched