Python attr 模块,s() 实例源码


项目:djangocms-link-manager    作者:divio    | 项目源码 | 文件源码
def validate_mailto(self, parts, verify_exists=False):
        Validates a mailto URL, by using Django's EmailValidator.
        `verify_exists` does nothing at this time.

        :param parts:
        :param verify_exists:
        validator = EmailValidator()
        except ValidationError:
            return False
            return True
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def to_config(config_cls, environ=os.environ):
    if config_cls._prefix:
        app_prefix = (config_cls._prefix,)
        app_prefix = ()

    def default_get(environ, metadata, prefix, name):
        ce = metadata[CNF_KEY]
        if is not None:
            var =
            var = ("_".join(app_prefix + prefix + (name,))).upper()

        log.debug("looking for env var '%s'." % (var,))
        val = environ.get(var, ce.default)
        if val is RAISE:
            raise MissingEnvValueError(var)
        return val

    return _to_config(config_cls, default_get, environ, ())
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def _get(self, environ, metadata, prefix, name):
        ce = metadata[CNF_KEY]

        if is not None:
            var =
            if callable(self.vault_prefix):
                vp = self.vault_prefix(environ)
                vp = self.vault_prefix
            var = "_".join(
                ((vp,) + prefix + (name,))

        log.debug("looking for env var '%s'." % (var,))
        val = environ.get(var, ce.default)
        if val is RAISE:
            raise MissingSecretError(var)
        return _SecretStr(val)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def _ensure_dir_exists(filename):
    # type: (str) -> None
    """Creates a directory tree if it does not already exist.

    :param str filename: Full path to file in destination directory
    dest_final_dir = filename.rsplit(os.sep, 1)[0]
    if dest_final_dir == filename:
        # File is in current directory
        _LOGGER.debug('Target dir is current dir')
    except _file_exists_error():
        # os.makedirs(... exist_ok=True) does not work in 2.7
    else:'Created directory: %s', dest_final_dir)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def output_filename(source_filename, destination_dir, mode, suffix):
    # type: (str, str, str, str) -> str
    """Duplicates the source filename in the destination directory, adding or stripping
    a suffix as needed.

    :param str source_filename: Full file path to source file
    :param str destination_dir: Full file path to destination directory
    :param str mode: Operating mode (encrypt/decrypt)
    :param str suffix: Suffix to append to output filename
    :returns: Full file path of new destination file in destination directory
    :rtype: str
    if suffix is None:
        suffix = OUTPUT_SUFFIX[mode]
        _LOGGER.debug('Using custom suffix "%s" to create output file', suffix)
    filename = source_filename.rsplit(os.sep, 1)[-1]
    _LOGGER.debug('Duplicating filename %s into %s', filename, destination_dir)
    return os.path.join(destination_dir, filename) + suffix
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def read_tokens(self, db) -> Iterator[DeviceConfig]:
        """Read device information out from a given database file.

        :param str db: Database file"""
        self.db = db"Reading database from %s" % db)
        self.conn = sqlite3.connect(db)

        self.conn.row_factory = sqlite3.Row
        with self.conn:
            is_android = self.conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None
            is_apple = self.conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None
            if is_android:
                yield from self.read_android()
            elif is_apple:
                yield from self.read_apple()
                _LOGGER.error("Error, unknown database type!")
