Python dbm 模块,open() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dbm.open()

项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __save(self):
        if self.__asynchronous == 0:
            state = {
                "version" : _BobState.CUR_VERSION,
                "byNameDirs" : self.__byNameDirs,
                "results" : self.__results,
                "inputs" : self.__inputs,
                "jenkins" : self.__jenkins,
                "dirStates" : self.__dirStates,
                "buildState" : self.__buildState,
            }
            tmpFile = self.__path+".new"
            try:
                with open(tmpFile, "wb") as f:
                    pickle.dump(state, f)
                    f.flush()
                    os.fsync(f.fileno())
                os.replace(tmpFile, self.__path)
            except OSError as e:
                raise ParseError("Error saving workspace state: " + str(e))
            self.__dirty = False
        else:
            self.__dirty = True
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def __init__(self, root, **config):
        self.root = root
        self.config = config

        store = self.config["dbm_path"]
        os.makedirs(store, exist_ok=True)

        self.http_path = config.get("http_path", "monitor")
        self.oauth = github.OAuth(**root.config.secrets[self.config["name"]])
        self.tokens = dbm.open(os.path.join(store, "tokens.db"), "cs")
        self.sessions = dbm.open(os.path.join(store, "sessions.db"), "cs")
        self.acl = config["access"]
        self.logs_path = config["logs_path"]
        os.makedirs(self.logs_path, exist_ok=True)
        self.connections = []
        self._ws_job_listeners = collections.defaultdict(list)
        self._jobws_cb_map = collections.defaultdict(list)
        self._ws_user_map = {}
        self._user_events = collections.defaultdict(list)
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
项目:sslxray    作者:portcullislabs    | 项目源码 | 文件源码
def keys(self):
        """Return a list of usernames in the database.

        @rtype: list
        @return: The usernames in the database.
        """
        if self.db == None:
            raise AssertionError("DB not open")

        self.lock.acquire()
        try:
            usernames = self.db.keys()
        finally:
            self.lock.release()
        usernames = [u for u in usernames if not u.startswith("--Reserved--")]
        return usernames
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def find_test_run_time_diff(test_id, run_time):
    times_db_path = os.path.join(os.path.join(os.getcwd(), '.testrepository'),
                                 'times.dbm')
    if os.path.isfile(times_db_path):
        try:
            test_times = dbm.open(times_db_path)
        except Exception:
            return False
        try:
            avg_runtime = float(test_times.get(str(test_id), False))
        except Exception:
            try:
                avg_runtime = float(test_times[str(test_id)])
            except Exception:
                avg_runtime = False

        if avg_runtime and avg_runtime > 0:
            run_time = float(run_time.rstrip('s'))
            perc_diff = ((run_time - avg_runtime) / avg_runtime) * 100
            return perc_diff
    return False
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __getBIdCache(self):
        if self.__buildIdCache is None:
            self.__buildIdCache = dbm.open(".bob-buildids.dbm", 'c')
            #self.__buildIdCache = {}
        return self.__buildIdCache
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def load(cls, cacheName, cacheKey):
        try:
            db = dbm.open(cacheName, "r")
            persistedCacheKey = db.get(b'vsn')
            if cacheKey == persistedCacheKey:
                return cls(db, db[b'root'])
            else:
                db.close()
        except OSError:
            pass
        except dbm.error:
            pass
        return None
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def create(cls, cacheName, cacheKey, root):
        try:
            db = dbm.open(cacheName, 'n')
            try:
                db[b'root'] = rootKey = PkgGraphNode.__convertPackageToGraph(db, root)
                db[b'vsn'] = cacheKey
            finally:
                db.close()
            return cls(dbm.open(cacheName, 'r'), rootKey)
        except dbm.error as e:
            raise BobError("Cannot save internal state: " + str(e))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __enter__(self):
        try:
            self.__db = dbm.open(".bob-adb", 'c')
        except dbm.error as e:
            raise BobError("Cannot open cache: " + str(e))
        return self
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def __init__(self, service_name, storage_name, **kwargs):
        path = os.path.join(kwargs["path"], service_name)
        os.makedirs(path, exist_ok=True)
        self.db = dbm.open(os.path.join(path, storage_name + ".db"), "cs")
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def __init__(self, root, **kwargs):
        self.root = root
        self.cfg = kwargs
        self.http_path = kwargs.get("http_path", "github")
        self.oauth = github.OAuth(**root.config.secrets[self.cfg["name"]])

        store = kwargs["data-path"]
        os.makedirs(store, exist_ok=True)
        self.users = dbm.open(os.path.join(store, "users.db"), "cs")
        self.orgs = dbm.open(os.path.join(store, "orgs.db"), "cs")
        session_store = dbm.open(os.path.join(store, "sessions.db"), "cs")
        self.ss = SessionStore(session_store, kwargs.get("cookie_name", "ghs"))
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def cb_job_started(self, job):
        path = os.path.join(self.config["logs_path"], job.event.id,
                            job.config["name"])
        os.makedirs(path)
        path = os.path.join(path, "console.txt")
        outfile = open(path, "wb")
        job.console_callbacks.add(lambda s,d: outfile.write(d))
