Python unicodecsv 模块,DictReader() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用unicodecsv.DictReader()

项目:doorman    作者:mwielgoszewski    | 项目源码 | 文件源码
def test_node_csv_download(self, node, testapp):
        import unicodecsv as csv

        node.enrolled_on = dt.datetime.utcnow()
        node.last_checkin = dt.datetime.utcnow()
        node.last_ip = '1.1.1.1'
        node.node_info = {'hardware_vendor': "Honest Achmed's Computer Supply"}
        node.save()

        resp = testapp.get(url_for('manage.nodes_csv'))

        assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
        assert resp.headers['Content-Disposition'] == 'attachment; filename=nodes.csv'

        reader = csv.DictReader(io.BytesIO(resp.body))
        row = next(reader)

        assert row['Display Name'] == node.display_name
        assert row['Host Identifier'] == node.host_identifier
        assert row['Enrolled On'] == str(node.enrolled_on)
        assert row['Last Check-In'] == str(node.last_checkin)
        assert row['Last Ip Address'] == node.last_ip
        assert row['Is Active'] == 'True'
        assert row['Make'] == node.node_info['hardware_vendor']
项目:confrontiv    作者:correctiv    | 项目源码 | 文件源码
def upload_recipients(self, request):
        if not request.method == 'POST':
            raise PermissionDenied
        if not self.has_change_permission(request):
            raise PermissionDenied

        reader = unicodecsv.DictReader(request.FILES['file'])
        for lineno, line in enumerate(reader, 1):
            group = line.pop('group', None)
            if 'slug' not in line:
                line['slug'] = slugify(line['name'])
            recipient = Recipient.objects.create(
                **line
            )
            if group is not None:
                rg = RecipientGroup.objects.get(slug=group)
                recipient.groups.add(rg)

        return redirect('admin:confrontiv_recipient_changelist')
项目:adversarial-squad    作者:robinjia    | 项目源码 | 文件源码
def dump_grammar():
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if OPTS.worker and row['WorkerId'] != OPTS.worker: continue
      if row['AssignmentStatus'] == 'Rejected': continue
      print 'HIT %s' % row['HITId']
      print 'WorkerId: %s' % row['WorkerId']
      print 'Time: %s s' % row['WorkTimeInSeconds']
      input_qids = row['Input.qids'].split('\t')
      input_sents = row['Input.sents'].split('\t')
      ans_is_good = row['Answer.is-good'].split('\t')
      ans_responses = row['Answer.responses'].split('\t')
      for qid, s, is_good, response in zip(input_qids, input_sents, 
                                           ans_is_good, ans_responses):
        print ('  Example %s' % qid)
        print ('    Sentence: %s' % s).encode('utf-8')
        print ('    Is good?  %s' % is_good)
        print ('    Response: %s' % colored(response, 'cyan')).encode('utf-8')
项目:adversarial-squad    作者:robinjia    | 项目源码 | 文件源码
def dump_verify():
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if OPTS.worker and row['WorkerId'] != OPTS.worker: continue
      if row['AssignmentStatus'] == 'Rejected': continue
      print 'HIT %s' % row['HITId']
      print 'WorkerId: %s' % row['WorkerId']
      print 'Time: %s s' % row['WorkTimeInSeconds']
      qids = row['Input.qids'].split('\t')
      questions = row['Input.questions'].split('\t')
      sents = row['Answer.sents'].split('\t')
      responses = row['Answer.responses'].split('\t')
      for qid, q, s_str, response_str in zip(
          qids, questions, sents, responses):
        print ('  Example %s' % qid)
        print ('  Question %s' % q)
        s_list = s_str.split('|')
        a_list = response_str.split('|')
        for s, a in zip(s_list, a_list):
          print ('    Sentence: %s' % sent_format(s)).encode('utf-8')
          print ('    Is good?  %s' % colored(a, 'cyan'))
