Python csv 模块,reader() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.reader()

项目:pubchem-ranker    作者:jacobwindsor    | 项目源码 | 文件源码
def harvest(self, limit=None, offset=None):
        """

        Harvest the data from the file
        :param offset: Integer offset for the row - starts from 0
        :param limit: Interger limit of the rows to iterate over - starts from 0
        :return: list of tuples containing CAS number and IUPAC name
        """
        response = []
        for i, row in enumerate(list(self.reader)[offset:]):
            if limit:
                if i == limit:
                    break

            cas = row[0].split(' ', 1)[0]
            cut_start_iupac = str(row[0].split('(', 1)[1])
            iupac = cut_start_iupac.rsplit(')', 1)[0]

            response.append({
                "CAS": cas,
                "IUPAC": iupac
            })

        return response
项目:AFSCbot    作者:HadManySons    | 项目源码 | 文件源码
def add_afsc(dict, fname):
    """
    Add AFSCs from given filename into the dictionary.
    :param dict: empty dictionary
    :param fname: CSV file using '#' as delimiter
    """
    with open(CSV_FOLDER + fname, newline='') as f:
        reader = csv.reader(f, delimiter='#')
        for row in reader:
            base_afsc = row[0]
            job_title = row[1]
            afsc_dict = {"base_afsc": base_afsc,
                         "job_title": job_title,
                         "shreds": {},
                         "link": ""}
            dict[base_afsc] = afsc_dict
项目:AFSCbot    作者:HadManySons    | 项目源码 | 文件源码
def add_shreds(afsc_dict, fname):
    """
    Add shreds from given filename into the dictionary.
    :param dict: either enlisted_dict or officer_dict
    :param fname: CSV file using '#' as delimiter
    """
    with open(CSV_FOLDER + fname, newline='') as f:
        reader = csv.reader(f, delimiter=',')
        for row in reader:
            base_afsc = row[0]
            shred_char = row[1]
            shred_title = row[2]
            # if shred AFSC not in base afsc, skip it
            try:
                afsc_dict[base_afsc]["shreds"][shred_char] = shred_title
            except KeyError:
                pass
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def process_desired_state(server_state, desired_state):
    """
    Processes the user provided input file and determines any actions that need to happen because the server state is different than the desired state
    :param server_state: The list of all users and their associated groups in the server
    :param desired_state: The input provided by the user
    :return: None
    """
    group_id =[]
    with open(desired_state) as csvDataFile:
        csvReader = csv.reader(csvDataFile)
        next(csvReader, None)
        for email, lastname, firstname, groups in csvReader:
            group_list = []
            if groups != "":
                group_list = groups.split(",")
                group_list = [group.strip(' ') for group in group_list]
            if email not in [user for user in server_state]:
                actions[email] = {'action': 'add', 'groups': group_list, 'user_id':'', 'user_data': {'firstname': firstname, 'lastname': lastname, 'email': email, 'reset_password_required': True} }
            else:
                group_names_server = server_state[email]['groups'][0::2]
                group_names_desired = groups.split(',')
                group_names_desired = [group.strip(' ') for group in group_names_desired]
                group_diff = [i for i in group_names_desired if i not in group_names_server]
                if group_diff != [] and group_diff != ['']:
                    actions[email] = {'action': 'add to group', 'groups': group_diff, 'user_id': server_state[email]['user_id'] }
项目:Causality    作者:vcla    | 项目源码 | 文件源码
def extractRows(fileName):
    fileName = 'results/cvpr_db_results/'+fileName+'.csv'
    with open(fileName, 'r') as csvfile:
        lines = csv.reader(csvfile)
        for row in lines:
            if row[0] != "name":
                retval = [int(x) if x != "" else -100 for x in row[1:-2] ]
            else:
                nameRow = row[1:-2]
            if row[0] == "causalgrammar":
                causalRow = retval
            elif row[0] == "origdata":
                origRow =  retval
            elif row[0] == "random":
                randomRow =  retval
        return {"nameRow": nameRow, "causalRow": causalRow, "origRow": origRow, "randomRow": randomRow}
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:MKLMM    作者:omerwe    | 项目源码 | 文件源码
def loadData(bfile, extractSim, phenoFile, missingPhenotype='-9', loadSNPs=False, standardize=True):
    bed = Bed(bfile)

    if (extractSim is not None):
        f = open(extractSim)
        csvReader = csv.reader(f)
        extractSnpsSet = set([])
        for l in csvReader: extractSnpsSet.add(l[0])            
        f.close()       
        keepSnpsInds = [i for i in xrange(bed.sid.shape[0]) if bed.sid[i] in extractSnpsSet]        
        bed = bed[:, keepSnpsInds]

    phe = None
    if (phenoFile is not None): bed, phe = loadPheno(bed, phenoFile, missingPhenotype)

    if (loadSNPs):
        bed = bed.read()
        if (standardize): bed = bed.standardize()   

    return bed, phe
