Python pymongo 模块,version_tuple() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用pymongo.version_tuple()

项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_uri_prioritised_over_host_and_port(self):
        self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/database_name'
        self.app.config['MONGO_HOST'] = 'some_other_host'
        self.app.config['MONGO_PORT'] = 27018
        self.app.config['MONGO_DBNAME'] = 'not_the_correct_db_name'

        with warnings.catch_warnings():
            # URI connections without a username and password
            # work, but warn that auth should be supplied
            warnings.simplefilter('ignore')
            mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
        assert mongo.db.name == 'database_name'
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_create_with_document_class(self):
        """ This test doesn't use self.mongo, because it has to change config

        It uses second mongo connection, using a CUSTOM prefix to avoid
        duplicate config_prefix exception. To make use of tearDown and thus DB
        deletion even in case of failure, it uses same DBNAME.

        """
        # copying standard DBNAME, so this DB gets also deleted by tearDown
        self.app.config['CUSTOM_DBNAME'] = self.app.config['MONGO_DBNAME']
        self.app.config['CUSTOM_DOCUMENT_CLASS'] = CustomDict
        # not using self.mongo, because we want to use updated config
        # also using CUSTOM, to avoid duplicate config_prefix exception
        mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
        assert mongo.db.things.find_one() is None
        # write document and retrieve, to check if type is really CustomDict
        if pymongo.version_tuple[0] > 2:
            # Write Concern is set to w=1 by default in pymongo > 3.0
            mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
        else:
            mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)
        assert type(mongo.db.things.find_one()) == CustomDict
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_uri_prioritised_over_host_and_port(self):
        self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/database_name'
        self.app.config['MONGO_HOST'] = 'some_other_host'
        self.app.config['MONGO_PORT'] = 27018
        self.app.config['MONGO_DBNAME'] = 'not_the_correct_db_name'

        with warnings.catch_warnings():
            # URI connections without a username and password
            # work, but warn that auth should be supplied
            warnings.simplefilter('ignore')
            mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
        assert mongo.db.name == 'database_name'
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_create_with_document_class(self):
        """ This test doesn't use self.mongo, because it has to change config

        It uses second mongo connection, using a CUSTOM prefix to avoid
        duplicate config_prefix exception. To make use of tearDown and thus DB
        deletion even in case of failure, it uses same DBNAME.

        """
        # copying standard DBNAME, so this DB gets also deleted by tearDown
        self.app.config['CUSTOM_DBNAME'] = self.app.config['MONGO_DBNAME']
        self.app.config['CUSTOM_DOCUMENT_CLASS'] = CustomDict
        # not using self.mongo, because we want to use updated config
        # also using CUSTOM, to avoid duplicate config_prefix exception
        mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
        assert mongo.db.things.find_one() is None
        # write document and retrieve, to check if type is really CustomDict
        if pymongo.version_tuple[0] > 2:
            # Write Concern is set to w=1 by default in pymongo > 3.0
            mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
        else:
            mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)
        assert type(mongo.db.things.find_one()) == CustomDict
项目:webtzite    作者:materialsproject    | 项目源码 | 文件源码
def _get_connection(self, host, port):
        ckey = "{}:{}".format(host, port)
        conn = self._conns.get(ckey, None)
        if conn is None:
            mps = ('max_pool_size' if pymongo.version_tuple[0] == 2
                   else 'maxPoolSize')
            conn = pymongo.MongoClient(host, port, **{mps: self.MAX_POOL})
            self._conns[ckey] = conn
        return conn
项目:apm-agent-python    作者:elastic    | 项目源码 | 文件源码
def mongo_database():
    connection_params = {'host': os.environ.get('MONGODB_HOST', 'localhost'),
                         'port': int(os.environ.get('MONGODB_PORT', 27017))}
    if pymongo.version_tuple < (3, 0):
        connection_params['safe'] = True
    mongo = pymongo.MongoClient(**connection_params)
    db = mongo.elasticapm_test
    yield db
    mongo.drop_database('elasticapm_test')
    mongo.close()
