Python csv 模块,writer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.writer()

项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def setup_output_writers(parent_dir, fold_number):
    """
    Create an output directory for the fold under the provided parent dir
    :param str parent_dir: file path to the output dir
    :param int fold_number: fold number to use in directory name
    :return: writer for <outdir.name>/fold<fold_number>/train.csv and <outdir.name>/fold<fold_number>/validation.csv
    :rtype: tuple(csv.writer,csv.writer)
    """
    output_dir = path.join(parent_dir, "Fold%d" % fold_number)
    if not path.isdir(output_dir):
        LOGGER.debug("Creating output for fold %d at the location: %s" % (fold_number, output_dir))
        makedirs(output_dir)
    else:
        LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)

    train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
                              dialect=csv.excel, delimiter=',')
    validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
                                   dialect=csv.excel, delimiter=',')

    return train_writer, validation_writer
项目:database_assetstore    作者:OpenGeoscience    | 项目源码 | 文件源码
def csv_safe_unicode(row, ignoreErrors=True):
    """
    Given an array of values, make sure all strings are unicode in Python 3 and
    str in Python 2.  The csv.writer in Python 2 does not handle unicode
    strings and in Python 3 it does not handle byte strings.

    :param row: an array which could contain mixed types.
    :param ignoreErrors: if True, convert what is possible and ignore errors.
        If false, conversion errors will raise an exception.
    :returns: either the original array if safe, or an array with all byte
        strings converted to unicode in Python 3 or str in Python 2.
    """
    # This is unicode in Python 2 and bytes in Python 3
    wrong_type = six.text_type if str == six.binary_type else six.binary_type
    # If all of the data is not the wrong type, just return it as is.  This
    # allows non-string types, such as integers and floats.
    if not any(isinstance(value, wrong_type) for value in row):
        return row
    # Convert the wrong type of string to the type needed for this version of
    # Python.  For Python 2 unicode gets encoded to str (bytes).  For Python 3
    # bytes get decoded to str (unicode).
    func = 'encode' if str == six.binary_type else 'decode'
    row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict')
           if isinstance(value, wrong_type) else value for value in row]
    return row
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def save_gem_class_csv(self, base_dir):
        csv_file_path = os.path.join(base_dir, 'gem_classification.csv')
        cr_utils.makedirs(os.path.dirname(csv_file_path), allow_existing=True)
        with open(csv_file_path, 'wb') as f:
            writer = csv.writer(f)
            writer.writerow(['barcode',
                             self.result['genome0'],
                             self.result['genome1'],
                             'call'
                         ])
            for i in xrange(len(self.result['barcode'])):
                call = self.result['call'][i]
                call = call.replace(cr_constants.GEM_CLASS_GENOME0, self.result['genome0'])
                call = call.replace(cr_constants.GEM_CLASS_GENOME1, self.result['genome1'])
                writer.writerow([
                    self.result['barcode'][i],
                    self.result['count0'][i],
                    self.result['count1'][i],
                    call,
                ])
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def build_gtf(self):
        print "Writing new genes GTF file (may take 10 minutes for a 1GB input GTF file)..."
        with open(self.out_gtf_fn, 'wb') as f:
            writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar='')
            for row, is_comment, properties in self.gtf_reader_iter(self.in_gtf_fn):
                if is_comment:
                    writer.writerow(row)
                    continue

                remove = False
                for key, value in properties.iteritems():
                    if key in self.attributes and value not in self.attributes[key]:
                        remove = True

                if not remove:
                    writer.writerow(row)

        print "...done\n"
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def build_metrics_summary_csv(filename, sample_properties, sample_data, pipeline):
    metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline)

    tables, _ = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes)
    if not tables:
        sys.stderr.write("No metrics tables were generated, skipping CSV generation.\n")
        return

    csv_metrics = collections.OrderedDict()
    for table in tables:
    if not table:
        continue
        for metric, _, value in table['rows']:
            if type(metric) == dict:
                metric = metric['v']
            if type(value) == dict:
                value = value['v']
            if metric not in csv_metrics:
                csv_metrics[metric] = value

    with open(filename, 'wb') as f:
        writer = csv.writer(f)
        writer.writerow(csv_metrics.keys())
        writer.writerow(csv_metrics.values())