项目:adversarial-squad    作者:robinjia    | 项目源码 | 文件源码
def pred_human_eval():
  all_preds = collections.defaultdict(list)
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      all_preds[row['Input.qid']].append(row['Answer.response'])
  preds = {}
  for qid in all_preds:
    if OPTS.ensemble:
      for a in all_preds[qid]:
        count = sum(1 for pred in all_preds[qid] if a == pred)
        if count > 1:
          preds[qid] = a
          break
      else:
        preds[qid] = random.sample(all_preds[qid], 1)[0]
    else:
      preds[qid] = random.sample(all_preds[qid], 1)[0]
  print json.dumps(preds)
项目:fingerprints    作者:alephdata    | 项目源码 | 文件源码
def fetch():
    out_path = os.path.dirname(__file__)
    out_path = os.path.join(out_path, 'fingerprints', 'data', 'types.yml')
    fh = urllib.urlopen(CSV_URL)
    types = {}
    for row in unicodecsv.DictReader(fh):
        name = stringify(row.get('Name'))
        abbr = stringify(row.get('Abbreviation'))
        if name is None or abbr is None:
            continue
        if name in types and types[name] != abbr:
            print name, types[name], abbr
        types[name] = abbr
        # print abbr, name

    with open(out_path, 'w') as fh:
        yaml.safe_dump({'types': types}, fh,
                       indent=2,
                       allow_unicode=True,
                       canonical=False,
                       default_flow_style=False)
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def parse_csv(file_stream, expected_columns=None):
    """
    Parse csv file and return a stream of dictionaries representing each row.

    First line of CSV file must contain column headers.

    Arguments:
         file_stream: input file
         expected_columns (set[unicode]): columns that are expected to be present

    Yields:
        dict: CSV line parsed into a dictionary.
    """
    reader = unicodecsv.DictReader(file_stream, encoding="utf-8")

    if expected_columns and set(expected_columns) - set(reader.fieldnames):
        raise ValidationError(ValidationMessages.MISSING_EXPECTED_COLUMNS.format(
            expected_columns=", ".join(expected_columns), actual_columns=", ".join(reader.fieldnames)
        ))

    # "yield from reader" would be nicer, but we're on python2.7 yet.
    for row in reader:
        yield row
项目:hgvs    作者:biocommons    | 项目源码 | 文件源码
def _read_input(self, in_file):
        """Dummy file of inputs

        :param in_file: path to input file of 2 cols (tab-delim); accession_number, sequence
        :type string
        :return dictionary of accession_number to sequence tags
        """
        result = {}
        with open(in_file, 'r') as f:
            reader = csv.DictReader(f, delimiter=str('\t'))
            for row in reader:
                result[row['accession']] = {
                    'transcript_sequence': row['transcript_sequence'],
                    'cds_start_i': int(row['cds_start_i']),
                    'cds_end_i': int(row['cds_end_i'])
                }

        return result
项目:stormtrooper    作者:CompileInc    | 项目源码 | 文件源码
def process(self, activate=False):
        if not self.is_questions_created:
            data = list(csv.DictReader(self.csv.file))
            for d in data:
                Question.objects.create(task=self,
                                        question=d)
            self.is_questions_created = True

            if activate:
                self.is_active = True

            self.save()
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        labels.append(j['id'])
    return labels
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_labels(filename):
    labels = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels[j['id']] = int(j['cancer'])
    return labels
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels.append(j['id'])
    return labels
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels.append(j['id'])
    return labels
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_labels(filename):
    labels = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels[j['id']] = int(j['cancer'])
    return labels
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_test_labels(filename):
    labels = []
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        #if not j['id'] == 'b8bb02d229361a623a4dc57aa0e5c485':
        labels.append(j['id'])
    return labels
项目:data-science-bowl-2017    作者:tondonia    | 项目源码 | 文件源码
def read_depths(filename):
    depths = {}
    csvFile = open(filename)
    reader = unicodecsv.DictReader(csvFile,encoding='utf-8')
    for j in reader:
        depths[j['key']] = int(j['depth'])
    return depths
项目:confrontiv    作者:correctiv    | 项目源码 | 文件源码
def make_inquiry_requests_from_file(inquiry, file):
    reader = unicodecsv.DictReader(file)
    for lineno, line in enumerate(reader, 1):
        try:
            recipient = Recipient.objects.get(slug=line['recipient'])
        except Recipient.DoesNotExist:
            raise ValueError('Recipient on line %s not found' % lineno)
        if not recipient.groups.filter(id=inquiry.group_id).exists():
            raise ValueError('Recipient %s not in inquiry group' % recipient)
        data = json.loads(line['data'])
        InquiryRequest.objects.create_from_inquiry(inquiry, recipient, data)
