Python collections 模块,namedtuple() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用collections.namedtuple()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def connection_from_pool_key(self, pool_key):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            pool = self._new_pool(pool_key.scheme, pool_key.host, pool_key.port)
            self.pools[pool_key] = pool

        return pool
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def driver_version(self):
        """
        collections.namedtuple: Indicates the major, minor and update
            portions of the installed version of NI-DAQmx.

            - major_version (int): Indicates the major portion of the
              installed version of NI-DAQmx, such as 7 for version 7.0.
            - minor_version (int): Indicates the minor portion of the
              installed version of NI-DAQmx, such as 0 for version 7.0.
            - update_version (int): Indicates the update portion of the
              installed version of NI-DAQmx, such as 1 for version 9.0.1.
        """
        DriverVersion = collections.namedtuple(
            'DriverVersion', ['major_version', 'minor_version',
                              'update_version'])

        return DriverVersion(self._major_version, self._minor_version,
                             self._update_version)
项目:quizbot-2017    作者:pycontw    | 项目源码 | 文件源码
def generate_leaders(*, leader_factory=None):
    """Creates a generator for the leaderboard.

    Generates a 4-tuple containing the ranking, score, user object, and
    registration information of the leader. The result is a namedtuple.
    """
    if leader_factory is None:
        leader_factory = Leader
    cursor = db.get_cursor()
    cursor.execute("""
        SELECT "serial", "score"
        FROM "user"
        ORDER BY "score" DESC
    """)

    for ranking, (serial, score) in enumerate(cursor, 1):
        user = User(serial=serial)
        registration = registrations.get_registration(serial=serial)
        yield leader_factory(
            ranking=ranking, score=score,
            user=user, registration=registration,
        )
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def connection_from_pool_key(self, pool_key, request_context=None):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            scheme = request_context['scheme']
            host = request_context['host']
            port = request_context['port']
            pool = self._new_pool(scheme, host, port, request_context=request_context)
            self.pools[pool_key] = pool

        return pool
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def connection_from_pool_key(self, pool_key):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            pool = self._new_pool(pool_key.scheme, pool_key.host, pool_key.port)
            self.pools[pool_key] = pool

        return pool
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def parse_argspec(callable_):
        """
        Takes a callable and returns a tuple with the list of Argument objects,
        the name of *args, and the name of **kwargs.
        If *args or **kwargs is not present, it will be None.
        This returns a namedtuple called Argspec that has three fields named:
        args, starargs, and kwargs.
        """
        args, varargs, keywords, defaults = inspect.getargspec(callable_)
        defaults = list(defaults or [])

        if getattr(callable_, '__self__', None) is not None:
            # This is a bound method, drop the self param.
            args = args[1:]

        first_default = len(args) - len(defaults)
        return Argspec(
            [Argument(arg, Argument.no_default
                      if n < first_default else defaults[n - first_default])
             for n, arg in enumerate(args)],
            varargs,
            keywords,
        )
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_checks_should_trigger(self):
        class CountingRule(Always):
            count = 0

            def should_trigger(self, dt, env):
                CountingRule.count += 1
                return True

        for r in [CountingRule] * 5:
                self.em.add_event(
                    Event(r(), lambda context, data: None)
                )

        mock_algo_class = namedtuple('FakeAlgo', ['trading_environment'])
        mock_algo = mock_algo_class(trading_environment="fake_env")
        self.em.handle_data(mock_algo, None, datetime.datetime.now())

        self.assertEqual(CountingRule.count, 5)