项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_csv(self, output_csv, input_fields, write_header=True, top_level=False, mode='a', encoding='utf-8', compression=None):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_csv, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_csv, mode)
        else:
            filehandle = open(output_csv, mode)

        writer = csv.writer(filehandle)
        if write_header:
            writer.writerow(input_fields)
        tweet_parser = TweetParser()

        for tweet in self.get_iterator():
            if top_level:
                ret = list(zip(input_fields, [tweet.get(field) for field in input_fields]))
            else:
                ret = tweet_parser.parse_columns_from_tweet(tweet,input_fields)
            ret_values = [col_val[1] for col_val in ret]
            writer.writerow(ret_values)
        filehandle.close()
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def __init__(self, ofile, maxresultrows=None):
        self._maxresultrows = 50000 if maxresultrows is None else maxresultrows

        self._ofile = ofile
        self._fieldnames = None
        self._buffer = StringIO()

        self._writer = csv.writer(self._buffer, dialect=CsvDialect)
        self._writerow = self._writer.writerow
        self._finished = False
        self._flushed = False

        self._inspector = OrderedDict()
        self._chunk_count = 0
        self._record_count = 0
        self._total_record_count = 0L
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def QA_SU_save_account_to_csv(message,path=os.getcwd()):

    __file_name_1 = '{}backtest-ca&history-{}.csv'.format(path,str(message['header']['cookie']))
    with open(__file_name_1, 'w', newline='') as C:
        csvwriter = csv.writer(C)
        csvwriter.writerow(['date', 'code', 'price', 'towards', 'amount',
                            'order_id', 'trade_id', 'commission_fee', 'cash', 'assets'])
        for i in range(0, max(len(message['body']['account']['cash']), len(message['body']['account']['assets']))):
            try:
                message['body']['account']['history'][i].append(
                    message['body']['account']['cash'][i])
                message['body']['account']['history'][i].append(
                    message['body']['account']['assets'][i])
                csvwriter.writerow(message['body']['account']['history'][i])
            except:
                pass
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def QA_SU_save_pnl_to_csv(detail, cookie):
    __file_name_2 = 'backtest-pnl--' + \
        str(cookie) + '.csv'
    with open(__file_name_2, 'w', newline='') as E:
        csvwriter_1 = csv.writer(E)
        csvwriter_1.writerow(detail.columns)
        for item in detail:
            csvwriter_1.writerow(item)

    """
            'cash': message['body']['account']['cash'],
            'hold': message['body']['account']['hold'],
            'history': message['body']['account']['history'],
            'assets': message['body']['account']['assets'],
            'detail': message['body']['account']['detail']
"""
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def __init__(self, ofile, maxresultrows=None):
        self._maxresultrows = 50000 if maxresultrows is None else maxresultrows

        self._ofile = ofile
        self._fieldnames = None
        self._buffer = StringIO()

        self._writer = csv.writer(self._buffer, dialect=CsvDialect)
        self._writerow = self._writer.writerow
        self._finished = False
        self._flushed = False

        self._inspector = OrderedDict()
        self._chunk_count = 0
        self._record_count = 0
        self._total_record_count = 0L
项目:AutoSleepScorerDev    作者:skjerns    | 项目源码 | 文件源码
def convert_hypnograms(datadir):
    """
    This function is quite a hack to read the edf hypnogram as a byte array. 
    I found no working reader for the hypnogram edfs.
    """
    print('Converting hypnograms')
    files = [x for x in os.listdir(datadir) if x.endswith('.hyp')]
    for file in files:
        file = os.path.join(datadir,file)
        hypnogram = []
        with open(file, mode='rb') as f: # b is important -> binary

            raw_hypno = [x for x in str(f.read()).split('Sleep_stage_')][1:]
            for h in raw_hypno:
                stage  = h[0]
                repeat = int(h.split('\\')[0][12:])//30 # no idea if this also works on linux
                hypnogram.extend(stage*repeat)            
        with open(file[:-4] + '.csv', "w") as f:
            writer = csv.writer(f, lineterminator='\r')
            writer.writerows(hypnogram)