项目:shelfdb    作者:nitipit    | 项目源码 | 文件源码
def setUp(self):
        self.db = shelfdb.open('test_data/db')
        self.assertIsInstance(self.db, shelfdb.shelf.DB)
项目:shelfdb    作者:nitipit    | 项目源码 | 文件源码
def test_shelf_open_close(self):
        self.db.shelf('user')
        try:
            dbm.open('test_data/db/user')
        except dbm.gnu.error as e:
            self.assertEqual(e.errno, 11)
        self.db.close()
        self.assertIsInstance(self.db._shelf['user']._shelf.dict,
            shelve._ClosedDict)
项目:shelfdb    作者:nitipit    | 项目源码 | 文件源码
def setUp(self):
        self.db = shelfdb.open('test_data/db')
        self.user_list = [
            {'name': 'Jan'},
            {'name': 'Feb'},
            {'name': 'Mar'},
        ]
        for user in self.user_list:
            user['_id'] = self.db.shelf('user').insert(user)
            # Check if _id is a valid uuid1
            id_ = uuid.UUID(user['_id'], version=1)
            self.assertEqual(user['_id'].replace('-', ''), id_.hex)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def OnSeveResult(self,event):
        fn = open('catalog.txt','w+')
        fn2 = open('extract.txt',"w+")
        leftRule = self.lf.GetText()
        rightRule = self.rt.GetText()
        fn.writelines(leftRule)
        fn2.writelines(rightRule)
        fn.close() 
        fn2.close()
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def save(self, data):
        with open('NormalFilePersistence.txt', 'w') as open_file:
            open_file.write(data)
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def load(self):
        with open('NormalFilePersistence.txt', 'r') as open_file:
            return open_file.read()
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def save(self, key, value):
        try:
            dbm_file = dbm.open('DBMPersistence', 'c')
            dbm_file[key] = str(value)
        finally:
            dbm_file.close()
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def load(self, key):
        try:
            dbm_file = dbm.open('DBMPersistence', 'r')
            if key in dbm_file:
                result = dbm_file[key]
            else:
                result = None
        finally:
            dbm_file.close()
        return result
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def save(self, obj):
        with open('PicklePersistence', 'wb') as pickle_file:
            pickle.dump(obj, pickle_file)
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def save(self, key, obj):
        try:
            shelve_file = shelve.open('ShelvePersistence')
            shelve_file[key] = obj
        finally:
            shelve_file.close()
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def load(self, key):
        try:
            shelve_file = shelve.open('ShelvePersistence')
            if key in shelve_file:
                result = shelve_file[key]
            else:
                result = None
        finally:
            shelve_file.close()
        return result
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, filename, flag='c', protocol=None, writeback=False):
        import dbm
        Shelf.__init__(self, dbm.open(filename, flag), protocol, writeback)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def open(filename, flag='c', protocol=None, writeback=False):
    """Open a persistent dictionary for reading and writing.

    The filename parameter is the base filename for the underlying
    database.  As a side-effect, an extension may be added to the
    filename and more than one file may be created.  The optional flag
    parameter has the same interpretation as the flag parameter of
    dbm.open(). The optional protocol parameter specifies the
    version of the pickle protocol (0, 1, or 2).

    See the module's __doc__ string for an overview of the interface.
    """

    return DbfilenameShelf(filename, flag, protocol, writeback)
项目:Red_Star    作者:medeor413    | 项目源码 | 文件源码
def __init__(self, filename, flag='c', protocol=None, keyencoding='utf-8'):
        self.db = filename
        self.flag = flag
        self.dict = {}
        with dbm.open(self.db, self.flag) as db:
            for k in db.keys():
                v = BytesIO(db[k])
                self.dict[k] = Unpickler(v).load()
        shelve.Shelf.__init__(self, self.dict, protocol, False, keyencoding)
项目:Red_Star    作者:medeor413    | 项目源码 | 文件源码
def sync(self):
        with dbm.open(self.db, self.flag) as db:
            for k, v in self.dict.items():
                f = BytesIO()
                p = Pickler(f, protocol=self._protocol)
                p.dump(v)
                db[k] = f.getvalue()
            db.sync()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def __init__(self, filename, mode, perm):
        import dbm
        self.db = dbm.open(filename, mode, perm)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:python-translate    作者:caspartse    | 项目源码 | 文件源码
