Python contextlib 模块,suppress() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.suppress()

项目:django-notifs    作者:danidee10    | 项目源码 | 文件源码
def create_notification(**kwargs):
    """Notify signal receiver."""
    # make fresh copy and retain kwargs
    params = kwargs.copy()
    del params['signal']
    del params['sender']
    with suppress(KeyError):
        del params['silent']

    silent = kwargs.get('silent', False)

    # If it's a silent notification create the notification but don't save it
    if silent:
        notification = Notification(**params)
    else:
        notification = Notification.objects.create(**params)

    # send via custom adapters
    for adapter_path in getattr(settings, 'NOTIFICATION_ADAPTERS', []):
        adapter = import_attr(adapter_path)
        adapter(**kwargs).notify()

    if getattr(settings, 'NOTIFICATIONS_WEBSOCKET', False):
        send_to_queue(notification)
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def value_set(self, colname, key, filter=None,
                  sort=True, flat=False, **kwargs):
        """Return a set of all values in a key"""
        if filter is not None:
            data = self.get_collection(colname).find(filter, **kwargs)
        else:
            data = self.get_collection(colname).find(**kwargs)

        values = [item.get(key) for item in data if item.get(key) is not None]

        if flat is True:
            values = list(itertools.chain(*values))

        with suppress(TypeError):
            values = list(set(values))

        return sorted(values) if sort is True else values
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def __delitem__(self, key):
        key = key.casefold()
        if key == 'targetname':
            with suppress(KeyError):
                self.map.by_target[
                    self.keys.get('targetname', None)
                ].remove(self)
            self.map.by_target[None].add(self)

        if key == 'classname':
            with suppress(KeyError):
                self.map.by_class[
                    self.keys.get('classname', None)
                ].remove(self)
            self.map.by_class[None].add(self)

        for k in self.keys:
            if k.casefold() == key:
                del self.keys[k]
                break
项目:aio-pika    作者:mosquito    | 项目源码 | 文件源码
def future_with_timeout(loop, timeout, future=None):
    loop = loop or asyncio.get_event_loop()
    f = future or create_future(loop=loop)

    def on_timeout():
        if f.done():
            return
        f.set_exception(TimeoutError)

    if timeout:
        handler = loop.call_later(timeout, on_timeout)

        def on_result(*_):
            with suppress(Exception):
                handler.cancel()

        f.add_done_callback(on_result)

    return f
项目:pretalx    作者:pretalx    | 项目源码 | 文件源码
def unfreeze(self, user=None):
        from pretalx.schedule.models import TalkSlot
        if not self.version:
            raise Exception('Cannot unfreeze schedule version: not released yet.')

        # collect all talks, which have been added since this schedule (#72)
        submission_ids = self.talks.all().values_list('submission_id', flat=True)
        talks = self.event.wip_schedule.talks \
            .exclude(submission_id__in=submission_ids) \
            .union(self.talks.all())

        wip_schedule = Schedule.objects.create(event=self.event)
        new_talks = []
        for talk in talks:
            new_talks.append(talk.copy_to_schedule(wip_schedule, save=False))
        TalkSlot.objects.bulk_create(new_talks)

        self.event.wip_schedule.talks.all().delete()
        self.event.wip_schedule.delete()

        with suppress(AttributeError):
            del wip_schedule.event.wip_schedule

        return self, wip_schedule
项目:pretalx    作者:pretalx    | 项目源码 | 文件源码
def sort_queryset(self, qs):
        sort_key = self.request.GET.get('sort')
        if sort_key:
            plain_key = sort_key[1:] if sort_key.startswith('-') else sort_key
            reverse = not (plain_key == sort_key)
            if plain_key in self.sortable_fields:
                is_text = False
                with suppress(FieldDoesNotExist):
                    is_text = isinstance(qs.model._meta.get_field(plain_key), CharField)

                if is_text:
                    # TODO: this only sorts direct lookups case insensitively
                    # A sorting field like 'speaker__name' will not be found
                    qs = qs.annotate(key=Lower(plain_key)).order_by('-key' if reverse else 'key')
                else:
                    qs = qs.order_by(sort_key)
        return qs