项目:mendelmd    作者:raonyguimaraes    | 项目源码 | 文件源码
def export_to_csv(request, variants):
    #export to csv
    export = request.GET.get('export', '')
    if export != '':
        if export == 'csv':
            response = HttpResponse(content_type='text/csv')
            response['Content-Disposition'] = 'attachment; filename=export.csv'
            writer = csv.writer(response)

        elif export == 'txt':
            response = HttpResponse(content_type='text/plain')
            response['Content-Disposition'] = 'attachment; filename=export.txt'
            writer = csv.writer(response, delimiter='\t', quoting=csv.QUOTE_NONE)    
        writer.writerow(['Individual', 'Index', 'Pos_index', 'Chr', 'Pos', 'Variant_id', 'Ref', 'Alt', 'Qual', 'Filter', 'Info', 'Format', 'Genotype_col', 'Genotype', 'Read_depth', 'Gene', 'Mutation_type', 'Vartype', 'Genomes1k_maf', 'Dbsnp_maf', 'Esp_maf', 'Dbsnp_build', 'Sift', 'Sift_pred', 'Polyphen2', 'Polyphen2_pred', 'Condel', 'Condel_pred', 'DANN', 'CADD', 'Is_at_omim', 'Is_at_hgmd', 'Hgmd_entries', 'Effect', 'Impact', 'Func_class', 'Codon_change', 'Aa_change', 'Aa_len', 'Gene_name', 'Biotype', 'Gene_coding', 'Transcript_id', 'Exon_rank', 'Genotype_number', 'Allele', 'Gene', 'Feature', 'Feature_type', 'Consequence', 'Cdna_position', 'Cds_position', 'Protein_position', 'Amino_acids', 'Codons', 'Existing_variation', 'Distance', 'Strand', 'Symbol', 'Symbol_source', 'Sift', 'Polyphen', 'Condel']) 
        for variant in variants:
            # print 'variant', variant.index
            writer.writerow([variant.individual, variant.index, variant.pos_index, variant.chr, variant.pos, variant.variant_id, variant.ref, variant.alt, variant.qual, variant.filter, pickle.loads(variant.info), variant.format, variant.genotype_col, variant.genotype, variant.read_depth, variant.gene, variant.mutation_type, variant.vartype, variant.genomes1k_maf, variant.dbsnp_maf, variant.esp_maf, variant.dbsnp_build, variant.sift, variant.sift_pred, variant.polyphen2, variant.polyphen2_pred, variant.condel, variant.condel_pred, variant.dann, variant.cadd, variant.is_at_omim, variant.is_at_hgmd, variant.hgmd_entries])
        return response
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def writerow(self, row):
        """Write row."""
        line = []
        for s in row:
            if (type(s) == dict):
                line.append(json.dumps(s))
            else:
                line.append(unicode(s).encode("utf-8"))
        self.writer.writerow(line)
        # Fetch UTF-8 output from the queue ...
        data = self.queue.getvalue()
        data = data.decode("utf-8")
        # ... and reencode it into the target encoding
        data = self.encoder.encode(data)
        # write to the target stream
        self.stream.write(data)
        # empty queue
        self.queue.truncate(0)
项目:django-csv-export-view    作者:benkonrath    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        queryset = self.get_queryset()

        field_names = self.get_fields(queryset)

        response = HttpResponse(content_type='text/csv')

        filename = self.get_filename(queryset)
        response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename)

        writer = csv.writer(response, **self.get_csv_writer_fmtparams())

        if self.specify_separator:
            response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator))

        if self.header:
            writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)])

        for obj in queryset:
            writer.writerow([self.get_field_value(obj, field) for field in field_names])

        return response
项目:regex_extraction    作者:aforsyth    | 项目源码 | 文件源码
def _write_csv_output(note_phrase_matches, output_filename):
    """Write one CSV row for each phrase_match where the row contains all of
    the RPDR note keys along with the extracted numerical value at the end of
    the row."""
    rpdr_rows_with_regex_value = []
    for phrase_matches in note_phrase_matches:
        row = phrase_matches.rpdr_note.get_keys()
        if not phrase_matches.phrase_matches:
            extracted_value = None
        else:
            extracted_value = phrase_matches.phrase_matches[0].extracted_value
        row.append(extracted_value)
        rpdr_rows_with_regex_value.append(row)

    with open(output_filename, 'wb') as output_file:
        csv_writer = csv.writer(output_file)
        csv_writer.writerows(rpdr_rows_with_regex_value)
项目:eurovision-country-selector    作者:mikejarrett    | 项目源码 | 文件源码
def write_data_to_csv(csv_name, people, countries):
    """ Loop through the list of people and write them to a csv.

    Args:
        csv_name (str): Name of the file to write results to.
        people (list): List of instantiated ``Person`` objects.
        countries (list): List of strings that represent countries.
    """
    with open(csv_name, 'w') as outfile:
        writer = csv.writer(outfile)
        columns = ['name'] + countries
        writer.writerow(columns)
        for person in people:
            person_row = [person.name] + [
                getattr(person, country, 0) for country in countries
            ]
            writer.writerow(person_row)
