Python csv 模块,QUOTE_NONE 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.QUOTE_NONE

项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_dialect(self):
        data = """\
label1,label2,label3
index1,"a,c,e
index2,b,d,f
"""

        dia = csv.excel()
        dia.quoting = csv.QUOTE_NONE
        df = self.read_csv(StringIO(data), dialect=dia)

        data = '''\
label1,label2,label3
index1,a,c,e
index2,b,d,f
'''
        exp = self.read_csv(StringIO(data))
        exp.replace('a', '"a', inplace=True)
        tm.assert_frame_equal(df, exp)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def build_gtf(self):
        print "Writing new genes GTF file (may take 10 minutes for a 1GB input GTF file)..."
        with open(self.out_gtf_fn, 'wb') as f:
            writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar='')
            for row, is_comment, properties in self.gtf_reader_iter(self.in_gtf_fn):
                if is_comment:
                    writer.writerow(row)
                    continue

                remove = False
                for key, value in properties.iteritems():
                    if key in self.attributes and value not in self.attributes[key]:
                        remove = True

                if not remove:
                    writer.writerow(row)

        print "...done\n"
项目:mendelmd    作者:raonyguimaraes    | 项目源码 | 文件源码
def export_to_csv(request, variants):
    #export to csv
    export = request.GET.get('export', '')
    if export != '':
        if export == 'csv':
            response = HttpResponse(content_type='text/csv')
            response['Content-Disposition'] = 'attachment; filename=export.csv'
            writer = csv.writer(response)

        elif export == 'txt':
            response = HttpResponse(content_type='text/plain')
            response['Content-Disposition'] = 'attachment; filename=export.txt'
            writer = csv.writer(response, delimiter='\t', quoting=csv.QUOTE_NONE)    
        writer.writerow(['Individual', 'Index', 'Pos_index', 'Chr', 'Pos', 'Variant_id', 'Ref', 'Alt', 'Qual', 'Filter', 'Info', 'Format', 'Genotype_col', 'Genotype', 'Read_depth', 'Gene', 'Mutation_type', 'Vartype', 'Genomes1k_maf', 'Dbsnp_maf', 'Esp_maf', 'Dbsnp_build', 'Sift', 'Sift_pred', 'Polyphen2', 'Polyphen2_pred', 'Condel', 'Condel_pred', 'DANN', 'CADD', 'Is_at_omim', 'Is_at_hgmd', 'Hgmd_entries', 'Effect', 'Impact', 'Func_class', 'Codon_change', 'Aa_change', 'Aa_len', 'Gene_name', 'Biotype', 'Gene_coding', 'Transcript_id', 'Exon_rank', 'Genotype_number', 'Allele', 'Gene', 'Feature', 'Feature_type', 'Consequence', 'Cdna_position', 'Cds_position', 'Protein_position', 'Amino_acids', 'Codons', 'Existing_variation', 'Distance', 'Strand', 'Symbol', 'Symbol_source', 'Sift', 'Polyphen', 'Condel']) 
        for variant in variants:
            # print 'variant', variant.index
            writer.writerow([variant.individual, variant.index, variant.pos_index, variant.chr, variant.pos, variant.variant_id, variant.ref, variant.alt, variant.qual, variant.filter, pickle.loads(variant.info), variant.format, variant.genotype_col, variant.genotype, variant.read_depth, variant.gene, variant.mutation_type, variant.vartype, variant.genomes1k_maf, variant.dbsnp_maf, variant.esp_maf, variant.dbsnp_build, variant.sift, variant.sift_pred, variant.polyphen2, variant.polyphen2_pred, variant.condel, variant.condel_pred, variant.dann, variant.cadd, variant.is_at_omim, variant.is_at_hgmd, variant.hgmd_entries])
        return response
项目:vw_glove    作者:KristianHolsheimer    | 项目源码 | 文件源码
def load(cls, filepath_or_buffer):
        """
        Load word embeddings from a csv file.

        Params
        ------
        filepath_or_buffer : str
            Any object that ``pandas.to_csv()`` can read from.

        """
        word_vectors = pd.read_csv(
            filepath_or_buffer, sep=' ',
            quoting=QUOTE_NONE, index_col=0, header=None)
        word_vectors.columns = np.arange(word_vectors.shape[1])
        word_vectors.index.name = 'token'
        self = cls(n_dimensions=word_vectors.shape[1])
        self.word_vectors_ = word_vectors
        return self
