Python unicodecsv 模块,DictWriter() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用unicodecsv.DictWriter()

项目:correctiv-nursinghomes    作者:correctiv    | 项目源码 | 文件源码
def export_supervision_authorities(self, *args, **options):
        writer = unicodecsv.DictWriter(open(options['filename'], 'w'), (
            'name', 'email', 'address', 'contact', 'jurisdiction__slug', 'other_names', 'description', 'tags', 'parent__name', 'classification', 'url', 'website_dump', 'request_note'
        ))
        writer.writeheader()
        for authority in SupervisionAuthority.objects.all():
            slug = slugify(authority.name)
            authority.fds_url = 'https://fragdenstaat.de/behoerde/%s/' % slug
            authority.save()
            writer.writerow({
                'name': authority.name,
                'email': authority.email,
                'address': authority.address,
                'contact': authority.contact,
                'jurisdiction__slug': slugify(authority.state.name),
                'classification': 'Heimaufsicht'
            })
项目:db-api-to-gtfs    作者:patrickbr    | 项目源码 | 文件源码
def write_routes(self):
        """Write routes to file"""
        with open(os.path.join(self.out_dir, 'routes.txt'), 'wb') as fhandle:
            route_fieldnames = [
                'route_id', 'route_short_name', 'route_long_name', 'route_type', 'agency_id']
            route_writer = csv.DictWriter(fhandle, delimiter=',',
                                          quotechar='"', fieldnames=route_fieldnames)
            route_writer.writeheader()

            for rid, route in enumerate(self.routes):
                route_writer.writerow({
                    'route_id': rid,
                    'route_short_name': route['route_short_name'],
                    'route_long_name': route['route_long_name'],
                    'route_type': route['route_type'],
                    'agency_id': route['agency_id']
                })
项目:db-api-to-gtfs    作者:patrickbr    | 项目源码 | 文件源码
def write_stops(self):
        """Write stops to file"""
        with open(os.path.join(self.out_dir, 'stops.txt'), 'wb') as fhandle:
            route_fieldnames = ['stop_id', 'stop_name', 'stop_lat', 'stop_lon']
            route_writer = csv.DictWriter(fhandle, delimiter=',',
                                          quotechar='"', fieldnames=route_fieldnames)
            route_writer.writeheader()

            for stop in self.stops.itervalues():
                if not stop['has_trip']:
                    continue

                route_writer.writerow({
                    'stop_id': stop['stop_id'],
                    'stop_name': stop['stop_name'],
                    'stop_lat': stop['stop_lat'],
                    'stop_lon': stop['stop_lon']
                })
项目:db-api-to-gtfs    作者:patrickbr    | 项目源码 | 文件源码
def write_agencies(self):
        """Write agencies to file"""
        with open(os.path.join(self.out_dir, 'agency.txt'), 'wb') as fhandle:
            agency_fieldnames = [
                'agency_id', 'agency_name', 'agency_url', 'agency_timezone', 'agency_lang']
            agency_writer = csv.DictWriter(fhandle, delimiter=',',
                                           quotechar='"', fieldnames=agency_fieldnames)
            agency_writer.writeheader()

            for agency in self.agencies:
                agency_writer.writerow({
                    'agency_id': agency,
                    'agency_name': agency,
                    'agency_url': 'http://www.bahn.de',
                    'agency_timezone': 'Europe/Berlin',
                    'agency_lang': 'de'
                })
项目:stormtrooper    作者:CompileInc    | 项目源码 | 文件源码
def export(self, file_handle=None):
        '''
        exports the task questions and answers as a CSV
        '''
        try:
            if not file_handle:
                file_handle = StringIO.StringIO()
            data = self.task.answers
            # http://stackoverflow.com/a/11399424
            # getting the union of all keys in all of the answer rows
            headers = list(set().union(*(i.keys() for i in data)))
            writer = csv.DictWriter(file_handle, fieldnames=headers)
            writer.writeheader()
            for row in data:
                writer.writerow(row)
            export_file = ContentFile(file_handle.getvalue())
            export_filename = "ST_TASK_{task_id}_EXPORT_{date}.csv".format(task_id=self.task.id,
                                                                           date=str(datetime.date.today()))
            self.export_file.save(name=export_filename, content=export_file, save=False)
            self.status = self.SUCCESS
        except Exception as e:
            LOG.exception(e)
            self.status = self.FAILURE
        self.save()