项目:okta-awscli    作者:jmhale    | 项目源码 | 文件源码
def choose_aws_role(assertion):
        """ Choose AWS role from SAML assertion """
        aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
        attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
        roles = []
        role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
        root = ET.fromstring(base64.b64decode(assertion))
        for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
            if saml2attribute.get('Name') == aws_attribute_role:
                for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
                    roles.append(role_tuple(*saml2attributevalue.text.split(',')))

        for index, role in enumerate(roles):
            role_name = role.role_arn.split('/')[1]
            print("%d: %s" % (index+1, role_name))
        role_choice = input('Please select the AWS role: ')-1
        return roles[role_choice]
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def create_legacy_kmeans_nodes(f, new_group_name, legacy_group_name, namedtuple, clustering_key):
    """ Soft-link a legacy-structured (CR 1.2) kmeans subgroup (dest) to a new-style (CR 1.3) subgroup (src).
        The old-style was a group called 'kmeans' with subgroups named _K.
        The new-style is a group called 'clustering' with subgroups named kmeans_K_clusters, etc. """
    group = f.create_group(f.root, legacy_group_name)

    cluster_type, cluster_param = parse_clustering_key(clustering_key)
    if cluster_type != CLUSTER_TYPE_KMEANS:
        return

    legacy_key = format_legacy_clustering_key(cluster_type, cluster_param)
    subgroup = f.create_group(group, legacy_key)
    for field in namedtuple._fields:
        target = '/%s/_%s/%s' % (new_group_name, clustering_key, field)

        if f.__contains__(target):
            # NOTE: coerce `target` to 'str' here because pytables chokes on unicode `target`
            f.create_soft_link(subgroup, field, target=str(target))
        else:
            sys.stderr.write('Skipped soft-link of legacy dataset to %s; node doesn\'t exist\n' % target)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def refresh_manager(self, fake_cluster, fake_database):
        refresh_manager = FullRefreshManager()
        refresh_manager.options = namedtuple(
            'Options',
            ['cluster', 'database', 'config_path', 'dry_run', 'verbose',
             'per_source_throughput_cap', 'total_throughput_cap']
        )(
            fake_cluster,
            fake_database,
            self.config_path,
            True,
            0,
            DEFAULT_CAP,
            1000
        )
        refresh_manager._init_global_state()
        return refresh_manager
项目:benchmarks    作者:tensorflow    | 项目源码 | 文件源码
def make_params(**kwargs):
  """Create a Params tuple for BenchmarkCNN from kwargs.

  Default values are filled in from _DEFAULT_PARAMS.

  Args:
    **kwargs: kwarg values will override the default values.
  Returns:
    Params namedtuple for constructing BenchmarkCNN.
  """
  # Create a (name: default_value) map from PARAMS.
  default_kwargs = {
      name: _DEFAULT_PARAMS[name].default_value
      for name in _DEFAULT_PARAMS
  }
  return Params(**default_kwargs)._replace(**kwargs)
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def connection_from_pool_key(self, pool_key, request_context=None):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            scheme = request_context['scheme']
            host = request_context['host']
            port = request_context['port']
            pool = self._new_pool(scheme, host, port, request_context=request_context)
            self.pools[pool_key] = pool

        return pool
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def lookup(tag):
    """
    :param tag: Integer tag number
    :returns: Taginfo namedtuple, From the TAGS_V2 info if possible,
        otherwise just populating the value and name from TAGS.
        If the tag is not recognized, "unknown" is returned for the name

    """

    return TAGS_V2.get(tag, TagInfo(tag, TAGS.get(tag, 'unknown')))


##
# Map tag numbers to tag info.
#
#  id: (Name, Type, Length, enum_values)
#
项目:Jumper-Cogs    作者:Redjumpman    | 项目源码 | 文件源码
def collect_moves(self, reader, name):
        Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
        if name.split('-')[-1].isdigit():
            for row in reader:
                if name == row[0]:
                    pokemon = name.split('-')[0].title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
        else:
            for row in reader:
                if name in row[0]:
                    pokemon = name.title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_non_frozen_udts(self):
        """
        Test to ensure that non frozen udt's work with C* >3.6.

        @since 3.7.0
        @jira_ticket PYTHON-498
        @expected_result Non frozen UDT's are supported

        @test_category data_types, udt
        """
        self.session.execute("USE {0}".format(self.keyspace_name))
        self.session.execute("CREATE TYPE user (state text, has_corn boolean)")
        self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name))
        User = namedtuple('user', ('state', 'has_corn'))
        self.cluster.register_user_type(self.keyspace_name, "user", User)
        self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True)))
        self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name))
        result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name))
        self.assertFalse(result[0].b.has_corn)
        table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query()
        self.assertNotIn("<frozen>", table_sql)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_raise_error_on_nonexisting_udts(self):
        """
        Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace
        """

        c = Cluster(protocol_version=PROTOCOL_VERSION)
        s = c.connect(self.keyspace_name, wait_for_all_pools=True)
        User = namedtuple('user', ('age', 'name'))

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("some_bad_keyspace", "user", User)

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("system", "user", User)

        with self.assertRaises(InvalidRequest):
            s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")

        c.shutdown()
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def connection_from_pool_key(self, pool_key, request_context=None):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            scheme = request_context['scheme']
            host = request_context['host']
            port = request_context['port']
            pool = self._new_pool(scheme, host, port, request_context=request_context)
            self.pools[pool_key] = pool

        return pool
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def lookup(tag):
    """
    :param tag: Integer tag number
    :returns: Taginfo namedtuple, From the TAGS_V2 info if possible,
        otherwise just populating the value and name from TAGS.
        If the tag is not recognized, "unknown" is returned for the name

    """

    return TAGS_V2.get(tag, TagInfo(tag, TAGS.get(tag, 'unknown')))