项目:sds011    作者:luetzel    | 项目源码 | 文件源码
def sensor_live(self):
            x = []
            y1 = []
            y2 = []
            for i in range(0,330,30): # change time interval here, if required
                self.sensor_wake()
                time.sleep(10)
                pm = self.sensor_read()
                if pm is not None:
                    x.append(i)
                    y1.append(pm[0])
                    y2.append(pm[1])
                    with open('/home/pi/data.csv', 'ab') as csvfile:
                        file = csv.writer(csvfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                        file.writerow([datetime.datetime.now().replace(microsecond=0).isoformat().replace('T', ' '), pm[0], pm[1]])
                        csvfile.close()
                    line1, = self.ax.plot(x,y1,'r-x')
                    line2, = self.ax.plot(x,y2,'b-x')
                    self.canvas.draw()
                self.sensor_sleep()
                time.sleep(20)
项目:LegalNetworks    作者:brschneidE3    | 项目源码 | 文件源码
def list_to_csv(directory_and_filename, list):
    if directory_and_filename[-4:] == '.csv':
        directory_and_filename = directory_and_filename[:-4]
    with open(directory_and_filename + '.csv', 'wb') as csvfile:
        spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
        for row in list:
            try:
                spamwriter.writerow(row)
            except UnicodeEncodeError:
                new_row = []
                for element in row:
                    if type(element) is unicode:
                        new_row.append(element.encode('utf-8'))
                    else:
                        new_row.append(element)

    csvfile.close()
项目:trustymail    作者:dhs-ncats    | 项目源码 | 文件源码
def generate_csv(domains, file_name):
    output = open(file_name, 'w')
    writer = csv.writer(output)

    # First row should always be the headers
    writer.writerow(CSV_HEADERS)

    for domain in domains:
        row = []

        # Grab the dictionary for each row.
        # Keys for the dict are the column headers.
        results = domain.generate_results()

        for column in CSV_HEADERS:
            row.append(results[column])

        writer.writerow(row)

    output.close()
项目:uniq    作者:CiscoDevNet    | 项目源码 | 文件源码
def result_write_to_csv(devices_info_list):
    """ Write network devices information to csv format.

    Args:
        devices_info_list (list[NetworkDeviceDTO]): Lists of network device info instances.
    """

    f = csv.writer(open("network_devices.csv", "w+"))
    f.writerow(["Device Name", "IP Address", "MAC Address", "IOS/Firmware", "Platform",
                "Serial Number", "Devcie Role", "Device Family"])
    for device_info in devices_info_list:
        f.writerow([device_info.hostname,
                    device_info.managementIpAddress,
                    device_info.macAddress,
                    device_info.softwareVersion,
                    device_info.platformId,
                    device_info.serialNumber,
                    device_info.role,
                    device_info.family])
项目:dartqc    作者:esteinig    | 项目源码 | 文件源码
def write_snp_summary(self, file="snp_summary.csv", summary_parameters=None, sort=False):

        if summary_parameters is None:
            summary_parameters = ["maf", "hwe", "rep", "call_rate"]

        out_file = os.path.join(self.out_path, self.attributes["project"] + "_" + file)

        out_data = [["id"] + summary_parameters]

        snps = [[snp] + [data[parameter] for parameter in summary_parameters] for snp, data in self.data.items()]

        if sort:
            snps = sorted(snps, key=operator.itemgetter(*[i for i in range(1, len(summary_parameters)+1)]),
                          reverse=True)

        out_data += snps

        with open(out_file, "w") as snp_summary:
            writer = csv.writer(snp_summary)
            writer.writerows(out_data)
项目:IM_Climate    作者:IMDProjects    | 项目源码 | 文件源码
def _writeToCSV(self):
        '''
        INFO
        ----
        Writes a 2-dimensional list to a CSV text file
        Comma-delimits values.  If there is no data, then there is no attempt to
        creat a file.

        RETURNS
        -------
        None

        '''
        if self._dataAsList:
            with open(self._filePathAndName,'w') as csvFile:
                writer = csv.writer(csvFile, lineterminator='\n', quoting=csv.QUOTE_NONNUMERIC )
                writer.writerows(self._dataAsList)
            csvFile.close()
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def export_data(self):
    self.running = False

    self.pupil_thread.join(5)
    self.lsl_thread.join(5)
    print('Joined threads, now outputting pupil data.')

    i = 0
    while os.path.exists("data/pupil/data-%s.csv" % i):
      i += 1

    # csv writer with stim_type, msg, and timestamp, then data
    with open('data/pupil/data-%s.csv' % i, 'w+') as f:
      writer = csv.writer(f)
      writer.writerow(('Signal Type', 'Msg', 'Time', 'Channel 1', 'Channel 2', 'Channel 3', 'Channel 4', 'Channel 5', 'Channel 6', 'Channel 7', 'Channel 8' ))
      for sample in self.samples:
        signal_type, timestamp, datas = sample
        out = (signal_type, 'msg', timestamp)
        for data in datas:
          out = out + (data,)
        writer.writerow(out)
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def export_annotations(self,export_range,export_dir):

        if not self.annotations:
            logger.warning('No annotations in this recording nothing to export')
            return

        annotations_in_section = chain(*self.annotations_by_frame[export_range])
        annotations_in_section = list({a['index']: a for a in annotations_in_section}.values())  # remove duplicates
        annotations_in_section.sort(key=lambda a:a['index'])

        with open(os.path.join(export_dir,'annotations.csv'),'w',encoding='utf-8',newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(self.csv_representation_keys())
            for a in annotations_in_section:
                csv_writer.writerow(self.csv_representation_for_annotations(a))
            logger.info("Created 'annotations.csv' file.")
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def write_key_value_file(csvfile,dictionary,append=False):
    """Writes a dictionary to a writable file in a CSV format

    Args:
        csvfile (FILE): Writable file
        dictionary (dict): Dictionary containing key-value pairs
        append (bool, optional): Writes `key,value` as fieldnames if False

    Returns:
        None: No return
    """
    writer = csv.writer(csvfile, delimiter=',')
    if not append:
        writer.writerow(['key','value'])
    for key,val in dictionary.items():
        writer.writerow([key,val])
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _drop_answer_id_col_from_feature_file(self, train_file_location):
        file_without_aid = insert_modifier_in_filename(train_file_location, 'no_aid')
        if path.isfile(file_without_aid):
            self.logger.info('Found a previously generated version of the training file without answer id column, '
                             're-using it: %s' % file_without_aid)
        else:
            self.logger.info('Generating a version of the feature file without answer id (which is what ranker'
                             ' training expects')
            temp_file = get_temp_file(file_without_aid)
            with smart_file_open(temp_file, 'w') as outfile:
                writer = csv.writer(outfile)
                with smart_file_open(train_file_location) as infile:
                    reader = csv.reader(infile)
                    for row in reader:
                        writer.writerow(row[:1] + row[2:])
            move(temp_file, file_without_aid)
            self.logger.info('Done generating file: %s' % file_without_aid)
        return file_without_aid
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def setup_train_and_test_writer(output_dir):
    """
    Create an output directory for the fold under the provided parent dir
    :param str output_dir: file path to the output dir
    :return: writer for <outdir.name>/train.csv and <outdir.name>/validation.csv
    :rtype: tuple(csv.writer,csv.writer)
    """
    if not path.isdir(output_dir):
        makedirs(output_dir)
    else:
        LOGGER.warn("Path <<%s>> already exists, files may be overwritten" % output_dir)

    train_writer = csv.writer(smart_file_open(path.join(output_dir, TRAIN_RELEVANCE_FILENAME), 'w'),
                              dialect=csv.excel, delimiter=',')
    validation_writer = csv.writer(smart_file_open(path.join(output_dir, VALIDATION_RELEVANCE_FILENAME), 'w'),
                                   dialect=csv.excel, delimiter=',')

    return train_writer, validation_writer
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _print_feature_vectors_and_check_for_correct_answers(writer, rnr_search_results, qid, correct_ans_lookup):
    """
    write the search results to file as a feature vector with the qid and gt labels from the query.
    :param csv.writer writer:
    :param list(list(str)) rnr_search_results:
    :param str qid: the qid to print at the start of each feature vector
    :param dict(str,int) correct_ans_lookup: label lookup for correct answer ids
    :return: num_possible_correct, num_correct_answers_in_search_results
    :rtype: tuple(int,int)
    """
    num_possible_correct = len(correct_ans_lookup)
    num_correct_answers_in_search_results = 0

    for row in rnr_search_results:
        gt_label = 0
        doc_id = row[_ANS_ID_COL].strip()
        if doc_id in correct_ans_lookup:
            gt_label = correct_ans_lookup[doc_id]
            num_correct_answers_in_search_results += 1
        writer.writerow([qid] + row + [gt_label])

    return num_possible_correct, num_correct_answers_in_search_results
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def scaneventresultexport(self, id, type, dialect="excel"):
        dbh = SpiderFootDb(self.config)
        data = dbh.scanResultEvent(id, type)
        fileobj = StringIO()
        parser = csv.writer(fileobj, dialect=dialect)
        parser.writerow(["Updated", "Type", "Module", "Source", "F/P", "Data"])
        for row in data:
            if row[4] == "ROOT":
                continue
            lastseen = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(row[0]))
            datafield = str(row[1]).replace("<SFURL>", "").replace("</SFURL>", "")
            parser.writerow([lastseen, str(row[4]), str(row[3]), str(row[2]), row[13], datafield])
        cherrypy.response.headers['Content-Disposition'] = "attachment; filename=SpiderFoot.csv"
        cherrypy.response.headers['Content-Type'] = "application/csv"
        cherrypy.response.headers['Pragma'] = "no-cache"
        return fileobj.getvalue()
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def scaneventresultexportmulti(self, ids, dialect="excel"):
        dbh = SpiderFootDb(self.config)
        scaninfo = dict()
        data = list()
        for id in ids.split(','):
            scaninfo[id] = dbh.scanInstanceGet(id)
            data = data + dbh.scanResultEvent(id)

        fileobj = StringIO()
        parser = csv.writer(fileobj, dialect=dialect)
        parser.writerow(["Scan Name", "Updated", "Type", "Module", "Source", "F/P", "Data"])
        for row in data:
            if row[4] == "ROOT":
                continue
            lastseen = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(row[0]))
            datafield = str(row[1]).replace("<SFURL>", "").replace("</SFURL>", "")
            parser.writerow([scaninfo[row[12]][0], lastseen, str(row[4]), str(row[3]), 
                            str(row[2]), row[13], datafield])
        cherrypy.response.headers['Content-Disposition'] = "attachment; filename=SpiderFoot.csv"
        cherrypy.response.headers['Content-Type'] = "application/csv"
        cherrypy.response.headers['Pragma'] = "no-cache"
        return fileobj.getvalue()