项目:pretalx    作者:pretalx    | 项目源码 | 文件源码
def _select_locale(self, request):
        supported = request.event.locales if (hasattr(request, 'event') and request.event) else settings.LANGUAGES
        language = (
            self._language_from_user(request, supported)
            or self._language_from_cookie(request, supported)
            or self._language_from_browser(request, supported)
        )
        if hasattr(request, 'event') and request.event:
            language = language or request.event.locale

        translation.activate(language)
        request.LANGUAGE_CODE = translation.get_language()

        with suppress(pytz.UnknownTimeZoneError):
            if request.user.is_authenticated:
                tzname = request.user.timezone
            elif hasattr(request, 'event') and request.event:
                tzname = request.event.timezone
            else:
                tzname = settings.TIME_ZONE
            timezone.activate(pytz.timezone(tzname))
            request.timezone = tzname
项目:taf    作者:taf3    | 项目源码 | 文件源码
def parse_json(self):
        data = self._get_config_data()

        try:
            json_data = json.loads(data.value)  # pylint: disable=no-member
        except ValueError:
            raise InvokeError('Failed to parse JSON data')

        # simple ordered dict
        command_execution_pairs = [
            ('ab', ApacheBenchExecution),
            ('nginx', NginxExecution),
            ('netperf', NetperfExecution),
            ('iperf', IperfExecution),
        ]

        for command_type, exec_class in command_execution_pairs:
            with suppress(KeyError):
                return exec_class(json_data[command_type], self)

        raise InvokeError('No command found to parse')
项目:taf    作者:taf3    | 项目源码 | 文件源码
def pytest_report_teststatus(self, report):
        outcome = yield
        if report.when == 'call':
            if report.passed:
                result = self.RESULT_PASSED
            elif report.skipped:
                result = self.RESULT_SKIPPED
            else:  # if report.failed:
                result = self.RESULT_FAILED

            status, letter, msg = self.get_result(result)

            if report.passed:
                # print return result instead of 'PASSED'
                with suppress(AttributeError):
                    msg = report.retval

            outcome.result = status, letter, msg
项目:chronophore    作者:mesbahamin    | 项目源码 | 文件源码
def _show_feedback_label(self, message, seconds=None):
        """Display a message in lbl_feedback, which then times out after
        some number of seconds. Use after() to schedule a callback to
        hide the feedback message. This works better than using threads,
        which can cause problems in Tk.
        """
        if seconds is None:
            seconds = CONFIG['MESSAGE_DURATION']

        # cancel any existing callback to clear the feedback
        # label. this prevents flickering and inconsistent
        # timing during rapid input.
        with contextlib.suppress(AttributeError):
            self.root.after_cancel(self.clear_feedback)

        logger.debug('Label feedback: "{}"'.format(message))
        self.feedback.set(message)
        self.clear_feedback = self.root.after(
            1000 * seconds, lambda: self.feedback.set("")
        )
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def on_guild_join(self, guild):
        """Send the bot introduction message when invited."""
        self.logger.info('New guild: ' + guild.name)
        if self.selfbot: return
        try:
            await self.send_message(guild.default_channel, join_msg)
        except discord.Forbidden:
            satisfied = False
            c_count = 0
            try_channels = list(guild.channels)
            channel_count = len(try_channels) - 1
            while not satisfied:
                with suppress(discord.Forbidden, discord.HTTPException):
                    await self.send_message(try_channels[c_count], join_msg)
                    satisfied = True
                if c_count > channel_count:
                    self.logger.warning('Couldn\'t announce join to guild ' + guild.name)
                    satisfied = True
                c_count += 1