项目:OmMongo    作者:bapakode    | 项目源码 | 文件源码
def unique(self, drop_dups=False):
        ''' Make this index unique, optionally dropping duplicate entries.

            :param drop_dups: Drop duplicate objects while creating the unique \
                index?  Default to ``False``
        '''
        self.__unique = True
        if drop_dups and pymongo.version_tuple >= (2, 7, 5): # pragma: nocover
            raise BadIndexException("drop_dups is not supported on pymongo >= 2.7.5")
        self.__drop_dups = drop_dups
        return self
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_find_one_or_404(self):
        self.mongo.db.things.remove()

        try:
            self.mongo.db.things.find_one_or_404({'_id': 'thing'})
        except HTTPException as notfound:
            assert notfound.code == 404, "raised wrong exception"

        if pymongo.version_tuple[0] > 2:
            self.mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
        else:
            self.mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)

        # now it should not raise
        thing = self.mongo.db.things.find_one_or_404({'_id': 'thing'})
        assert thing['val'] == 'foo', 'got wrong thing'

        # also test with dotted-named collections
        self.mongo.db.things.morethings.remove()
        try:
            self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
        except HTTPException as notfound:
            assert notfound.code == 404, "raised wrong exception"

        if pymongo.version_tuple[0] > 2:
            # Write Concern is set to w=1 by default in pymongo > 3.0
            self.mongo.db.things.morethings.insert_one({'_id': 'thing', 'val': 'foo'})
        else:
            self.mongo.db.things.morethings.insert({'_id': 'thing', 'val': 'foo'}, w=1)

        # now it should not raise
        thing = self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
        assert thing['val'] == 'foo', 'got wrong thing'
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_default_config_prefix(self):
        self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
        self.app.config['MONGO_HOST'] = 'localhost'
        self.app.config['MONGO_PORT'] = 27017

        mongo = flask.ext.pymongo.PyMongo(self.app)
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_custom_config_prefix(self):
        self.app.config['CUSTOM_DBNAME'] = 'flask_pymongo_test_db'
        self.app.config['CUSTOM_HOST'] = 'localhost'
        self.app.config['CUSTOM_PORT'] = 27017

        mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_converts_str_to_int(self):
        self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
        self.app.config['MONGO_HOST'] = 'localhost'
        self.app.config['MONGO_PORT'] = '27017'

        mongo = flask.ext.pymongo.PyMongo(self.app)
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_config_with_uri(self):
        self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/flask_pymongo_test_db'

        with warnings.catch_warnings():
            # URI connections without a username and password
            # work, but warn that auth should be supplied
            warnings.simplefilter('ignore')
            mongo = flask.ext.pymongo.PyMongo(self.app)
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_config_with_document_class(self):
        self.app.config['MONGO_DOCUMENT_CLASS'] = CustomDict
        mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            assert mongo.cx.codec_options.document_class == CustomDict
        else:
            assert mongo.cx.document_class == CustomDict
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_config_without_document_class(self):
        mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            assert mongo.cx.codec_options.document_class == dict
        else:
            assert mongo.cx.document_class == dict
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def test_host_with_port_does_not_get_overridden_by_separate_port_config_value(self):
        self.app.config['MONGO_HOST'] = 'localhost:27017'
        self.app.config['MONGO_PORT'] = 27018

        with warnings.catch_warnings():
            # URI connections without a username and password
            # work, but warn that auth should be supplied
            warnings.simplefilter('ignore')
            mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:qmmap    作者:hiqlabs    | 项目源码 | 文件源码