项目:odoo-report    作者:vertelab    | 项目源码 | 文件源码
def create(self, cr, uid, ids, data, context=None):

        temp = tempfile.NamedTemporaryFile(mode='w+t',suffix='.csv')
        outfile = tempfile.NamedTemporaryFile(mode='w+b',suffix='.pdf')
        glabels = tempfile.NamedTemporaryFile(mode='w+t',suffix='.glabels')
        glabels.write(base64.b64decode(data.get('template')) if data.get('template') else None or self.template)
        glabels.seek(0)

        pool = registry(cr.dbname)
        labelwriter = None
        for p in pool.get(self.model).read(cr,uid,ids):
            if not labelwriter:
                labelwriter = csv.DictWriter(temp,p.keys())
                labelwriter.writeheader()
            for c in range(self.count):
                labelwriter.writerow({k:isinstance(v, (str, unicode)) and v.encode('utf8') or str(v) for k,v in p.items()})
        temp.seek(0)
        res = os.system("glabels-3-batch -o %s -l -C -i %s %s" % (outfile.name,temp.name,glabels.name))

        outfile.seek(0)
        pdf = outfile.read()
        outfile.close()
        temp.close()
        glabels.close()
        return (pdf,'pdf')
项目:odoo-report    作者:vertelab    | 项目源码 | 文件源码
def create(self, cr, uid, ids, data, context=None):

        temp = tempfile.NamedTemporaryFile(mode='w+t',suffix='.csv')
        outfile = tempfile.NamedTemporaryFile(mode='w+b',suffix='.pdf')
        glabels = tempfile.NamedTemporaryFile(mode='w+t',suffix='.glabels')
        glabels.write(base64.b64decode(data.get('template')) if data.get('template') else None or self.template)
        glabels.seek(0)

        pool = registry(cr.dbname)
        labelwriter = csv.DictWriter(temp,[h[self.col_name] for h in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name])])
        labelwriter.writeheader()
        for c in range(self.count):
            #~ labelwriter.writerow({p[self.col_name]:isinstance(p[self.col_value], (str, unicode)) and p[self.col_value].encode('utf8') or p[self.col_value] or '' for p in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name,self.col_value])])})
            labelwriter.writerow({p[self.col_name]: str(p[self.col_value]) if not str(p[self.col_value]) == '0.0' else '' for p in pool.get(self.model).read(cr,uid,pool.get(self.model).search(cr,uid,[]),[self.col_name,self.col_value], context=context)})
        temp.seek(0)
        res = os.system("glabels-3-batch -o %s -l -C -i %s %s" % (outfile.name,temp.name,glabels.name))
        outfile.seek(0)
        pdf = outfile.read()
        outfile.close()
        temp.close()
        glabels.close()
        return (pdf,'pdf')
项目:db-api-to-gtfs    作者:patrickbr    | 项目源码 | 文件源码
def write_trips(self):
        """Write trips to file"""
        with open(os.path.join(self.out_dir, 'trips.txt'), 'wb') as fhandle, open(os.path.join(self.out_dir, 'stop_times.txt'), 'wb') as sfhandle:
            trip_fieldnames = [
                'route_id', 'service_id', 'trip_id', 'trip_headsign']
            trip_writer = csv.DictWriter(fhandle, delimiter=',',
                                         quotechar='"', fieldnames=trip_fieldnames)
            trip_writer.writeheader()

            stoptimes_fieldnames = [
                'trip_id', 'arrival_time', 'departure_time', 'stop_id', 'stop_sequence']
            stoptimes_writer = csv.DictWriter(sfhandle, delimiter=',',
                                              quotechar='"', fieldnames=stoptimes_fieldnames)
            stoptimes_writer.writeheader()

            for tid, trip in enumerate(self.trips):
                trip_writer.writerow({
                    'route_id': trip['route_id'],
                    'service_id': trip['service_id'],
                    'trip_id': tid,
                    'trip_headsign': trip['headsign']
                })

                for stoptime in trip['stoptimes']:
                    stoptimes_writer.writerow({
                        'trip_id': tid,
                        'arrival_time': stoptime['arrival_time'] + ':00',
                        'departure_time': stoptime['departure_time'] + ':00',
                        'stop_id': stoptime['stop_id'],
                        'stop_sequence': stoptime['stop_sequence']
                    })
项目:db-api-to-gtfs    作者:patrickbr    | 项目源码 | 文件源码
def write_calendar_dates(self):
        """Write calendar dates to file"""
        with open(os.path.join(self.out_dir, 'calendar_dates.txt'), 'wb') as fhandle:
            calendar_fieldnames = ['service_id', 'date', 'exception_type']
            calendar_writer = csv.DictWriter(fhandle, delimiter=',',
                                             quotechar='"', fieldnames=calendar_fieldnames)
            calendar_writer.writeheader()

            for sid, cdate in enumerate(self.calendar_dates):
                for date in cdate:
                    calendar_writer.writerow({
                        'service_id': sid,
                        'date': date.strftime('%Y%m%d'),
                        'exception_type': 1
                    })