项目:env-diff    作者:jacebrowning    | 项目源码 | 文件源码
def from_code(cls, *lines, index=0):
        line = lines[index].strip()

        match = (cls.RE_ENV_SET.match(line) or
                 cls.RE_QUOTED_CAPITALS.search(line))
        if not match:
            log.debug("Skipped line %s without variable: %r", index + 1, line)
            return None

        name = match.group('name')
        context = line
        with suppress(IndexError):
            if context.endswith('{'):
                context += lines[index + 1].strip()
                context += lines[index + 2].strip()

        variable = cls(name, context=context)
        log.info("Loaded variable: %s", variable)

        return variable
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def run(self):
        self._interaction = asyncio.ensure_future(self._game_screen.interact(timeout=None, delete_after=False))
        self._runner = asyncio.ensure_future(self.run_loop())
        # await self._game_screen.wait_until_ready()
        try:
            return await self._runner
        finally:
            # For some reason having all these games hanging around causes lag.
            # Until I properly make a delete_after on the paginator I'll have to
            # settle with this hack.
            async def task():
                await asyncio.sleep(30)
                with contextlib.suppress(discord.HTTPException):
                    await self._game_screen._message.delete()

            self.ctx.bot.loop.create_task(task())
            self._interaction.cancel()
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def _inrole(ctx, *roles, members, final='and'):
        joined_roles = human_join(map(str, roles), final=final)
        truncated_title = truncate(f'Members in {pluralize(role=len(roles))} {joined_roles}', 256, '...')

        total_color = map(sum, zip(*(role.colour.to_rgb() for role in roles)))
        average_color = discord.Colour.from_rgb(*map(round, (c / len(roles) for c in total_color)))

        if members:
            entries = sorted(map(str, members))
            # Make the author's name bold (assuming they have that role).
            # We have to do it after the list was built, otherwise the author's name
            # would be at the top.
            with contextlib.suppress(ValueError):
                index = entries.index(str(ctx.author))
                entries[index] = f'**{entries[index]}**'
        else:
            entries = ('There are no members :(', )

        pages = ListPaginator(ctx, entries, colour=average_color, title=truncated_title)
        await pages.interact()
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def _get_afk_embed(self, member):
        message = self.afks.get(member.id)
        if message is None:
            return None

        avatar = member.avatar_url
        colour = await user_color(member)
        title = f"{member.display_name} is AFK"

        embed = (discord.Embed(description=message, colour=colour)
                .set_author(name=title, icon_url=avatar)
                .set_footer(text=f"ID: {member.id}")
                )

        with contextlib.suppress(IndexError):
            embed.timestamp = self.user_message_queues[member.id][-1]
        return embed
项目:ddmbot    作者:Budovi    | 项目源码 | 文件源码
def on_voice_state_update(self, before, after):
        if before == self._client.user:
            if after.voice.voice_channel != self._voice_channel:
                log.warning('Client was disconnected from the voice channel')
                await self.connect_voice()
            return
        # joining
        if before.voice.voice_channel != self._voice_channel and after.voice.voice_channel == self._voice_channel:
            with suppress(database.bot.IgnoredUserError):
                if await self._database.interaction_check(int(after.id)):
                    await self._send_welcome_message(after)
                await self._users.add_listener(int(after.id), direct=False)

        # leaving
        elif before.voice.voice_channel == self._voice_channel and after.voice.voice_channel != self._voice_channel:
            try:
                await self._users.remove_listener(int(after.id), direct=False)
            except ValueError:
                log.warning('Tried to remove {0} (ID: {0.id}) from listeners but the user was not listed'.format(after))
项目:labrat    作者:sdhutchins    | 项目源码 | 文件源码
def backup(self):
        """Backup files from source directory to destination."""

        logzero.logfile("directory_backup_%s.log" %
                        str(d.now().strftime(self._f2)))
        sep = 50*'-'
        log.info("#%s" % sep)
        log.info("The script name is %s" % os.path.basename(__file__))
        log.info("The date and time is currently %s" %
                 str(d.now().strftime(self._f1)))

        with contextlib.suppress(OSError):
            # TODO Add SameFileError from shutil
            # TODO Add threading
            log.error(OSError)
            copytree(self.sourcedir, self.backupdir)
            log.info('%s has been backed up.' % self.sourcedir)