##
# Map tag numbers to tag info.
#
#  id: (Name, Type, Length, enum_values)
#
# The length here differs from the length in the tiff spec.  For
# numbers, the tiff spec is for the number of fields returned. We
# agree here.  For string-like types, the tiff spec uses the length of
# field in bytes.  In Pillow, we are using the number of expected
# fields, in general 1 for string-like types.
项目:fluxpart    作者:usda-ars-ussl    | 项目源码 | 文件源码
def stats2(sarray, names=None):
    """Calculate means and (co)variances for structured array data."""

    if names is None:
        names = sarray.dtype.names
    nvar = len(names)
    data = tuple(sarray[name] for name in names)
    cov = np.cov(data)
    nondiag_cov = list(cov[i, j] for i, j in permutations(range(nvar), 2))

    names_ave = list('ave_' + name for name in names)
    names_var = list('var_' + name for name in names)
    names_cov = list(
        'cov_' + n1 + "_" + n2 for n1, n2 in permutations(names, 2))

    out = dict(zip(names_ave, np.mean(data, axis=1)))
    out.update(zip(names_var, cov.diagonal()))
    out.update(zip(names_cov, nondiag_cov))

    NamedStats = namedtuple('Stats2', names_ave + names_var + names_cov)
    return NamedStats(**out)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def connection_from_pool_key(self, pool_key, request_context=None):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            scheme = request_context['scheme']
            host = request_context['host']
            port = request_context['port']
            pool = self._new_pool(scheme, host, port, request_context=request_context)
            self.pools[pool_key] = pool

        return pool
项目:Formulate    作者:BebeSparkelSparkel    | 项目源码 | 文件源码
def abstract_brackets(formula, variables_re=''):
  lwt = split_formula(formula)
  new_variables = {}
  while lwt:
    substitute = no_re_matches(combine_re_expressions(
        itertools.chain((variables_re,), new_variables.keys())
    ))
    formula = lwt['leading'] + substitute + lwt['trailing']
    new_variables[substitute] = lwt['within']
    lwt = split_formula(formula)
  if formula in new_variables.keys():
    # incase of extranous brackets
    return abstract_brackets(new_variables[formula], variables_re)
  # return [formula, new_variables]
  return namedtuple('abstract_brackets', ('formula', 'new_variables'))(formula, new_variables)


# splits formula into 2 parts (leading and trailing) where the operator (from settings.order_of_operations) with the lowest priority is
# if there are no operators in formula returns None
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:hmv-s16    作者:cmuphyscomp    | 项目源码 | 文件源码
def _read_data(self, stream, verbose = False):
        """Process frame data rows from the CSV stream."""

        # Note that the frame_num indices do not necessarily start from zero,
        # but the setter functions assume that the array indices do.  This
        # implementation just ignores the original frame numbers, the frames are
        # renumbered from zero.
        for row_num, row in enumerate(stream):
            frame_num = int(row[0])
            frame_t   = float(row[1])
            values    = row[2:]

            # if verbose: print "Processing row_num %d, frame_num %d, time %f." % (row_num, frame_num, frame_t)

            # add the new frame time to each object storing a trajectory
            for body in self.rigid_bodies.values():
                body._add_frame(frame_t)

            # process the columns of interest
            for mapping in self._column_map:
                # each mapping is a namedtuple with a setter method, column index, and axis name
                mapping.setter( row_num, mapping.axis, values[mapping.column] )

    # ================================================================
项目:notex    作者:adiultra    | 项目源码 | 文件源码
def paragraph(self):
        """Return the index within self.text of the current paragraph and of
        the current line and current character (number of characters since the
        start of the paragraph) within the paragraph

        Returns: namedtuple (para_index, line_index, char_index)

        """
        idx_para = idx_buffer = idx_line = idx_char = 0
        done = False
        for para in self.text:
            for idx_line, line in enumerate(para):
                if idx_buffer == self.buffer_idx_y:
                    done = True
                    break
                idx_buffer += 1
            if done is True:
                break
            idx_para += 1
        idx_char = sum(map(len, self.text[idx_para][:idx_line])) + \
            self.buffer_idx_x
        p = namedtuple("para", ['para_index', 'line_index', 'char_index'])
        return p(idx_para, idx_line, idx_char)