项目:auto_ml    作者:doordash    | 项目源码 | 文件源码
def write_gs_param_results_to_file(trained_gs, most_recent_filename):

    timestamp_time = datetime.datetime.now()
    write_most_recent_gs_result_to_file(trained_gs, most_recent_filename, timestamp_time)

    grid_scores = trained_gs.grid_scores_
    scorer = trained_gs.scorer_
    best_score = trained_gs.best_score_

    file_name = 'pipeline_grid_search_results.csv'
    write_header = False
    if not os.path.isfile(file_name):
        write_header = True

    with open(file_name, 'a') as results_file:
        writer = csv.writer(results_file, dialect='excel')
        if write_header:
            writer.writerow(['timestamp', 'scorer', 'best_score', 'all_grid_scores'])
        writer.writerow([timestamp_time, scorer, best_score, grid_scores])
项目:buttervolume    作者:anybox    | 项目源码 | 文件源码
def schedule():
    """Schedule or unschedule a job
    TODO add a lock
    """
    name = jsonloads(request.body.read())['Name']
    timer = jsonloads(request.body.read())['Timer']
    action = jsonloads(request.body.read())['Action']
    schedule = []
    if timer:  # 0 means unschedule!
        schedule.append((name, action, timer))
    if os.path.exists(SCHEDULE):
        with open(SCHEDULE) as f:
            for n, a, t in csv.reader(f):
                # skip the line we want to write
                if n == name and a == action:
                    continue
                schedule.append((n, a, t))
    os.makedirs(dirname(SCHEDULE), exist_ok=True)
    with open(SCHEDULE, 'w') as f:
        for line in schedule:
            csv.writer(f).writerow(line)
    return json.dumps({'Err': ''})
