Python collections.OrderedDict 模块,__init__() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.OrderedDict.__init__()

项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_init(self):
        with self.assertRaises(TypeError):
            OrderedDict([('a', 1), ('b', 2)], None)                                 # too many args
        pairs = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
        self.assertEqual(sorted(OrderedDict(dict(pairs)).items()), pairs)           # dict input
        self.assertEqual(sorted(OrderedDict(**dict(pairs)).items()), pairs)         # kwds input
        self.assertEqual(list(OrderedDict(pairs).items()), pairs)                   # pairs input
        self.assertEqual(list(OrderedDict([('a', 1), ('b', 2), ('c', 9), ('d', 4)],
                                          c=3, e=5).items()), pairs)                # mixed input

        # make sure no positional args conflict with possible kwdargs
        self.assertEqual(list(OrderedDict(self=42).items()), [('self', 42)])
        self.assertEqual(list(OrderedDict(other=42).items()), [('other', 42)])
        self.assertRaises(TypeError, OrderedDict, 42)
        self.assertRaises(TypeError, OrderedDict, (), ())
        self.assertRaises(TypeError, OrderedDict.__init__)

        # Make sure that direct calls to __init__ do not clear previous contents
        d = OrderedDict([('a', 1), ('b', 2), ('c', 3), ('d', 44), ('e', 55)])
        d.__init__([('e', 5), ('f', 6)], g=7, d=4)
        self.assertEqual(list(d.items()),
            [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5), ('f', 6), ('g', 7)])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_init(self):
        with self.assertRaises(TypeError):
            OrderedDict([('a', 1), ('b', 2)], None)                                 # too many args
        pairs = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
        self.assertEqual(sorted(OrderedDict(dict(pairs)).items()), pairs)           # dict input
        self.assertEqual(sorted(OrderedDict(**dict(pairs)).items()), pairs)         # kwds input
        self.assertEqual(list(OrderedDict(pairs).items()), pairs)                   # pairs input
        self.assertEqual(list(OrderedDict([('a', 1), ('b', 2), ('c', 9), ('d', 4)],
                                          c=3, e=5).items()), pairs)                # mixed input

        # make sure no positional args conflict with possible kwdargs
        self.assertEqual(list(OrderedDict(self=42).items()), [('self', 42)])
        self.assertEqual(list(OrderedDict(other=42).items()), [('other', 42)])
        self.assertRaises(TypeError, OrderedDict, 42)
        self.assertRaises(TypeError, OrderedDict, (), ())
        self.assertRaises(TypeError, OrderedDict.__init__)

        # Make sure that direct calls to __init__ do not clear previous contents
        d = OrderedDict([('a', 1), ('b', 2), ('c', 3), ('d', 44), ('e', 55)])
        d.__init__([('e', 5), ('f', 6)], g=7, d=4)
        self.assertEqual(list(d.items()),
            [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5), ('f', 6), ('g', 7)])
项目:gwyfile    作者:tuxu    | 项目源码 | 文件源码
def __init__(self, data,
                 xreal=1.0, yreal=1.0, xoff=0, yoff=0,
                 si_unit_xy=None, si_unit_z=None,
                 typecodes=None):
        super(GwyDataField, self).__init__('GwyDataField', typecodes=typecodes)
        if isinstance(data, OrderedDict):
            self.update(data)
        else:
            assert isinstance(data, np.ndarray) and len(data.shape) == 2
            self.xreal, self.yreal = xreal, yreal
            self.xoff, self.yoff = xoff, yoff
            self.si_unit_xy, self.si_unit_z = si_unit_xy, si_unit_z
            self.data = data
        self.typecodes.update({
            'xres': 'i', 'yres': 'i',
            'xreal': 'd', 'yreal': 'd',
            'xoff': 'd', 'yoff': 'd',
        })
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_init(self):
        with self.assertRaises(TypeError):
            OrderedDict([('a', 1), ('b', 2)], None)                                 # too many args
        pairs = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
        self.assertEqual(sorted(OrderedDict(dict(pairs)).items()), pairs)           # dict input
        self.assertEqual(sorted(OrderedDict(**dict(pairs)).items()), pairs)         # kwds input
        self.assertEqual(list(OrderedDict(pairs).items()), pairs)                   # pairs input
        self.assertEqual(list(OrderedDict([('a', 1), ('b', 2), ('c', 9), ('d', 4)],
                                          c=3, e=5).items()), pairs)                # mixed input

        # make sure no positional args conflict with possible kwdargs
        self.assertEqual(list(OrderedDict(self=42).items()), [('self', 42)])
        self.assertEqual(list(OrderedDict(other=42).items()), [('other', 42)])
        self.assertRaises(TypeError, OrderedDict, 42)
        self.assertRaises(TypeError, OrderedDict, (), ())
        self.assertRaises(TypeError, OrderedDict.__init__)

        # Make sure that direct calls to __init__ do not clear previous contents
        d = OrderedDict([('a', 1), ('b', 2), ('c', 3), ('d', 44), ('e', 55)])
        d.__init__([('e', 5), ('f', 6)], g=7, d=4)
        self.assertEqual(list(d.items()),
            [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5), ('f', 6), ('g', 7)])