项目:pyrcrack    作者:XayOn    | 项目源码 | 文件源码
def __init__(self, attack=False, interface=False, stdout=None, stderr=None, **kwargs):
        self.stdout = stdout
        self.stderr = stderr
        if not stdout:
            self.stdout = DEVNULL
        if not stderr:
            self.stderr = DEVNULL
        self.interface = interface

        if attack not in self._allowed_attacks:
            raise WrongArgument

        self.attack = attack
        extra = tuple()
        with suppress(AttributeError):
            extra = getattr(self, "_allowed_arguments_{}".format(attack))
        if extra:
            self._allowed_arguments += extra
        self._allowed_arguments.append((attack, False))
        kwargs[attack] = True
        super(self.__class__, self).__init__(**kwargs)
项目:flask-openapi    作者:remcohaszing    | 项目源码 | 文件源码
def _process_rule(self, rule):
        path = {}
        view_func = self.app.view_functions[rule.endpoint]
        schema = self._extract_schema(view_func)
        if schema:
            path['parameters'] = [{
                'in': 'body',
                'name': 'payload',
                'schema': schema
            }]
        with suppress(AttributeError):
            path['responses'] = view_func.responses
        add_optional(path, 'description', self._extract_description(view_func))
        add_optional(
            path,
            'deprecated',
            getattr(view_func, 'deprecated', None))
        with suppress(AttributeError):
            path['tags'] = sorted(view_func.tags)
        return path
项目:gateway    作者:wasp    | 项目源码 | 文件源码
def serve_many(workers=1):
    # thank you sanic
    workers = min(workers, multiprocessing.cpu_count())
    event = multiprocessing.Event()
    signal(SIGINT, lambda *_: event.set())
    signal(SIGTERM, lambda *_: event.set())

    processes = []
    kwargs = dict(reuse_port=True)
    for _ in range(workers):
        # noinspection PyArgumentList
        process = multiprocessing.Process(target=serve, kwargs=kwargs,
                                          daemon=True)
        process.start()
        print('Started subprocess:', process.name, process.pid)
        processes.append(process)

    with contextlib.suppress(Exception):
        while not event.is_set():
            time.sleep(0.5)

    [process.terminate() for process in processes]
    [process.join() for process in processes]
项目:falsy    作者:pingf    | 项目源码 | 文件源码
def run(coro, loop=None):
    async def main_task():
        pycurl_task = aio.ensure_future(curl_loop())
        try:
            r = await coro
        finally:
            pycurl_task.cancel()
            with suppress(aio.CancelledError):
                await pycurl_task
        return r, pycurl_task

    if loop is None:
        loop = uvloop.new_event_loop()
        # loop = aio.get_event_loop()
    aio.set_event_loop(loop)
    loop.set_exception_handler(exception_handler)
    r, _ = loop.run_until_complete(main_task())
    return r
项目:pyha    作者:gasparka    | 项目源码 | 文件源码
def redbaron_pyfor_to_vhdl(red_node):
    def modify_for(red_node):
        # if for range contains call to 'range' -> skip
        with suppress(Exception):
            if red_node.target('call')[0].previous.value == 'range':
                return red_node

        frange = red_node.target
        ite = red_node.iterator

        red_node(ite.__class__.__name__, value=ite.value) \
            .map(lambda x: x.replace(f'{frange}[_i_]'))

        red_node.iterator = '_i_'
        return red_node

    fors = red_node.find_all('for')
    for x in fors:
        modify_for(x)

    return red_node