项目:UberLens    作者:adamalawrence    | 项目源码 | 文件源码
def create_HexGrid_csv(self, hexgrid):
        with open('HexGrid-Manhattan.csv', 'w') as out:
            csv_out = csv.writer(out)
            csv_out.writerow(['latitude','longitude'])
            for row in hexgrid:
                csv_out.writerow(row)
项目:Causality    作者:vcla    | 项目源码 | 文件源码
def getExampleFromDB(exampleName, connType, conn=False):
    exampleNameForDB = exampleName.replace("_","")
    m = hashlib.md5(exampleNameForDB)
    tableName = TBLPFX + m.hexdigest()
    leaveconn = True
    if not conn:
        leaveconn = False
        conn = getDB(connType)
    allColumns = getColumns(conn, connType, tableName, exampleNameForDB)
    if not allColumns:
        return
    sqlStatement = "SELECT "
    for singleColumn in allColumns:
        if "act_made_call" not in singleColumn[0] and "act_unlock" not in singleColumn[0]:
            sqlStatement += singleColumn[0] + ", "
        else:
            pass #print singleColumn
    notNullColumn = allColumns[len(allColumns)-3] # the last data column (hopefully)
    #cursor.execute("SELECT data.* FROM {} data, cvpr2012complete tally WHERE data.name = tally.name AND data.stamp = tally.stamp and tally.hash = %s".format(tableName), m.hexdigest())
    #cursor.execute("SELECT * FROM {} WHERE {} IS NOT NULL".format(tableName, notNullColumn[0]))
    sqlStatement = sqlStatement[:-2]
    sqlStatement += " FROM " + tableName + " WHERE " + notNullColumn[0] + " IS NOT NULL"
    cursor = conn.cursor()
    cursor.execute(sqlStatement)
    if not globalDryRun:
        csv_filename = kResultStorageFolder + exampleName + ".csv"
        print(" as {}".format(csv_filename))
        csv_writer = csv.writer(open(csv_filename, "wt"))
        csv_writer.writerow([i[0] for i in cursor.description]) # write headers
        csv_writer.writerows(cursor)
        del csv_writer # this will close the CSV file
    cursor.close()
    if not leaveconn:
        conn.close()

