我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.files.base.ContentFile()。
def generate_robot_report(self): if self.xml_source is not None: from robot.reporting.resultwriter import ResultWriter log = os.path.splitext(self.xml_source.file.name)[0] + "_log.html" report = os.path.splitext(self.xml_source.file.name)[0] + "_report.html" if self.log_file is not None: self.log_file.delete() if self.report_file is not None: self.report_file.delete() self.log_file.save(os.path.basename(log), ContentFile('no content')) self.report_file.save(os.path.basename(report), ContentFile('no content')) writer = ResultWriter(self.xml_source.file.name) retval = writer.write_results(report=report, log=log, xunit=None) if retval == -1: print "failed to regenerate files"
def test_read_and_save_cnt(self): with override_storage.locmem_stats_override_storage() as storage: name = 'file.txt' content = 'file content'.encode() self.assertEqual(storage.read_cnt, 0) self.assertEqual(storage.save_cnt, 0) obj = SimpleModel() obj.upload_file.save(name, ContentFile(content)) self.assertEqual(storage.save_cnt, 1) self.assertEqual(storage.read_cnt, 0) read_content = obj.upload_file.read() self.assertEqual(storage.read_cnt, 1) self.assertEqual(content, read_content)
def _copy_file(self, destination, overwrite=False): """ Copies the file to a destination files and returns it. """ if overwrite: # If the destination file already exists default storage backend # does not overwrite it but generates another filename. # TODO: Find a way to override this behavior. raise NotImplementedError src_file_name = self.file.name storage = self.file.storages['public' if self.is_public else 'private'] # This is needed because most of the remote File Storage backend do not # open the file. src_file = storage.open(src_file_name) src_file.open() return storage.save(destination, ContentFile(src_file.read()))
def download(self): """ Download the LaTeX source of this paper and save to storage. It will save the model. """ # Delete an existing file if it exists so we don't clutter up storage # with dupes. This might mean we lose a file the download fails, # but we can just download it again. if self.source_file.name: self.source_file.delete() res = requests.get(self.get_source_url()) res.raise_for_status() extension = guess_extension_from_headers(res.headers) if not extension: raise DownloadError("Could not determine file extension from " "headers: Content-Type: {}; " "Content-Encoding: {}".format( res.headers.get('content-type'), res.headers.get('content-encoding'))) content = ContentFile(res.content) self.source_file.save(self.arxiv_id + extension, content)
def add_script_project(request): name = request.POST['name'] desc = request.POST['description'] entrance = request.POST['entrance'] arguments = request.POST['arguments'] sp = ScriptProject(name=name, desc=desc, entrance_point=entrance, arguments=arguments) sp.save() project_path = os.path.join(SCRIPT_MANAGER_DIR, '%s_%s' % (str(sp.id), canonize_project_name(name))) if not os.path.exists(project_path): os.makedirs(project_path) for file_ in request.FILES.getlist('files[]'): path = default_storage.save(os.path.join(project_path, file_.name), ContentFile(file_.read())) return HttpResponse()
def export(self, file_handle=None): ''' exports the task questions and answers as a CSV ''' try: if not file_handle: file_handle = StringIO.StringIO() data = self.task.answers # http://stackoverflow.com/a/11399424 # getting the union of all keys in all of the answer rows headers = list(set().union(*(i.keys() for i in data))) writer = csv.DictWriter(file_handle, fieldnames=headers) writer.writeheader() for row in data: writer.writerow(row) export_file = ContentFile(file_handle.getvalue()) export_filename = "ST_TASK_{task_id}_EXPORT_{date}.csv".format(task_id=self.task.id, date=str(datetime.date.today())) self.export_file.save(name=export_filename, content=export_file, save=False) self.status = self.SUCCESS except Exception as e: LOG.exception(e) self.status = self.FAILURE self.save()
def perform_image_crop(image_obj, crop_rect=None): img_ext = os.path.splitext(image_obj.name)[1][1:].upper() img_ext = 'JPEG' if img_ext=='JPG' else img_ext if crop_rect is None: return image_obj image = BytesIO(image_obj.read()) base_image = Image.open(image) tmp_img,tmp_file = base_image.crop(crop_rect), BytesIO() tmp_img.save(tmp_file, format=img_ext) tmp_file = ContentFile(tmp_file.getvalue()) return uploadedfile.InMemoryUploadedFile( tmp_file, None, image_obj.name, image_obj.content_type, tmp_file.tell, None )
def save(self, *args, **kwargs): pil_image_obj = Image.open(self.thumbnail_image) new_image = resizeimage.resize_cover( pil_image_obj, [250, 150], validate=False ) new_image_io = BytesIO() new_image.save(new_image_io, format='PNG') temp_name = self.thumbnail_image.name self.thumbnail_image.delete(save=False) self.thumbnail_image.save( temp_name, content=ContentFile(new_image_io.getvalue()), save=False ) super(ThumbnailImage, self).save(*args, **kwargs)
def convert_image(image, width, height): """ resize the image to the correct size needed by the template. """ name = image.name pio = Image.open(image) if width is None: img = resizeimage.resize_height(pio, height, validate=False) else: img = resizeimage.resize_cover(pio, [width, height], validate=False) new_image_io = BytesIO() img.save(new_image_io, format=pio.format) image.delete(save=False) image.save( name, content=ContentFile(new_image_io.getvalue()), save=False )
def put(self, request, *args, **kwargs): ''' If there is a thumbnail, and it was sent as part of an application/json payload, then we need to unpack a thumbnail object payload and convert it to a Python ContentFile payload instead. We use a try/catch because the optional nature means we need to check using "if hasattr(request.data,'thumbnail'):" as we as "if request.data['thumbnail']" and these are pretty much mutually exclusive patterns. A try/pass make far more sense. ''' try: thumbnail = request.data['thumbnail'] # do we actually need to repack as ContentFile? if thumbnail['name'] and thumbnail['base64']: name = thumbnail['name'] encdata = thumbnail['base64'] proxy = ContentFile(base64.b64decode(encdata), name=name) request.data['thumbnail'] = proxy except: pass return super(UserProfileAPIView, self).put(request, *args, **kwargs)
def test_publish_xml_xlsform_download(self): count = XForm.objects.count() path = os.path.join( self.this_directory, '..', '..', 'api', 'tests', 'fixtures', 'forms', 'contributions', 'contributions.xml') f = open(path) xml_file = ContentFile(f.read()) f.close() xml_file.name = 'contributions.xml' self.xform = publish_xml_form(xml_file, self.user) self.assertTrue(XForm.objects.count() > count) response = self.client.get(reverse(download_xlsform, kwargs={ 'username': self.user.username, 'id_string': 'contributions' }), follow=True) self.assertContains(response, 'No XLS file for your form ')
def _contributions_form_submissions(self): count = XForm.objects.count() path = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'forms', 'contributions') form_path = os.path.join(path, 'contributions.xml') f = open(form_path) xml_file = ContentFile(f.read()) f.close() xml_file.name = 'contributions.xml' self.xform = publish_xml_form(xml_file, self.user) self.assertTrue(XForm.objects.count() > count) instances_path = os.path.join(path, 'instances') for uuid in os.listdir(instances_path): s_path = os.path.join(instances_path, uuid, 'submission.xml') create_instance(self.user.username, open(s_path), []) self.assertEqual(self.xform.instances.count(), 6)
def download_media_files(self, xml_doc, media_path): for media_node in xml_doc.getElementsByTagName('mediaFile'): filename_node = media_node.getElementsByTagName('filename') url_node = media_node.getElementsByTagName('downloadUrl') if filename_node and url_node: filename = filename_node[0].childNodes[0].nodeValue path = os.path.join(media_path, filename) if default_storage.exists(path): continue download_url = url_node[0].childNodes[0].nodeValue if self._get_media_response(download_url): download_res = self._current_response media_content = ContentFile(download_res.content) default_storage.save(path, media_content) self.logger.debug("Fetched %s." % filename) else: self.logger.error("Failed to fetch %s." % filename)
def _upload_instance(self, xml_file, instance_dir_path, files): xml_doc = clean_and_parse_xml(xml_file.read()) xml = StringIO() de_node = xml_doc.documentElement for node in de_node.firstChild.childNodes: xml.write(node.toxml()) new_xml_file = ContentFile(xml.getvalue()) new_xml_file.content_type = 'text/xml' xml.close() attachments = [] for attach in de_node.getElementsByTagName('mediaFile'): filename_node = attach.getElementsByTagName('filename') filename = filename_node[0].childNodes[0].nodeValue if filename in files: file_obj = default_storage.open( os.path.join(instance_dir_path, filename)) mimetype, encoding = mimetypes.guess_type(file_obj.name) media_obj = django_file(file_obj, 'media_files[]', mimetype) attachments.append(media_obj) create_instance(self.user.username, new_xml_file, attachments)
def _save_thumbnails(image, path, size, suffix): nm = NamedTemporaryFile(suffix='.%s' % settings.IMG_FILE_TYPE) default_storage = get_storage_class()() try: # Ensure conversion to float in operations image.thumbnail( get_dimensions(image.size, float(size)), Image.ANTIALIAS) except ZeroDivisionError: pass try: image.save(nm.name) except IOError: # e.g. `IOError: cannot write mode P as JPEG`, which gets raised when # someone uploads an image in an indexed-color format like GIF image.convert('RGB').save(nm.name) default_storage.save( get_path(path, suffix), ContentFile(nm.read())) nm.close()
def test_fire_message_with_attachments(self): MailFactory(message=self.message) default_storage.save('foo.txt', ContentFile('testfoo')) ma = MessageAttachment(message=self.message, file='foo.txt') self.message.author.organization.settings.notify_message_status = False ma.save() # Fires the message self.message.status = 'sending' self.message.save() first_mail = self.message.mails.all().first() message = first_mail.message.to_mail(first_mail) self.assertEqual(len(message.attachments), 1) filename, content, mimetype = message.attachments[0] self.assertEqual(filename, 'foo.txt') self.assertEqual(content, 'testfoo') self.assertEqual(mimetype, 'text/plain') default_storage.delete('foo.txt')
def test_image_resizing(self): image = Image(organization=self.organization) file = ContentFile(self.create_random_image(100, 100).read()) image.file.save('random_image.png', file, save=False) image.save() image_file = PIL.Image.open(image.file.file) self.assertEqual((100, 100), image_file.size) image = Image(organization=self.organization) file = ContentFile(self.create_random_image(100, 100).read()) image.file.save('random_image.png', file, save=False) image.width = 50 image.save() image_file = PIL.Image.open(image.file.file) self.assertEqual((50, 50), image_file.size)
def store(self): if len(self.data) >= self.MAX_SIZE: raise TooBigMedia(self.identifying_name, self.MAX_SIZE) mime = magic.from_buffer(self.data, mime=True) if mime not in self.allowed_mimetypes: raise InvalidMimeType(mime) self.extension = mimetypes.guess_extension(mime) # weirdness from mimetypes if self.extension == '.jpe': self.extension = '.jpeg' checksum = hashlib.sha1(self.data).hexdigest() fn = '{}{}'.format(checksum, self.extension) img = Image(organization=self.organization) img.file.save(fn, ContentFile(self.data)) return img.get_absolute_url()
def avatar_upload(request): if request.method == "POST": image_stream = request.POST.get('file', None) image = ContentFile(b64decode(image_stream)) avatar = UserAvatar() avatar.user_avatar.save(produce_image_name() + '.jpg', image) current_user = User.objects(id=request.session['currentUser']['_id']).get() image_url = prefURL['ImageURL'] + avatar.user_avatar.__str__() current_user.update(avatar=image_url) current_user.reload() kw = { '_id': str(current_user.id), 'nickname': current_user.nickname, 'avatar': current_user.avatar, 'token': current_user.token } process_token(**kw) return HttpResponse(current_user.to_json()) raise Http404
def image_test(request): if request.method == "GET": print request.method return JsonResponse({"result": 555}) if request.method == "POST": print 123 xxs = request.POST.get('file', None) # print xxs # print type(xxs) image_data = ContentFile(b64decode(xxs)) # file = ContentFile(xxs) # print request.FILES # print request.FILES['file'].__dict__ # print bytes(request.FILES['file']) # print type(request.FILES['file']) x = ImageTest() # file = ContentFile(request.FILES['file'].read()) # # x.save() x.image_avatar.save("test.jpg", image_data) return JsonResponse({"result": 555})
def test_post_audio(self): # Build a fake file fake_file = ContentFile(b("A boring example song")) fake_file.name = 'song.mp3' # Submit post_data = { 'title': "Test media", 'file': fake_file, 'duration': 100, } response = self.client.post(reverse('wagtailmedia:add', args=('audio', )), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtailmedia:index')) # Media should be created, and be placed in the root collection self.assertTrue(models.Media.objects.filter(title="Test media").exists()) root_collection = Collection.get_first_root_node() media = models.Media.objects.get(title="Test media") self.assertEqual(media.collection, root_collection) self.assertEqual(media.type, 'audio')
def test_post_video(self): # Build a fake file fake_file = ContentFile(b("A boring example movie")) fake_file.name = 'movie.mp4' # Submit post_data = { 'title': "Test media", 'file': fake_file, 'duration': 100, 'width': 720, 'height': 480, } response = self.client.post(reverse('wagtailmedia:add', args=('video', )), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtailmedia:index')) # Media should be created, and be placed in the root collection self.assertTrue(models.Media.objects.filter(title="Test media").exists()) root_collection = Collection.get_first_root_node() media = models.Media.objects.get(title="Test media") self.assertEqual(media.collection, root_collection) self.assertEqual(media.type, 'video')
def test_post_audio(self): # Build a fake file fake_file = ContentFile(b("A boring example song")) fake_file.name = 'song.mp3' # Submit post_data = { 'title': "Test media", 'file': fake_file, 'duration': 100, } response = self.client.post(reverse('wagtailmedia:add', args=('audio', )), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtailmedia:index')) # Media should be created with type 'audio' and in the 'evil plans' collection, # despite there being no collection field in the form, because that's the # only one the user has access to self.assertTrue(models.Media.objects.filter(title="Test media").exists()) media = models.Media.objects.get(title="Test media") self.assertEqual(media.collection, self.evil_plans_collection) self.assertEqual(media.type, 'audio')
def test_post_video(self): # Build a fake file fake_file = ContentFile(b("A boring example movie")) fake_file.name = 'movie.mp4' # Submit post_data = { 'title': "Test media", 'file': fake_file, 'duration': 100, } response = self.client.post(reverse('wagtailmedia:add', args=('video', )), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtailmedia:index')) # Media should be created with type 'video' and in the 'evil plans' collection, # despite there being no collection field in the form, because that's the # only one the user has access to self.assertTrue(models.Media.objects.filter(title="Test media").exists()) media = models.Media.objects.get(title="Test media") self.assertEqual(media.collection, self.evil_plans_collection) self.assertEqual(media.type, 'video')
def test_post(self): # Build a fake file fake_file = ContentFile(b("A boring example song")) fake_file.name = 'song.mp3' # Submit title change post_data = { 'title': "Test media changed!", 'file': fake_file, 'duration': 100, } response = self.client.post(reverse('wagtailmedia:edit', args=(self.media.id,)), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtailmedia:index')) # Media title should be changed self.assertEqual(models.Media.objects.get(id=self.media.id).title, "Test media changed!")
def imageLoad(request): if request.method == 'POST': image_stream = request.POST.get('file',None) image = ContentFile(b64decode(image_stream)) icon = UserIcon() icon.user_iocn.save(produceImageName() + '.jpg',image) current_user = User.objects(id = request.session['currentUser']['_id']).get() image_url = preUrl['ImageUrl'] + icon.user_icon.__str__() current_user.update(icon = image_url) current_user.reload() kw = { 'id':str(current_user.id), 'nickname':current_user.nickname, 'icon':current_user.icon, 'token':current_user.token } processToken(** kw) results = current_user.to_json() results['is_success'] = 1 return HttpResponse(results) raise Http404
def test_update_asset_parent(self): """ Test update Asset parent only """ logging.info('Test update asset file...') f = Folder(name='f') f.save() f2 = Folder(name='f2') f2.save() a = Asset(name='a', parent=f) a.file.save('file.txt', ContentFile('Content'.encode('utf-8'))) a.parent = f2 a.save() self.assertEqual(self.get_bucket_contents(), ['media/' + str(a.parent.id) + '/file.txt'])
def test_update_asset_file_and_parent(self): """ Test update Asset file and parent simultaneously """ logging.info('Test update asset file and parent simultaneously...') f = Folder(name='f') f.save() f2 = Folder(name='f2') f2.save() a = Asset(name='a', parent=f) a.file.save('file.txt', ContentFile('Content'.encode('utf-8'))) a.parent = f2 a.file.save('file2.txt', ContentFile('Content2'.encode('utf-8'))) self.assertEqual(self.get_bucket_contents(), ['media/' + str(a.parent.id) + '/file2.txt'])
def new_test_image(): """ Creates an automatically generated test image. In your testing `tearDown` method make sure to delete the test image with the helper function `delete_test_image`. The recommended way of using this helper function is as follows: object_1.image_property.save(*new_test_image()) :return: Image name and image content file. """ warnings.warn(DeprecationWarning( "new_test_image() is deprecated in favour of the get_sample_image() " "context manager."), stacklevel=2) image_name = 'test-{}.png'.format(uuid.uuid4()) image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0)) ImageDraw.Draw(image) byte_io = BytesIO() image.save(byte_io, 'png') byte_io.seek(0) return image_name, ContentFile(byte_io.read(), image_name)
def test_race_condition_handling(self): # Hold on to original os.remove original_remove = os.remove def race_remove(path): "Patched os.remove to raise ENOENT (No such file or directory)" original_remove(path) raise OSError(errno.ENOENT, 'Fake ENOENT') try: os.remove = race_remove self.default_storage.save('race.file', ContentFile('Fake ENOENT')) self.default_storage.delete('race.file') self.assertFalse(self.default_storage.exists('race.file')) finally: # Restore os.remove os.remove = original_remove
def saveImage(image): pwd = os.getcwd() filetype = image.content_type.split('/')[1] filePath = os.path.abspath(os.path.dirname(pwd)) + '/Django/image/' + str(time.time()) + '.' + filetype if isinstance(image, TemporaryUploadedFile): temp_file = open(image.temporary_file_path(), 'rb+') content = cStringIO.StringIO(temp_file.read()) image = Image.open(content) temp_file.close() # image.seek(0) # img = Image.open(image) # img.save(filePath) default_storage.save(filePath, ContentFile(image.read())) path = detectObject(filePath, filetype) # new_img = Image.open(path) fileName = path.split('/')[-1] return fileName # return HttpResponse("<img src="{% static img %}" style="width: 80%;alignment: center">")
def load_mugshots(data_dir='/root/mugshots'): from .models import DepartmentUser files = [x for x in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, x))] valid = 0 for f in files: name = os.path.splitext(f)[0] qs = DepartmentUser.objects.filter(username__iexact=name) if qs: with open(os.path.join(data_dir, f)) as fp: qs[0].photo.save(f, ContentFile(fp.read())) print('Updated photo for {}'.format(name)) valid += 1 else: print('ERROR: Username {} not found'.format(name)) print('{}/{} photos valid'.format(valid, len(files)))
def test_post(self): # Build a fake file fake_file = ContentFile(b("A boring example document")) fake_file.name = 'test.txt' # Submit post_data = { 'title': "Test document", 'file': fake_file, } response = self.client.post(reverse('wagtaildocs:add'), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtaildocs:index')) # Document should be created, and be placed in the root collection self.assertTrue(models.Document.objects.filter(title="Test document").exists()) root_collection = Collection.get_first_root_node() self.assertEqual( models.Document.objects.get(title="Test document").collection, root_collection )
def test_post(self): # Build a fake file fake_file = ContentFile(b("A boring example document")) fake_file.name = 'test.txt' # Submit post_data = { 'title': "Test document", 'file': fake_file, } response = self.client.post(reverse('wagtaildocs:add'), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtaildocs:index')) # Document should be created in the 'evil plans' collection, # despite there being no collection field in the form, because that's the # only one the user has access to self.assertTrue(models.Document.objects.filter(title="Test document").exists()) self.assertEqual( models.Document.objects.get(title="Test document").collection, self.evil_plans_collection )
def test_post(self): # Build a fake file fake_file = ContentFile(b("A boring example document")) fake_file.name = 'test.txt' # Submit title change post_data = { 'title': "Test document changed!", 'file': fake_file, } response = self.client.post(reverse('wagtaildocs:edit', args=(self.document.id,)), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtaildocs:index')) # Document title should be changed self.assertEqual(models.Document.objects.get(id=self.document.id).title, "Test document changed!")
def test_post(self): # Build a fake file fake_file = ContentFile(b("A boring example document")) fake_file.name = 'test.txt' # Submit post_data = { 'title': "Test document", 'file': fake_file, } response = self.client.post(reverse('wagtaildocs:chooser_upload'), post_data) # Check that the response is a javascript file saying the document was chosen self.assertTemplateUsed(response, 'wagtaildocs/chooser/document_chosen.js') self.assertContains(response, "modal.respond('documentChosen'") # Document should be created self.assertTrue(models.Document.objects.filter(title="Test document").exists())
def test_post(self): # Build a fake file fake_file = ContentFile(b("A boring example document")) fake_file.name = 'test.txt' # Submit post_data = { 'title': "Test document", 'file': fake_file, } response = self.client.post(reverse('wagtaildocs:chooser_upload'), post_data) # Check that the response is a javascript file saying the document was chosen self.assertTemplateUsed(response, 'wagtaildocs/chooser/document_chosen.js') self.assertContains(response, "modal.respond('documentChosen'") # Document should be created doc = models.Document.objects.filter(title="Test document") self.assertTrue(doc.exists()) # Document should be in the 'evil plans' collection self.assertEqual(doc.get().collection, self.evil_plans_collection)
def add_document(self, **params): # Build a fake file fake_file = ContentFile(b("A boring example document")) fake_file.name = 'test.txt' # Submit post_data = { 'title': "Test document", 'file': fake_file, } post_data.update(params) response = self.client.post(reverse('wagtaildocs:add'), post_data) # User should be redirected back to the index self.assertRedirects(response, reverse('wagtaildocs:index')) # Document should be created doc = models.Document.objects.filter(title=post_data['title']) self.assertTrue(doc.exists()) return doc.first()
def thumbnail_from_big_image(big_image, size=None): try: src_w, src_h = get_image_dimensions(big_image) img = Image.open(big_image) img.thumbnail((512,512)) th_w, th_h = img.size if size is None: size = settings.DEFAULT_MAX_THUMB_SIZE if th_w > size or th_h > size: x = (th_w - size)/2.0 if x < 0: x = 0 y = (th_h - size)/2.0 if y < 0: y = 0 img = img.crop((x,y, size+x, size+y)) output = StringIO() img.save(output, format="PNG") contents = output.getvalue() output.close() return ContentFile(contents) except IOError: return None
def test_file_handling(self): fileobj = ContentFile("foo bar") file1 = File.objects.create( name='baz.js', type='default', size=7, ) results = file1.putfile(fileobj, 3) assert len(results) == 3 assert results[0].offset == 0 assert results[1].offset == 3 assert results[2].offset == 6 with file1.getfile() as fp: assert fp.read() == 'foo bar' fp.seek(2) assert fp.read() == 'o bar' fp.seek(0) assert fp.read() == 'foo bar' fp.seek(4) assert fp.read() == 'bar'
def test_publish_xml_xlsform_download(self): count = XForm.objects.count() path = os.path.join( self.this_directory, '..', '..', 'api', 'tests', 'fixtures', 'forms', 'contributions', 'contributions.xml') f = open(path) xml_file = ContentFile(f.read()) f.close() xml_file.name = 'contributions.xml' project = get_user_default_project(self.user) self.xform = publish_xml_form(xml_file, self.user, project) self.assertTrue(XForm.objects.count() > count) response = self.client.get(reverse(download_xlsform, kwargs={ 'username': self.user.username, 'id_string': 'contributions' }), follow=True) self.assertContains(response, 'No XLS file for your form ')
def _contributions_form_submissions(self): count = XForm.objects.count() path = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'forms', 'contributions') form_path = os.path.join(path, 'contributions.xml') f = open(form_path) xml_file = ContentFile(f.read()) f.close() xml_file.name = 'contributions.xml' project = get_user_default_project(self.user) self.xform = publish_xml_form(xml_file, self.user, project) self.assertTrue(XForm.objects.count() > count) instances_path = os.path.join(path, 'instances') for uuid in os.listdir(instances_path): s_path = os.path.join(instances_path, uuid, 'submission.xml') create_instance(self.user.username, open(s_path), []) self.assertEqual(self.xform.instances.count(), 6)
def __init__(self, file_contents=None, content_type=None, file_name=None, *args, **kwargs): super(WsFileResponse, self).__init__(*args, **kwargs) self.content = ContentFile(file_contents) self["Content-Disposition"] = "attachment; filename=%s" % (file_name,) self["Content-Type"] = content_type # Static Methods # Class Methods # Public Methods # Protected Methods # Private Methods # Properties # Representation and Comparison
def value_from_datadict(self, data, files, name): file_data = data["id_" + name] """ file_data: <file_name>:::data:<file content type>;base64,<base64 encoded file data> Example : PG Deletion.txt:::data:text/plain;base64,UEcgRGVsZXRpb2tpIjsKCg== """ _data_list = file_data.split(":::") if len(_data_list) == 1: return None file_name = _data_list[0] _data_list = _data_list[1].split(";base64,") file_extension = _data_list[0].split("data:")[1] file_content = _data_list[1] return ContentFile(base64.b64decode(file_content), name=file_name)
def test_assign(cls): """Tests whether the :see:LocalizedFileValueDescriptor works properly""" temp_file = tempfile.NamedTemporaryFile(dir=MEDIA_ROOT) instance = cls.FileFieldModel() instance.file = {'en': temp_file.name} assert isinstance(instance.file.en, LocalizedFieldFile) assert instance.file.en.name == temp_file.name field_dump = pickle.dumps(instance.file) instance = cls.FileFieldModel() instance.file = pickle.loads(field_dump) assert instance.file.en.field == instance._meta.get_field('file') assert instance.file.en.instance == instance assert isinstance(instance.file.en, LocalizedFieldFile) instance = cls.FileFieldModel() instance.file = {'en': ContentFile("test", "testfilename")} assert isinstance(instance.file.en, LocalizedFieldFile) assert instance.file.en.name == "testfilename" another_instance = cls.FileFieldModel() another_instance.file = {'ro': instance.file.en} assert another_instance == another_instance.file.ro.instance assert another_instance.file.ro.lang == 'ro'
def duplicate(self, settings): new_design = deepcopy(self) new_design.pk = None new_design.badge_settings = settings for var in ('bg_front', 'bg_back'): if getattr(self, var): tmp = ContentFile(getattr(self, var).read()) tmp.name = os.path.basename(getattr(self, var).name) setattr(new_design, var, tmp) new_design.save() return new_design
def save_file(self, name, content): expected_path = original_storage.path(name) obj = SimpleModel() obj.upload_file.save(name, ContentFile(content)) return expected_path