Python enum 模块,unique() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用enum.unique()

项目:django-celery-growthmonitor    作者:mbourqui    | 项目源码 | 文件源码
def job_root(instance, filename=''):
    """
    Return the path of `filename` stored at the root folder of his job `instance`.

    Parameters
    ----------
    instance : AJob
        The model instance associated
    filename : str
        Original filename

    Returns
    --------
    str
        Path to filename which is unique for a job
    """
    return os.path.join(root_job(instance), filename)
项目:django-celery-growthmonitor    作者:mbourqui    | 项目源码 | 文件源码
def job_data(instance, filename=''):
    """
    Return the path of `filename` stored in a subfolder of the root folder of his job `instance`.

    Parameters
    ----------
    instance : AJob or ADataFile
        The model instance associated
    filename : str
        Original filename

    Returns
    --------
    str
        Path to filename which is unique for a job
    """
    head = root_job(instance.job) if isinstance(instance, ADataFile) else root_job(instance)
    tail = _user_path(instance.upload_to_data, filename) or os.path.join('data', filename)
    return os.path.join(head, tail)
项目:django-celery-growthmonitor    作者:mbourqui    | 项目源码 | 文件源码
def job_results(instance, filename=''):
    """
    Return the path of `filename` stored in a subfolder of the root folder of his job `instance`.

    Parameters
    ----------
    instance : AJob
        The model instance associated
    filename : str
        Original filename

    Returns
    --------
    str
        Path to filename which is unique for a job
    """
    tail = _user_path(instance.upload_to_results, filename) or os.path.join('results', filename)
    return os.path.join(root_job(instance), tail)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def ensure_index(cls):
        collection = cls.collection()
        collection.create_index(
            [
                ("execution_id", generic.SORT_ASC),
                ("task_type", generic.SORT_ASC)
            ],
            name="index_execution_id",
            unique=True
        )
        collection.create_index(
            [
                ("time.created", generic.SORT_ASC),
                ("time.started", generic.SORT_ASC),
                ("time.completed", generic.SORT_ASC),
                ("time.cancelled", generic.SORT_ASC),
                ("time.failed", generic.SORT_ASC)
            ],
            name="index_time"
        )
        collection.create_index(
            TTL_FIELDNAME,
            expireAfterSeconds=0,
            name="index_task_ttl"
        )
项目:git_nautilus_icons    作者:chrisjbillington    | 项目源码 | 文件源码
def get_statuses_by_dir(path, file_statuses):
    """Sort the file statuses into which directory at the current level they
    are under. Only keep unique statuses, and return a dictionary of sets for
    each directory rooted at the given path."""
    statuses_by_dir = defaultdict(set)
    prefix = path + '/'
    len_prefix = len(prefix)
    for name, status in file_statuses.items():
        if not name.startswith(prefix):
            continue
        dirname = name[:name.find('/', len_prefix)]
        statuses_by_dir[dirname].add(status)
    return statuses_by_dir


# import bprofile
# @bprofile.BProfile('test.png')
项目:Python_Study    作者:thsheep    | 项目源码 | 文件源码
def func_11():
    Months = Enum('Moths', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
                            'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
                  )
    for name, member in Months.__members__.items():
        print(name, '=>', member, ',', member.value)
    # value??????????int??????1??

# ?????????????????Enum?????????
# unique??????????????????
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def create_linter(
        cls: t.Type['AssignmentLinter'],
        assignment_id: int,
        name: str,
        config: str,
    ) -> 'AssignmentLinter':
        """Create a new instance of this class for a given :class:`Assignment`
        with a given :py:class:`.linters.Linter`

        :param assignment_id: The id of the assignment
        :param name: Name of the linter
        :returns: The created AssignmentLinter
        """
        id = str(uuid.uuid4())

        # Find a unique id.
        while db.session.query(
            AssignmentLinter.query.filter(cls.id == id).exists()
        ).scalar():  # pragma: no cover
            id = str(uuid.uuid4())

        self = cls(id=id, assignment_id=assignment_id, name=name)
        self.config = config

        self.tests = []
        for work in Assignment.query.get(assignment_id
                                         ).get_all_latest_submissions():
            self.tests.append(LinterInstance(work, self))

        return self
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def __init__(self, work: Work, tester: AssignmentLinter) -> None:
        # Find a unique id
        id = str(uuid.uuid4())
        while db.session.query(
            LinterInstance.query.filter(LinterInstance.id == id).exists()
        ).scalar():  # pragma: no cover
            id = str(uuid.uuid4())

        self.id = id
        self.work = work
        self.tester = tester
项目:open-synthesis    作者:twschiller    | 项目源码 | 文件源码
def __init__(self, key, delta):
        """Initialize Digest Frequency.

        :param key: unique identifier, used in database
        :param delta: timedelta covered by the frequency, or None (e.g., 7 days)
        """
        self.key = key
        self.delta = delta
项目:libpurecoollink    作者:CharlesBlonde    | 项目源码 | 文件源码
def __init__(self, name, type_, class_):
        self.key = name.lower()
        self.name = name
        self.type = type_
        self.class_ = class_ & _CLASS_MASK
        self.unique = (class_ & _CLASS_UNIQUE) != 0
项目:libpurecoollink    作者:CharlesBlonde    | 项目源码 | 文件源码
def to_string(self, hdr, other):
        """String representation with additional information"""
        result = "%s[%s,%s" % (hdr, self.get_type(self.type),
                               self.get_class_(self.class_))
        if self.unique:
            result += "-unique,"
        else:
            result += ","
        result += self.name
        if other is not None:
            result += ",%s]" % other
        else:
            result += "]"
        return result