项目:OpenRAFAM    作者:jazzido    | 项目源码 | 文件源码
def toCSV(self, out=sys.stdout):
        "Saves dataset rows to a CSV file"
        fieldnames = self.rows.keys()

        writer = unicodecsv.DictWriter(out, fieldnames)
        writer.writeheader()
        for r in self.rows:
            writer.writerow(dict(r))
项目:roulier    作者:akretion    | 项目源码 | 文件源码
def generate_deposit_slip(self, rows):
        output = BytesIO()

        # l'ordre est important
        headers = rows[0].keys()

        # l'ordre est fixé par headers
        writer = csv.DictWriter(output, headers, encoding='utf-8')
        writer.writeheader()
        writer.writerows(rows)
        return output
项目:troveharvester    作者:wragge    | 项目源码 | 文件源码
def process_results(self, results):
        '''
        Processes a page full of results.
        Saves pdf for each result.
        '''
        try:
            articles = results[0]['records']['article']
            with open(self.csv_file, 'ab') as csv_file:
                writer = csv.DictWriter(csv_file, FIELDS, encoding='utf-8')
                if self.harvested == 0:
                    writer.writeheader()
                for article in articles:
                    article_id = article['id']
                    row = self.prepare_row(article)
                    writer.writerow(row)
                    if self.pdf:
                        pdf_url = self.get_pdf_url(article_id)
                        if pdf_url:
                            pdf_filename = self.make_filename(article)
                            pdf_file = os.path.join(self.data_dir, 'pdf', '{}.pdf'.format(pdf_filename))
                            urlretrieve(pdf_url, pdf_file)
                    if self.text:
                        text = article.get('articleText')
                        if text:
                            text_filename = self.make_filename(article)
                            text = re.sub('<[^<]+?>', '', text)
                            text = re.sub("\s\s+", " ", text)
                            text_file = os.path.join(self.data_dir, 'text', '{}.txt'.format(text_filename))
                            with open(text_file, 'wb') as text_output:
                                text_output.write(text.encode('utf-8'))
            time.sleep(0.5)
            self.harvested += self.get_highest_n(results)
            print('Harvested: {}'.format(self.harvested))
        except KeyError:
            pass
项目:guiabolso2csv    作者:hugombarreto    | 项目源码 | 文件源码
def csv_transactions(self, year, month, file_name):
        transactions = self.transactions(year, month)

        if len(transactions) == 0:
            warnings.warn('No transactions for the period ({}-{})'.format(
                year, month))
            return

        with open(file_name, 'wb') as f:
            csv_writer = csv.DictWriter(f, fieldnames=self.fieldnames,
                                        encoding='utf-8-sig')  # add BOM to csv
            csv_writer.writeheader()
            csv_writer.writerows(transactions)
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def write_csv(resource_name, data, write_header=False):
    """Start the csv writing flow.

    Args:
        resource_name (str): The resource name.
        data (iterable): An iterable of data to be written to csv.
        write_header (bool): If True, write the header in the csv file.

    Yields:
       object: The CSV temporary file pointer.

    Raises:
        CSVFileError: If there was an error writing the CSV file.
    """
    csv_file = tempfile.NamedTemporaryFile(delete=False)
    try:
        writer = csv.DictWriter(csv_file, doublequote=False, escapechar='\\',
                                quoting=csv.QUOTE_NONE,
                                fieldnames=CSV_FIELDNAME_MAP[resource_name])
        if write_header:
            writer.writeheader()

        for i in data:
            writer.writerow(i)

        # This must be closed before returned for loading.
        csv_file.close()
        yield csv_file

        # Remove the csv file after loading.
        os.remove(csv_file.name)
    except (IOError, OSError, csv.Error) as e:
        raise CSVFileError(resource_name, e)
项目:recon-ng    作者:captainhooligan    | 项目源码 | 文件源码
def csvify(rows):
    '''Expects a list of dictionaries and returns a CSV response.'''
    if not rows:
        csv_str = ''
    else:
        s = BytesIO()
        keys = rows[0].keys()
        dw = csv.DictWriter(s, keys)
        dw.writeheader()
        dw.writerows([dict(r) for r in rows])
        csv_str = s.getvalue()
    return Response(csv_str, mimetype='text/csv')