项目:python_clang_parser    作者:mathben    | 项目源码 | 文件源码
def _create_csv(self, filename, data):
        csv_id = 0

        if not self._parser.quiet:
            print("Creating csv on file '%s'" % filename)

        with open(filename, self._option_open_file) as csv_file:
            result = csv.writer(csv_file, delimiter=';', quotechar="", quoting=csv.QUOTE_NONE)
            result.writerow(self._header)

            for clang_obj in data:
                if clang_obj.kind in [clang.cindex.CursorKind.CLASS_DECL, clang.cindex.CursorKind.CLASS_TEMPLATE]:
                    # class section
                    for clang_obj_child in clang_obj.methods:
                        self.add_line(result, clang_obj, clang_obj_child, csv_id)
                        csv_id += 1
                else:
                    # method of function section
                    self.add_line(result, None, clang_obj, csv_id)
                    csv_id += 1

            if not self._parser.quiet:
                print("%s row into %s" % (csv_id, filename))
项目:ti-data-samples    作者:bom4v    | 项目源码 | 文件源码
def init_csv_file (csv_file_param):
    """
    Add the list of flattened event structures into the CSV file
    """
    csv_file = None
    if isinstance (csv_file_param, str):
        # The parameter is a file-path
        csv_file = open (csv_file_param, 'w', newline = '')

    elif hasattr (csv_file_param, 'write'):
        # The parameter is already a file (normally, stdout)
        csv_file = csv_file_param

    else:
        # Unknown
        raise IOError ('[Error] Output file parameter "' + str(csv_file_param) + '" unkown')

    # Write the header
    fileWriter = csv.DictWriter (csv_file, delimiter='^',
                                 fieldnames = fieldnames,
                                 dialect = 'unix', quoting = csv.QUOTE_NONE)

    #
    fileWriter.writeheader()