def connectMongoEngine(pmcol, conn_uri=None):
    if pymongo.version_tuple[0] == 2:     #really? REALLY?
        #host = pmcol.database.connection.HOST
        #port = pmcol.database.connection.PORT
        host = pmcol.database.connection.host
        port = pmcol.database.connection.port
    else:
        host = pmcol.database.client.HOST
        port = pmcol.database.client.PORT
    # Can just use the connection uri, which has credentials
    if conn_uri:
        return meng.connect(pmcol.database.name, host=conn_uri)
    return meng.connect(pmcol.database.name, host=host, port=port)
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_find_one_or_404(self):
        self.mongo.db.things.remove()

        try:
            self.mongo.db.things.find_one_or_404({'_id': 'thing'})
        except HTTPException as notfound:
            assert notfound.code == 404, "raised wrong exception"

        if pymongo.version_tuple[0] > 2:
            self.mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'})
        else:
            self.mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1)

        # now it should not raise
        thing = self.mongo.db.things.find_one_or_404({'_id': 'thing'})
        assert thing['val'] == 'foo', 'got wrong thing'

        # also test with dotted-named collections
        self.mongo.db.things.morethings.remove()
        try:
            self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
        except HTTPException as notfound:
            assert notfound.code == 404, "raised wrong exception"

        if pymongo.version_tuple[0] > 2:
            # Write Concern is set to w=1 by default in pymongo > 3.0
            self.mongo.db.things.morethings.insert_one({'_id': 'thing', 'val': 'foo'})
        else:
            self.mongo.db.things.morethings.insert({'_id': 'thing', 'val': 'foo'}, w=1)

        # now it should not raise
        thing = self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'})
        assert thing['val'] == 'foo', 'got wrong thing'
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_default_config_prefix(self):
        self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
        self.app.config['MONGO_HOST'] = 'localhost'
        self.app.config['MONGO_PORT'] = 27017

        mongo = flask.ext.pymongo.PyMongo(self.app)
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_custom_config_prefix(self):
        self.app.config['CUSTOM_DBNAME'] = 'flask_pymongo_test_db'
        self.app.config['CUSTOM_HOST'] = 'localhost'
        self.app.config['CUSTOM_PORT'] = 27017

        mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM')
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_converts_str_to_int(self):
        self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db'
        self.app.config['MONGO_HOST'] = 'localhost'
        self.app.config['MONGO_PORT'] = '27017'

        mongo = flask.ext.pymongo.PyMongo(self.app)
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_config_with_uri(self):
        self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/flask_pymongo_test_db'

        with warnings.catch_warnings():
            # URI connections without a username and password
            # work, but warn that auth should be supplied
            warnings.simplefilter('ignore')
            mongo = flask.ext.pymongo.PyMongo(self.app)
        assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_config_with_document_class(self):
        self.app.config['MONGO_DOCUMENT_CLASS'] = CustomDict
        mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            assert mongo.cx.codec_options.document_class == CustomDict
        else:
            assert mongo.cx.document_class == CustomDict
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_config_without_document_class(self):
        mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            assert mongo.cx.codec_options.document_class == dict
        else:
            assert mongo.cx.document_class == dict
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def test_host_with_port_does_not_get_overridden_by_separate_port_config_value(self):
        self.app.config['MONGO_HOST'] = 'localhost:27017'
        self.app.config['MONGO_PORT'] = 27018

        with warnings.catch_warnings():
            # URI connections without a username and password
            # work, but warn that auth should be supplied
            warnings.simplefilter('ignore')
            mongo = flask.ext.pymongo.PyMongo(self.app)
        if pymongo.version_tuple[0] > 2:
            time.sleep(0.2)
            assert ('localhost', 27017) == mongo.cx.address
        else:
            assert mongo.cx.host == 'localhost'
            assert mongo.cx.port == 27017
项目:qmmap    作者:hiqlabs    | 项目源码 | 文件源码
def do_chunks(init, proc, src_col, dest_col, query, key, sort, verbose, sleep=60):
    while housekeep.objects(state = 'done').count() < housekeep.objects.count():
        tnow = datetime.datetime.utcnow()
        raw = housekeep._collection.find_and_modify(
            {'state': 'open'},
            {
                '$set': {
                    'state': 'working',
                    'tstart': tnow,
                    'procname': procname(),
                }
            }
        )
        # if raw==None, someone scooped us
        if raw != None:
            raw_id = raw['_id']
            #reload as mongoengine object -- _id is .start (because set as primary_key)
            hko = housekeep.objects(start = raw_id)[0]
            # Record git commit for sanity
#             hko.git = git.Git('.').rev_parse('HEAD')
#             hko.save()
            # get data pointed to by housekeep
            qq = {'$and': [query, {key: {'$gte': hko.start}}, {key: {'$lte': hko.end}}]}
            # Make cursor not timeout, using version-appropriate paramater
            if pymongo.version_tuple[0] == 2:
                cursor = src_col.find(qq, timeout=False)
            elif pymongo.version_tuple[0] == 3:
                cursor = src_col.find(qq, no_cursor_timeout=True)
            else:
                raise Exception("Unknown pymongo version")
            # Set the sort parameters on the cursor
            if sort[0] == "-":
                cursor = cursor.sort(sort[1:], pymongo.DESCENDING)
            else:
                cursor = cursor.sort(sort, pymongo.ASCENDING)
            if verbose & 2: print "mongo_process: %d elements in chunk %s-%s" % (cursor.count(), hko.start, hko.end)
            sys.stdout.flush()
            # This is where processing happens
            hko.good =_process(init, proc, cursor, dest_col, verbose,
                hkstart=raw_id)
            # Check if another job finished it while this one was plugging away
            hko_later = housekeep.objects(start = raw_id).only('state')[0]
            if hko.good == -1:  # Early exit signal
                print "Chunk at %s lost to another process; not updating" % raw_id
                sys.stdout.flush()
            elif hko_later.state == 'done':
                print "Chunk at %s had already finished; not updating" % raw_id
                sys.stdout.flush()
            else:
                hko.state = 'done'
                hko.procname = 'none'
                hko.time = datetime.datetime.utcnow()
                hko.save()
        else:
            # Not all done, but none were open for processing; thus, wait to
            # see if one re-opens
            print 'Standing by for reopening of "working" job...'
            sys.stdout.flush()
            time.sleep(sleep)