def query(self, word):
        import requests
        from bs4 import BeautifulSoup
        sess = requests.Session()
        headers = {
            'Host': 'open.iciba.com',
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate'
        }
        sess.headers.update(headers)
        url = 'http://open.iciba.com/huaci_new/dict.php?word=%s' % (word)
        try:
            resp = sess.get(url, timeout=100)
            text = resp.text
            pattern = r'(<div class=\\\"icIBahyI-group_pos\\\">[\s\S]+?</div>)'
            text = re.search(pattern, text).group(1)
        except:
            return None
        if (resp.status_code == 200) and (text):
            soup = BeautifulSoup(text, 'lxml')
            ps = soup.find_all('p')
            trans = []
            for item in ps:
                transText = item.get_text()
                transText = re.sub(
                    r'\s+', ' ', transText.replace('\t', '')).strip()
                if transText:
                    trans.append(transText)
            return '\n'.join(trans)
        else:
            return None
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def __init__(self, filename, flag='c', protocol=None, writeback=False):
        import dbm
        Shelf.__init__(self, dbm.open(filename, flag), protocol, writeback)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def open(filename, flag='c', protocol=None, writeback=False):
    """Open a persistent dictionary for reading and writing.

    The filename parameter is the base filename for the underlying
    database.  As a side-effect, an extension may be added to the
    filename and more than one file may be created.  The optional flag
    parameter has the same interpretation as the flag parameter of
    dbm.open(). The optional protocol parameter specifies the
    version of the pickle protocol (0, 1, or 2).

    See the module's __doc__ string for an overview of the interface.
    """

    return DbfilenameShelf(filename, flag, protocol, writeback)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:wafer    作者:hariedo    | 项目源码 | 文件源码
def load(self, id):
        # overridable callback for external storage, do not call directly
        try:
            file = open(self.path(id), 'rb')
        except IOError:
            return None
        blob = file.read()
        file.close()
        return blob
项目:wafer    作者:hariedo    | 项目源码 | 文件源码
def save(self, id, blob):
        # overridable callback for external storage, do not call directly
        file = open(self.path(id), 'wb')
        file.write(blob)
        file.close()
        return id
项目:wafer    作者:hariedo    | 项目源码 | 文件源码
def startup(self, *args, **kwargs):
        '''Supports path= to specify a location, otherwise assumes the
        current directory.
        '''
        path = kwargs.get('path', '.')
        self._path = '%s/%s' % (path, self.name)
        self._db = dbm.open(self._path, 'c')
        return [ ]
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def set(self, **kw):
    """
    Set this object by setting one of its known formats.

    This method only allows one to set one format at a time.
    Subsequent calls will clear the object first.  The point of all
    this is to let the object's internal converters handle mustering
    the object into whatever format you need at the moment.
    """

    if len(kw) == 1:
      name = kw.keys()[0]
      if name in self.formats:
        self.clear()
        setattr(self, name, kw[name])
        return
      if name == "PEM":
        self.clear()
        self._set_PEM(kw[name])
        return
      if name == "Base64":
        self.clear()
        self.DER = base64.b64decode(kw[name])
        return
      if name == "Auto_update":
        self.filename = kw[name]
        self.check_auto_update()
        return
      if name in ("PEM_file", "DER_file", "Auto_file"):
        f = open(kw[name], "rb")
        value = f.read()
        f.close()
        self.clear()
        if name == "PEM_file" or (name == "Auto_file" and looks_like_PEM(value)):
          self._set_PEM(value)
        else:
          self.DER = value
        return
    raise rpki.exceptions.DERObjectConversionError("Can't honor conversion request %r" % (kw,))
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def check_auto_update(self):
    """
    Check for updates to a DER object that auto-updates from a file.
    """
    if self.filename is None:
      return
    try:
      filename = self.filename
      timestamp = os.stat(self.filename).st_mtime
      if self.timestamp is None or self.timestamp < timestamp:
        logger.debug("Updating %s, timestamp %s",
                     filename, rpki.sundial.datetime.fromtimestamp(timestamp))
        f = open(filename, "rb")
        value = f.read()
        f.close()
        self.clear()
        if looks_like_PEM(value):
          self._set_PEM(value)
        else:
          self.DER = value
        self.filename = filename
        self.timestamp = timestamp
    except (IOError, OSError), e:
      now = rpki.sundial.now()
      if self.lastfail is None or now > self.lastfail + self.failure_threshold:
        logger.warning("Could not auto_update %r (last failure %s): %s", self, self.lastfail, e)
      self.lastfail = now
    else:
      self.lastfail = None
项目:RPKI-toolkit    作者:pavel-odintsov    | 项目源码 | 文件源码
def __init__(self, filename, keyno = 0):
    try:
      try:
        import gdbm as dbm_du_jour
      except ImportError:
        import dbm as dbm_du_jour
      self.keyno = long(keyno)
      self.filename = filename
      self.db = dbm_du_jour.open(filename, "c")
    except:
      logger.warning("insecure_debug_only_rsa_key_generator initialization FAILED, hack inoperative")
      raise