项目:GeoInfoMiner    作者:jingge326    | 项目源码 | 文件源码
def prepare_data(imagery_path, train_file_path, split_points):

    # Read tiff image
    image_tuple = read_image(imagery_path)

    # Read samples
    original_data_list = []
    csv_reader = csv.reader(open(train_file_path, encoding='utf-8'))
    for row in csv_reader:
        original_data_list.append(row)
    original_data_array = np.array(original_data_list)

    # Split training data into variables and lables 
    x_s = original_data_array[:,split_points[0]:split_points[1]] 
    y_s = original_data_array[:,split_points[1]]

    return x_s, y_s, image_tuple


# Read image
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def read_csv_rows(path):
    """
    Extract the rows from the CSV at the specified path.
    Will throw an error if the file doesn't exist.

    :type path: string
    :rtype: list[list[string]]
    """
    with open(path, 'rU') as infile:
        reader = csv.reader(infile, delimiter=',')
        rows = [row for row in reader]
        # eliminate trailing cols that have no entries (CSI-215)
        for idx, row in enumerate(rows):
            clipIndex = 0
            for col in row[::-1]:
                if not col:
                    clipIndex -= 1
                else:
                    break
            if clipIndex < 0:
                rows[idx] = rows[idx][:clipIndex]
    return rows
项目:Renewables_Scenario_Gen_GAN    作者:chennnnnyize    | 项目源码 | 文件源码
def load_solar_data():
    with open('solar label.csv', 'r') as csvfile:
        reader = csv.reader(csvfile)
        rows = [row for row in reader]
    labels = np.array(rows, dtype=int)
    print(shape(labels))

    with open('solar.csv', 'r') as csvfile:
        reader = csv.reader(csvfile)
        rows = [row for row in reader]
    rows = np.array(rows, dtype=float)
    rows=rows[:104832,:]
    print(shape(rows))
    trX = np.reshape(rows.T,(-1,576))
    print(shape(trX))
    m = np.ndarray.max(rows)
    print("maximum value of solar power", m)
    trY=np.tile(labels,(32,1))
    trX=trX/m
    return trX,trY
项目:chash    作者:luhsra    | 项目源码 | 文件源码
def read_from_csv(filename, column_names):
    data = []
    with open(filename) as csv_file:
        reader = csv.reader(csv_file)
        is_header_row = True
        for row in reader:
            if is_header_row:
                for col in row:
                    data.append([])
                is_header_row = False
            else:
                colnum = 0
                for col in row:
                    data[colnum].append(float(col))
                    colnum += 1
    return data
项目:chash    作者:luhsra    | 项目源码 | 文件源码
def read_from_csv(filename, column_names):
    data = []
    with open(filename) as csv_file:
        reader = csv.reader(csv_file)
        is_header_row = True
        for row in reader:
            if is_header_row:
                for col in row:
                    data.append([])
                is_header_row = False
            else:
                colnum = 0
                for col in row:
                    data[colnum].append(float(col))
                    colnum += 1
    return data
项目:Jumper-Cogs    作者:Redjumpman    | 项目源码 | 文件源码
def collect_moves(self, reader, name):
        Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
        if name.split('-')[-1].isdigit():
            for row in reader:
                if name == row[0]:
                    pokemon = name.split('-')[0].title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
        else:
            for row in reader:
                if name in row[0]:
                    pokemon = name.title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def _records_protocol_v1(self, ifile):

        reader = csv.reader(ifile, dialect=CsvDialect)

        try:
            fieldnames = reader.next()
        except StopIteration:
            return

        mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')}

        if len(mv_fieldnames) == 0:
            for values in reader:
                yield OrderedDict(izip(fieldnames, values))
            return

        for values in reader:
            record = OrderedDict()
            for fieldname, value in izip(fieldnames, values):
                if fieldname.startswith('__mv_'):
                    if len(value) > 0:
                        record[mv_fieldnames[fieldname]] = self._decode_list(value)
                elif fieldname not in record:
                    record[fieldname] = value
            yield record