项目:pygresql    作者:Cito    | 项目源码 | 文件源码
def __init__(self, db):
        """Initialize type cache for connection."""
        super(DbTypes, self).__init__()
        self._db = weakref.proxy(db)
        self._regtypes = False
        self._typecasts = Typecasts()
        self._typecasts.get_attnames = self.get_attnames
        self._typecasts.connection = self._db
        if db.server_version < 80400:
            # older remote databases (not officially supported)
            self._query_pg_type = (
                "SELECT oid, typname, typname::text::regtype,"
                " typtype, null as typcategory, typdelim, typrelid"
                " FROM pg_type WHERE oid=%s::regtype")
        else:
            self._query_pg_type = (
                "SELECT oid, typname, typname::regtype,"
                " typtype, typcategory, typdelim, typrelid"
                " FROM pg_type WHERE oid=%s::regtype")
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data=None):
        if data is None:
            data = {}
        self.reverse = {}
        for k in data:
            self.reverse[data[k]] = k
        dict.__init__(self, data)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data=None):
        if data is None:
            data = {}
        dict.__init__(self)
        for k in data:
            self[data[k]] = k
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.mutex = threading.RLock()
        dict.__init__(self, *args, **kwargs)
        for k in self:
            if type(self[k]) is dict:
                self[k] = ThreadsafeDict(self[k])
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.mutex = threading.RLock()
        list.__init__(self, *args, **kwargs)
        for k in self:
            self[k] = mkThreadsafe(self[k])
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, lock):
        self.lock = lock
        self.lock.acquire()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data):
        self._data_ = data

    ## List of methods to directly wrap from _data_
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data):
        self._data_ = data
        #self.__mro__ = (ProtectedList, object)

    ## List of methods to directly wrap from _data_
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data):
        self._data_ = data

    ## List of methods to directly wrap from _data_
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data=None):
        if data is None:
            data = {}
        self.reverse = {}
        for k in data:
            self.reverse[data[k]] = k
        dict.__init__(self, data)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data=None):
        if data is None:
            data = {}
        dict.__init__(self)
        for k in data:
            self[data[k]] = k
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.mutex = threading.RLock()
        list.__init__(self, *args, **kwargs)
        for k in self:
            self[k] = mkThreadsafe(self[k])
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, lock):
        self.lock = lock
        self.lock.acquire()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args):
        OrderedDict.__init__(self, {}) ## requirement for the empty {} here seems to be a python bug?
        self.keyMap = OrderedDict([(k.lower(), k) for k in OrderedDict.keys(self)])
        if len(args) == 0:
            return
        elif len(args) == 1 and isinstance(args[0], dict):
            for k in args[0]:
                self[k] = args[0][k]
        else:
            raise Exception("CaselessDict may only be instantiated with a single dict.")

    #def keys(self):
        #return self.keyMap.values()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data):
        self._data_ = data

    ## List of methods to directly wrap from _data_
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, data):
        self._data_ = data
        #self.__mro__ = (ProtectedList, object)

    ## List of methods to directly wrap from _data_
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def __init__(self):
        self.today = datetime.date.today()
        self.short_date = self.today.strftime('%y%m%d')
        self.long_date = self.today.strftime('%Y-%m-%d')
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def __init__(self, default_factory=None, *a, **kw):
        if (default_factory is not None and
                not isinstance(default_factory, Callable)):
            raise TypeError('first argument must be callable')
        OrderedDict.__init__(self, *a, **kw)
        self.default_factory = default_factory
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def __init__(self, msg):
        self.msg = "ERROR::EXAConf: " + msg
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
        odict.__init__(self, *args, **kw)
        self.__enabled = True
项目:mopidy-tidal    作者:mones88    | 项目源码 | 文件源码
def __init__(self, max_size=1024):
        if max_size <= 0:
            raise ValueError('Invalid size')
        OrderedDict.__init__(self)
        self._max_size = max_size
        self._check_limit()