项目:libpurecoollink    作者:CharlesBlonde    | 项目源码 | 文件源码
def write_record(self, record, now):
        """Writes a record (answer, authoritative answer, additional) to
        the packet"""
        if self.state == self.State.finished:
            return 1

        start_data_length, start_size = len(self.data), self.size
        self.write_name(record.name)
        self.write_short(record.type)
        if record.unique and self.multicast:
            self.write_short(record.class_ | _CLASS_UNIQUE)
        else:
            self.write_short(record.class_)
        if now == 0:
            self.write_int(record.ttl)
        else:
            self.write_int(record.get_remaining_ttl(now))
        index = len(self.data)

        # Adjust size for the short we will write before this record
        self.size += 2
        record.write(self)
        self.size -= 2

        length = sum((len(d) for d in self.data[index:]))
        # Here is the short we adjusted for
        self.insert_short(index, length)

        # if we go over, then rollback and quit
        if self.size > _MAX_MSG_ABSOLUTE:
            while len(self.data) > start_data_length:
                self.data.pop()
            self.size = start_size
            self.state = self.State.finished
            return 1
        return 0
项目:libpurecoollink    作者:CharlesBlonde    | 项目源码 | 文件源码
def register_service(self, info, ttl=_DNS_TTL, allow_name_change=False):
        """Registers service information to the network with a default TTL
        of 60 seconds.  Zeroconf will then respond to requests for
        information for that service.  The name of the service may be
        changed if needed to make it unique on the network."""
        self.check_service(info, allow_name_change)
        self.services[info.name.lower()] = info
        if info.type in self.servicetypes:
            self.servicetypes[info.type] += 1
        else:
            self.servicetypes[info.type] = 1
        now = current_time_millis()
        next_time = now
        i = 0
        while i < 3:
            if now < next_time:
                self.wait(next_time - now)
                now = current_time_millis()
                continue
            out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)
            out.add_answer_at_time(
                DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, ttl, info.name), 0)
            out.add_answer_at_time(
                DNSService(info.name, _TYPE_SRV, _CLASS_IN,
                           ttl, info.priority, info.weight, info.port,
                           info.server), 0)

            out.add_answer_at_time(
                DNSText(info.name, _TYPE_TXT, _CLASS_IN, ttl, info.text), 0)
            if info.address:
                out.add_answer_at_time(
                    DNSAddress(info.server, _TYPE_A, _CLASS_IN,
                               ttl, info.address), 0)
            self.send(out)
            i += 1
            next_time += _REGISTER_TIME
项目:bytecode    作者:vstinner    | 项目源码 | 文件源码
def const_key(obj):
    # Python implmentation of the C function _PyCode_ConstantKey()
    # of Python 3.6

    obj_type = type(obj)
    # Note: check obj_type == test_type rather than isinstance(obj, test_type)
    # to not merge instance of subtypes

    if (obj is None
        or obj is Ellipsis
            or obj_type in {int, bool, bytes, str, types.CodeType}):
        return (obj_type, obj)

    if obj_type == float:
        # all we need is to make the tuple different in either the 0.0
        # or -0.0 case from all others, just to avoid the "coercion".
        if obj == 0.0 and math.copysign(1.0, obj) < 0:
            return (obj_type, obj, None)
        else:
            return (obj_type, obj)

    if obj_type == complex:
        # For the complex case we must make complex(x, 0.)
        # different from complex(x, -0.) and complex(0., y)
        # different from complex(-0., y), for any x and y.
        # All four complex zeros must be distinguished.
        real_negzero = (obj.real == 0.0 and math.copysign(1.0, obj.real) < 0.0)
        imag_negzero = (obj.imag == 0.0 and math.copysign(1.0, obj.imag) < 0.0)

        # use True, False and None singleton as tags for the real and imag
        # sign, to make tuples different
        if real_negzero and imag_negzero:
            return (obj_type, obj, True)
        elif imag_negzero:
            return (obj_type, obj, False)
        elif real_negzero:
            return (obj_type, obj, None)
        else:
            return (obj_type, obj)

    if type(obj) == tuple:
        key = tuple(const_key(item) for item in obj)
        return (obj_type, obj, key)

    if type(obj) == frozenset:
        key = frozenset(const_key(item) for item in obj)
        return (obj_type, obj, key)

    # for other types, use the object identifier as an unique identifier
    # to ensure that they are seen as unequal.
    return (obj_type, obj, id(obj))
项目:libpurecoollink    作者:CharlesBlonde    | 项目源码 | 文件源码
def check_service(self, info, allow_name_change):
        """Checks the network for a unique service name, modifying the
        ServiceInfo passed in if it is not unique."""

        # This is kind of funky because of the subtype based tests
        # need to make subtypes a first class citizen
        service_name = service_type_name(info.name)
        if not info.type.endswith(service_name):
            raise BadTypeInNameException

        instance_name = info.name[:-len(service_name) - 1]
        next_instance_number = 2

        now = current_time_millis()
        next_time = now
        i = 0
        while i < 3:
            # check for a name conflict
            while self.cache.current_entry_with_name_and_alias(
                    info.type, info.name):
                if not allow_name_change:
                    raise NonUniqueNameException

                # change the name and look for a conflict
                info.name = '%s-%s.%s' % (
                    instance_name, next_instance_number, info.type)
                next_instance_number += 1
                service_type_name(info.name)
                next_time = now
                i = 0

            if now < next_time:
                self.wait(next_time - now)
                now = current_time_millis()
                continue

            out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA)
            self.debug = out
            out.add_question(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN))
            out.add_authorative_answer(DNSPointer(
                info.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, info.name))
            self.send(out)
            i += 1
            next_time += _CHECK_TIME