项目:bayestsa    作者:thalesians    | 项目源码 | 文件源码
def main():
    indexfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-index.txt'
    chainfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-for-chain-1.txt'

    index = readindexfile(indexfilepath)
    print(index)

    data = []
    indexoffset = None
    with open(chainfilepath, 'rt') as chainfile:
        reader = csv.reader(chainfile, delimiter='\t')
        for row in reader:
            index, value = int(row[0]), float(row[1])
            if not data: indexoffset = index
            print(index, indexoffset)
            assert index == len(data) + indexoffset
            data.append(value)
    print(data)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        if headers.get('Content-Type') != 'application/json':
            logger.debug('Unexpected response for JSON request')
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def _records_protocol_v1(self, ifile):

        reader = csv.reader(ifile, dialect=CsvDialect)

        try:
            fieldnames = reader.next()
        except StopIteration:
            return

        mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')}

        if len(mv_fieldnames) == 0:
            for values in reader:
                yield OrderedDict(izip(fieldnames, values))
            return

        for values in reader:
            record = OrderedDict()
            for fieldname, value in izip(fieldnames, values):
                if fieldname.startswith('__mv_'):
                    if len(value) > 0:
                        record[mv_fieldnames[fieldname]] = self._decode_list(value)
                elif fieldname not in record:
                    record[fieldname] = value
            yield record
项目:kaggle-spark-ml    作者:imgoodman    | 项目源码 | 文件源码
def loadRecord(line):
    """
    ????csv??
    """
    input_line=StringIO.StringIO(line)
    #row=unicodecsv.reader(input_line, encoding="utf-8")
    #return row.next()
    #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
    reader=csv.reader(input_line)
    return reader.next()
    #data=[]
    #for row in reader:
    #    print row
    #    data.append([unicode(cell,"utf-8") for cell in row])
    #return data[0]
    #return reader.next()

#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
项目:Harmonbot    作者:Harmon758    | 项目源码 | 文件源码
def __init__(self, bot):
        self.bot = bot
        # Add commands as random subcommands
        for name, command in inspect.getmembers(self):
            if isinstance(command, commands.Command) and command.parent is None and name != "random":
                self.bot.add_command(command)
                self.random.add_command(command)
        # Add fact subcommands as subcommands of corresponding commands
        for command, parent in ((self.fact_cat, self.cat), (self.fact_date, self.date), (self.fact_number, self.number)):
            utilities.add_as_subcommand(self, command, parent, "fact")
        # Add random subcommands as subcommands of corresponding commands
        self.random_subcommands = ((self.color, "Resources.color"), (self.giphy, "Resources.giphy"), (self.map, "Resources.map"), (self.streetview, "Resources.streetview"), (self.uesp, "Search.uesp"), (self.wikipedia, "Search.wikipedia"), (self.xkcd, "Resources.xkcd"))
        for command, parent_name in self.random_subcommands:
            utilities.add_as_subcommand(self, command, parent_name, "random")
        # Import jokes
        self.jokes = []
        try:
            with open("data/jokes.csv", newline = "") as jokes_file:
                jokes_reader = csv.reader(jokes_file)
                for row in jokes_reader:
                    self.jokes.append(row[0])
        except FileNotFoundError:
            pass
项目:piperine    作者:DNA-and-Natural-Algorithms-Group    | 项目源码 | 文件源码
def main():
    print()

    csvnames = sys.argv[1:]

    columns_named=False
    scores = []
    for csvname in sys.argv[1:] :
        print("Reading scores from "+csvname+" .")

        with open(csvname, 'r') as csvfile:
            score_reader = csv.reader(csvfile)
            for row in score_reader:
                print(', '.join(row))
                if 'Winner' in row[0]:
                    continue
                if 'Index' in row[0] and columns_named:
                    continue
                else:
                    scores.append(row)
                    columns_named=True
            csvfile.close()

    winner = metarank(scores)
