我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用django.utils.text.force_text()。
def created_by(self): """ Gets the user that created this object :returns: the user that created this object :rtype: django.contrib.auth.models.User """ # check if there is a prefetched created_by and use it if self._created_by: return get_user_from_cache_or_db(self._created_by) # get the earliest changeset earliest_changeset = self._get_earliest_changeset() if earliest_changeset: try: if earliest_changeset.user_id: return get_user_from_cache_or_db(earliest_changeset.user_id) else: return None except ObjectDoesNotExist: logger.debug(u"No user for the latest change set of '%(model)s' with pk '%(pk)s'." % { 'model': force_text(earliest_changeset.object_type), 'pk': force_text(earliest_changeset.object_uuid), }) return None
def last_modified_by(self): """ Gets the user that last modified the object :returns: the user that last modified this object :rtype: django.contrib.auth.models.User """ latest_changeset = self._get_latest_changeset() if latest_changeset: try: if latest_changeset.user_id: return get_user_from_cache_or_db(latest_changeset.user_id) else: return None except ObjectDoesNotExist: logger.debug(u"No user for the latest change set of '%(model)s' with pk '%(pk)s'." % { 'model': force_text(latest_changeset.object_type), 'pk': force_text(latest_changeset.object_uuid), }) return None
def quote_value(self, value): if isinstance(value, (datetime.date, datetime.time, datetime.datetime)): return "'%s'" % value elif isinstance(value, six.string_types): return "'%s'" % six.text_type(value).replace("\'", "\'\'") elif isinstance(value, six.buffer_types): return "'%s'" % force_text(binascii.hexlify(value)) elif isinstance(value, bool): return "1" if value else "0" else: return str(value)
def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super(InitialInventory,self).save(force_insert,force_update,using,update_fields) count = InitItem.objects.filter(master=self).count() if self.attach and count == 0: path = os.path.join(settings.MEDIA_ROOT,self.attach.name) reader = csv.reader(open(path,'r')) index = 0 with transaction.atomic(): for code,name,description,unit_code,unit_name,warehouse_code,warehouse_name,price,cnt in reader: index+=1 if index == 1: continue material = None measure = None house = None try: measure = Measure.objects.get(code=unit_code) except Exception,e: measure = Measure.objects.create(code=unit_code,name=force_text(unit_name.decode('gbk'))) try: house = Warehouse.objects.get(code=warehouse_code) except Exception,e: house = Warehouse.objects.create(code=warehouse_code,name=force_text(warehouse_name.decode('gbk'))) try: material = Material.objects.get(code=code) except Exception,e: material = Material(code=code,name=force_text(name.decode('gbk')),spec=force_text(description.decode('gbk')),warehouse=house) material.stock_price = price material.save() InitItem.objects.create(master=self,material=material,price=price,cnt=cnt,warehouse=house,measure=measure,source=self.code)
def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super(WorkOrder,self).save(force_insert,force_update,using,update_fields) if self.service: material = self.service if self.woextravalue_set.count() < 1 and material.extraparam_set and material.extraparam_set.count() > 0: for param in material.extraparam_set.all(): extra_param = WOExtraValue.objects.create(workorder=self,param_name=param) self.woextravalue_set.add(extra_param) item_count = WOItem.objects.filter(workorder=self).count() if self.detail and item_count == 0: path = os.path.join(settings.MEDIA_ROOT,self.detail.name) workbook = xlrd.open_workbook(path) sheet = workbook.sheet_by_index(0) row_count = sheet.nrows with transaction.atomic(): for row_index in range(row_count): row = sheet.row_values(row_index) if row_index == 0: doc_type = row[1] if doc_type.startswith('0'): break else: continue elif row_index < 3: continue material = None measure = None try: measure = Measure.objects.get(code=row[4]) except Exception,e: measure = Measure.objects.create(code=row[4],name=force_text(row[5])) try: material = Material.objects.get(code=row[0]) except Exception,e: material = Material(code=row[0],name=force_text(row[1]),spec=force_text(row[2])) material.save() WOItem.objects.create(workorder=self,material=material,measure=measure,amount=row[6])
def list_to_csv_response(data, title='report', header=None, widths=None): """ Make 2D list into a csv response for download data. """ response = HttpResponse(content_type="text/csv; charset=UTF-8") cw = csv.writer(response) for row in chain([header] if header else [], data): cw.writerow([force_text(s).encode(response.charset) for s in row]) return response
def test_assertion_consumer_service(self): # Get initial number of users initial_user_count = User.objects.count() settings.SAML_CONFIG = conf.create_conf( sp_host='sp.example.com', idp_hosts=['idp.example.com'], metadata_file='remote_metadata_one_idp.xml', ) self.init_cookies() # session_id should start with a letter since it is a NCName session_id = "a0123456789abcdef0123456789abcdef" came_from = '/another-view/' self.add_outstanding_query(session_id, came_from) # this will create a user saml_response = auth_response(session_id, 'student') response = self.client.post(reverse('saml2_acs'), { 'SAMLResponse': self.b64_for_post(saml_response), 'RelayState': came_from, }) self.assertEqual(response.status_code, 302) location = response['Location'] url = urlparse(location) self.assertEqual(url.path, came_from) self.assertEqual(User.objects.count(), initial_user_count + 1) user_id = self.client.session[SESSION_KEY] user = User.objects.get(id=user_id) self.assertEqual(user.username, 'student') # let's create another user and log in with that one new_user = User.objects.create(username='teacher', password='not-used') session_id = "a1111111111111111111111111111111" came_from = '' # bad, let's see if we can deal with this saml_response = auth_response(session_id, 'teacher') self.add_outstanding_query(session_id, '/') response = self.client.post(reverse('saml2_acs'), { 'SAMLResponse': self.b64_for_post(saml_response), 'RelayState': came_from, }) self.assertEqual(response.status_code, 302) location = response['Location'] url = urlparse(location) # as the RelayState is empty we have redirect to LOGIN_REDIRECT_URL self.assertEqual(url.path, settings.LOGIN_REDIRECT_URL) self.assertEqual(force_text(new_user.id), self.client.session[SESSION_KEY])
def save(self, force_insert=False, force_update=False, using=None, update_fields=None): import decimal super(OfferSheet,self).save(force_insert,force_update,using,update_fields) if self.discount_amount > 0: sql = 'UPDATE sale_offeritem a SET a.discount_price = a.sale_price - ' \ '((SELECT discount_amount/amount FROM sale_offersheet WHERE id = %s) * (a.sale_price*a.cnt)/a.cnt) WHERE a.master_id = %s' params = [self.id,self.id] # print sql % (self.id,self.id) generic.update(sql,params) item_count = OfferItem.objects.filter(master=self).count() if self.attach and item_count == 0: path = os.path.join(settings.MEDIA_ROOT,self.attach.name) workbook = xlrd.open_workbook(path) sheet = workbook.sheet_by_index(0) row_count = sheet.nrows with transaction.atomic(): total_amount = decimal.Decimal(0) for row_index in range(row_count): row = sheet.row_values(row_index) if row_index == 0: doc_type = row[1] if doc_type.startswith('0'): break else: continue elif row_index < 3: continue material = None measure = None try: measure = Measure.objects.get(code=row[4]) except Exception,e: measure = Measure.objects.create(code=row[4],name=force_text(row[5])) try: material = Material.objects.get(code=row[0]) except Exception,e: material = Material(code=row[0],name=force_text(row[1]),spec=force_text(row[2])) material.sale_price = row[6] material.purchase_price = row[7] material.save() OfferItem.objects.create(master=self,material=material,measure=measure,cnt=row[8],brand=force_text(row[3]), cost_price=row[7],sale_price=row[6]) total_amount += decimal.Decimal(row[6])*decimal.Decimal(row[8]) sql = 'update sale_offersheet set amount = %s where id=%s' params = [total_amount,self.id] generic.update(sql,params)
def save(self, force_insert=False, force_update=False, using=None, update_fields=None): super(PurchaseOrder,self).save(force_insert,force_update,using,update_fields) if self.discount_amount > 0: sql = 'UPDATE purchase_poitem a SET a.discount_price = ' \ 'a.price-((SELECT discount_amount/amount FROM purchase_purchaseorder WHERE id=%s)*a.amount/a.cnt) WHERE a.po_id = %s' params = [self.id,self.id] generic.update(sql,params) item_count = POItem.objects.filter(po=self).count() if self.attach and item_count == 0: path = os.path.join(settings.MEDIA_ROOT,self.attach.name) workbook = xlrd.open_workbook(path) sheet = workbook.sheet_by_index(0) row_count = sheet.nrows with transaction.atomic(): total_amount = decimal.Decimal(0) for row_index in range(row_count): row = sheet.row_values(row_index) if row_index == 0: doc_type = row[1] if doc_type.startswith('0'): break else: continue elif row_index < 3: continue material = None measure = None try: measure = Measure.objects.get(code=row[4]) except Exception,e: measure = Measure.objects.create(code=row[4],name=force_text(row[5])) try: material = Material.objects.get(code=row[0]) except Exception,e: material = Material(code=row[0],name=force_text(row[1]),spec=force_text(row[2])) material.purchase_price = row[6] material.save() amount = decimal.Decimal(row[6])*decimal.Decimal(row[7]) POItem.objects.create(po=self,material=material,measure=measure,cnt=row[7],price=row[6],amount=amount) total_amount += amount sql = 'update purchase_purchaseorder set amount = %s where id=%s' params = [total_amount,self.id] generic.update(sql,params)
def action_import(self,request): from django.db import transaction if self.attach: if self.handler: klass = ExcelManager().handlers.get(self.handler) with transaction.atomic(): klass.handle(self,self.attach) self.status = 1 self.save() else: import xlrd import os from mis import settings path = os.path.join(settings.MEDIA_ROOT,self.attach.name) workbook = xlrd.open_workbook(path) sheet = workbook.sheet_by_index(0) row_count = sheet.nrows col_count = sheet.ncols cols = [] with transaction.atomic(): for row_index in range(row_count): line = sheet.row_values(row_index) if row_index == 0: cols = line continue elif row_index == 1: continue else: klass = self.content_type.model_class() values = line params = {} for name in cols: index = cols.index(name) v = values[index] if type(v) == str: v = force_text(v.decode('gbk')) params[name]=v # print 'name is %s value is %s'%(name,v) try: params.pop('') except Exception,e: pass # print params klass.objects.create(**params) self.status = '1' self.save()
def test_assertion_consumer_service(self): # Get initial number of users initial_user_count = User.objects.count() settings.SAML_CONFIG = conf.create_conf( sp_host='sp.example.com', idp_hosts=['idp.example.com'], metadata_file='remote_metadata_one_idp.xml', ) self.init_cookies() # session_id should start with a letter since it is a NCName session_id = "a0123456789abcdef0123456789abcdef" came_from = '/another-view/' self.add_outstanding_query(session_id, came_from) # this will create a user saml_response = auth_response(session_id, 'student') response = self.client.post(reverse('saml2_acs'), { 'SAMLResponse': self.b64_for_post(saml_response), 'RelayState': came_from, }) self.assertEquals(response.status_code, 302) location = response['Location'] url = urlparse(location) self.assertEquals(url.path, came_from) self.assertEquals(User.objects.count(), initial_user_count + 1) user_id = self.client.session[SESSION_KEY] user = User.objects.get(id=user_id) self.assertEquals(user.username, 'student') # let's create another user and log in with that one new_user = User.objects.create(username='teacher', password='not-used') session_id = "a1111111111111111111111111111111" came_from = '' # bad, let's see if we can deal with this saml_response = auth_response(session_id, 'teacher') self.add_outstanding_query(session_id, '/') response = self.client.post(reverse('saml2_acs'), { 'SAMLResponse': self.b64_for_post(saml_response), 'RelayState': came_from, }) self.assertEquals(response.status_code, 302) location = response['Location'] url = urlparse(location) # as the RelayState is empty we have redirect to LOGIN_REDIRECT_URL self.assertEquals(url.path, '/accounts/profile/') self.assertEquals(force_text(new_user.id), self.client.session[SESSION_KEY])