项目:txmix    作者:applied-mixnetworks    | 项目源码 | 文件源码
def do_start(self):
        make this transport begin listening on the specified interface and UDP port
        interface must be an IP address
        # save a TorConfig so we can later use it to send messages
        self.torconfig = txtorcon.TorConfig(control=self.tor.protocol)
        yield self.torconfig.post_bootstrap

        hs_strings = []
        if len(self.onion_unix_socket) == 0:
            local_socket_endpoint_desc = "tcp:interface=%s:%s" % (self.onion_tcp_interface_ip, self.onion_tcp_port)
            local_socket_endpoint_desc = "unix:%s" % self.onion_unix_socket
        onion_service_endpoint = endpoints.serverFromString(self.reactor, local_socket_endpoint_desc)
        datagram_proxy_factory = OnionDatagramProxyFactory(received_handler=lambda x: self.datagram_received(x))
        yield onion_service_endpoint.listen(datagram_proxy_factory)
        if len(self.onion_unix_socket) == 0:
            hs_strings.append("%s %s:%s" % (self.onion_port, self.onion_tcp_interface_ip, self.onion_tcp_port))
            hs_strings.append("%s unix:%s" % (self.onion_port, self.onion_unix_socket))
        hs = txtorcon.torconfig.EphemeralHiddenService(hs_strings, key_blob_or_type=self.onion_key)
        yield hs.add_to_tor(self.tor.protocol)
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def validate(self):
        Cross-checks the settings we have against the options the Container has
        # Verify all link targets are possible
        for alias, target in list(self.links.items()):
            if isinstance(target, str):
                raise ValueError("Link target {} is still a string!".format(target))
            if target.container not in self.container.graph.dependencies(self.container):
                warnings.warn("It is not possible to link %s to %s as %s" % (target, self.container, alias))
                del self.links[alias]
        # Verify devmodes exist
        for devmode in list(self.devmodes):
            if devmode not in self.container.devmodes:
                warnings.warn("Invalid devmode %s on container %s" % (devmode,
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def resolve_links(self):
        Resolves any links that are still names to instances from the formation
        for alias, target in list(self.links.items()):
            # If it's a string, it's come from an introspection process where we couldn't
            # resolve into an instance at the time (as not all of them were around)
            if isinstance(target, str):
                    target = self.formation[target]
                except KeyError:
                    # We don't error here as that would prevent you stopping orphaned containers;
                    # instead, we delete the link and warn the user. The deleted link means `up` will recreate it
                    # if it's orphaned.
                    del self.links[alias]
                    warnings.warn("Could not resolve link {} to an instance for {}".format(target,
                    self.links[alias] = target
            elif isinstance(target, ContainerInstance):
                raise ValueError("Invalid link value {}".format(repr(target)))
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def convert(self, value, param, ctx):
        self.context = ctx

        # Exact match
        if value in self.choices:
            return value

        # Match through normalization
        if ctx is not None and \
           ctx.token_normalize_func is not None:
            value = ctx.token_normalize_func(value)
            for choice in self.choices:
                if ctx.token_normalize_func(choice) == value:
                    return choice'invalid choice: %s. %s' % (PURPLE(value), self.get_missing_message(param, value)), param, ctx)
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def image_version(self, image_name, image_tag):
        Returns the Docker image hash of the requested image and tag, or
        raises ImageNotFoundException if it's not available on the host.
        if image_tag == "local":
            image_tag = "latest"
            docker_info ="{}:{}".format(image_name, image_tag))
            return docker_info['Id']
        except NotFound:
            # TODO: Maybe auto-build if we can?
            raise ImageNotFoundException(
                "Cannot find image {}:{}".format(image_name, image_tag),
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def attach(app, container, host, command):
    Attaches to a container
    if command:
        shell = ['/bin/bash', '-lc', ' '.join(command)]
        shell = ['/bin/bash']

    # See if the container is running
    formation = FormationIntrospector(host, app.containers).introspect()
    for instance in formation:
        if instance.container == container:
            # Work out anything to put before the shell (e.g. ENV)
            pre_args = []
            if os.environ.get("TERM", None):
                pre_args = ["env", "TERM=%s" % os.environ['TERM']]
            # Launch into an attached shell
            status_code =["docker", "exec", "-it",] + pre_args + shell)
    # It's not running ;(
    click.echo(RED("Container {name} is not running. It must be started to attach - try `bay run {name}`.".format(,
项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def destroy(app, host, name):
    Destroys a single volume
    task = Task("Destroying volume {}".format(name))
    # Run GC first to clean up stopped containers
    from .gc import GarbageCollector
    # Remove the volume
    formation = FormationIntrospector(host, app.containers).introspect()
    instance_conflicts = [ for instance in formation.get_instances_using_volume(name)]
    if instance_conflicts:
        task.finish(status="Volume {} is in use by container(s): {}".format(
            name, ",".join(instance_conflicts)), status_flavor=Task.FLAVOR_BAD)
        except NotFound:
            task.add_extra_info("There is no volume called {}".format(name))
            task.finish(status="Not found", status_flavor=Task.FLAVOR_BAD)
            task.finish(status="Done", status_flavor=Task.FLAVOR_GOOD)
项目:seashore    作者:shopkick    | 项目源码 | 文件源码
def setenv(self, key, val):
        Set internal environment variable.

        Changes internal environment in which subprocesses will be run.
        Does not change the process's own environment.

        :param key: name of variable
        :param value: value of variable
        if val is None:
            if key in self._env:
                del self._env[key]
        key = str(key)  # keys must be strings
        val = str(val)  # vals must be strings
        self._env[key] = val
项目:aptrepo    作者:jwodder    | 项目源码 | 文件源码
def fetch_suite(self, suite):
        flat = suite.endswith('/')
        if flat:
            baseurl = joinurl(self.uri, suite)
            baseurl = joinurl(self.uri, 'dists', suite)'Fetching InRelease file from %s', baseurl)
        r = self.session.get(joinurl(baseurl, 'InRelease'))
        if not (400 <= r.status_code < 500):
            release = ReleaseFile.parse_signed(r.content)
  'Server returned %d; fetching Release file instead',
            r = self.session.get(joinurl(baseurl, 'Release'))
            release = ReleaseFile.parse(r.content)
        ### TODO: Handle/fetch/verify PGP stuff
        if flat:
            return FlatRepository(self, suite, release)
            return Suite(self, suite, release)
项目:Codado    作者:corydodt    | 项目源码 | 文件源码
def _iterClass(self, cls, prefix=''):
        Descend a Klein()'s url_map, and generate ConvertedRule() for each one
        iterableRules = [(prefix, cls,]
        for prefix, currentClass, i in iter(iterableRules):
            for rule in i:
                converted = dumpRule(currentClass, rule, prefix)
                if converted.branch:

                if converted.subKlein:
                    clsDown = namedAny(converted.subKlein)
                    iterableRules.append((converted.rulePath, clsDown,

                yield converted
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def statistics(self):
        """Return an object containing debugging information.

        Currently the following fields are defined:

        * ``borrowed_tokens``: The number of tokens currently borrowed from
          the sack.
        * ``total_tokens``: The total number of tokens in the sack. Usually
          this will be larger than ``borrowed_tokens``, but it's possibly for
          it to be smaller if :attr:`total_tokens` was recently decreased.
        * ``borrowers``: A list of all tasks or other entities that currently
          hold a token.
        * ``tasks_waiting``: The number of tasks blocked on this
          :class:`CapacityLimiter`\'s :meth:`acquire` or
          :meth:`acquire_on_behalf_of` methods.

        return _CapacityLimiterStatistics(
            # Use a list instead of a frozenset just in case we start to allow
            # one borrower to hold multiple tokens in the future
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def statistics(self):
        """Return an object containing debugging information.

        Currently the following fields are defined:

        * ``locked``: boolean indicating whether the lock is held.
        * ``owner``: the :class:`trio.hazmat.Task` currently holding the lock,
          or None if the lock is not held.
        * ``tasks_waiting``: The number of tasks blocked on this lock's
          :meth:`acquire` method.

        return _LockStatistics(
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def statistics(self):
        """Returns an object containing debugging information.

        Currently the following fields are defined:

        * ``qsize``: The number of items currently in the queue.
        * ``capacity``: The maximum number of items the queue can hold.
        * ``tasks_waiting_put``: The number of tasks blocked on this queue's
          :meth:`put` method.
        * ``tasks_waiting_get``: The number of tasks blocked on this queue's
          :meth:`get` method.

        return _QueueStats(
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def current_default_worker_thread_limiter():
    """Get the default :class:`CapacityLimiter` used by

    The most common reason to call this would be if you want to modify its
    :attr:`~CapacityLimiter.total_tokens` attribute.

        limiter = _limiter_local.limiter
    except AttributeError:
        limiter = _limiter_local.limiter = CapacityLimiter(DEFAULT_LIMIT)
    return limiter

# Eventually we might build this into a full-fledged deadlock-detection
# system; see
# But for now we just need an object to stand in for the thread, so we can
# keep track of who's holding the CapacityLimiter's token.
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_main_and_task_both_crash(recwarn):
    # If main crashes and there's also a task crash, then we get both in a
    # MultiError
    async def crasher():
        raise ValueError

    async def main(wait):
        async with _core.open_nursery() as nursery:
            crasher_task = nursery.spawn(crasher)
            if wait:
                await crasher_task.wait()
            raise KeyError

    for wait in [True, False]:
        with pytest.raises(_core.MultiError) as excinfo:
  , wait)
        assert set(type(exc) for exc in excinfo.value.exceptions) == {
            ValueError, KeyError
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_broken_abort():
    async def main():
        # These yields are here to work around an annoying warning -- we're
        # going to crash the main loop, and if we (by chance) do this before
        # the run_sync_soon task runs for the first time, then Python gives us
        # a spurious warning about it not being awaited. (I mean, the warning
        # is correct, but here we're testing our ability to deliver a
        # semi-meaningful error after things have gone totally pear-shaped, so
        # it's not relevant.) By letting the run_sync_soon_task run first, we
        # avoid the warning.
        await _core.checkpoint()
        await _core.checkpoint()
        with _core.open_cancel_scope() as scope:
            # None is not a legal return value here
            await _core.wait_task_rescheduled(lambda _: None)

    with pytest.raises(_core.TrioInternalError):

    # Because this crashes, various __del__ methods print complaints on
    # stderr. Make sure that they get run now, so the output is attached to
    # this test.
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_system_task_crash_KeyboardInterrupt():
    async def ki():
        raise KeyboardInterrupt

    async def main():
        await sleep_forever()

    # KI doesn't get wrapped with TrioInternalError
    with pytest.raises(KeyboardInterrupt):

# This used to fail because checkpoint was a yield followed by an immediate
# reschedule. So we had:
# 1) this task yields
# 2) this task is rescheduled
# ...
# 3) next iteration of event loop starts, runs timeouts
# 4) this task has timed out
# 5) ...but it's on the run queue, so the timeout is queued to be delivered
#    the next time that it's blocked.
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_exc_info_after_yield_error():
    child_task = None

    async def child():
        nonlocal child_task
        child_task = _core.current_task()

            raise KeyError
        except Exception:
                await sleep_forever()
            except Exception:

    with pytest.raises(KeyError):
        async with _core.open_nursery() as nursery:
            await wait_all_tasks_blocked()
            _core.reschedule(child_task, _core.Error(ValueError()))

# Similar to previous test -- if the ValueError() gets sent in via 'throw',
# then Python's normal implicit chaining stuff is broken.
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_TrioToken_run_sync_soon_massive_queue():
    # There are edge cases in the wakeup fd code when the wakeup fd overflows,
    # so let's try to make that happen. This is also just a good stress test
    # in general. (With the current-as-of-2017-02-14 code using a socketpair
    # with minimal buffer, Linux takes 6 wakeups to fill the buffer and MacOS
    # takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes
    # ~600,000 wakeups, but has the same code paths...)
    COUNT = 1000
    token = _core.current_trio_token()
    counter = [0]

    def cb(i):
        # This also tests FIFO ordering of callbacks
        assert counter[0] == i
        counter[0] += 1

    for i in range(COUNT):
        token.run_sync_soon(cb, i)
    await wait_all_tasks_blocked()
    assert counter[0] == COUNT
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def __exit__(self, etype, exc, tb):
        if exc is not None:
            filtered_exc = MultiError.filter(self._handler, exc)
            if filtered_exc is exc:
                # Let the interpreter re-raise it
                return False
            if filtered_exc is None:
                # Swallow the exception
                return True
            # When we raise filtered_exc, Python will unconditionally blow
            # away its __context__ attribute and replace it with the original
            # exc we caught. So after we raise it, we have to pause it while
            # it's in flight to put the correct __context__ back.
            old_context = filtered_exc.__context__
                raise filtered_exc
                _, value, _ = sys.exc_info()
                assert value is filtered_exc
                value.__context__ = old_context
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def flags(self):
        flags = 0
        if self.read_task is not None:
            flags |= select.EPOLLIN
        if self.write_task is not None:
            flags |= select.EPOLLOUT
        if not flags:
            return None
        # XX not sure if EPOLLEXCLUSIVE is actually safe... I think
        # probably we should use it here unconditionally, but:
        #flags |= select.EPOLLEXCLUSIVE
        # We used to use ONESHOT here also, but it turns out that it's
        # confusing/complicated: you can't use ONESHOT+EPOLLEXCLUSIVE
        # together, you ONESHOT doesn't delete the registration but just
        # "disables" it so you re-enable with CTL rather than ADD (or
        # something?)...
        return flags
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def test_do():
    class EDo:
        arg = attr.ib()

    def do_func(a, b):
        done_a = yield Effect(EDo(a))
        done_b = yield Effect(EDo(b))
        return [done_a, done_b]

    effect = do_func(1, 2)
    assert isinstance(effect, Effect)
    assert isinstance(effect.intent, ChainedIntent)
    dispatcher = TypeDispatcher({
        EDo: lambda intent: 'done: %s' % intent.arg
    ret = sync_perform(dispatcher, effect)
    assert ret == ['done: 1', 'done: 2']
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def test_chained_intent(self):
        class ENumToString:
            num = attr.ib()

        def collect_intent_results():
            intent_results = []
            for i in range(5):
                res = yield Effect(ENumToString(i))
            return ''.join(intent_results)

        effect = Effect(ChainedIntent(collect_intent_results()))
        dispatcher = TypeDispatcher({
            ENumToString: lambda intent: str(intent.num)
        ret = await asyncio_perform(dispatcher, effect)
        assert ret == '01234'
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def block_connection_factory(url):
    if url.startswith('s3:'):
            from parsec.core.block_s3 import S3BlockConnection
            _, region, bucket, key_id, key_secret = url.split(':')
        except ImportError as exc:
            raise SystemExit('Parsec needs boto3 to support S3 block storage (error: %s).' %
        except ValueError:
            raise SystemExit('Invalid s3 block store '
                             ' (should be `s3:<region>:<bucket>:<id>:<secret>`.')
        return S3BlockConnection(region, bucket, key_id, key_secret)
    elif url.startswith('http://'):
        return RESTBlockConnection(url)
        raise SystemExit('Unknown block store `%s`.' % url)
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def open_connection(self, identity):
        logger.debug('Connection to backend opened')
        assert not self._websocket, "Connection to backend already opened"
            self._websocket = await websockets.connect(self.url)
            # Handle handshake
            raw = await self._websocket.recv()
            challenge = ejson_loads(raw)
            answer = identity.private_key.sign(challenge['challenge'].encode())
            await self._websocket.send(ejson_dumps({
                'handshake': 'answer',
                'answer': to_jsonb64(answer)
            resp = ejson_loads(await self._websocket.recv())
            if resp['status'] != 'ok':
                await self.close_connection()
                raise exception_from_status(resp['status'])(resp['label'])
            self._ws_recv_handler_task = asyncio.ensure_future(
                self._ws_recv_handler(), loop=self.loop)
            if self.watchdog_time:
                self._watchdog_task = asyncio.ensure_future(self._watchdog(), loop=self.loop)
        except (ConnectionRefusedError, websockets.exceptions.ConnectionClosed) as exc:
            raise BackendConnectionError('Cannot connect to backend (%s)' % exc)
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def _ws_recv_handler(self):
        # Given command responses and notifications are all send through the
        # same websocket, separate them here, passing command response thanks
        # to a Queue.
        while True:
            raw = await self._websocket.recv()
                if isinstance(raw, bytes):
                    raw = raw.decode()
                recv = ejson_loads(raw)
                if 'status' in recv:
                    # Message response
                    # Event
            except (KeyError, TypeError, json.JSONDecodeError):
                # Dummy ???
                logger.warning('Backend server sent invalid message: %s' % raw)
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def perform_vlob_read(self, intent):
        async with self.connection.acquire() as conn:
            async with conn.cursor() as cur:
                if intent.version:
                    await cur.execute("SELECT * FROM vlobs WHERE "
                                      "id=%s AND version=%s;", (, intent.version))
                    await cur.execute("SELECT * FROM vlobs WHERE "
                                      "id=%s ORDER BY version DESC;", (, ))
                ret = await cur.fetchone()
        if not ret:
            raise VlobNotFound('Vlob not found.')
        _, version, rts, wts, blob = ret
        if rts != intent.trust_seed:
            raise TrustSeedError('Invalid read trust seed.')
        return VlobAtom(, version=version, read_trust_seed=rts,
                        write_trust_seed=wts, blob=bytes(blob))
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def _perform_vlob_update(self, intent):
        async with self.connection.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT version, read_trust_seed, write_trust_seed FROM "
                                  "vlobs WHERE id=%s ORDER BY version DESC;", (, ))
                ret = await cur.fetchone()
                if ret is None:
                    raise VlobNotFound('Vlob not found.')
                last_version, rts, wts = ret
                if wts != intent.trust_seed:
                    raise TrustSeedError('Invalid write trust seed.')
                if intent.version != last_version + 1:
                    raise VlobNotFound('Wrong blob version.')
                # TODO: insertion doesn't do atomic check of version
                await cur.execute("INSERT INTO vlobs VALUES (%s, %s, %s, %s, %s);",
                    (, intent.version, rts, wts, intent.blob))
                await cur.execute("NOTIFY vlob_updated, %s", (, ))
项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def backend_layer(self):
        if self._backend_layer is None:
            cache_key = (self.backend_type, dict_to_hashable(self.backend_layer_kwargs))
            if cache_key not in self._backend_layer_cache:
                with self._get_timer('backend.initialize'):
                    backend_layer_kwargs = deepcopy(self.backend_layer_kwargs)
                    if self.backend_type == REDIS_BACKEND_TYPE_SENTINEL:
                        self._backend_layer_cache[cache_key] = SentinelRedisClient(**backend_layer_kwargs)
                        self._backend_layer_cache[cache_key] = StandardRedisClient(**backend_layer_kwargs)

            self._backend_layer = self._backend_layer_cache[cache_key]

        # Each time the backend layer is accessed, use _this_ transport's metrics recorder for the backend layer
        self._backend_layer.metrics_counter_getter = lambda name: self._get_counter(name)
        return self._backend_layer

    # noinspection PyAttributeOutsideInit
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_fallback(cl_and_vals):
    """The fallback case works."""
    cl, vals = cl_and_vals

    assume(attr.fields(cl))  # At least one field.

    class A(object):

    fn = create_uniq_field_dis_func(A, cl)

    assert fn({}) is A
    assert fn(attr.asdict(cl(*vals))) is cl

    attr_names = { for a in attr.fields(cl)}

    if 'xyz' not in attr_names:
        fn({'xyz': 1}) is A  # Uses the fallback.
项目:cattrs    作者:Tinche    | 项目源码 | 文件源码
def test_optional_field_roundtrip(converter, cl_and_vals):
    Classes with optional fields can be unstructured and structured.
    cl, vals = cl_and_vals

    class C(object):
        a = attr.ib(type=Optional[cl])

    inst = C(a=cl(*vals))
    assert inst == converter.structure(converter.unstructure(inst), C)

    inst = C(a=None)
    unstructured = converter.unstructure(inst)

    assert inst == converter.structure(unstructured, C)
项目:tap-facebook    作者:singer-io    | 项目源码 | 文件源码
def retry_pattern(backoff_type, exception, **wait_gen_kwargs):
    def log_retry_attempt(details):
        _, exception, _ = sys.exc_info()'Caught retryable error after %s tries. Waiting %s more seconds then retrying...',

    def should_retry_api_error(exception):
        if isinstance(exception, FacebookRequestError):
            return exception.api_transient_error()
        elif isinstance(exception, InsightsJobTimeout):
            return True
        return False

    return backoff.on_exception(
        giveup=lambda exc: not should_retry_api_error(exc),
项目:tap-facebook    作者:singer-io    | 项目源码 | 文件源码
def advance_bookmark(stream, bookmark_key, date):
    tap_stream_id =
    state = stream.state or {}'advance(%s, %s)', tap_stream_id, date)
    date = pendulum.parse(date) if date else None
    current_bookmark = get_start(stream, bookmark_key)

    if date is None:'Did not get a date for stream %s '+
                    ' not advancing bookmark',
    elif not current_bookmark or date > current_bookmark:'Bookmark for stream %s is currently %s, ' +
                    'advancing to %s',
                    tap_stream_id, current_bookmark, date)
        state = singer.write_bookmark(state, tap_stream_id, bookmark_key, str(date))
    else:'Bookmark for stream %s is currently %s ' +
                    'not changing to to %s',
                    tap_stream_id, current_bookmark, date)
    return state
项目:myriagon    作者:hawkowl    | 项目源码 | 文件源码
def get_time_for_session(task, time):

    cd =

    if task.cutoff == "week":

        cutoff_time = datetime.datetime(cd.year, cd.month,
        cutoff_delta = datetime.timedelta(

        cutoff_time = (cutoff_time - cutoff_delta).timestamp()

    elif task.cutoff == "month":
        cutoff_time = datetime.datetime(cd.year, cd.month, 1).timestamp()

    qualifiers = filter(lambda t: t.started > cutoff_time, time)
    time_spent_this_per = sum(map(
        lambda s: s.finished - s.started, qualifiers))

    return time_spent_this_per
项目:heidelberg_subtyping    作者:peterk87    | 项目源码 | 文件源码
def _reads_exist(self, attribute, value):
        if isinstance(value, str):
            if not os.path.exists(value):
                raise FileNotFoundError('Reads file {} does not exist!'.format(value))
            if not os.path.isfile(value):
                raise OSError('{} is not a valid reads file'.format(value))
        elif isinstance(value, list):
            for x in value:
                if not isinstance(x, str):
                    raise Exception(
                        'Reads file not specified as string or list of string: type={} "{}"'.format(type(x), x))
                if not os.path.exists(x):
                    raise FileNotFoundError('Reads file {} does not exist!'.format(x))
                if not os.path.isfile(x):
                    raise OSError('{} is not a valid reads file'.format(x))
            raise Exception(
                'Reads file(s) not specified as string or list of string: type={} "{}"'.format(type(value), value))
项目:heidelberg_subtyping    作者:peterk87    | 项目源码 | 文件源码
def _create_tmp_folder(self):
        count = 1
        tmp_dir = self.tmp_dir
        while True:
      'Trying to create analysis directory at: %s', tmp_dir)
            except OSError as e:
                logging.warning('Error on creation of tmp analysis directory "{}"! {}'.format(
                tmp_dir = '{}_{}'.format(self.tmp_dir, count)
                count += 1
        self.tmp_dir = tmp_dir
        return self.tmp_dir
项目:heidelberg_subtyping    作者:peterk87    | 项目源码 | 文件源码
def _create_tmp_folder(self):
        count = 1
        tmp_dir = self.tmp_work_dir
        while True:
      'Trying to create analysis directory at: %s', tmp_dir)
            except OSError as e:
                logging.warning('Error on creation of tmp analysis directory "{}"! {}'.format(
                tmp_dir = '{}_{}'.format(self.tmp_work_dir, count)
                count += 1
        self.tmp_work_dir = tmp_dir
        return self.tmp_work_dir
项目:tomato-libnotify    作者:Perlence    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--silent', action='store_true',
                        help="don't print time logs")
    args = parser.parse_args()

    loop = asyncio.get_event_loop()

    tomato = Tomato(silent=args.silent)
    except KeyboardInterrupt:
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def test_secret_str_no_repr(self):
        Outside of reprs, _SecretStr behaves normally.
        s = _SecretStr("abc")

        assert "'abc'" == repr(s)
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def test_secret_str_censors(self):
        _SecretStr censors it's __repr__ if its called from another __repr__.
        s = _SecretStr("abc")

        class Cfg(object):
            s = attr.ib()

        assert "Cfg(s=<SECRET>)" == repr(Cfg(s))
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def config(maybe_cls=None, prefix="APP"):
    def wrap(cls):
        cls._prefix = prefix
        return attr.s(cls, slots=True)

    if maybe_cls is None:
        return wrap
        return wrap(maybe_cls)
项目:environ_config    作者:hynek    | 项目源码 | 文件源码
def _env_to_bool(val):
    Convert *val* to a bool if it's not a bool in the first place.
    if isinstance(val, bool):
        return val
    val = val.strip().lower()
    if val in ("1", "true", "yes"):
        return True

    return False
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def _print_prop_summary(prop, outcome, outfile=sys.stdout):
    name =
    failed_impl = prop.failed_implications
    depth = outcome.state['depth']
    n = outcome.state['calls']
    if failed_impl:
        outfile.write('After {} call(s) ({} did not meet implication)\n'.format(n, failed_impl))
        outfile.write('After {} call(s)\n'.format(n))
    outfile.write('To depth {}\n'.format(depth))
    outfile.write('In property `{}`\n'.format(name))
项目:gocd-dashboard    作者:datasift    | 项目源码 | 文件源码
def passed(self):
        return all(s.passed for s in self.pipelines)