项目:pyha    作者:gasparka    | 项目源码 | 文件源码
def flush_pipeline(func):
    """ For inputs: adds 'x.get_delay()' dummy samples, to flush out pipeline values
    For outputs: removes the first 'x.get_delay()' samples, as these are initial pipeline values"""

    @wraps(func)
    def flush_pipeline_wrap(self, *args, **kwargs):
        delay = 0
        with suppress(AttributeError):  # no get_delay()
            delay = self.model._delay
        if delay == 0:
            return func(self, *args, **kwargs)

        args = list(args)

        for i in range(delay):
            args.append(args[0])

        ret = func(self, *args, **kwargs)
        ret = ret[delay:]
        return ret

    return flush_pipeline_wrap
项目:django-rest-witchcraft    作者:shosca    | 项目源码 | 文件源码
def field_kwargs(self):
        kwargs = {'label': capfirst(' '.join(self.property.key.split('_'))), 'help_text': self.property.doc}

        with suppress(AttributeError):
            enum_class = self.column.type.enum_class
            if enum_class is not None:
                kwargs['enum_class'] = self.column.type.enum_class
            else:
                kwargs['choices'] = self.column.type.enums

        with suppress(AttributeError):
            kwargs['max_digits'] = self.column.type.precision

        with suppress(AttributeError):
            kwargs['decimal_places'] = self.column.type.scale

        with suppress(AttributeError):
            kwargs['max_length'] = self.column.type.length

        kwargs['required'] = not self.column.nullable
        kwargs['allow_null'] = self.column.nullable

        return kwargs
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def __exit__(self: 'FileChange', *exc) -> None:
        """Clean up after change -- both success and failure."""

        if any(exc):  # Failure -- rollback
            if self.__valid_destination:
                if self.__file_change:
                    # Ignore missing file on deletion
                    with suppress(FileNotFoundError):
                        self.new_path.unlink()

            if self.__valid_source:
                if self.__file_change:
                    self.tmp_path.rename(self.old_path)
                if self.__store_change:
                    self.source[self.old_file.mod.id] = self.old_file

        else:  # Success -- change metadata
            if self.__valid_destination:
                self.destination[self.new_file.mod.id] = self.new_file

            # Clean temporary file
            if self.__valid_source and self.__file_change:
                    self.tmp_path.unlink()
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def save(self, **kwargs):
        new = self.pk is None
        if not new and (self.was_processed or not self.processed):
            raise TypeError

        super().save(**kwargs)

        with suppress(FileExistsError):
            os.mkdir(os.path.dirname(self._changed_geometries_filename()))

        from c3nav.mapdata.utils.cache.changes import changed_geometries
        pickle.dump(changed_geometries, open(self._changed_geometries_filename(), 'wb'))

        if new:
            transaction.on_commit(
                lambda: cache.set('mapdata:last_update', self.to_tuple, 300)
            )
            if settings.HAS_CELERY:
                transaction.on_commit(
                    lambda: process_map_updates.delay()
                )
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def clean_cut_polygon(polygon: Polygon) -> Polygon:
    interiors = []
    interiors.extend(cut_ring(polygon.exterior))
    exteriors = [(i, ring) for (i, ring) in enumerate(interiors) if ring.is_ccw]

    with suppress(AttributeError):
        delattr(polygon, 'c3nav_cache')

    if len(exteriors) != 1:
        raise ValueError('Invalid cut polygon!')
    exterior = interiors[exteriors[0][0]]
    interiors.pop(exteriors[0][0])

    for ring in polygon.interiors:
        interiors.extend(cut_ring(ring))

    return Polygon(exterior, interiors)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def shutdown(self, timeout=15.0):
        """Worker process is about to exit, we need cleanup everything and
        stop accepting requests. It is especially important for keep-alive
        connections."""
        if self._request_handler is None:
            return
        self._closing = True

        if self._request_count > 1 and not self._reading_request:
            # force-close idle keep-alive connections
            self._request_handler.cancel()
        elif timeout:
            canceller = self._loop.call_later(timeout,
                                              self._request_handler.cancel)
            with suppress(asyncio.CancelledError):
                yield from self._request_handler
            canceller.cancel()
        else:
            self._request_handler.cancel()