#def getAllExamplesFromDB()
#   fileWithExampleNames = 'testingCutPoints.txt'
#   f = open(fileWithExampleNames, 'r')
#   for line in f:
#       line = line.split(
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, fn, **kwargs):
        self.stream = _csv_open(fn, 'w')
        self.writer = csv.writer(self.stream, **self.defaults)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def writerow(self, row):
        if sys.version_info[0] < 3:
            r = []
            for item in row:
                if isinstance(item, text_type):
                    item = item.encode('utf-8')
                r.append(item)
            row = r
        self.writer.writerow(row)

#
#   Configurator functionality
#
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def write_record(self, bdist_dir, distinfo_dir):
        from wheel.util import urlsafe_b64encode

        record_path = os.path.join(distinfo_dir, 'RECORD')
        record_relpath = os.path.relpath(record_path, bdist_dir)

        def walk():
            for dir, dirs, files in os.walk(bdist_dir):
                dirs.sort()
                for f in sorted(files):
                    yield os.path.join(dir, f)

        def skip(path):
            """Wheel hashes every possible file."""
            return (path == record_relpath)

        with open_for_csv(record_path, 'w+') as record_file:
            writer = csv.writer(record_file)
            for path in walk():
                relpath = os.path.relpath(path, bdist_dir)
                if skip(relpath):
                    hash = ''
                    size = ''
                else:
                    with open(path, 'rb') as f:
                        data = f.read()
                    digest = hashlib.sha256(data).digest()
                    hash = 'sha256=' + native(urlsafe_b64encode(digest))
                    size = len(data)
                record_path = os.path.relpath(
                    path, bdist_dir).replace(os.path.sep, '/')
                writer.writerow((record_path, hash, size))
项目:kiteHistory    作者:mr-karan    | 项目源码 | 文件源码
def write_to_csv(stock_data, name):
    """
    params:
        - stock_data(list) : list of dict objects containing stock data
        - name(str) : output file name specified by `-output` param.
    """
    with open(path.join(args.path, name + '.csv'), 'w') as the_file:
        fieldnames = ['date', 'open', 'high', 'low', 'close', 'volume']
        writer = csv.DictWriter(the_file, fieldnames=fieldnames)
        writer.writeheader()
        for line in stock_data:
            writer.writerow(line)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def oldmain():
    output_rows, DATE = export_impl()
    with open('RRID-data-%s.csv' % DATE, 'wt') as f:
        writer = csv.writer(f, lineterminator='\n')
        writer.writerows(sorted(output_rows))

    import json
    output_json, DATE = export_json_impl()
    with open('RRID-data-%s.json' % DATE, 'wt') as f:
        json.dump(output_json, f, sort_keys=True, indent=4)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def export(request):
    print('starting csv export')
    output_rows, DATE = export_impl()    
    data = StringIO()
    writer = csv.writer(data)
    writer.writerows(sorted(output_rows))

    r = Response(gzip.compress(data.getvalue().encode()))
    r.content_type = 'text/csv'
    r.headers.update({
        'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE,
        'Content-Encoding':'gzip'
        })

    return r