项目:mopidy-tidal    作者:mones88    | 项目源码 | 文件源码
def __init__(self, func):
        super(SearchCache, self).__init__()
        self._func = func
项目:mopidy-tidal    作者:mones88    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        fixed_query = self.fix_query(kwargs["query"])
        self._query = tuple(sorted(fixed_query.iteritems()))
        self._exact = kwargs["exact"]
        self._hash = None
项目:Sublime-uroboroSQL-formatter    作者:future-architect    | 项目源码 | 文件源码
def __init__(self, maxsize=100):
            OrderedDict.__init__(self)

            self._maxsize = maxsize
项目:Sublime-uroboroSQL-formatter    作者:future-architect    | 项目源码 | 文件源码
def __init__(self, maxsize=100):
            dict.__init__(self)

            self._maxsize = maxsize
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
            if len(args) > 1 or kw:
                raise TypeError
            items = args[0] if args else []
            if isinstance(items, dict):
                raise TypeError
            items = list(items)
            self._keys = [item[0] for item in items]
            dict.__init__(self, items)
            self._read_only = True
            error = self._read_only_error
            self.clear = self.update = error
            self.pop = self.setdefault = self.popitem = error
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
            self._read_only = False
            OrderedDict.__init__(self, *args, **kw)
            self._read_only = True
            error = self._read_only_error
            self.clear = self.update = error
            self.pop = self.setdefault = self.popitem = error
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, offset, name=None):
            self.offset = offset
            if not name:
                minutes = self.offset.days * 1440 + self.offset.seconds // 60
                if minutes < 0:
                    hours, minutes = divmod(-minutes, 60)
                    hours = -hours
                else:
                    hours, minutes = divmod(minutes, 60)
                name = 'UTC%+03d:%02d' % (hours, minutes)
            self.name = name
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self):
        for typ, keys in self._types.items():
            for key in keys.split():
                self[key] = typ
                self['_%s' % key] = '%s[]' % typ

    # this could be a static method in Python > 2.6
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, obj):
        self.obj = obj
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, db):
        """Initialize type cache for connection."""
        super(DbTypes, self).__init__()
        self._regtypes = False
        self._get_attnames = db.get_attnames
        self._typecasts = Typecasts()
        self._typecasts.get_attnames = self.get_attnames
        self._typecasts.connection = db
        db = db.db
        self.query = db.query
        self.escape_string = db.escape_string
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, result, fields):
        """Create query from given result rows and field names."""
        self.result = result
        self.fields = fields
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, db, event, callback=None,
            arg_dict=None, timeout=None, stop_event=None):
        """Initialize the notification handler.

        You must pass a PyGreSQL database connection, the name of an
        event (notification channel) to listen for and a callback function.

        You can also specify a dictionary arg_dict that will be passed as
        the single argument to the callback function, and a timeout value
        in seconds (a floating point number denotes fractions of seconds).
        If it is absent or None, the callers will never time out.  If the
        timeout is reached, the callback function will be called with a
        single argument that is None.  If you set the timeout to zero,
        the handler will poll notifications synchronously and return.

        You can specify the name of the event that will be used to signal
        the handler to stop listening as stop_event. By default, it will
        be the event name prefixed with 'stop_'.
        """
        self.db = db
        self.event = event
        self.stop_event = stop_event or 'stop_%s' % event
        self.listening = False
        self.callback = callback
        if arg_dict is None:
            arg_dict = {}
        self.arg_dict = arg_dict
        self.timeout = timeout
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
        """Create a new connection

        You can pass either the connection parameters or an existing
        _pg or pgdb connection. This allows you to use the methods
        of the classic pg interface with a DB-API 2 pgdb connection.
        """
        if not args and len(kw) == 1:
            db = kw.get('db')
        elif not kw and len(args) == 1:
            db = args[0]
        else:
            db = None
        if db:
            if isinstance(db, DB):
                db = db.db
            else:
                try:
                    db = db._cnx
                except AttributeError:
                    pass
        if not db or not hasattr(db, 'db') or not hasattr(db, 'query'):
            db = connect(*args, **kw)
            self._closeable = True
        else:
            self._closeable = False
        self.db = db
        self.dbname = db.db
        self._regtypes = False
        self._attnames = {}
        self._pkeys = {}
        self._privileges = {}
        self._args = args, kw
        self.adapter = Adapter(self)
        self.dbtypes = DbTypes(self)
        db.set_cast_hook(self.dbtypes.typecast)
        self.debug = None  # For debugging scripts, this can be set
            # * to a string format specification (e.g. in CGI set to "%s<BR>"),
            # * to a file object to write debug statements or
            # * to a callable object which takes a string argument
            # * to any other true value to just print debug statements
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
            if len(args) > 1 or kw:
                raise TypeError
            items = args[0] if args else []
            if isinstance(items, dict):
                raise TypeError
            items = list(items)
            self._keys = [item[0] for item in items]
            dict.__init__(self, items)
            self._read_only = True
            error = self._read_only_error
            self.clear = self.update = error
            self.pop = self.setdefault = self.popitem = error
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, offset, name=None):
            self.offset = offset
            if not name:
                minutes = self.offset.days * 1440 + self.offset.seconds // 60
                if minutes < 0:
                    hours, minutes = divmod(-minutes, 60)
                    hours = -hours
                else:
                    hours, minutes = divmod(minutes, 60)
                name = 'UTC%+03d:%02d' % (hours, minutes)
            self.name = name
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self):
        for typ, keys in self._types.items():
            for key in keys.split():
                self[key] = typ
                self['_%s' % key] = '%s[]' % typ

    # this could be a static method in Python > 2.6
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, obj):
        self.obj = obj
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, db):
        self.db = db
        self.encode_json = db.encode_json
        db = db.db
        self.escape_bytea = db.escape_bytea
        self.escape_string = db.escape_string
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, db):
        """Initialize type cache for connection."""
        super(DbTypes, self).__init__()
        self._regtypes = False
        self._get_attnames = db.get_attnames
        self._typecasts = Typecasts()
        self._typecasts.get_attnames = self.get_attnames
        self._typecasts.connection = db
        db = db.db
        self.query = db.query
        self.escape_string = db.escape_string
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, db, event, callback=None,
            arg_dict=None, timeout=None, stop_event=None):
        """Initialize the notification handler.

        You must pass a PyGreSQL database connection, the name of an
        event (notification channel) to listen for and a callback function.

        You can also specify a dictionary arg_dict that will be passed as
        the single argument to the callback function, and a timeout value
        in seconds (a floating point number denotes fractions of seconds).
        If it is absent or None, the callers will never time out.  If the
        timeout is reached, the callback function will be called with a
        single argument that is None.  If you set the timeout to zero,
        the handler will poll notifications synchronously and return.

        You can specify the name of the event that will be used to signal
        the handler to stop listening as stop_event. By default, it will
        be the event name prefixed with 'stop_'.
        """
        self.db = db
        self.event = event
        self.stop_event = stop_event or 'stop_%s' % event
        self.listening = False
        self.callback = callback
        if arg_dict is None:
            arg_dict = {}
        self.arg_dict = arg_dict
        self.timeout = timeout