项目:steemdata-mongo    作者:SteemData    | 项目源码 | 文件源码
def upsert_comment_chain(mongo, identifier, recursive=False):
    """ Upsert given comments and its parent(s).

    Args:
        mongo: mongodb instance
        identifier: Post identifier
        recursive: (Defaults to False). If True, recursively update all parent comments, incl. root post.
    """
    with suppress(PostDoesNotExist):
        p = Post(identifier)
        if p.is_comment():
            mongo.Comments.update({'identifier': p.identifier}, p.export(), upsert=True)
            parent_identifier = '@%s/%s' % (p.parent_author, p.parent_permlink)
            if recursive:
                upsert_comment_chain(mongo, parent_identifier, recursive)
            else:
                upsert_comment(mongo, parent_identifier)
        else:
            return mongo.Posts.update({'identifier': p.identifier}, p.export(), upsert=True)
项目:retr    作者:aikipooh    | 项目源码 | 文件源码
def load(self, arg):
        '''Always load all proxies, we'll be able to prune and rearrange later
    '''

        self.master_plist=OrderedDict()
        if isinstance(arg, str): # File name    
            lg.info('Loading {}'.format(arg))
            c=Counter()
            with suppress(FileNotFoundError), open(arg) as f:
                rd=reader(f, delimiter='\t')
                for row in rd: # (proxy, status)
                    status=row[1] if len(row) > 1 else ''
                    c.update([status])
                    p=proxy(row[0], status)
                    self.master_plist[p.p]=p                
            lg.info('Loaded {} {}'.format(arg, c))
        else: # Iterable of proxies
            for i in arg:
                p=proxy(i, 'G') # Mark as good
                self.master_plist[p.p]=p
            lg.info('Loaded {} proxies from iterable'.format(len(self.master_plist)))
项目:raptiformica    作者:vdloo    | 项目源码 | 文件源码
def ensure_latest_consul_release(host=None, port=22):
    """
    Make sure the latest consul release is downloaded on the remote machine
    :param str host: hostname or ip of the remote machine, or None for the local machine
    :param int port: port to use to connect to the remote machine over ssh
    :return None:
    """
    log.info("Ensuring consul release {} is on disk "
             "on the remote machine".format(CONSUL_RELEASE.split('/')[-1]))
    with suppress(FileNotFoundError):
        # remove any previously existing zip in case we tried
        # before but the zip was corrupted
        remove(CONSUL_RELEASE.split('/')[-1])
    wget(
        CONSUL_RELEASE, host=host, port=port,
        failure_message="Failed to retrieve {}".format(
            CONSUL_RELEASE.split('/')[-1]
        )
    )
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def run_app(app, port, loop):
    handler = app.make_handler(access_log=None, loop=loop)

    loop.run_until_complete(app.startup())
    server = loop.run_until_complete(loop.create_server(handler, HOST, port))

    try:
        loop.run_forever()
    except KeyboardInterrupt:  # pragma: no branch
        pass
    finally:
        logger.info('shutting down server...')
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.run_until_complete(app.shutdown())
        with contextlib.suppress(asyncio.TimeoutError, KeyboardInterrupt):
            loop.run_until_complete(handler.shutdown(0.1))
        with contextlib.suppress(asyncio.TimeoutError, KeyboardInterrupt):
            loop.run_until_complete(app.cleanup())
        with contextlib.suppress(KeyboardInterrupt):
            loop.close()
项目:asynccmd    作者:valentinmk    | 项目源码 | 文件源码
def loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(None)
    executor = concurrent.futures.ThreadPoolExecutor()
    loop.set_default_executor(executor)
    yield loop

    if loop.is_closed:
        pending = asyncio.Task.all_tasks(loop=loop)
        for task in pending:
            task.cancel()
            # Now we should await task to execute it's cancellation.
            # Cancelled task raises asyncio.CancelledError that we can suppress
            with suppress(asyncio.CancelledError):
                loop.run_until_complete(task)
        executor.shutdown()
        loop.close()

    gc.collect()
    asyncio.set_event_loop(None)