项目:treecat    作者:posterior    | 项目源码 | 文件源码
def csv_writer(filename):
    with open(filename, 'w') as f:
        yield csv.writer(f)
项目:dsa-sign-in-sheets    作者:PhillyDSA    | 项目源码 | 文件源码
def event_to_csv(request, *args, **kwargs):
    event = Event.objects.get(pk=kwargs.get('pk', None))
    if event.event_admin.id != request.user.id:
        return HttpResponseForbidden()
    participants = EventParticipant.objects.all().filter(event=event)

    response = HttpResponse(content_type='text/csv')
    response['Content-Disposition'] = f'attachment; filename="{ event.id }.csv"'

    writer = csv.writer(response)
    writer.writerow([
            'first_name',
            'last_name',
            'email',
            'street_one',
            'street_two',
            'city',
            'state',
            'zip_code',
            'telephone_number',
        ])
    for part in participants:
        writer.writerow([
            part.first_name,
            part.last_name,
            part.email,
            part.street_one,
            part.street_two,
            part.city,
            part.state,
            part.zip_code,
            part.telephone_number,])
    return response
项目:AmericansStopSoros    作者:HelloKitty    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:AmericansStopSoros    作者:HelloKitty    | 项目源码 | 文件源码
def writerow(self, row):
        '''writerow(unicode) -> None
        This function takes a Unicode string and encodes it to the output.
        '''
        self.writer.writerow([s.encode("utf-8") for s in row])
        data = self.queue.getvalue()
        data = data.decode("utf-8")
        data = self.encoder.encode(data)
        self.stream.write(data)
        self.queue.truncate(0)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __init__(self, fn, **kwargs):
        self.stream = _csv_open(fn, 'w')
        self.writer = csv.writer(self.stream, **self.defaults)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def writerow(self, row):
        if sys.version_info[0] < 3:
            r = []
            for item in row:
                if isinstance(item, text_type):
                    item = item.encode('utf-8')
                r.append(item)
            row = r
        self.writer.writerow(row)

#
#   Configurator functionality
#
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def write_record(self, bdist_dir, distinfo_dir):
        from .util import urlsafe_b64encode

        record_path = os.path.join(distinfo_dir, 'RECORD')
        record_relpath = os.path.relpath(record_path, bdist_dir)

        def walk():
            for dir, dirs, files in os.walk(bdist_dir):
                dirs.sort()
                for f in sorted(files):
                    yield os.path.join(dir, f)

        def skip(path):
            """Wheel hashes every possible file."""
            return (path == record_relpath)

        with open_for_csv(record_path, 'w+') as record_file:
            writer = csv.writer(record_file)
            for path in walk():
                relpath = os.path.relpath(path, bdist_dir)
                if skip(relpath):
                    hash = ''
                    size = ''
                else:
                    with open(path, 'rb') as f:
                        data = f.read()
                    digest = hashlib.sha256(data).digest()
                    hash = 'sha256=' + native(urlsafe_b64encode(digest))
                    size = len(data)
                record_path = os.path.relpath(
                    path, bdist_dir).replace(os.path.sep, '/')
                writer.writerow((record_path, hash, size))
项目:PyTasks    作者:TheHirschfield    | 项目源码 | 文件源码
def exportNoteToFile():
  data = notes
  with open('saves/notes.csv', 'w') as f:
    writer = csv.writer(f)
    writer.writerows(data)
  return ""
项目:PyTasks    作者:TheHirschfield    | 项目源码 | 文件源码
def saveTasks():
    data = events
    with open('saves/task.csv', 'w', newline = '') as t:
        tWriter = csv.writer(t)
        tWriter.writerows(data)
    t.close()
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def csv(self, output):
        """Output data as excel-compatible CSV"""
        import csv
        csvwriter = csv.writer(self.outfile)
        csvwriter.writerows(output)