项目:SNOMEDToOWL    作者:hsolbrig    | 项目源码 | 文件源码
def walk(self, filtr: Callable[[str], bool], processor: Callable[[Dict], bool]) -> None:
        """
        Walk the directory testing the file against filtr and invoking processor with contents if true
        :param filtr: file name tester
        :param processor: content row processor
        """
        for filedir, _, files in os.walk(self._indir):
            for file in files:
                if filtr(file):
                    print("Processing %s" % os.path.join(filedir, file))
                    with open(os.path.join(filedir, file)) as f:
                        reader = csv.DictReader(f, delimiter="\t", quoting=csv.QUOTE_NONE)
                        with self._create_writer(filedir, file, reader) as writer:
                            for row in reader:
                                if processor(row):
                                    writer.writerow(row)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        # PyPy gets a TypeError instead of a csv.Error for "not a sequence"
        self.assertRaises((csv.Error, TypeError), self._write_test, None, '')
        self._write_test((), '')
        self._write_test([None], '""')
        self.assertRaises(csv.Error, self._write_test,
                          [None], None, quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise IOError
        self.assertRaises(IOError, self._write_test, BadList(), '')
        class BadItem:
            def __str__(self):
                raise IOError
        self.assertRaises(IOError, self._write_test, [BadItem()], '')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_quoting(self):
        class mydialect(csv.Dialect):
            delimiter = ";"
            escapechar = '\\'
            doublequote = False
            skipinitialspace = True
            lineterminator = '\r\n'
            quoting = csv.QUOTE_NONE
        d = mydialect()

        mydialect.quoting = None
        self.assertRaises(csv.Error, mydialect)

        mydialect.doublequote = True
        mydialect.quoting = csv.QUOTE_ALL
        mydialect.quotechar = '"'
        d = mydialect()

        mydialect.quotechar = "''"
        self.assertRaises(csv.Error, mydialect)

        mydialect.quotechar = 4
        self.assertRaises(csv.Error, mydialect)
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def get_group_data(self):
        """
        Parse group(5) formatted files and return tuples of group data in the
        form (groupname, group password, group id and a list of member
        usernames).
        """
        group_data = []
        group_file = open(self._group_file, "r")
        reader = csv.DictReader(group_file, fieldnames=self.group_fields,
                                delimiter=":", quoting=csv.QUOTE_NONE)
        current_line = 0
        for row in reader:
            current_line += 1
            # Skip if we find the NIS marker
            if (row["name"].startswith("+") or row["name"].startswith("-")):
                continue
            try:
                group_data.append((row["name"], row["passwd"], int(row["gid"]),
                                   row["members"].split(",")))
            except (AttributeError, ValueError):
                logging.warn("group file %s is incorrectly formatted: "
                             "line %d." % (self._group_file, current_line))
        group_file.close()
        return group_data
项目:nist_mni_pipelines    作者:vfonov    | 项目源码 | 文件源码
def generate_nonlinear_model_csv(input_csv, model=None, mask=None, work_prefix=None, options={},skip=0,stop_early=100000):
    internal_sample=[]

    with open(input_csv, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        for row in reader:
            internal_sample.append(MriDataset(scan=row[0],mask=row[1]))

    internal_model=None
    if model is not None:
        internal_model=MriDataset(scan=model,mask=mask)

    if work_prefix is not None and not os.path.exists(work_prefix):
        os.makedirs(work_prefix)

    return generate_nonlinear_average(internal_sample,internal_model,prefix=work_prefix,options=options,skip=skip,stop_early=stop_early)
项目:nist_mni_pipelines    作者:vfonov    | 项目源码 | 文件源码
def generate_linear_model_csv(input_csv,model=None,mask=None,work_prefix=None,options={}):
    internal_sample=[]

    with open(input_csv, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        for row in reader:
            internal_sample.append(MriDataset(scan=row[0],mask=row[1]))

    internal_model=None
    if model is not None:
        internal_model=MriDataset(scan=model,mask=mask)

    if work_prefix is not None and not os.path.exists(work_prefix):
        os.makedirs(work_prefix)

    return generate_linear_average(internal_sample,internal_model,prefix=work_prefix,options=options)

# kate: space-indent on; indent-width 4; indent-mode python;replace-tabs on;word-wrap-column 80;show-tabs on
项目:nist_mni_pipelines    作者:vfonov    | 项目源码 | 文件源码
def generate_ldd_model_csv(input_csv,model=None,mask=None,work_prefix=None,options={}):
    internal_sample=[]

    with open(input_csv, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        for row in reader:
            internal_sample.append(MriDataset(scan=row[0],mask=row[1]))

    internal_model=None
    if model is not None:
        internal_model=MriDataset(scan=model,mask=mask)

    if work_prefix is not None and not os.path.exists(work_prefix):
        os.makedirs(work_prefix)

    return generate_ldd_average(internal_sample,internal_model,
                                prefix=work_prefix,options=options)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        self._write_error_test(csv.Error, None)
        self._write_test((), '')
        self._write_test([None], '""')
        self._write_error_test(csv.Error, [None], quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise IOError
        self._write_error_test(IOError, BadList())
        class BadItem:
            def __str__(self):
                raise IOError
        self._write_error_test(IOError, [BadItem()])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self._write_error_test(csv.Error, ['a',1,'p,"q"'],
                               escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        self._write_error_test(csv.Error, None)
        self._write_test((), '')
        self._write_test([None], '""')
        self._write_error_test(csv.Error, [None], quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise IOError
        self._write_error_test(IOError, BadList())
        class BadItem:
            def __str__(self):
                raise IOError
        self._write_error_test(IOError, [BadItem()])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self._write_error_test(csv.Error, ['a',1,'p,"q"'],
                               escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:autoinjection    作者:ChengWiLL    | 项目源码 | 文件源码
def parse(self):
        import csv
        reader = csv.reader(
            self.stream, 
            delimiter = ',',
            quotechar = None,
            escapechar = None,
            doublequote = False,
            skipinitialspace = True,
            lineterminator = '\r\n',
            quoting = csv.QUOTE_NONE)
        it = iter(reader)
        row = reader.next()
        self.parse_header(row)
        for row in it:
            self.parse_row(row)

        # compute derived data
        self.profile.validate()
        self.profile.find_cycles()
        self.profile.ratio(TIME_RATIO, SAMPLES)
        self.profile.call_ratios(SAMPLES2)
        self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)

        return self.profile
项目:semspaces    作者:pmandera    | 项目源码 | 文件源码
def read_vectors(fin, dtype='float64', delim=' '):
        """Return a list with tuples (word, word_vector)."""
        reader = csv.reader(fin, delimiter=delim, quoting=csv.QUOTE_NONE)
        word_vectors = []

        ncol = None

        for row in reader:
            if ncol is None:
                if len(row) == 2:
                    ncol = int(row[1])
                    continue
                else:
                    ncol = len(row) - 1

            word = unicode(row[0], 'utf-8', errors='replace')

            word_vector = np.fromiter(
                [float(v) for v in row[1: ncol + 1]],
                dtype=dtype, count=ncol)

            word_vectors.append((word, word_vector))

        return word_vectors
项目:ISR    作者:bioliyezhang    | 项目源码 | 文件源码
def reader_gen(self,FILEPATH=os.getcwd()):
            if '/' in self.filename:
                complete_path=self.filename
            else:
                complete_path=FILEPATH + '/' + self.filename

            reader=csv.reader(open(complete_path,'rU'),delimiter=self.SEP_CHAR,quoting=csv.QUOTE_NONE)
            if self.AUTOSKIP_HEADER==True:
                ## this will over-write provided default
                skip_number=0
                row=reader.next()
                while row[0][0]=="@":
                        skip_number+=1
                        row=reader.next()
                self.SKIP_HEADER=skip_number

            reader=csv.reader(open(complete_path,'rU'),delimiter=self.SEP_CHAR,quoting=csv.QUOTE_NONE)
            for i in range(self.SKIP_HEADER):
                    reader.next()
            return reader
项目:ISR    作者:bioliyezhang    | 项目源码 | 文件源码
def ref_dict_gen(self,FILEPATH=os.getcwd()):
            ## this function output the reference chromosome into a dict
            if '/' in self.filename:
                complete_path=self.filename
            else:
                complete_path=FILEPATH + '/' + self.filename

            reader=csv.reader(open(complete_path,'rU'),delimiter=self.SEP_CHAR,quoting=csv.QUOTE_NONE)
            row=reader.next()
            ref_dict= dict()
            while row[0][0]=="@":
                if row[0][1:3]=="SQ":
                    ref_name=row[1][3:]
                    ref_length=int(row[2][3:])
                    ref_dict[ref_name]=ref_length
                row=reader.next()
            return ref_dict
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        self.assertRaises(csv.Error, self._write_test, None, '')
        self._write_test((), '')
        self._write_test([None], '""')
        self.assertRaises(csv.Error, self._write_test,
                          [None], None, quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise IOError
        self.assertRaises(IOError, self._write_test, BadList(), '')
        class BadItem:
            def __str__(self):
                raise IOError
        self.assertRaises(IOError, self._write_test, [BadItem()], '')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:leaf-classification    作者:MWransky    | 项目源码 | 文件源码
def write_results_to_file(species, ids, probs):
    print(probs.shape)
    # Make a path for our results to be saved to
    if not os.path.exists('results'):
        os.makedirs('results')
    print('Writing results to file')
    with open('results/results.csv', 'w') as f1:
        writer = csv.writer(f1, delimiter=' ', escapechar=' ', quoting=csv.QUOTE_NONE)
        header = 'id,' + ','.join(species)
        writer.writerow([header])
        for i in range(ids.shape[0]):
            row = probs[i]
            row = convert_list_of_ints_to_string(row)
            row = '{}'.format(str(int(ids[i]))) + row
            writer.writerow([row])
    print('Successfully wrote results to file')
项目:recon-ng    作者:Hehe-Zhc    | 项目源码 | 文件源码
def __parse_file(self):
        filename = self.options['filename']
        if not filename:
            raise IOError
        sep = self.options['column_separator']
        quote = self.options['quote_character']
        values = []

        with open(filename, 'rU') as infile:
            # if sep is not a one character string, csv.reader will raise a TypeError
            if not quote:
                csvreader = csv.reader(infile, delimiter=str(sep), quoting=csv.QUOTE_NONE)
            else:
                csvreader = csv.reader(infile, delimiter=str(sep), quotechar=str(quote))

            # get each line from the file and separate it into columns based on sep
            for row in csvreader:
                # append all lines as-is case-wise
                # unicode(str, errors='ignore') causes all invalid characters to be stripped out
                values.append([unicode(value.strip(), errors='ignore') for value in row])
                # ensure the number of columns in each row is the same as the previous row
                if len(values) > 1:
                    assert len(values[-1]) == len(values[-2])

        return values
项目:Eagle    作者:magerx    | 项目源码 | 文件源码
def parse(self):
        import csv
        reader = csv.reader(
            self.stream, 
            delimiter = ',',
            quotechar = None,
            escapechar = None,
            doublequote = False,
            skipinitialspace = True,
            lineterminator = '\r\n',
            quoting = csv.QUOTE_NONE)
        it = iter(reader)
        row = reader.next()
        self.parse_header(row)
        for row in it:
            self.parse_row(row)

        # compute derived data
        self.profile.validate()
        self.profile.find_cycles()
        self.profile.ratio(TIME_RATIO, SAMPLES)
        self.profile.call_ratios(SAMPLES2)
        self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)

        return self.profile
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        # PyPy gets a TypeError instead of a csv.Error for "not a sequence"
        self.assertRaises((csv.Error, TypeError), self._write_test, None, '')
        self._write_test((), '')
        self._write_test([None], '""')
        self.assertRaises(csv.Error, self._write_test,
                          [None], None, quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise IOError
        self.assertRaises(IOError, self._write_test, BadList(), '')
        class BadItem:
            def __str__(self):
                raise IOError
        self.assertRaises(IOError, self._write_test, [BadItem()], '')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        self._write_error_test(csv.Error, None)
        self._write_test((), '')
        self._write_test([None], '""')
        self._write_error_test(csv.Error, [None], quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise OSError
        self._write_error_test(OSError, BadList())
        class BadItem:
            def __str__(self):
                raise OSError
        self._write_error_test(OSError, [BadItem()])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self._write_error_test(csv.Error, ['a',1,'p,"q"'],
                               escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        self.assertRaises(csv.Error, self._write_test, None, '')
        self._write_test((), '')
        self._write_test([None], '""')
        self.assertRaises(csv.Error, self._write_test,
                          [None], None, quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise IOError
        self.assertRaises(IOError, self._write_test, BadList(), '')
        class BadItem:
            def __str__(self):
                raise IOError
        self.assertRaises(IOError, self._write_test, [BadItem()], '')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:Helix    作者:3lackrush    | 项目源码 | 文件源码
def parse(self):
        import csv
        reader = csv.reader(
            self.stream, 
            delimiter = ',',
            quotechar = None,
            escapechar = None,
            doublequote = False,
            skipinitialspace = True,
            lineterminator = '\r\n',
            quoting = csv.QUOTE_NONE)
        it = iter(reader)
        row = reader.next()
        self.parse_header(row)
        for row in it:
            self.parse_row(row)

        # compute derived data
        self.profile.validate()
        self.profile.find_cycles()
        self.profile.ratio(TIME_RATIO, SAMPLES)
        self.profile.call_ratios(SAMPLES2)
        self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)

        return self.profile
项目:rnn-sentiment-analysis    作者:kashizui    | 项目源码 | 文件源码
def glove2dict(src_filename):
    """GloVe Reader.

    Parameters
    ----------
    src_filename : str
        Full path to the GloVe file to be processed.

    Returns
    -------
    dict
        Mapping words to their GloVe vectors.

    """
    reader = csv.reader(open(src_filename), delimiter=' ',
                        quoting=csv.QUOTE_NONE)
    return {line[0]: np.array(list(map(float, line[1:]))) for line in reader}
项目:lstm-crf-ner    作者:qfzxhy    | 项目源码 | 文件源码
def buildMap(train_path):
    df_train = pd.read_csv(train_path,delimiter='\t',quoting=csv.QUOTE_NONE,skip_blank_lines=False,header=None,names=['word','label'])
    # print df_train
    # print df_train['word'][df_train['word'].notnull()]

    words = list(set(df_train['word'][df_train['word'].notnull()]))
    labels = list(set(df_train['label'][df_train['label'].notnull()]))

    word2id = dict(zip(words,range(1,len(words)+1)))
    label2id = dict(zip(labels,range(1,len(labels)+1)))

    id2word = dict(zip(range(1,len(words)+1),words))
    id2label = dict(zip(range(1, len(labels) + 1), labels))
    id2word[0] = "<PAD>"
    id2label[0] = "<PAD>"
    word2id["<PAD>"] = 0
    label2id["<PAD>"] = 0

    id2word[len(words)+1] = "<NEW>"
    id2label[len(labels)+1] = "<NEW>"
    word2id["<NEW>"] = len(words)+1
    label2id["<NEW>"] = len(labels)+1
    saveMap(id2word,id2label)
    return word2id,id2word,label2id,id2label
项目:lstm-crf-ner    作者:qfzxhy    | 项目源码 | 文件源码
def getTestData(test_path,seq_max_len,is_validation = True):
    word2id,id2word = loadMap('data/word2id')
    label2id,id2label = loadMap('data/label2id')
    #print word2id
    df_test = pd.read_csv(test_path,delimiter='\t',skip_blank_lines=False,header=None,quoting=csv.QUOTE_NONE,names=['word','label'])
    def mapfunc(x):
        if str(x) == str(np.nan):
            return -1
        elif x not in word2id:
            return word2id['<NEW>']
        else:
            return word2id[x]
    df_test['word_id'] = df_test.word.map(lambda x : mapfunc(x))
    df_test['label_id'] = df_test.label.map(lambda x : -1 if str(x) == str(np.nan) else label2id[x])
    if is_validation:
        X_test,y_test = prepare(df_test['word_id'],df_test['label_id'],seq_max_len)
        return X_test,y_test
    else:
        df_test['word'] = df_test.word.map(lambda x : -1 if str(x) == str(np.nan) else x)
        df_test['label'] = df_test.label.map(lambda x : -1 if str(x) == str(np.nan) else x)
        X_test,_ = prepare(df_test['word_id'],df_test['word_id'],seq_max_len)
        X_test_str,X_test_label_str = prepare(df_test['word'],df_test['label'],seq_max_len,is_padding=False)
        #print X_test_str
        return X_test,X_test_str,X_test_label_str
项目:Sentiment-Analysis-Twitter    作者:ayushoriginal    | 项目源码 | 文件源码
def newStats2CSV(files, out_file):

    arr = [ [] ] * len(files)

    for j in range( len(files)):
        values = []
        with open(files[j], 'r') as fp:
            for line in fp:
                values += [ float(w) for w in line.split()\
                                if  w[0] in ['0','1','2','3','4','5','6','7','8','9'] ]
        arr[j] = values

    with open(out_file, 'w') as fq:
        stats_writer = csv.writer( fq, delimiter=',', quotechar='\'')#, quoting=csv.QUOTE_NONE )
        for i in range(0,len(stats_tiltes)):
            row = [stats_tiltes[i]] + [arr[j][i] for j in range(len(files))]
            stats_writer.writerow( row )
项目:autoscan    作者:b01u    | 项目源码 | 文件源码
def parse(self):
        import csv
        reader = csv.reader(
            self.stream, 
            delimiter = ',',
            quotechar = None,
            escapechar = None,
            doublequote = False,
            skipinitialspace = True,
            lineterminator = '\r\n',
            quoting = csv.QUOTE_NONE)
        it = iter(reader)
        row = reader.next()
        self.parse_header(row)
        for row in it:
            self.parse_row(row)

        # compute derived data
        self.profile.validate()
        self.profile.find_cycles()
        self.profile.ratio(TIME_RATIO, SAMPLES)
        self.profile.call_ratios(SAMPLES2)
        self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)

        return self.profile
项目:recon-ng    作者:captainhooligan    | 项目源码 | 文件源码
def __parse_file(self):
        filename = self.options['filename']
        if not filename:
            raise IOError
        sep = self.options['column_separator']
        quote = self.options['quote_character']
        values = []

        with open(filename, 'rU') as infile:
            # if sep is not a one character string, csv.reader will raise a TypeError
            if not quote:
                csvreader = csv.reader(infile, delimiter=str(sep), quoting=csv.QUOTE_NONE)
            else:
                csvreader = csv.reader(infile, delimiter=str(sep), quotechar=str(quote))

            # get each line from the file and separate it into columns based on sep
            for row in csvreader:
                # append all lines as-is case-wise
                # unicode(str, errors='ignore') causes all invalid characters to be stripped out
                values.append([unicode(value.strip(), errors='ignore') for value in row])
                # ensure the number of columns in each row is the same as the previous row
                if len(values) > 1:
                    assert len(values[-1]) == len(values[-2])

        return values
项目:sequence-labeling    作者:chilynn    | 项目源码 | 文件源码
def buildMap(train_path="train.in"):
    df_train = pd.read_csv(train_path, delimiter='\t', quoting=csv.QUOTE_NONE, skip_blank_lines=False, header=None, names=["char", "label"])
    chars = list(set(df_train["char"][df_train["char"].notnull()]))
    labels = list(set(df_train["label"][df_train["label"].notnull()]))
    char2id = dict(zip(chars, range(1, len(chars) + 1)))
    label2id = dict(zip(labels, range(1, len(labels) + 1)))
    id2char = dict(zip(range(1, len(chars) + 1), chars))
    id2label =  dict(zip(range(1, len(labels) + 1), labels))
    id2char[0] = "<PAD>"
    id2label[0] = "<PAD>"
    char2id["<PAD>"] = 0
    label2id["<PAD>"] = 0
    id2char[len(chars) + 1] = "<NEW>"
    char2id["<NEW>"] = len(chars) + 1

    saveMap(id2char, id2label)

    return char2id, id2char, label2id, id2label
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_write_arg_valid(self):
        self.assertRaises(csv.Error, self._write_test, None, '')
        self._write_test((), '')
        self._write_test([None], '""')
        self.assertRaises(csv.Error, self._write_test,
                          [None], None, quoting = csv.QUOTE_NONE)
        # Check that exceptions are passed up the chain
        class BadList:
            def __len__(self):
                return 10;
            def __getitem__(self, i):
                if i > 2:
                    raise OSError
        self.assertRaises(OSError, self._write_test, BadList(), '')
        class BadItem:
            def __str__(self):
                raise OSError
        self.assertRaises(OSError, self._write_test, [BadItem()], '')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:BinaryNet.tf    作者:itayhubara    | 项目源码 | 文件源码
def __read_imagenet(path, shuffle=True, save_file = 'imagenet_files.csv'):
    if not os.path.exists(save_file):
        def class_index(fn):
            class_id = re.search(r'(n\d+)', fn).group(1)
            return synset_map[class_id]['index']

        file_list = glob.glob(path+'/*/*.JPEG')
        label_indexes = []
        with open(save_file, 'wb') as csv_file:
            wr = csv.writer(csv_file, quoting=csv.QUOTE_NONE)
            for f in file_list:
                idx = class_index(f)
                label_indexes.append(idx)
                wr.writerow([f, idx])

    with open(save_file, 'rb') as f:
        reader = csv.reader(f)
        file_list = list(reader)
    file_tuple, label_tuple = zip(*file_list)

    filename, labels = tf.train.slice_input_producer([list(file_tuple), list(label_tuple)], shuffle=shuffle)
    images = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
    images = tf.div(tf.add(tf.to_float(images), -127), 128)
    return images, tf.string_to_number(labels, tf.int32)
项目:FSquaDRA2    作者:zyrikby    | 项目源码 | 文件源码
def run(self):
        print "Starting thread to write results into the file %s..." % self._out_file_path
        with open(self._out_file_path, 'wt', buffering=1) as csvfile:
            #writer = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
            dict_writer = csv.DictWriter(csvfile, self._fieldnames)
            dict_writer.writeheader()
            while not self._exit.is_set():
                while True:
                    queue_item=None
                    try:
                        queue_item = self._queue.get(True, 5)
                    except Queue.Empty:
                        break
                    dict_writer.writerow(queue_item)
                    self._counter += 1
                    self._queue.task_done()
                    if self._counter % 10 == 0:
                        print "Analyzed %d pairs..." % self._counter

        print "Finishing file writer thread..."
项目:spectra-cluster-py    作者:spectra-cluster    | 项目源码 | 文件源码
def parse_msgfplus(filename, fdr):
    """
    Extracts the PSMs from a MSGF+ search result file.

    :param filename: Filename of the MSGF+ search result file (only text file supported)
    :param fdr: Target FDR as fractional (ie. 0.01 for 1%)
    :return: A list of PSM objects
    """
    msgfplus_results = list()
    with open(filename, newline="") as result_file:
        msgfplus_result_reader = csv.DictReader(result_file, delimiter="\t", quoting=csv.QUOTE_NONE)

        for msgfplus_psm in msgfplus_result_reader:
            # ignore all PSMs below the set FDR
            if float(msgfplus_psm['PepQValue']) > fdr:
                continue

            psm = Psm(int(msgfplus_psm["SpecID"][6:]), msgfplus_psm["Peptide"])
            msgfplus_results.append(psm)

    return msgfplus_results
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def write_genome_gtf(self, out_gtf_fn):
        with open(out_gtf_fn, 'wb') as f:
            writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar='')
            for genome_prefix, in_gtf_fn in itertools.izip(self.genome_prefixes, self.in_gtf_fns):
                if len(self.genomes) > 1:
                    prefix_func = lambda s: '%s_%s' % (genome_prefix, s)
                else:
                    prefix_func = lambda s: s

                transcript_to_chrom = {}
                cross_chrom_transcripts = set()
                for row, is_comment, properties in self.gtf_reader_iter(in_gtf_fn):
                    if is_comment:
                        writer.writerow(row)
                        continue

                    chrom = prefix_func(row[0])
                    row[0] = chrom

                    if 'transcript_id' in properties:
                        properties['transcript_id'] = prefix_func(properties['transcript_id'])
                        curr_tx = properties['transcript_id']
                        if curr_tx in transcript_to_chrom and transcript_to_chrom[curr_tx] != chrom:
                            # ignore recurrences of a transcript on different chromosomes - it will break the STAR index
                            cross_chrom_transcripts.add(curr_tx)
                            continue
                        transcript_to_chrom[curr_tx] = chrom
                    if 'gene_id' in properties:
                        properties['gene_id'] = prefix_func(properties['gene_id'])
                    if 'gene_name' in properties:
                        properties['gene_name'] = prefix_func(properties['gene_name'])

                    row[8] = self.format_properties_dict(properties)

                    writer.writerow(row)
                print "WARNING: The following transcripts appear on multiple chromosomes in the GTF:"
                print '\n'.join(list(cross_chrom_transcripts)) + '\n'
                print "This can indicate a problem with the reference or annotations. Only the first chromosome will be counted."
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def parse(self):
        import csv
        reader = csv.reader(
            self.stream, 
            delimiter = ',',
            quotechar = None,
            escapechar = None,
            doublequote = False,
            skipinitialspace = True,
            lineterminator = '\r\n',
            quoting = csv.QUOTE_NONE)
        header = True
        for row in reader:
            if header:
                self.parse_header(row)
                header = False
            else:
                self.parse_row(row)

        # compute derived data
        self.profile.validate()
        self.profile.find_cycles()
        self.profile.ratio(TIME_RATIO, SAMPLES)
        self.profile.call_ratios(SAMPLES2)
        self.profile.integrate(TOTAL_TIME_RATIO, TIME_RATIO)

        return self.profile
项目:geomdn    作者:afshinrahimi    | 项目源码 | 文件源码
def load_data(self):
        logging.info('loading the dataset from %s' %self.data_home)
        train_file = os.path.join(self.data_home, 'user_info.train.gz')
        dev_file = os.path.join(self.data_home, 'user_info.dev.gz')
        test_file = os.path.join(self.data_home, 'user_info.test.gz')

        df_train = pd.read_csv(train_file, delimiter='\t', encoding=self.encoding, names=['user', 'lat', 'lon', 'text'], quoting=csv.QUOTE_NONE, error_bad_lines=False)
        df_dev = pd.read_csv(dev_file, delimiter='\t', encoding=self.encoding, names=['user', 'lat', 'lon', 'text'], quoting=csv.QUOTE_NONE, error_bad_lines=False)
        df_test = pd.read_csv(test_file, delimiter='\t', encoding=self.encoding, names=['user', 'lat', 'lon', 'text'], quoting=csv.QUOTE_NONE, error_bad_lines=False)
        df_train.dropna(inplace=True)
        df_dev.dropna(inplace=True)
        df_test.dropna(inplace=True)
        df_train['user'] = df_train['user'].apply(lambda x: str(x).lower())
        df_train.drop_duplicates(['user'], inplace=True, keep='last')
        df_train.set_index(['user'], drop=True, append=False, inplace=True)
        df_train.sort_index(inplace=True)
        df_dev['user'] = df_dev['user'].apply(lambda x: str(x).lower())
        df_dev.drop_duplicates(['user'], inplace=True, keep='last')
        df_dev.set_index(['user'], drop=True, append=False, inplace=True)
        df_dev.sort_index(inplace=True)
        df_test['user'] = df_test['user'].apply(lambda x: str(x).lower())
        df_test.drop_duplicates(['user'], inplace=True, keep='last')
        df_test.set_index(['user'], drop=True, append=False, inplace=True)
        df_test.sort_index(inplace=True)
        self.df_train = df_train
        self.df_dev = df_dev
        self.df_test = df_test
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def loadCSV(csvfile):
        """ Load the results
        Retuns: ID list of ints, ROI (array) of strings
        """
        # get information about the slices first
        vcnt=5  # we expect 5 error measures
        values = {}
        with open(csvfile, 'rb') as csvfile:
                reader = csv.reader(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
                values = {}
                cnt = 0
                for row in reader:
                        if(reader.line_num == 1): 
                            apps=[]
                            for i in range(len(row)):
                                if row[i] != '': apps.append(row[i])
                            apps = apps[1:]
                        elif(reader.line_num == 2): 
                            labels=row[1:vcnt+1]
                        elif(reader.line_num > 2): 
                            d={}
                            for i in range(len(apps)):
                                d[apps[i]]=row[1+vcnt*i: 1+vcnt*(i+1)]
                            values[row[0]] = d
                        cnt +=1
                        #if cnt > 100: break
        return apps, labels, values