项目:transmission-remote-gnome    作者:TingPing    | 项目源码 | 文件源码
def apply(self):
        """Creates or deletes the autostart file based upon current state"""
        if not self.props.sensitive: # We never got the current state
            return

        if self.props.active and not self._was_enabled:
            logging.info('Creating autostart file')
            source = Gio.File.new_for_uri('resource:///se/tingping/Trg/se.tingping.Trg.service.desktop')
            if hasattr(source, 'copy_async'):
                # TODO: Fix upstream in GLib
                source.copy_async(self.autostart_file, Gio.FileCopyFlags.NONE, GLib.PRIORITY_DEFAULT)
            else:
                with suppress(GLib.Error):
                    source.copy(self.autostart_file, Gio.FileCopyFlags.NONE)
        elif not self.props.active and self._was_enabled:
            logging.info('Deleting autostart file')
            self.autostart_file.delete_async(GLib.PRIORITY_DEFAULT)
项目:difftrack    作者:qntln    | 项目源码 | 文件源码
def data_mapper(mapper: Callable[[types.DataType], types.DataType]) -> Callable[[types.ListenerType], types.ListenerType]:
    '''
    Apply a mapper to the listener's `data` argument.
    '''

    def decorator(listener: types.ListenerType) -> types.ListenerType:

        @functools.wraps(listener)
        def wrapped(dtype: types.DiffType, index: int, data: types.DataType) -> None:
            listener(dtype, index, mapper(data))

        with contextlib.suppress(AttributeError):
            wrapped.finalize_batch = listener.finalize_batch

        return wrapped
    return decorator
项目:AirbnbReviewAnalyzer    作者:mrsata    | 项目源码 | 文件源码
def insert_reviews(neighborhood, db):
    listings_cursor = db['listings_' +
                         neighborhood].find({"reviews_count": {"$gt": 0}})
    listings = [listing for listing in listings_cursor]
    collection = db['reviews_' + neighborhood]
    for listing in listings:
        reviews_cursor = db.reviews_en.find({'listing_id':listing['_id']})
        reviews = [review for review in reviews_cursor]
        if reviews:
            with suppress(Exception):
                collection.insert_many(reviews, ordered=True)
            # try:
            #     collection.insert_many(reviews, ordered=True)
            # except:
            #     print(reviews)
            #     raise
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def remove(self, node):
        self._nodes.remove(node)

        for n in iter(self._nodes):
            with contextlib.suppress(KeyError):
                n.edges.remove(node)
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def _get_name_from_func(func, other):
    if not isinstance(func, types.LambdaType):
        with contextlib.suppress(AttributeError):
            return func.__qualname__

        with contextlib.suppress(AttributeError):
            return func.__name__

    return other
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def resolve_self_references(self, rules):
        """Resolves `$self` references to actual application name in security group rules."""
        with suppress(KeyError):
            rule = rules.pop('$self')
            rules[self.app_name] = rule
        return rules
项目:xarray-simlab    作者:benbovy    | 项目源码 | 文件源码
def __getattr__(self, name):
        if name != '__setstate__':
            # this avoids an infinite loop when pickle looks for the
            # __setstate__ attribute before the object is initialized
            with suppress(KeyError):
                return self._mapping[name]
        raise AttributeError("%r object has no attribute %r" %
                             (type(self).__name__, name))
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def remove_project(self):
        self.close_project()
        with contextlib.suppress(FileNotFoundError):
            self.project_path.unlink()
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def cleanup(ctx):
    pv = Povray(ctx.config.povray.parent_dir)
    with contextlib.suppress(FileNotFoundError):
        pv.shelve_db_path.unlink()
        pv.scitools_udb_path.unlink()