项目:adversarial-squad    作者:robinjia    | 项目源码 | 文件源码
def read_sentences():
  id_to_sents = collections.defaultdict(list)
  with open(OPTS.batch_file) as f:
    reader = csv.DictReader(f)
    for row in reader:
      input_qids = row['Input.qids'].split('\t')
      input_sents = row['Input.sents'].split('\t')
      ans_is_good = row['Answer.is-good'].split('\t')
      ans_responses = row['Answer.responses'].split('\t')
      for qid, s, is_good, response in zip(input_qids, input_sents, ans_is_good, ans_responses):
        if is_good == 'yes':
          response = s
        if response not in id_to_sents[qid]:
          id_to_sents[qid].append(response)
  return id_to_sents
项目:adversarial-squad    作者:robinjia    | 项目源码 | 文件源码
def stats_grammar():
  # Read data
  worker_to_is_good = collections.defaultdict(list)
  worker_to_times = collections.defaultdict(list)
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if row['AssignmentStatus'] == 'Rejected': continue
      worker_id = row['WorkerId']
      ans_is_good = row['Answer.is-good'].split('\t')
      time = float(row['WorkTimeInSeconds'])
      worker_to_is_good[worker_id].extend(ans_is_good)
      worker_to_times[worker_id].append(time)

  # Aggregate by worker
  print '%d total workers' % len(worker_to_times)
  worker_stats = {}
  for worker_id in worker_to_times:
    times = sorted(worker_to_times[worker_id])
    t_median = times[len(times)/2]
    t_mean = sum(times) / float(len(times))
    is_good_list = worker_to_is_good[worker_id]
    num_qs = len(is_good_list)
    frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs
    worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good)

  # Print
  sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3])
  for worker_id in sorted_ids:
    t_median, t_mean, num_qs, frac_good = worker_stats[worker_id]
    print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % (
        worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
项目:adversarial-squad    作者:robinjia    | 项目源码 | 文件源码
def stats_verify():
  # Read data
  worker_to_is_good = collections.defaultdict(list)
  worker_to_times = collections.defaultdict(list)
  with open(OPTS.filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
      if row['AssignmentStatus'] == 'Rejected': continue
      worker_id = row['WorkerId']
      ans_is_good = [x for s in row['Answer.responses'].split('\t')
                     for x in s.split('|')]
      time = float(row['WorkTimeInSeconds'])
      worker_to_is_good[worker_id].extend(ans_is_good)
      worker_to_times[worker_id].append(time)

  # Aggregate by worker
  print '%d total workers' % len(worker_to_times)
  worker_stats = {}
  for worker_id in worker_to_times:
    times = sorted(worker_to_times[worker_id])
    t_median = times[len(times)/2]
    t_mean = sum(times) / float(len(times))
    is_good_list = worker_to_is_good[worker_id]
    num_qs = len(is_good_list)
    frac_good = sum(1.0 for x in is_good_list if x == 'yes') / num_qs
    worker_stats[worker_id] = (t_median, t_mean, num_qs, frac_good)

  # Print
  sorted_ids = sorted(list(worker_stats), key=lambda x: worker_stats[x][3])
  for worker_id in sorted_ids:
    t_median, t_mean, num_qs, frac_good = worker_stats[worker_id]
    print 'Worker %s: t_median %.1f, t_mean %.1f, %d questions, %.1f%% good' % (
        worker_id, t_median, t_mean, num_qs, 100.0 * frac_good)
项目:handcart    作者:hatnote    | 项目源码 | 文件源码
def get_csv(csv_file):
    reader = unicodecsv.DictReader(open(example_file, 'rb'))
    return list(reader)
项目:froide-campaign    作者:okfde    | 项目源码 | 文件源码
def handle(self, *args, **options):
        translation.activate(settings.LANGUAGE_CODE)

        filename = options['filename']

        reader = unicodecsv.DictReader(open(filename))
        importer = CSVImporter()
        importer.run(reader)
项目:froide-campaign    作者:okfde    | 项目源码 | 文件源码
def upload_information_objects(self, request):
        if not request.method == 'POST':
            raise PermissionDenied
        if not self.has_change_permission(request):
            raise PermissionDenied
        reader = unicodecsv.DictReader(request.FILES['file'])
        importer = CSVImporter()
        importer.run(reader)
        return redirect('admin:froide_campaign_informationobject_changelist')
项目:panphon    作者:dmort27    | 项目源码 | 文件源码
def read_ipa_bases(ipa_bases):
    segments = []
    with open(ipa_bases, 'rb') as f:
        dictreader = csv.DictReader(f, encoding='utf=8')
        for record in dictreader:
            form = record['ipa']
            features = {k: v for k, v in record.items() if k != 'ipa'}
            segments.append(Segment(form, features))
    return segments
项目:hgvs    作者:biocommons    | 项目源码 | 文件源码
def gcp_file_reader(fn):
    rdr = csv.DictReader(open(fn, "r"), delimiter=str("\t"))
    for rec in rdr:
        if rec["id"].startswith("#"):
            continue
        yield rec
项目:hgvs    作者:biocommons    | 项目源码 | 文件源码
def test_parser_test_completeness(self):
        """ensure that all rules in grammar have tests"""

        grammar_rule_re = re.compile("^(\w+)")
        grammar_fn = pkg_resources.resource_filename(__name__, "../hgvs/_data/hgvs.pymeta")
        with open(grammar_fn, "r") as f:
            grammar_rules = set(r.group(1) for r in filter(None, map(grammar_rule_re.match, f)))

        with open(self._test_fn, "r") as f:
            reader = csv.DictReader(f, delimiter=str("\t"))
            test_rules = set(row["Func"] for row in reader)

        untested_rules = grammar_rules - test_rules

        self.assertTrue(len(untested_rules) == 0, "untested rules: {}".format(untested_rules))
项目:hgvs    作者:biocommons    | 项目源码 | 文件源码
def test_parser_grammar(self):
        with open(self._test_fn, "r") as f:
            reader = csv.DictReader(f, delimiter=str("\t"))

            fail_cases = []

            for row in reader:
                if row["Func"].startswith("#"):
                    continue

                # setup input
                inputs = self._split_inputs(row["Test"], row["InType"])
                expected_results = self._split_inputs(row["Expected"], row["InType"]) if row["Expected"] else inputs
                expected_map = dict(zip(inputs, expected_results))
                # step through each item and check
                is_valid = True if row["Valid"].lower() == "true" else False

                for key in expected_map:
                    expected_result = six.text_type(expected_map[key]).replace("u'", "'")
                    function_to_test = getattr(self.p._grammar(key), row["Func"])
                    row_str = u"{}\t{}\t{}\t{}\t{}".format(row["Func"], key, row["Valid"], "one", expected_result)
                    try:
                        actual_result = six.text_type(function_to_test()).replace("u'", "'")
                        if not is_valid or (expected_result != actual_result):
                            print("expected: {} actual:{}".format(expected_result, actual_result))
                            fail_cases.append(row_str)
                    except Exception as e:
                        if is_valid:
                            print("expected: {} Exception: {}".format(expected_result, e))
                            fail_cases.append(row_str)

        # everything should have passed - report whatever failed
        self.assertTrue(len(fail_cases) == 0, pprint.pprint(fail_cases))
项目:hgvs    作者:biocommons    | 项目源码 | 文件源码
def gxp_file_reader(fn):
    rdr = csv.DictReader(open(fn, "r"), delimiter=str("\t"))
    for rec in rdr:
        if rec["id"].startswith("#"):
            continue
        yield rec
项目:cop-matcher    作者:thechicagoreporter    | 项目源码 | 文件源码
def load_new_casecops(apps,schema_editor):
    infile_path = BASE_DIR + '/data/20160614_migration/casecops.csv'
    infile      = open(infile_path)
    incsv       = csv.DictReader(infile)

    for row in incsv:
        case_lookup = list(Case.objects.filter(case_no=row['case_no']))
        if len(case_lookup) != 1:
            print 'ambiguous case:', row['case_no'], 'has len:', len(case_lookup)
            import ipdb; ipdb.set_trace()
        else:
            # can only create casecop if we verify there's 1 matching case in cases table
            case = case_lookup[0]
            cc = CaseCop.objects.create(
                                   id   = row['id'],
                                   case = case,
                                   case_no = row['case_no'],
                                   slug = '',

                                   cop = Cop.objects.get(id=row['cop_id']) if row['cop_id'] else None,
                                   cop_first_name = row['cop_first_name'],
                                   cop_middle_initial = row['cop_middle_initial'],
                                   cop_last_name = row['cop_last_name'],
                                   badge_no = row['badge_no'],
                                   officer_atty = row['officer_atty'],
                                   officer_atty_firm = row['officer_atty_firm'],

                                   entered_by = row['entered_by'],
                                   entered_when = parse_str_date(row['entered_when']),
                                   fact_checked_by = row['fact_checked_by'],
                                   fact_checked_when = parse_str_date(row['fact_checked_when']),
                                   matched_by = row['matched_by'],
                                   matched_when = parse_str_date(row['matched_when']),
                                   note = row['note'],
                                   flag = row['flag'] == '1'
                                  )
            cc.save()
项目:cop-matcher    作者:thechicagoreporter    | 项目源码 | 文件源码
def load_new_cases(apps,schema_editor):
    in_file_path = BASE_DIR + '/data/20160620_migration/cases.csv'
    in_file      = open(in_file_path)
    in_csv       = csv.DictReader(in_file)

    for row in in_csv:
        c = Case.objects.create(
                                id                 = row['id'],
                                case_no            = row['CaseNumber'], 
                                date_filed         = parse_str_date(row['DateFiled']),
                                date_closed        = parse_str_date(row['DateClosed']),
                                judge              = row['Judge'],
                                plaintiff_atty     = row['PlaintiffsLeadAttorney'],
                                plaintiff_firm     = row['PlaintiffsAttorneyLawFirm'],
                                city_atty          = row['CitysLeadAttorney'],
                                city_firm          = row['CitysAttorneyLawFirm'],
                                magistrate         = row['MagistrateJudge'],
                                incident_date      = parse_str_date(row['DateofIncident']),             
                                location           = row['LocationListed'],
                                address            = row['StreetAddress'],
                                city               = row['City'],
                                state              = row['State'],
                                lat                = float(row['Latitude']) if row['Latitude'] else None, 
                                lon                = float(row['Longitude']) if row['Longitude'] else None,
                                census_place       = row['CensusPlaceFips'],
                                census_msa         = row['CensusMsaFips'],
                                census_met_div     = row['CensusMetDivFips'],
                                census_mcd         = row['CensusMcdFips'],
                                census_micro       = row['CensusCbsaMicro'],
                                census_cbsa        = row['CensusCbsaFips'],
                                census_block       = row['CensusBlock'],
                                census_block_g     = row['CensusBlockGroup'],
                                census_tract       = row['CensusTract'],
                                census_county      = row['CensusCountyFips'],
                                census_state       = row['CensusStateFips'],
                                naaccr_cert        = row['naaccrCertCode'],
                                m_number           = row['MNumber'],
                                m_predirection     = row['MPreDirectional'],
                                m_name             = row['MName'],
                                m_suffix           = row['MSuffix'],
                                m_city             = row['MCity'],
                                m_state            = row['MState'],
                                narrative          = row['Narrative'],
                                primary_cause      = row['primary_cause'],
                                federal_causes     = row['federal_causes'],
                                state_causes       = row['state_causes'],
                                interaction_type   = row['interaction_type'],
                                officers           = row['officers'],
                                victims            = row['victims'],
                                misconduct_type    = row['misconduct_type'],
                                weapons_used       = row['weapons_used'],
                                outcome            = row['outcome'],
                                tags               = row['tags'],
                                reporter           = row['EnteredBy'],
                                fact_checker       = row['Fact-checkedby'],
                                differences        = row['Differences'],
                               )
        c.save()