项目:code-uai16    作者:thanhan    | 项目源码 | 文件源码
def read_turk_dic_proton():
    """
    dic of abs_id to answers to (wid, q3,q4) for each worker
    """
    f = open("data/proton-beam-RawTurkResults.csv")

    first_line = f.readline()

    csv_reader = csv.reader(f)

    turk_dic = {}
    for row in csv_reader:        
        (AssignmentId, WorkerId, HITId, AcceptTime, SubmitTime, ApprovalTime, TimeToComplete, AbstractId, Question1, Question2, Question3, Question4, Relevant) = tuple(row)
        AbstractId = int(AbstractId)
        if AbstractId not in turk_dic: turk_dic[AbstractId] = []
        turk_dic[AbstractId].append( (Question3, Question4) )

    return turk_dic
项目:code-uai16    作者:thanhan    | 项目源码 | 文件源码
def get_pub_dic_csv(dataset):
    filename = "data/" + dataset + "-text.csv"
    f = open(filename)
    f.readline()
    csv_reader = csv.reader(f)

    # Create dic of : id -> text features
    pub_dic = {}

    for row in csv_reader:
        if dataset.startswith("RCT"):
            (abstract_id, abstract, title) = tuple(row)[0:3]
        else:
            (abstract_id, title, publisher, abstract) = tuple(row)[0:4]

        abstract_id = int(abstract_id)
        text = title + abstract

        pub_dic[abstract_id] = text

    return pub_dic
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def load(corpus_csv_file: Path,
             sampled_training_example_count: Optional[int] = None) -> 'Corpus':
        import csv
        with corpus_csv_file.open(encoding='utf8') as opened_csv:
            reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            def to_absolute(audio_file_path: Path) -> Path:
                return audio_file_path if audio_file_path.is_absolute() else Path(
                    corpus_csv_file.parent) / audio_file_path

            examples = [
                (
                    LabeledExampleFromFile(
                        audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
                        positional_label=None if positional_label == "" else PositionalLabel.deserialize(
                            positional_label)), Phase[phase])
                for id, audio_file_path, label, phase, positional_label in reader]

            return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
                          test_examples=[e for e, phase in examples if phase == Phase.test],
                          sampled_training_example_count=sampled_training_example_count)
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_UnicodeWriter(self):
        """Test UnicodeWriter class works."""
        tmp = tempfile.NamedTemporaryFile()
        uw = util.UnicodeWriter(tmp)
        fake_csv = ['one, two, three, {"i": 1}']
        for row in csv.reader(fake_csv):
            # change it for a dict
            row[3] = dict(i=1)
            uw.writerow(row)
        tmp.seek(0)
        err_msg = "It should be the same CSV content"
        with open(tmp.name, 'rb') as f:
            reader = csv.reader(f)
            for row in reader:
                for item in row:
                    assert item in fake_csv[0], err_msg
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def uninstallation_paths(dist):
    """
    Yield all the uninstallation paths for dist based on RECORD-without-.pyc

    Yield paths to all the files in RECORD. For each .py file in RECORD, add
    the .pyc in the same directory.

    UninstallPathSet.add() takes care of the __pycache__ .pyc.
    """
    from pip.utils import FakeFile  # circular import
    r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD')))
    for row in r:
        path = os.path.join(dist.location, row[0])
        yield path
        if path.endswith('.py'):
            dn, fn = os.path.split(path)
            base = fn[:-3]
            path = os.path.join(dn, base + '.pyc')
            yield path
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def uninstallation_paths(dist):
    """
    Yield all the uninstallation paths for dist based on RECORD-without-.pyc

    Yield paths to all the files in RECORD. For each .py file in RECORD, add
    the .pyc in the same directory.

    UninstallPathSet.add() takes care of the __pycache__ .pyc.
    """
    from pip.utils import FakeFile  # circular import
    r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD')))
    for row in r:
        path = os.path.join(dist.location, row[0])
        yield path
        if path.endswith('.py'):
            dn, fn = os.path.split(path)
            base = fn[:-3]
            path = os.path.join(dn, base + '.pyc')
            yield path
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def uninstallation_paths(dist):
    """
    Yield all the uninstallation paths for dist based on RECORD-without-.pyc

    Yield paths to all the files in RECORD. For each .py file in RECORD, add
    the .pyc in the same directory.

    UninstallPathSet.add() takes care of the __pycache__ .pyc.
    """
    from pip.utils import FakeFile  # circular import
    r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD')))
    for row in r:
        path = os.path.join(dist.location, row[0])
        yield path
        if path.endswith('.py'):
            dn, fn = os.path.split(path)
            base = fn[:-3]
            path = os.path.join(dn, base + '.pyc')
            yield path
