我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用unicodecsv.DictWriter()。
def export_supervision_authorities(self, *args, **options): writer = unicodecsv.DictWriter(open(options['filename'], 'w'), ( 'name', 'email', 'address', 'contact', 'jurisdiction__slug', 'other_names', 'description', 'tags', 'parent__name', 'classification', 'url', 'website_dump', 'request_note' )) writer.writeheader() for authority in SupervisionAuthority.objects.all(): slug = slugify(authority.name) authority.fds_url = 'https://fragdenstaat.de/behoerde/%s/' % slug authority.save() writer.writerow({ 'name': authority.name, 'email': authority.email, 'address': authority.address, 'contact': authority.contact, 'jurisdiction__slug': slugify(authority.state.name), 'classification': 'Heimaufsicht' })
def write_routes(self): """Write routes to file""" with open(os.path.join(self.out_dir, 'routes.txt'), 'wb') as fhandle: route_fieldnames = [ 'route_id', 'route_short_name', 'route_long_name', 'route_type', 'agency_id'] route_writer = csv.DictWriter(fhandle, delimiter=',', quotechar='"', fieldnames=route_fieldnames) route_writer.writeheader() for rid, route in enumerate(self.routes): route_writer.writerow({ 'route_id': rid, 'route_short_name': route['route_short_name'], 'route_long_name': route['route_long_name'], 'route_type': route['route_type'], 'agency_id': route['agency_id'] })
def write_stops(self): """Write stops to file""" with open(os.path.join(self.out_dir, 'stops.txt'), 'wb') as fhandle: route_fieldnames = ['stop_id', 'stop_name', 'stop_lat', 'stop_lon'] route_writer = csv.DictWriter(fhandle, delimiter=',', quotechar='"', fieldnames=route_fieldnames) route_writer.writeheader() for stop in self.stops.itervalues(): if not stop['has_trip']: continue route_writer.writerow({ 'stop_id': stop['stop_id'], 'stop_name': stop['stop_name'], 'stop_lat': stop['stop_lat'], 'stop_lon': stop['stop_lon'] })
def write_agencies(self): """Write agencies to file""" with open(os.path.join(self.out_dir, 'agency.txt'), 'wb') as fhandle: agency_fieldnames = [ 'agency_id', 'agency_name', 'agency_url', 'agency_timezone', 'agency_lang'] agency_writer = csv.DictWriter(fhandle, delimiter=',', quotechar='"', fieldnames=agency_fieldnames) agency_writer.writeheader() for agency in self.agencies: agency_writer.writerow({ 'agency_id': agency, 'agency_name': agency, 'agency_url': 'http://www.bahn.de', 'agency_timezone': 'Europe/Berlin', 'agency_lang': 'de' })
def export(self, file_handle=None): ''' exports the task questions and answers as a CSV ''' try: if not file_handle: file_handle = StringIO.StringIO() data = self.task.answers # http://stackoverflow.com/a/11399424 # getting the union of all keys in all of the answer rows headers = list(set().union(*(i.keys() for i in data))) writer = csv.DictWriter(file_handle, fieldnames=headers) writer.writeheader() for row in data: writer.writerow(row) export_file = ContentFile(file_handle.getvalue()) export_filename = "ST_TASK_{task_id}_EXPORT_{date}.csv".format(task_id=self.task.id, date=str(datetime.date.today())) self.export_file.save(name=export_filename, content=export_file, save=False) self.status = self.SUCCESS except Exception as e: LOG.exception(e) self.status = self.FAILURE self.save()
def create(self, cr, uid, ids, data, context=None): temp = tempfile.NamedTemporaryFile(mode='w+t',suffix='.csv') outfile = tempfile.NamedTemporaryFile(mode='w+b',suffix='.pdf') glabels = tempfile.NamedTemporaryFile(mode='w+t',suffix='.glabels') glabels.write(base64.b64decode(data.get('template')) if data.get('template') else None or self.template) glabels.seek(0) pool = registry(cr.dbname) labelwriter = None for p in pool.get(self.model).read(cr,uid,ids): if not labelwriter: labelwriter = csv.DictWriter(temp,p.keys()) labelwriter.writeheader() for c in range(self.count): labelwriter.writerow({k:isinstance(v, (str, unicode)) and v.encode('utf8') or str(v) for k,v in p.items()}) temp.seek(0) res = os.system("glabels-3-batch -o %s -l -C -i %s %s" % (outfile.name,temp.name,glabels.name)) outfile.seek(0) pdf = outfile.read() outfile.close() temp.close() glabels.close() return (pdf,'pdf')
def create(self, cr, uid, ids, data, context=None): temp = tempfile.NamedTemporaryFile(mode='w+t',suffix='.csv') outfile = tempfile.NamedTemporaryFile(mode='w+b',suffix='.pdf') glabels = tempfile.NamedTemporaryFile(mode='w+t',suffix='.glabels') glabels.write(base64.b64decode(data.get('template')) if data.get('template') else None or self.template) glabels.seek(0) pool = registry(cr.dbname) labelwriter = csv.DictWriter(temp,[h[self.col_name] for h in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name])]) labelwriter.writeheader() for c in range(self.count): #~ labelwriter.writerow({p[self.col_name]:isinstance(p[self.col_value], (str, unicode)) and p[self.col_value].encode('utf8') or p[self.col_value] or '' for p in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name,self.col_value])])}) labelwriter.writerow({p[self.col_name]: str(p[self.col_value]) if not str(p[self.col_value]) == '0.0' else '' for p in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name,self.col_value], context=context)}) temp.seek(0) res = os.system("glabels-3-batch -o %s -l -C -i %s %s" % (outfile.name,temp.name,glabels.name)) outfile.seek(0) pdf = outfile.read() outfile.close() temp.close() glabels.close() return (pdf,'pdf')
def write_trips(self): """Write trips to file""" with open(os.path.join(self.out_dir, 'trips.txt'), 'wb') as fhandle, open(os.path.join(self.out_dir, 'stop_times.txt'), 'wb') as sfhandle: trip_fieldnames = [ 'route_id', 'service_id', 'trip_id', 'trip_headsign'] trip_writer = csv.DictWriter(fhandle, delimiter=',', quotechar='"', fieldnames=trip_fieldnames) trip_writer.writeheader() stoptimes_fieldnames = [ 'trip_id', 'arrival_time', 'departure_time', 'stop_id', 'stop_sequence'] stoptimes_writer = csv.DictWriter(sfhandle, delimiter=',', quotechar='"', fieldnames=stoptimes_fieldnames) stoptimes_writer.writeheader() for tid, trip in enumerate(self.trips): trip_writer.writerow({ 'route_id': trip['route_id'], 'service_id': trip['service_id'], 'trip_id': tid, 'trip_headsign': trip['headsign'] }) for stoptime in trip['stoptimes']: stoptimes_writer.writerow({ 'trip_id': tid, 'arrival_time': stoptime['arrival_time'] + ':00', 'departure_time': stoptime['departure_time'] + ':00', 'stop_id': stoptime['stop_id'], 'stop_sequence': stoptime['stop_sequence'] })
def write_calendar_dates(self): """Write calendar dates to file""" with open(os.path.join(self.out_dir, 'calendar_dates.txt'), 'wb') as fhandle: calendar_fieldnames = ['service_id', 'date', 'exception_type'] calendar_writer = csv.DictWriter(fhandle, delimiter=',', quotechar='"', fieldnames=calendar_fieldnames) calendar_writer.writeheader() for sid, cdate in enumerate(self.calendar_dates): for date in cdate: calendar_writer.writerow({ 'service_id': sid, 'date': date.strftime('%Y%m%d'), 'exception_type': 1 })
def toCSV(self, out=sys.stdout): "Saves dataset rows to a CSV file" fieldnames = self.rows.keys() writer = unicodecsv.DictWriter(out, fieldnames) writer.writeheader() for r in self.rows: writer.writerow(dict(r))
def generate_deposit_slip(self, rows): output = BytesIO() # l'ordre est important headers = rows[0].keys() # l'ordre est fixé par headers writer = csv.DictWriter(output, headers, encoding='utf-8') writer.writeheader() writer.writerows(rows) return output
def process_results(self, results): ''' Processes a page full of results. Saves pdf for each result. ''' try: articles = results[0]['records']['article'] with open(self.csv_file, 'ab') as csv_file: writer = csv.DictWriter(csv_file, FIELDS, encoding='utf-8') if self.harvested == 0: writer.writeheader() for article in articles: article_id = article['id'] row = self.prepare_row(article) writer.writerow(row) if self.pdf: pdf_url = self.get_pdf_url(article_id) if pdf_url: pdf_filename = self.make_filename(article) pdf_file = os.path.join(self.data_dir, 'pdf', '{}.pdf'.format(pdf_filename)) urlretrieve(pdf_url, pdf_file) if self.text: text = article.get('articleText') if text: text_filename = self.make_filename(article) text = re.sub('<[^<]+?>', '', text) text = re.sub("\s\s+", " ", text) text_file = os.path.join(self.data_dir, 'text', '{}.txt'.format(text_filename)) with open(text_file, 'wb') as text_output: text_output.write(text.encode('utf-8')) time.sleep(0.5) self.harvested += self.get_highest_n(results) print('Harvested: {}'.format(self.harvested)) except KeyError: pass
def csv_transactions(self, year, month, file_name): transactions = self.transactions(year, month) if len(transactions) == 0: warnings.warn('No transactions for the period ({}-{})'.format( year, month)) return with open(file_name, 'wb') as f: csv_writer = csv.DictWriter(f, fieldnames=self.fieldnames, encoding='utf-8-sig') # add BOM to csv csv_writer.writeheader() csv_writer.writerows(transactions)
def write_csv(resource_name, data, write_header=False): """Start the csv writing flow. Args: resource_name (str): The resource name. data (iterable): An iterable of data to be written to csv. write_header (bool): If True, write the header in the csv file. Yields: object: The CSV temporary file pointer. Raises: CSVFileError: If there was an error writing the CSV file. """ csv_file = tempfile.NamedTemporaryFile(delete=False) try: writer = csv.DictWriter(csv_file, doublequote=False, escapechar='\\', quoting=csv.QUOTE_NONE, fieldnames=CSV_FIELDNAME_MAP[resource_name]) if write_header: writer.writeheader() for i in data: writer.writerow(i) # This must be closed before returned for loading. csv_file.close() yield csv_file # Remove the csv file after loading. os.remove(csv_file.name) except (IOError, OSError, csv.Error) as e: raise CSVFileError(resource_name, e)
def csvify(rows): '''Expects a list of dictionaries and returns a CSV response.''' if not rows: csv_str = '' else: s = BytesIO() keys = rows[0].keys() dw = csv.DictWriter(s, keys) dw.writeheader() dw.writerows([dict(r) for r in rows]) csv_str = s.getvalue() return Response(csv_str, mimetype='text/csv')
def _agregado_por_comercio(request, anio, mes, quincena, region_id, funcion, prefijo): if not (hasattr(request.user, "perfil") and \ request.user.perfil.autorizacion >= PERMISO_COORD_ZONAL): messages.error(request, 'Permisos insuficientes.') return render(request, 'relevamiento/mensaje.html') lecturas = _lecturas_del_periodo(anio, mes, quincena, region_id, funcion) # Pivot de los resultados lecturas_por_comercio = OrderedDict() encabezado = ["Producto"] for lectura in lecturas: if lectura['comercio'] not in encabezado: encabezado.append(lectura['comercio']) lecturas_por_comercio.setdefault(lectura['producto'], {}).update( {lectura['comercio']: lectura['valor']}) nombre_archivo = [prefijo, anio, mes.zfill(2), quincena.zfill(2)] if region_id: region = Region.objects.get(pk=region_id) nombre_archivo.append("_%s" % slugify(region.nombre)) nombre_archivo.append(".csv") nombre_archivo = "".join(nombre_archivo) response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = 'attachment; filename=%s;' % nombre_archivo response['Cache-Control'] = 'no-cache' writer = csv.DictWriter(response, fieldnames=encabezado) writer.writeheader() for producto, lecturas_dict in list(lecturas_por_comercio.items()): row = {"Producto": producto} row.update(lecturas_dict) writer.writerow(row) return response
def write_ipa_all(ipa_bases, ipa_all, all_segments, sort_order): with open(ipa_bases, 'rb') as f: reader = csv.reader(f, encoding='utf-8') fieldnames = next(reader) with open(ipa_all, 'wb') as f: writer = csv.DictWriter(f, encoding='utf-8', fieldnames=fieldnames) writer.writerow({k: k for k in fieldnames}) all_segments_list = sort_all_segments(sort_order, all_segments) for segment in all_segments_list: fields = copy.copy(segment.features) fields['ipa'] = segment.form writer.writerow(fields)
def write_csv(self, filename='output.csv', make_strings=False): """Write the processed rows to the given filename """ if (len(self.rows) <= 0): raise AttributeError('No rows were loaded') if make_strings: out = self.make_strings() else: out = self.rows with open(filename, 'wb+') as f: writer = csv.DictWriter(f, self.key_map.keys()) writer.writeheader() writer.writerows(out)
def toCSV(self, delimiter = '\t'): NOT_ALLOWED_FIELDS = ['evidence.evidence_chain', 'search_metadata', 'search_metadata.sort'] output = BytesIO() if not self.data: self.flatten(self.toDict()) # populate data if empty if self.data and isinstance(self.data[0], dict): key_set = set() flattened_data = [] for row in self.data: flat = self.flatten(row, simplify=self.params.datastructure == SourceDataStructureOptions.SIMPLE) for field in NOT_ALLOWED_FIELDS: flat.pop(field, None) flattened_data.append(flat) key_set.update(flat.keys()) ordered_keys=self.params.fields or sorted(list(key_set)) ordered_keys = map(unicode,ordered_keys) writer = csv.DictWriter(output, ordered_keys, restval='', delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL, doublequote=False, escapechar='\\', # extrasaction='ignore', ) writer.writeheader() for row in flattened_data: writer.writerow(row) if self.data and isinstance(self.data[0], list): writer = csv.writer(output, delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL, doublequote=False, escapechar='\\', # extrasaction = 'ignore', ) for row in self.data: writer.writerow(row) return output.getvalue()