我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用google.appengine.ext.ndb.Key()。
def init_zoneinfo(): """ Add each zone info to the datastore. This will overwrite existing zones. This must be called before the AppengineTimezoneLoader will work. """ import os, logging from zipfile import ZipFile zoneobjs = [] zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'zoneinfo.zip')) with ZipFile(zoneinfo_path) as zf: for zfi in zf.filelist: key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE) zobj = Zoneinfo(key=key, data=zf.read(zfi)) zoneobjs.append(zobj) logging.info("Adding %d timezones to the pytz-appengine database" % len(zoneobjs) ) ndb.put_multi(zoneobjs)
def open_resource(name): """Load the object from the datastore""" import logging from cStringIO import StringIO try: data = ndb.Key('Zoneinfo', name, namespace=NDB_NAMESPACE).get().data except AttributeError: # Missing zone info; test for GMT - which would be there if the # Zoneinfo has been initialized. if ndb.Key('Zoneinfo', 'GMT', namespace=NDB_NAMESPACE).get(): # the user asked for a zone that doesn't seem to exist. logging.exception("Requested zone '%s' is not in the database." % name) raise # we need to initialize the database init_zoneinfo() return open_resource(name) return StringIO(data)
def encode_ndb_key(key, encoder=None): """ When encountering an L{ndb.Key} instance, find the entity in the datastore and encode that. """ klass = ndb.Model._kind_map.get(key.kind()) referenced_object = _get_by_class_key( encoder, klass, key, ) if not referenced_object: encoder.writeElement(None) return encoder.writeObject(referenced_object)
def test_key_no_entity(self): """ Attempting to encode an `ndb.Key` that does not have a matching entity in the datastore must be `None`. """ key = ndb.Key('SimpleEntity', 'bar') self.assertIsNone(key.get()) self.assertEncodes(key, ( b'\x05', ), encoding=pyamf.AMF0) self.assertEncodes(key, ( b'\x01' ), encoding=pyamf.AMF3)
def test_key_with_entity(self): """ Attempting to encode an `ndb.Key` that DOES have a matching entity in the datastore must encode that entity. """ key = ndb.Key('SimpleEntity', 'bar') entity = models.SimpleEntity(key=key) entity.put() self.assertEncodes(key, ( b'\x03', ( b'\x00\x04_key\x02\x002agx0ZXN0YmVkLXRlc3RyFQsSDFNpbXBsZUVudGl' b'0eSIDYmFyDA' ), '\x00\x00\t' ), encoding=pyamf.AMF0) self.assertEncodes(key, ( '\n\x0b\x01', ( '\t_key\x06eagx0ZXN0YmVkLXRlc3RyFQsSDFNpbXBsZUVudGl0eSIDYmFyDA' ), '\x01' ), encoding=pyamf.AMF3)
def GetUserByAuth(): """Returns current users profile.""" user_key = users.get_current_user().user_id() user_key = ndb.Key('Profile', user_key) profile = user_key.get() for ledger in profile.user_ledger: try: price = GetPriceByPredictionId(ledger.prediction_id) ledger.value = math.fabs((price * ledger.contract_one) - ( price * ledger.contract_two)) ledger.prediction_statement = ndb.Key( urlsafe=ledger.prediction_id).get().statement except: ledger.value = 404 ledger.prediction_statement = 'ERROR' return render_template('profile.html', profile=profile)
def SellStake(): user_id = users.get_current_user().user_id() user_key = ndb.Key('Profile', user_id) current_user = user_key.get() prediction_key = ndb.Key(urlsafe=request.form['prediction_id']) prediction = prediction_key.get() portfolio = GetUserPortfolioByAuth(request.form['prediction_id']) for ledger in portfolio: if ledger.contract_one > 0: contract = 'CONTRACT_ONE' quantity = ledger.contract_one else: contract = 'CONTRACT_TWO' quantity = ledger.contract_two trade = Trade( prediction_id=prediction_key, user_id=user_key, direction='SELL', contract=contract, quantity=float(quantity)) err = CreateTradeAction(prediction, current_user, trade) if err != 'error': flash('You sold your stake!') return redirect('/users/me')
def _SetAncestor(self, value): """Setter to be used for public ancestor property on query info. Args: value: A potential value for an ancestor. Raises: AttributeError: if query on the object is already final. AttributeError: if the ancestor has already been set. TypeError: if the value to be set is not an instance of ndb.Key. """ if self._query_final is not None: raise AttributeError('Can\'t set ancestor. Query info is final.') if self._ancestor is not None: raise AttributeError('Ancestor can\'t be set twice.') if not isinstance(value, ndb.Key): raise TypeError('Ancestor must be an instance of ndb.Key.') self._ancestor = value
def from_filter_data(cls, filter_data): url_string = filter_data.get(UNIQUE_ID) if url_string: entity_key = ndb.Key(urlsafe=url_string) if entity_key: filter_data.pop(UNIQUE_ID) entity = entity_key.get() for field_name, value in filter_data.iteritems(): if getattr(entity, field_name) != value: return None return entity else: return None else: entity_query = cls.query() for field_name, value in filter_data.iteritems(): value_property = _verify_property(cls, field_name) entity_query = entity_query.filter(value_property == value) return entity_query.fetch()
def from_json(self, request_data): """ Update entity with new data from json. check for data to transform, if needed, perform operations and assign values to respective properties in entity. :param request_data: :return: """ for property, value in request_data.iteritems(): prop_type = self._properties.get(property) if prop_type: prop_value = value if isinstance(prop_type, (ndb.DateProperty, ndb.DateTimeProperty, ndb.TimeProperty)): prop_value = utils.date_from_str(prop_type, prop_value) elif isinstance(prop_type, ndb.KeyProperty): prop_value = ndb.Key(urlsafe=prop_value) setattr(self, property, prop_value)
def transform_response(self, transform_fields=None): """ Select ndb.Key property types for their respective data response. :param transform_fields: optional list or tuple which is used to specify returned properties for a ndb.Key property. :return: """ data = self._to_dict() for property_name, value in data.iteritems(): if isinstance(value, ndb.Key): property_value = value.get() if property_value: property_value = property_value.to_json() else: property_value = self.to_json_data(value) data[property_name] = property_value data['id'] = self.key.urlsafe() return data
def preferences_api_post(subscription): data = request.json user = get_user() if not user: return '400' subscription_key = ndb.Key(urlsafe=subscription) form_selection = {} for key, value in data.items(): form_selection[ndb.Key(urlsafe=key)] = value removed = remove_preferences(user, form_selection, subscription_key) logging.info('Removed') logging.info(removed) added = add_preferences(user, form_selection, subscription_key) logging.info('Added') logging.info(added) return 'OK'
def create_delete_meeting_request(): user = get_user() data = request.json meeting_spec_key = data['meeting_spec_key'] meeting_request_key = data['meeting_request_key'] if meeting_request_key == '': meeting_spec = ndb.Key(urlsafe=meeting_spec_key) if not meeting_spec: return 400 meeting_request = query_meeting_request(meeting_spec, user) if not meeting_request: meeting_request = MeetingRequest(meeting_spec=meeting_spec, user=user.key) meeting_request.put() return json.dumps({'key': meeting_request.key.urlsafe()}) else: meeting_request = ndb.Key(urlsafe=meeting_request_key) meeting_request.delete() return json.dumps({'key': ''})
def filter_subscriptions_by_user_data(subscriptions, user): approved_subscriptions = [] for subscription in subscriptions: subscription_rules = ndb.Key(urlsafe=subscription['id']).get().user_rules if subscription.get('rule_logic') == 'any': assert subscription_rules, 'You created logic for rules but don\'t have any rules!' approved = apply_rules(user, subscription, subscription_rules, any) elif subscription.get('rule_logic') == 'all': assert subscription_rules, 'You created logic for rules but don\'t have any rules!' approved = apply_rules(user, subscription, subscription_rules, all) else: approved = subscription if approved is not None: approved_subscriptions.append(approved) return approved_subscriptions
def init_zoneinfo(): """ Add each zone info to the datastore. This will overwrite existing zones. This must be called before the AppengineTimezoneLoader will work. """ import os import logging from zipfile import ZipFile zoneobjs = [] zoneinfo_path = os.path.abspath( os.path.join(os.path.dirname(__file__), 'zoneinfo.zip')) with ZipFile(zoneinfo_path) as zf: for zfi in zf.filelist: key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE) zobj = Zoneinfo(key=key, data=zf.read(zfi)) zoneobjs.append(zobj) logging.info( "Adding %d timezones to the pytz-appengine database" % len(zoneobjs)) ndb.put_multi(zoneobjs)
def open_resource(name): """Load the object from the datastore""" import logging from cStringIO import StringIO try: data = ndb.Key('Zoneinfo', name, namespace=NDB_NAMESPACE).get().data except AttributeError: # Missing zone info; test for GMT # which would be there if the Zoneinfo has been initialized. if ndb.Key('Zoneinfo', 'GMT', namespace=NDB_NAMESPACE).get(): # the user asked for a zone that doesn't seem to exist. logging.exception( "Requested zone '%s' is not in the database." % name) raise # we need to initialize the database init_zoneinfo() return open_resource(name) return StringIO(data)
def PutChangeLog(self, changed_by): ''' Create a change log for the current state of the hand. Uses current timestamp in seconds as key. Args: changed_by: Integer. Pair number for the user requesting the change. ''' change_dict = { "calls" : self.calls_dict(), "notes" : self.notes, "ns_score" : self.get_ns_score(), "ew_score" : self.get_ew_score(), } epoch = datetime.datetime.utcfromtimestamp(0) nowtime = datetime.datetime.now() change_log = ChangeLog(changed_by=changed_by, change=json.dumps(change_dict)) change_log.key = ndb.Key("ChangeLog", str((nowtime - epoch).total_seconds()), parent=self.key) change_log.put()
def get(self): self.response.out.write('<html><body>') guestbook_name = self.request.get('guestbook_name') ancestor_key = ndb.Key("Book", guestbook_name or "*notitle*") greetings = Greeting.query_book(ancestor_key).fetch(20) for greeting in greetings: self.response.out.write('<blockquote>%s</blockquote>' % cgi.escape(greeting.content)) # [END query] self.response.out.write(""" <form action="/sign?%s" method="post"> <div><textarea name="content" rows="3" cols="60"></textarea></div> <div><input type="submit" value="Sign Guestbook"></div> </form> <hr> <form>Guestbook name: <input value="%s" name="guestbook_name"> <input type="submit" value="switch"></form> </body> </html>""" % (urllib.urlencode({'guestbook_name': guestbook_name}), cgi.escape(guestbook_name))) # [START submit]
def test_main(app): # Add a greeting to find guestbook.Greeting( content='Hello world', parent=ndb.Key('Book', 'brane3')).put() # Add a greeting to not find. guestbook.Greeting( content='Flat sheet', parent=ndb.Key('Book', 'brane2')).put() response = app.get('/?guestbook_name=brane3') assert response.status_int == 200 assert 'Hello world' in response.body assert 'Flat sheet' not in response.body
def add_note(): page_name = flask.request.args.get('page_name', 'default') note_title = flask.request.form['note_title'] note_text = flask.request.form['note_text'] parent = parent_key(page_name) choice = random.randint(0, 1) if choice == 0: # Use transactional function # [START calling] note_key = ndb.Key(Note, note_title, parent=parent) note = Note(key=note_key, content=note_text) # [END calling] if pick_random_insert(note_key, note) is False: return ('Already there<br><a href="%s">Return</a>' % flask.url_for('main_page', page_name=page_name)) return flask.redirect(flask.url_for('main_page', page_name=page_name)) elif choice == 1: # Use get_or_insert, which is transactional note = Note.get_or_insert(note_title, parent=parent, content=note_text) if note.content != note_text: return ('Already there<br><a href="%s">Return</a>' % flask.url_for('main_page', page_name=page_name)) return flask.redirect(flask.url_for('main_page', page_name=page_name))
def test_relationship(testbed): # Creates 1 contact and 2 companies addressbook_key = ndb.Key('AddressBook', 'tmatsuo') mary = models.Contact(parent=addressbook_key, name='Mary') mary.put() google = models.Company(name='Google') google.put() candit = models.Company(name='Candit') candit.put() # first google hires Mary models.ContactCompany(parent=addressbook_key, contact=mary.key, company=google.key, title='engineer').put() # then another company named 'candit' hires Mary too models.ContactCompany(parent=addressbook_key, contact=mary.key, company=candit.key, title='president').put() # get the list of companies that Mary belongs to assert len(mary.companies) == 2
def testEventuallyConsistentGlobalQueryResult(self): class TestModel(ndb.Model): pass user_key = ndb.Key('User', 'ryan') # Put two entities ndb.put_multi([ TestModel(parent=user_key), TestModel(parent=user_key) ]) # Global query doesn't see the data. self.assertEqual(0, TestModel.query().count(3)) # Ancestor query does see the data. self.assertEqual(2, TestModel.query(ancestor=user_key).count(3)) # [END HRD_example_1] # [START HRD_example_2]
def post(self): accountname = self.session.get('account') if self.validate_user(accountname, SADMIN): suser = User.query(User.username == accountname).fetch() if len(suser) !=0: suser = suser[0] else: suser = None user_key = self.request.get('user_key') if user_key != '': fname = self.request.get('firstName') lname = self.request.get('lastName') pword = self.request.get('passWord') user = ndb.Key(urlsafe=user_key).get() user.firstname = fname user.lastname = lname user.password = pword user.put() self.success_redirect('DSUBMIT_SUCCESS', '/registrationhomepage') else: self.error_redirect('INVALID_LOGIN_STATE', '/logout')
def get_people_rankings_for_city_names_dev_remote(geoname_ids, attendees_only): rankings = [] client = datastore.Client('dancedeets-hrd') for city_name in geoname_ids: q = client.query(kind='PRCityCategory') q.add_filter('city', '=', city_name) if attendees_only: q.add_filter('person_type', '=', 'ATTENDEE') for result in q.fetch(100): ranking = PRCityCategory() ranking.key = ndb.Key('PRCityCategory', result.key.name) ranking.person_type = result['person_type'] ranking.geoname_id = result['geoname_id'] ranking.category = result['category'] ranking.top_people_json = json.loads(result.get('top_people_json', '[]')) rankings.append(ranking) return rankings
def get_multiple_tokens(token_count): good_users = users.User.query(users.User.key >= ndb.Key(users.User, '701004'), users.User.expired_oauth_token == False).fetch(token_count) # noqa guaranteed_users = [x for x in good_users if x.fb_uid == '701004'] if guaranteed_users: guaranteed_user_token = guaranteed_users[0].fb_access_token else: guaranteed_user_token = None tokens = [x.fb_access_token for x in good_users] debug_token_infos = fb_api.lookup_debug_tokens(tokens) some_future_time = time.time() + 7 * (24 * 60 * 60) # Ensure our tokens are really still valid, and still valid a few days from now, for long-running mapreduces good_tokens = [] for token, info in zip(tokens, debug_token_infos): if info['empty']: logging.error('Trying to lookup invalid access token: %s', token) continue if (info['token']['data']['is_valid'] and (info['token']['data']['expires_at'] == 0 or # infinite token info['token']['data']['expires_at'] > some_future_time)): good_tokens.append(token) # Only trim out the guaranteed-token if we have some to spare if len(good_tokens) > 1: good_tokens = [x for x in good_tokens if x != guaranteed_user_token] return good_tokens
def _delete_entity(self): """Delete entity from datastore. Attempts to delete using the key_name stored on the object, whether or not the given key is in the datastore. """ if self._is_ndb(): ndb.Key(self._model, self._key_name).delete() else: entity_key = db.Key.from_path(self._model.kind(), self._key_name) db.delete(entity_key)
def get_model(self): if not isinstance(self.json, dict): raise HTTPSuccess() key = self.json.get('key', None) if not key: logging.warn('send handler called without key') raise HTTPSuccess() key = ndb.Key(urlsafe=key) job = key.get() if job is None: logging.warn('job {} not found'.format(key)) raise HTTPSuccess() return job
def get_key(cls, user_id): """ Get Key by given user id """ return ndb.Key(Account, user_id)
def decode_key(self, key): return ndb.Key(urlsafe=key)
def decode_key_property(obj, prop, value): if not value: return None return ndb.Key(urlsafe=value)
def test_but_missing(self): """ .model is set to a key but the entity does not exist - pyamf must handle the encode and decode cases. """ pyamf.register_class(models.Pet, 'pet') pyamf.register_class(models.SuperModel, 'supermodel') pet_key = ndb.Key('Pet', 'foobar') model_key = ndb.Key('SuperModel', 'barfoo') self.assertIsNone(model_key.get()) pet = models.Pet(key=pet_key, model=model_key) pet.put() bytes = ( b'\n\x0b\x07pet', ( b'\t_key\x06Uagx0ZXN0YmVkLXRlc3RyDwsSA1BldCIGZm9vYmFyDA', b'\x0bmodel\x06gagx0ZXN0YmVkLXRlc3RyFgsSClN1cGVyTW9kZWwiBmJhcm' b'Zvbww', b'\tname\x01', ), b'\x01' ) self.assertEncodes(pet, bytes) self.assertDecodes(bytes, pet)
def test_person(self): pyamf.register_class(models.Person, 'Person') person_key = ndb.Key('Contact', 'p') person = models.Person( key=person_key, first_name='Foo', last_name='Bar', ) bytes = ( b'\n\x0b\rPerson', ( b'\t_key\x06Qagx0ZXN0YmVkLXRlc3RyDgsSB0NvbnRhY3QiAXAM', b'\x19phone_number\x01', b'\x0faddress\x01', b'\x15first_name\x06\x07Foo', b'\x13last_name\x06\x07Bar', ), b'\x01' ) self.assertEncodes(person, bytes) def check_person(ret): self.assertIsInstance(ret, models.Person) self.assertEqual(ret, person) self.assertDecodes(bytes, check_person)
def test_company(self): pyamf.register_class(models.Company, 'Company') company_key = ndb.Key('Contact', 'c') company = models.Company( key=company_key, name='Acme Ltd', ) bytes = ( b'\n\x0b\x0fCompany', ( b'\t_key\x06Qagx0ZXN0YmVkLXRlc3RyDgsSB0NvbnRhY3QiAWMM', b'\x19phone_number\x01', b'\x0faddress\x01', b'\tname\x06\x11Acme Ltd', ), b'\x01' ) self.assertEncodes(company, bytes) def check_company(ret): self.assertIsInstance(ret, models.Company) self.assertEqual(ret, company) self.assertDecodes(bytes, check_company)
def test_structured(self): pyamf.register_class(models.SPContact, 'SPContact') pyamf.register_class(models.SPAddress, 'SPAddress') guido_key = ndb.Key('SPContact', 'guido') guido = models.SPContact( key=guido_key, name='Guido', addresses=[ models.SPAddress( type='home', city='Amsterdam' ), models.SPAddress( type='work', street='Spear St', city='SF' ) ] ) guido.put() bytes = ( b'\n\x0b\x13SPContact\t_key\x06aagx0ZXN0YmVkLXRlc3RyFAsSCVNQQ29udG' b'FjdCIFZ3VpZG8M\x13addresses\t\x05\x01\n\x0b\x13SPAddress\x02\x01' b'\tcity\x06\x13Amsterdam\rstreet\x01\ttype\x06\thome\x01\n\x05' b'\x02\x01\n\x06\x05SF\x0e\x06\x11Spear St\x10\x06\twork\x01\tname' b'\x06\x0bGuido\x01' ) self.assertEncodes(guido, bytes) def check_guido(ret): self.assertIsInstance(ret, models.SPContact) self.assertEqual(ret.key, guido_key) self.assertEqual(ret, guido) self.assertEqual(guido.addresses, ret.addresses) self.assertDecodes(bytes, check_guido)
def test_localstructured(self): pyamf.register_class(models.LSPContact, 'LSPContact') pyamf.register_class(models.SPAddress, 'SPAddress') guido_key = ndb.Key('LSPContact', 'guido') guido = models.LSPContact( key=guido_key, name='Guido', addresses=[ models.SPAddress( type='home', city='Amsterdam' ), models.SPAddress( type='work', street='Spear St', city='SF' ) ] ) guido.put() bytes = ( b'\n\x0b\x15LSPContact\t_key\x06eagx0ZXN0YmVkLXRlc3RyFQsSCkxTUENv' b'bnRhY3QiBWd1aWRvDA\x13addresses\t\x05\x01\n\x0b\x13SPAddress\tc' b'ity\x06\x13Amsterdam\x02\x01\rstreet\x01\ttype\x06\thome\x01\n' b'\x05\n\x06\x05SF\x02\x01\x0e\x06\x11Spear St\x10\x06\twork\x01\t' b'name\x06\x0bGuido\x01' ) self.assertEncodes(guido, bytes) def check_guido(ret): self.assertIsInstance(ret, models.LSPContact) self.assertEqual(ret.key, guido_key) self.assertEqual(ret, guido) self.assertEqual(guido.addresses, ret.addresses) self.assertDecodes(bytes, check_guido)
def __init__(self, *args, **kwargs): self.love = ndb.Key(Love, kwargs.get('love_id')).get()
def _post_delete_hook(cls, key, future): """ Deletes an EmailReport from our search index when it is deleted from the datastore. Args: cls - The model class key - The instance ndb.Key future - The async future operation """ future.check_success() search_provider.delete(str(key.id()))
def get_by_id(identifier): key = ndb.Key(PrivateNote, identifier) return key.get()
def GetPriceByPredictionId(prediction_id): """Returns the current market price of contract one for a prediction_id.""" prediction_key = ndb.Key(urlsafe=prediction_id) pred = prediction_key.get() price = math.pow(math.e, (pred.contract_one / pred.liquidity)) / ( math.pow(math.e, (pred.contract_two / pred.liquidity)) + math.pow( math.e, (pred.contract_two / pred.liquidity))) return float(price)
def GetPrices(prediction_id): prices = Price.query(Price.prediction_id == ndb.Key(urlsafe=prediction_id)).order(-Price.date).fetch(30) print(str(prices)) prices = [{'price': p.value, 'date': {'year': p.date.year, 'month': p.date.month, 'day': p.date.day}} for p in prices] return prices
def GetPredictionById(prediction_id): """Returns a prediction by prediction_id.""" try: prediction_key = ndb.Key(urlsafe=prediction_id) prediction = prediction_key.get() portfolio = GetUserPortfolioByAuth(prediction_id) except: return render_template('404.html') return render_template( 'prediction.html', prediction=prediction, price=GetPriceByPredictionId(prediction_id), pricelist=GetPrices(prediction_id), prediction_id=prediction_id, portfolio=portfolio)
def GetUserBalanceByAuth(): """Returns current users balance.""" user_key = ndb.Key('Profile', users.get_current_user().user_id()) profile = user_key.get() return str(profile.balance) # TODO(goldhaber): change to GetUserPortfolioByAuth By Prediction ID
def CreateTrade(): """Creates a trade for the user.""" user_id = users.get_current_user().user_id() user_key = ndb.Key('Profile', user_id) current_user = user_key.get() prediction_key = ndb.Key(urlsafe=request.form['prediction_id']) prediction = prediction_key.get() if request.form['is_likelihood'] == 'true': user_id = users.get_current_user().user_id() user_key = ndb.Key('Profile', user_id) current_user = user_key.get() trade = calculate_trade_from_likelihood( float(request.form['likelihood']), prediction, current_user) print trade else: trade = Trade( prediction_id=prediction_key, user_id=user_key, direction=request.form['direction'], contract=request.form['contract'], quantity=float(request.form['quantity'])) err = CreateTradeAction(prediction, current_user, trade) #TODO replace with error if err != 'error': flash('You successfully predicted!') return redirect('/predictions/' + trade.prediction_id.urlsafe())
def GetTradesForPredictionId(prediction_id): user = users.get_current_user() trades = Trade.query(ndb.AND(Trade.prediction_id == ndb.Key(urlsafe=prediction_id), Trade.user_id == ndb.Key('Profile', user.user_id()))).fetch() return str(trades)
def inject_balance(): user = users.get_current_user() if not user: return dict(balance=0) user_key = ndb.Key('Profile', user.user_id()) profile = user_key.get() return dict(balance=profile.balance)
def GetEntityViaMemcache(entity_key): """Get entity from memcache if available, from datastore if not.""" entity = memcache.get(entity_key) if entity is not None: return entity key = ndb.Key(urlsafe=entity_key) entity = key.get() if entity is not None: memcache.set(entity_key, entity) return entity
def GetPriceByPredictionId(prediction_id): """Get current price by Prediction Id""" prediction_key = ndb.Key(urlsafe=prediction_id) pred = prediction_key.get() price = math.pow(math.e, (pred.contract_one / pred.liquidity)) / ( math.pow(math.e, (pred.contract_two / pred.liquidity)) + math.pow( math.e, (pred.contract_two / pred.liquidity))) return float(price)