项目:IPP    作者:nicoechaniz    | 项目源码 | 文件源码
def _agregado_por_comercio(request, anio, mes, quincena, region_id, funcion, prefijo):
    if not (hasattr(request.user, "perfil") and \
            request.user.perfil.autorizacion >= PERMISO_COORD_ZONAL):
        messages.error(request, 'Permisos insuficientes.')
        return render(request, 'relevamiento/mensaje.html')

    lecturas = _lecturas_del_periodo(anio, mes, quincena, region_id, funcion)

    # Pivot de los resultados
    lecturas_por_comercio = OrderedDict()
    encabezado = ["Producto"]
    for lectura in lecturas:
        if lectura['comercio'] not in encabezado:
            encabezado.append(lectura['comercio'])
        lecturas_por_comercio.setdefault(lectura['producto'], {}).update(
            {lectura['comercio']: lectura['valor']})

    nombre_archivo = [prefijo, anio, mes.zfill(2), quincena.zfill(2)]
    if region_id:
        region = Region.objects.get(pk=region_id)
        nombre_archivo.append("_%s" % slugify(region.nombre))
    nombre_archivo.append(".csv")
    nombre_archivo = "".join(nombre_archivo)

    response = HttpResponse(content_type='text/csv')
    response['Content-Disposition'] = 'attachment; filename=%s;' % nombre_archivo
    response['Cache-Control'] = 'no-cache'

    writer = csv.DictWriter(response, fieldnames=encabezado)
    writer.writeheader()
    for producto, lecturas_dict in list(lecturas_por_comercio.items()):
        row = {"Producto": producto}
        row.update(lecturas_dict)
        writer.writerow(row)

    return response
项目:panphon    作者:dmort27    | 项目源码 | 文件源码
def write_ipa_all(ipa_bases, ipa_all, all_segments, sort_order):
    with open(ipa_bases, 'rb') as f:
        reader = csv.reader(f, encoding='utf-8')
        fieldnames = next(reader)
    with open(ipa_all, 'wb') as f:
        writer = csv.DictWriter(f, encoding='utf-8', fieldnames=fieldnames)
        writer.writerow({k: k for k in fieldnames})
        all_segments_list = sort_all_segments(sort_order, all_segments)
        for segment in all_segments_list:
            fields = copy.copy(segment.features)
            fields['ipa'] = segment.form
            writer.writerow(fields)
项目:pyAssetContext    作者:ShadeySecurity    | 项目源码 | 文件源码
def write_csv(self, filename='output.csv', make_strings=False):
        """Write the processed rows to the given filename
        """
        if (len(self.rows) <= 0):
            raise AttributeError('No rows were loaded')
        if make_strings:
            out = self.make_strings()
        else:
            out = self.rows
        with open(filename, 'wb+') as f:
            writer = csv.DictWriter(f, self.key_map.keys())
            writer.writeheader()
            writer.writerows(out)
项目:twitterApiForHumans    作者:vaulstein    | 项目源码 | 文件源码
def write_csv(self, filename='output.csv', make_strings=False):
        """Write the processed rows to the given filename
        """
        if (len(self.rows) <= 0):
            raise AttributeError('No rows were loaded')
        if make_strings:
            out = self.make_strings()
        else:
            out = self.rows
        with open(filename, 'wb+') as f:
            writer = csv.DictWriter(f, self.key_map.keys())
            writer.writeheader()
            writer.writerows(out)
项目:rest_api    作者:opentargets    | 项目源码 | 文件源码
def toCSV(self, delimiter = '\t'):
        NOT_ALLOWED_FIELDS = ['evidence.evidence_chain', 'search_metadata', 'search_metadata.sort']
        output = BytesIO()
        if not self.data:
            self.flatten(self.toDict())  # populate data if empty
        if self.data and isinstance(self.data[0], dict):
            key_set = set()
            flattened_data = []
            for row in self.data:
                flat = self.flatten(row,
                                    simplify=self.params.datastructure == SourceDataStructureOptions.SIMPLE)
                for field in NOT_ALLOWED_FIELDS:
                    flat.pop(field, None)
                flattened_data.append(flat)
                key_set.update(flat.keys())
            ordered_keys=self.params.fields or sorted(list(key_set))
            ordered_keys = map(unicode,ordered_keys)

            writer = csv.DictWriter(output,
                                    ordered_keys,
                                    restval='',
                                    delimiter=delimiter,
                                    quotechar='"',
                                    quoting=csv.QUOTE_MINIMAL,
                                    doublequote=False,
                                    escapechar='\\',
                                    # extrasaction='ignore',
                                    )
            writer.writeheader()
            for row in flattened_data:
                writer.writerow(row)

        if self.data and isinstance(self.data[0], list):
            writer = csv.writer(output,
                                delimiter=delimiter,
                                quotechar='"',
                                quoting=csv.QUOTE_MINIMAL,
                                doublequote=False,
                                escapechar='\\',
                                # extrasaction = 'ignore',
                                )
            for row in self.data:
                writer.writerow(row)
        return output.getvalue()