项目:pytablereader    作者:thombashi    | 项目源码 | 文件源码
def test_exception_invalid_csv(self):
        table_text = """nan = float("nan")
inf = float("inf")
TEST_TABLE_NAME = "test_table"
TEST_DB_NAME = "test_db"
NOT_EXIT_FILE_PATH = "/not/existing/file/__path__"

NamedTuple = namedtuple("NamedTuple", "attr_a attr_b")
NamedTupleEx = namedtuple("NamedTupleEx", "attr_a attr_b attr_c")
"""
        loader = ptr.CsvTableTextLoader(table_text)
        loader.table_name = "dummy"

        with pytest.raises(ptr.InvalidDataError):
            for _tabletuple in loader.load():
                pass
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def iter_units_for_relation_name(relation_name):
    """Iterate through all units in a relation

    Generator that iterates through all the units in a relation and yields
    a named tuple with rid and unit field names.

    Usage:
    data = [(u.rid, u.unit)
            for u in iter_units_for_relation_name(relation_name)]

    :param relation_name: string relation name
    :yield: Named Tuple with rid and unit field names
    """
    RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
    for rid in relation_ids(relation_name):
        for unit in related_units(rid):
            yield RelatedUnit(rid, unit)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_request_start_line(line):
    """Returns a (method, path, version) tuple for an HTTP 1.x request line.

    The response is a `collections.namedtuple`.

    >>> parse_request_start_line("GET /foo HTTP/1.1")
    RequestStartLine(method='GET', path='/foo', version='HTTP/1.1')
    """
    try:
        method, path, version = line.split(" ")
    except ValueError:
        raise HTTPInputError("Malformed HTTP request line")
    if not re.match(r"^HTTP/1\.[0-9]$", version):
        raise HTTPInputError(
            "Malformed HTTP version in HTTP Request-Line: %r" % version)
    return RequestStartLine(method, path, version)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_response_start_line(line):
    """Returns a (version, code, reason) tuple for an HTTP 1.x response line.

    The response is a `collections.namedtuple`.

    >>> parse_response_start_line("HTTP/1.1 200 OK")
    ResponseStartLine(version='HTTP/1.1', code=200, reason='OK')
    """
    line = native_str(line)
    match = re.match("(HTTP/1.[0-9]) ([0-9]+) ([^\r]*)", line)
    if not match:
        raise HTTPInputError("Error parsing response start line")
    return ResponseStartLine(match.group(1), int(match.group(2)),
                             match.group(3))

# _parseparam and _parse_header are copied and modified from python2.7's cgi.py
# The original 2.7 version of this code did not correctly support some
# combinations of semicolons and double quotes.
# It has also been modified to support valueless parameters as seen in
# websocket extension negotiations.
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_request_start_line(line):
    """Returns a (method, path, version) tuple for an HTTP 1.x request line.

    The response is a `collections.namedtuple`.

    >>> parse_request_start_line("GET /foo HTTP/1.1")
    RequestStartLine(method='GET', path='/foo', version='HTTP/1.1')
    """
    try:
        method, path, version = line.split(" ")
    except ValueError:
        raise HTTPInputError("Malformed HTTP request line")
    if not re.match(r"^HTTP/1\.[0-9]$", version):
        raise HTTPInputError(
            "Malformed HTTP version in HTTP Request-Line: %r" % version)
    return RequestStartLine(method, path, version)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_response_start_line(line):
    """Returns a (version, code, reason) tuple for an HTTP 1.x response line.

    The response is a `collections.namedtuple`.

    >>> parse_response_start_line("HTTP/1.1 200 OK")
    ResponseStartLine(version='HTTP/1.1', code=200, reason='OK')
    """
    line = native_str(line)
    match = re.match("(HTTP/1.[0-9]) ([0-9]+) ([^\r]*)", line)
    if not match:
        raise HTTPInputError("Error parsing response start line")
    return ResponseStartLine(match.group(1), int(match.group(2)),
                             match.group(3))

# _parseparam and _parse_header are copied and modified from python2.7's cgi.py
# The original 2.7 version of this code did not correctly support some
# combinations of semicolons and double quotes.
# It has also been modified to support valueless parameters as seen in
# websocket extension negotiations.
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_response_start_line(line):
    """Returns a (version, code, reason) tuple for an HTTP 1.x response line.

    The response is a `collections.namedtuple`.

    >>> parse_response_start_line("HTTP/1.1 200 OK")
    ResponseStartLine(version='HTTP/1.1', code=200, reason='OK')
    """
    line = native_str(line)
    match = re.match("(HTTP/1.[0-9]) ([0-9]+) ([^\r]*)", line)
    if not match:
        raise HTTPInputError("Error parsing response start line")
    return ResponseStartLine(match.group(1), int(match.group(2)),
                             match.group(3))