项目:esmcheckds2    作者:andywalden    | 项目源码 | 文件源码
def _insert_zone_names(self):
        """
        Args:
            _zonetree (str): set in __init__

        Returns:
            List of dicts (str: str) devices by zone
        """
        self._zone_name = None
        self._zonetree_io = StringIO(self._zonetree)
        self._zonetree_csv = csv.reader(self._zonetree_io, delimiter=',')
        self._zonetree_lod = []

        for self._row in self._zonetree_csv:
            if self._row[0] == '1':
                self._zone_name = self._row[1]
                if self._zone_name == 'Undefined':
                    self._zone_name = ''
                continue
            for self._dev in self._devtree:
                if self._dev['ds_id'] == self._row[2]:
                    self._dev['zone_name'] = self._zone_name
        return self._devtree
项目:cisco-meraki-fw-ap-rules-api    作者:robertcsapo    | 项目源码 | 文件源码
def meraki_ap_fw_rules_import_csv(filename):
    fw_ap_import_csv_list = []
    try:
        with open(filename) as csvfile:
            fw_ap_import_csv = csv.reader(csvfile, delimiter=',')
            key=0
            for item in fw_ap_import_csv:
                if key != 0:
                    print(item)
                    fw_ap_import_csv_list.append(item)
                    if len(item) < 5:
                        key +=1
                        print("")
                        sys.exit("- Error: Row %s isn't correct, exiting... -" % key)
                key +=1
        return fw_ap_import_csv_list
    except FileNotFoundError:
        sys.exit("- Error: CSV file not found %s, exiting... -" % filename)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
项目:mlhi-ass2-anonymization    作者:cassinius    | 项目源码 | 文件源码
def readAdults(file_name):
    adult_file = open(file_name, 'r')
    adults_csv = csv.reader(adult_file, delimiter=',')

    # ignore the headers
    next(adults_csv, None)

    # create the dict we need
    adults = {}

    for adult_idx in adults_csv:
        adult = adults[int(adult_idx[0])] = {}
        adult['age'] = int(adult_idx[1])
        adult['workclass'] = adult_idx[2]
        adult['native-country'] = adult_idx[3]
        adult['sex'] = adult_idx[4]
        adult['race'] = adult_idx[5]
        adult['marital-status'] = adult_idx[6]

    adult_file.close()
    return adults
项目:sqlakeyset    作者:djrobstep    | 项目源码 | 文件源码
def test_serial():
    assert s.serialize_value(None) == 'x'
    assert s.serialize_value(True) == 'true'
    assert s.serialize_value(False) == 'false'
    assert s.serialize_value(5) == 'i:5'
    assert s.serialize_value(5.0) == 'f:5.0'
    assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5'
    assert s.serialize_value('abc') == 's:abc'
    assert s.serialize_value(b'abc') == 'b:YWJj'
    assert s.serialize_value(b'abc') == 'b:YWJj'
    assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05'
    assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \
        == 'dt:2007-12-05 12:30:30+00:00'
    assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56'
    with raises(NotImplementedError):
        s.serialize_value(csv.reader)
项目:eurovision-country-selector    作者:mikejarrett    | 项目源码 | 文件源码
def get_countries_from_csv(filepath):
    """ Process a csv countaining country data.

    Args:
        filepath (str): A path to a csv file.

    Returns:
        list: sanitized country names as strings.

    Raises:
        OSError: If file does not exist at the filepath.
    """
    if not path.exists(filepath):
        raise OSError('Path to file: "{}" does not exist.'.format(filepath))

    with open(filepath, 'r') as csv_file:
        reader = csv.reader(csv_file)
        return [sanitize_string(''.join(row)) for row in reader]
项目:eurovision-country-selector    作者:mikejarrett    | 项目源码 | 文件源码
def build_list_of_people_from_csv(filename, countries):
    people = []
    with open(filename, 'r') as opened_file:
        reader = csv.reader(opened_file)
        for row in reader:
            excluded_countries = []
            if len(row) > 1:
                excluded_countries = [
                    sanitize_string(name)
                    for name in row[1].split(',')
                ]
            people.append(
                Person(
                    name=row[0],
                    countries=countries,
                    excluded_countries=excluded_countries
                )
            )

    return people