项目:slack-sql    作者:wang502    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
        """Create a new connection

        You can pass either the connection parameters or an existing
        _pg or pgdb connection. This allows you to use the methods
        of the classic pg interface with a DB-API 2 pgdb connection.
        """
        if not args and len(kw) == 1:
            db = kw.get('db')
        elif not kw and len(args) == 1:
            db = args[0]
        else:
            db = None
        if db:
            if isinstance(db, DB):
                db = db.db
            else:
                try:
                    db = db._cnx
                except AttributeError:
                    pass
        if not db or not hasattr(db, 'db') or not hasattr(db, 'query'):
            db = connect(*args, **kw)
            self._closeable = True
        else:
            self._closeable = False
        self.db = db
        self.dbname = db.db
        self._regtypes = False
        self._attnames = {}
        self._pkeys = {}
        self._privileges = {}
        self._args = args, kw
        self.adapter = Adapter(self)
        self.dbtypes = DbTypes(self)
        db.set_cast_hook(self.dbtypes.typecast)
        self.debug = None  # For debugging scripts, this can be set
            # * to a string format specification (e.g. in CGI set to "%s<BR>"),
            # * to a file object to write debug statements or
            # * to a callable object which takes a string argument
            # * to any other true value to just print debug statements
项目:uroboroSQL-formatter    作者:future-architect    | 项目源码 | 文件源码
def __init__(self, maxsize=100):
            OrderedDict.__init__(self)

            self._maxsize = maxsize
项目:uroboroSQL-formatter    作者:future-architect    | 项目源码 | 文件源码
def __init__(self, maxsize=100):
            dict.__init__(self)

            self._maxsize = maxsize
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def __init__ (self, rc = None, data = None, is_warn = False):
        self.rc_list = []

        if (rc != None):
            self.rc_list.append(TupleRC(rc, data, is_warn))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def __init__(self, maxlen = 20, *args, **kwargs):
        OrderedDict.__init__(self, *args, **kwargs)
        self.maxlen = maxlen