# _parseparam and _parse_header are copied and modified from python2.7's cgi.py
# The original 2.7 version of this code did not correctly support some
# combinations of semicolons and double quotes.
# It has also been modified to support valueless parameters as seen in
# websocket extension negotiations.
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def test_get_network_data(self, time_mock, sleep_mock):
        time_mock.side_effect = [1, 2]

        Counter = namedtuple('Counter',
                             ['bytes_sent', 'bytes_recv', 'packets_sent',
                              'packets_recv'])

        first_counter = Counter(bytes_sent=54000, bytes_recv=12000,
                                packets_sent=50, packets_recv=100)
        second_counter = Counter(bytes_sent=108000, bytes_recv=36000,
                                 packets_sent=75, packets_recv=150)

        m = mock.Mock()
        m.side_effect = [
            {'eth0': first_counter}, {'eth0': second_counter}
        ]

        self.network.psutil.net_io_counters = m
        kb_ul, kb_dl, p_ul, p_dl = self.network.get_network_data(
            interface='eth0', delay=1)
        self.assertEqual(kb_ul, 54000)
        self.assertEqual(kb_dl, 24000)
        self.assertEqual(p_ul, 25)
        self.assertEqual(p_dl, 50)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient,) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((None,))

    l = load_pretrained_model(l_in_rshp)

    #ins = penultimate_layer.output_shape[1]
    # l = conv3d(penultimate_layer, ins, filter_size=3, stride=2)
    # #l = feat_red(l)
    #
    #
    # l = nn.layers.DropoutLayer(l)
    # #
    # l = nn.layers.DenseLayer(l, num_units=256, W=nn.init.Orthogonal(),
    #                          nonlinearity=nn.nonlinearities.rectify)

    #l = nn.layers.DropoutLayer(l)

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1))

    l_out = nn_lung.LogMeanExp(l,r=16, axis=(1, 2), name='LME')

    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = dense(penultimate_layer, 128, name='dense_final')

    l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME')


    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, 1,) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = drop(penultimate_layer, name='drop_final2')

    l = dense(l, 256, name='dense_final1')

    l = drop(l, name='drop_final2')

    l = dense(l, 256, name='dense_final2')

    l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=None, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.AggAllBenignExp(l, name='aggregate_all_nodules_benign')

    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, 1,) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = drop(penultimate_layer, name='drop_final')

    l = dense(l, 128, name='dense_final')

    l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=None, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.AggAllBenignExp(l, name='aggregate_all_nodules_benign')

    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient,) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    l = load_pretrained_model(l_in_rshp)

    #ins = penultimate_layer.output_shape[1]
    # l = conv3d(penultimate_layer, ins, filter_size=3, stride=2)
    # #l = feat_red(l)
    #
    #
    # l = nn.layers.DropoutLayer(l)
    # #
    # l = nn.layers.DenseLayer(l, num_units=256, W=nn.init.Orthogonal(),
    #                          nonlinearity=nn.nonlinearities.rectify)

    #l = nn.layers.DropoutLayer(l)

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1))

    l_out = nn_lung.LogMeanExp(l,r=16, axis=(1, 2), name='LME')

    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = dense(penultimate_layer, 128, name='dense_final')

    l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME')


    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = dense(penultimate_layer, 128, name='dense_final')

    l = nn.layers.DenseLayer(l, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME')


    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = nn.layers.DenseLayer(penultimate_layer, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME')


    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)
项目:dsb3    作者:EliasVansteenkiste    | 项目源码 | 文件源码
def build_model():
    l_in = nn.layers.InputLayer((None, n_candidates_per_patient, ) + p_transform['patch_size'])
    l_in_rshp = nn.layers.ReshapeLayer(l_in, (-1, 1,) + p_transform['patch_size'])
    l_target = nn.layers.InputLayer((batch_size,))

    penultimate_layer = load_pretrained_model(l_in_rshp)

    l = nn.layers.DenseLayer(penultimate_layer, num_units=1, W=nn.init.Orthogonal(),
                             nonlinearity=nn.nonlinearities.sigmoid, name='dense_p_benign')

    l = nn.layers.ReshapeLayer(l, (-1, n_candidates_per_patient, 1), name='reshape2patients')

    l_out = nn_lung.LogMeanExp(l, r=8, axis=(1, 2), name='LME')


    return namedtuple('Model', ['l_in', 'l_out', 'l_target'])(l_in, l_out, l_target)