项目:byro    作者:byro    | 项目源码 | 文件源码
def byro_information(request):
    ctx = {'config': Configuration.get_solo()}

    try:
        ctx['url_name'] = resolve(request.path_info).url_name
    except Http404:
        ctx['url_name'] = ''

    if settings.DEBUG:
        ctx['development_warning'] = True
        with suppress(Exception):
            import subprocess
            ctx['byro_version'] = subprocess.check_output(['git', 'describe', '--always'])

    return ctx
项目:caves    作者:mikaelho    | 项目源码 | 文件源码
def delete(self, sender):
    if self.control.filename != '':
      (name, _) = os.path.splitext(os.path.basename(self.control.filename))
      try:
        console.alert('Delete', name, button1='Ok')
      except KeyboardInterrupt:
        return
      os.remove(self.control.filename)
      with contextlib.suppress(FileNotFoundError):
        os.remove(self.control.filename[:-4]+'.json')
      self.control.set_default_map()
    else:
      self.control.quit_map(sender)
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def match(v1, v2, nomatch=-1, incomparables=None, start=0):
    """
    Return a vector of the positions of (first)
    matches of its first argument in its second.

    Parameters
    ----------
    v1: array-like
        the values to be matched

    v2: array-like
        the values to be matched against

    nomatch: int
        the value to be returned in the case when
        no match is found.

    incomparables: array-like
        a list of values that cannot be matched.
        Any value in v1 matching a value in this list
        is assigned the nomatch value.
    start: int
        type of indexing to use. Most likely 0 or 1
    """
    lookup = {}
    for i, x in enumerate(v2):
        if x not in lookup:
            lookup[x] = i

    lst = [nomatch] * len(v1)
    skip = set(incomparables) if incomparables else set()
    for i, x in enumerate(v1):
        if x in skip:
            continue

        with suppress(KeyError):
            lst[i] = lookup[x] + start

    return lst
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def suppress(*exceptions):
        """
        Return a context manager that suppresses any of the
        specified exceptions if they occur in the body of a
        with statement and then resumes execution with the
        first statement following the end of the with statement.
        """
        try:
            yield
        except exceptions:
            pass
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def get_valid_kwargs(func, potential_kwargs):
    """
    Return valid kwargs to function func
    """
    kwargs = {}
    for name in get_kwarg_names(func):
        with suppress(KeyError):
            kwargs[name] = potential_kwargs[name]
    return kwargs
项目:saffron    作者:Lamden    | 项目源码 | 文件源码
def init_dbs(sqls):
    # graceful initialization tries to create new tables as a test to see if this is a new DB or not
    for s in sqls:
        with suppress(sqlite3.OperationalError):
            cursor.execute(s)
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def __setitem__(self, key, val):
        """Allow using [] syntax to save a keyvalue.

        - It is case-insensitive, so it will overwrite a key which only
          differs by case.
        """
        if isinstance(val, bool):
            val = '1' if val else '0'
        key_fold = key.casefold()
        for k in self.keys:
            if k.casefold() == key_fold:
                # Check case-insensitively for this key first
                orig_val = self.keys.get(k)
                self.keys[k] = str(val)
                break
        else:
            orig_val = self.keys.get(key)
            self.keys[key] = str(val)

        # Update the by_class/target dicts with our new value
        if key_fold == 'classname':
            with suppress(KeyError):
                self.map.by_class[orig_val].remove(self)
            self.map.by_class[val].add(self)
        elif key_fold == 'targetname':
            with suppress(KeyError):
                self.map.by_target[orig_val].remove(self)
            self.map.by_target[val].add(self)
项目:Clean-code-in-Python    作者:rmariano    | 项目源码 | 文件源码
def ignore_exceptions_2():
    data = None
    import contextlib
    with contextlib.suppress(FileNotFoundError, PermissionError):
        with open(DATA_FILE) as f:
            data = f.read()
    return data