Python typing 模块,Optional() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Optional()

项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _send_receive(self, nummsgs: int, outformat: str='json',
                      dataupdate: Optional[Dict[AnyStr, Any]]=None,
                      restart_data: bool=True) -> List[Response]:
        if restart_data:
            self._restart_data(outformat)

        if dataupdate:
            self.data.update(dataupdate)

        self._add_to_buffer(nummsgs, outformat)
        self.sendbuffer.seek(0)

        processor, _ = get_processor_instance(
                outformat,
                custom_outbuffer=self.recvbuffer,
                custom_inbuffer=self.sendbuffer
        )
        processor.process_requests(self.sendbuffer)
        return self._loadResults(outformat)
项目:qqmbr    作者:ischurov    | 项目源码 | 文件源码
def format(self, content: Optional[QqTag],
               blanks_to_pars=True,
               keep_end_pars=True) -> str:
        """
        :param content: could be QqTag or any iterable of QqTags
        :param blanks_to_pars: use blanks_to_pars (True or False)
        :param keep_end_pars: keep end paragraphs
        :return: str: text of tag
        """
        if content is None:
            return ""

        out = []

        for child in content:
            if isinstance(child, str):
                if blanks_to_pars:
                    out.append(self.blanks_to_pars(html_escape(
                        child, keep_end_pars)))
                else:
                    out.append(html_escape(child))
            else:
                out.append(self.handle(child))
        return "".join(out)
项目:qqmbr    作者:ischurov    | 项目源码 | 文件源码
def get_counter_for_tag(self, tag: QqTag) -> Optional[Counter]:
        name = tag.name
        counters = self.counters
        while True:
            if tag.exists('nonumber'):
                return None
            current = counters.get(name)
            if current is None:
                return None
            if isinstance(current, Counter):
                return current
            if isinstance(current, dict):
                counters = current
                tag = tag.parent
                name = tag.name
                continue
            return None
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, client: Session, mouse: Mouse, touchscreen: Touchscreen
                 ) -> None:
        """Make new frame manager."""
        super().__init__()
        self._client = client
        self._mouse = mouse
        self._touchscreen = touchscreen
        self._frames: Dict[str, Frame] = dict()
        self._mainFrame: Optional[Frame] = None

        client.on('Page.frameAttached',
                  lambda event: self._onFrameAttached(
                      event.get('frameId', ''), event.get('parentFrameId', ''))
                  )
        client.on('Page.frameNavigated',
                  lambda event: self._onFrameNavigated(event.get('frame')))
        client.on('Page.frameDetached',
                  lambda event: self._onFrameDetached(event.get('frameId')))
        client.on('Runtime.executionContextCreated',
                  lambda event: self._onExecutionContextCreated(
                      event.get('context')))
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
                 options: dict = None, **kwargs: Any) -> None:
        """Make new navigator watcher."""
        if options is None:
            options = {}
        options.update(kwargs)
        self._client = client
        self._ignoreHTTPSErrors = ignoreHTTPSErrors
        self._timeout = options.get('timeout', 3000)
        self._idleTime = options.get('networkIdleTimeout', 1000)
        self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
        self._idleInflight = options.get('networkIdleInflight', 2)
        self._waitUntil = options.get('waitUntil', 'load')
        if self._waitUntil not in ('load', 'networkidle'):
            raise ValueError(
                f'Unknown value for options.waitUntil: {self._waitUntil}')
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def waitForNavigation(self, options: dict = None, **kwargs: Any
                                ) -> Optional[Response]:
        """Wait navigation completes."""
        if options is None:
            options = dict()
        options.update(kwargs)
        watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
                                   options)
        responses: Dict[str, Response] = dict()
        listener = helper.addEventListener(
            self._networkManager,
            NetworkManager.Events.Response,
            lambda response: responses.__setitem__(response.url, response)
        )
        await watcher.waitForNavigation()
        helper.removeEventListeners([listener])
        return responses.get(self.url)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def __init__(self, exe, cache=None):
        # type: (str, Optional[Cache]) -> None
        if not os.path.isabs(exe):
            exe = which(exe)  # type: ignore
        self.exe = unifilename(exe)
        self.cache = cache
        self._styledefinition = styledef_make()
        self.allow_encoding_change = False
        self.languages = []  # type: List[str]

        self.initial_style = style_make()
        # The are deleted after one call to minimize_errors
        self.globaltempfiles = set()  # type: Set[str]
        # These are deleted after each round of attempts
        self.tempfiles = set()  # type: Set[str]
        self.keeptempfiles = False
        self.version_string = formatter_version(exe)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(),
                           'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename))
        try:
            dirpath = os.path.dirname(cfg)
            os.makedirs(dirpath)
            self.add_tempfile(dirpath)
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['--config-path', cfg]
        return cmdargs
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def __init__(self, database, tabledesc=None, timestamp=False):
        # type: (str, Optional[Dict[str, str]], bool) -> None
        self.database = database
        if tabledesc is None:
            tabledesc = self.prefixdesc()
        self.tabledesc = tabledesc
        timestampsql = self._sqtimestamp if timestamp else ''
        sqcreate = self._sqcreate % timestampsql
        self.kv_create = sqcreate.format(**tabledesc)
        self.kv_get = self._sqget.format(**tabledesc)
        self.kv_mget = self._sqmget.format(**tabledesc)
        self.kv_put = self._sqput.format(**tabledesc)
        self.kv_delete = self._sqdelete.format(**tabledesc)
        self._connection = None  # type: Optional[sqlite3.Connection]
        self.sqlite_limit_variable_number = 999
        self.support_mget = True
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        rows = []
        if self.support_mget:
            try:
                with self.conn as conn:
                    for somekeys in grouper(self.sqlite_limit_variable_number, keys):
                        keylist = list(somekeys)
                        questionmarks = ','.join(['?'] * len(keylist))
                        sql = self.kv_mget % questionmarks
                        for row in conn.execute(sql, keylist):
                            rows.append(row)
                resultdict = dict(rows)  # type: Dict[str, bytes]
                rget = resultdict.get
                return [rget(k) for k in keys]
            except sqlite3.OperationalError:
                self.support_mget = False
        return [self.__get(k) for k in keys]
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        if not keys:
            return []
        cached = []
        uncached = []  # type: List[Tuple[int, Optional[bytes]]]
        contentkeys = super(DedupKeyValueStore, self).mget(keys)
        for idx, contentkey in enumerate(contentkeys):
            if contentkey is None:
                uncached.append((idx, None))
            else:
                sha = binary_type(contentkey)
                cached.append((idx, unistr(sha)))
        if not cached:
            return [None for _, contentkey in uncached]
        indices, existing_keys = zip(*cached)
        existing_values = self.kvstore.mget(existing_keys)
        idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
        return list([value for _, value in idx_value_pairs])
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mixtohash(self,
                  args=(),      # type: Sequence[AnyStr]
                  exe=None,     # type: Optional[str]
                  depfiles=(),  # type: Sequence[str]
                  hashobj=None  # type: Optional[Any]
                  ):
        # type: (...) -> Any
        if hashobj is None:
            hashobj = HASHFUNC()
        for filename in depfiles:
            hashobj.update(sysfilename(filename))
            hashobj.update(filesha(filename))
            hashobj.update(b'\x00')
        for arg in args:
            hashobj.update(sysfilename(arg))
            hashobj.update(b'\x00')
        if exe is not None:
            hashobj.update(self.digest_for_exe(exe))
        return hashobj
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def apply(func,         # type: Callable[..., bytes]
              args=(),      # type: Sequence[AnyStr]
              exe=None,     # type: Optional[str]
              depfiles=(),  # type: Sequence[str]
              cache=None    # type: Optional[Cache]
              ):
        """Applies func(*args) when the result is not present in the cache.
        The result of func(*args) must be bytes and must not be None which is used as
        cache-miss indicator. After evaluation of func the result is stored in the cache.
        """
        key, value = None, None
        if cache is not None:
            hashobj = cache.mixtohash(args, exe=exe, depfiles=depfiles)
            key = hashobj.hexdigest()
            value = cache.get(key)
        if value is None:
            value = func(*args)
            if key is not None:
                cache.set(key, value)
        return value
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unified_diff(filename, content2=None):
    # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
    """This function prints a unified diff of the contents of
    filename and the standard input, when used from the command line
    as follows:
        echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
    We get this result:
    ---
    +++
    @@ -1 +1 @@
    -123
    +456
    """
    use_stdin = content2 is None
    if content2 is None:
        # Read binary input stream
        stdin = rawstream(sys.stdin)
        econtent2 = bytestr(stdin.read())
    else:
        econtent2 = content2
    exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
    if use_stdin:
        write('\n'.join(diff))
    return exit_code, diff
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 numPlayers: int,
                 variant: Variant,
                 spectators: bool,
                 gameName: str,
                 *args,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.numPlayers: int = numPlayers
        self.variant: Variant = variant
        self.spectators: bool = spectators
        self.gameName: str = gameName
        self.conn: socketIO_client.SocketIO
        self.tablePlayers: List[str] = []
        self.readyToStart: bool = False
        self.game: Optional[Game] = None
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def isNowPlayable(self,
                      color: Optional[Color],
                      value: Optional[Value]) -> bool:
        '''Returns True the color and/or value is playable'''
        assert color is not None or value is not None
        if color is not None and value is not None:
            return self.isPlayable(color, value)
        if color is not None:
            playableValue = len(self.game.playedCards[color]) + 1
            if (playableValue <= 5
                    and not self.locatedCount[color][playableValue]):
                return True
            return False
        if value is not None:
            for c in self.colors:
                if (len(self.game.playedCards[c]) + 1 == value
                        and not self.locatedCount[c][value]):
                    return True
            return False
        assert False
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def maybeGiveEarlyGameHint(self, highValue: bool) -> bool:
        bestHint: Hint = Hint()
        i: int
        for i in range(1, self.game.numPlayers):
            player: int = (self.position + i) % self.game.numPlayers
            candidate: Optional[Hint]
            candidate = self.bestEarlyHintForPlayer(player, highValue)
            if candidate is not None and candidate.fitness >= 0:
                handState: List[HandState] = self.handState(player)
                if HandState.Playable not in handState:
                    candidate.fitness += (self.game.numPlayers - i) * 2
            if candidate is not None and candidate.fitness > bestHint.fitness:
                bestHint = candidate
        if bestHint.fitness <= 0:
            return False

        bestHint.give(self)
        return True
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def deck_draw(self,
                  player: int,
                  deckidx: int,
                  color: Optional[Color],
                  value: Optional[Value]) -> None:
        if player == self.botPosition:
            card = self.bot.create_own_card(deckidx)
            self.deck[deckidx] = card
            self.bot.drew_card(deckidx)
        else:
            color = Suit(color).color(self.variant)
            value = Rank(value).value_()
            card = self.bot.create_player_card(player, deckidx, color, value)
            self.deck[deckidx] = card
            self.players[player].drew_card(deckidx)
            self.bot.someone_drew(player, deckidx)
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def register_command(handler: Callable[[argparse.Namespace], None],
                     main_parser: Optional[ArgParserType]=None,
                    ) -> Callable[[argparse.Namespace], None]:
    if main_parser is None:
        main_parser = global_argparser
    if id(main_parser) not in _subparsers:
        subparsers = main_parser.add_subparsers(title='commands',
                                                dest='command')
        _subparsers[id(main_parser)] = subparsers
    else:
        subparsers = _subparsers[id(main_parser)]

    @functools.wraps(handler)
    def wrapped(args):
        handler(args)

    doc_summary = handler.__doc__.split('\n\n')[0]
    inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
                                         description=handler.__doc__,
                                         help=doc_summary)
    inner_parser.set_defaults(function=wrapped)
    wrapped.register_command = functools.partial(register_command,
                                                 main_parser=inner_parser)
    wrapped.add_argument = inner_parser.add_argument
    return wrapped
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def _list(cls, user_id: int,
              is_active: Optional[bool]=None,
              fields: Optional[Iterable[str]]=None):
        if fields is None:
            fields = (
                'access_key', 'secret_key',
                'is_active', 'is_admin',
            )
        q = 'query($user_id: Int!, $is_active: Boolean) {' \
            '  keypairs(user_id: $user_id, is_active: $is_active) {' \
            '    $fields' \
            '  }' \
            '}'
        q = q.replace('$fields', ' '.join(fields))
        vars = {
            'user_id': user_id,
            'is_active': is_active,
        }
        resp = yield Request('POST', '/admin/graphql', {
            'query': q,
            'variables': vars,
        })
        data = resp.json()
        return data['keypairs']
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def _get_or_create(cls, lang: str,
                       client_token: Optional[str]=None,
                       mounts: Optional[Iterable[str]]=None,
                       envs: Optional[Mapping[str, str]]=None,
                       max_mem: int=0, exec_timeout: int=0) -> str:
        if client_token:
            assert len(client_token) > 8
        else:
            client_token = uuid.uuid4().hex
        resp = yield Request('POST', '/kernel/create', {
            'lang': lang,
            'clientSessionToken': client_token,
            'config': {
                'mounts': mounts,
                'envs': envs,
            },
            # 'limits': {
            #     'maxMem': max_mem,
            #     'execTimeout': exec_timeout,
            # },
        })
        data = resp.json()
        o = cls(data['kernelId'])  # type: ignore
        o.created = data.get('created', True)  # True is for legacy
        return o
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def __init__(self, endpoint: Optional[str]=None,
                 version: str='v2.20170315',
                 user_agent: Optional[str]=None,
                 access_key: Optional[str]=None,
                 secret_key: Optional[str]=None,
                 hash_type: Optional[str]=None) -> None:
        self._endpoint = \
            endpoint if endpoint else get_env('ENDPOINT', self.DEFAULTS['endpoint'])
        self._version = \
            version if version else self.DEFAULTS['version']
        self._user_agent = \
            user_agent if user_agent else self.DEFAULTS['user_agent']
        self._access_key = \
            access_key if access_key else get_env('ACCESS_KEY')
        self._secret_key = \
            secret_key if secret_key else get_env('SECRET_KEY')
        self._hash_type = \
            hash_type.lower() if hash_type else 'sha256'
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def indices_to_load_by_target_index(allowed_characters_for_loaded_model: List[chr],
                                        allowed_characters: List[chr]) -> List[Optional[int]]:

        load_character_set = set(allowed_characters_for_loaded_model)
        target_character_set = set(allowed_characters)

        ignored = load_character_set - target_character_set
        if ignored:
            log("Ignoring characters {} from loaded model.".format(sorted(ignored)))

        extra = target_character_set - load_character_set
        if extra:
            log("Initializing extra characters {} not found in model.".format(sorted(extra)))

        def character_index_to_load(target_character: chr) -> Optional[int]:
            return single_or_none([index for index, character in enumerate(allowed_characters_for_loaded_model) if
                                   character == target_character])

        character_mapping = [character_index_to_load(character) for character in allowed_characters]

        log("Character mapping: {}".format(character_mapping))

        return character_mapping
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def __init__(self,
                 get_raw_audio: Callable[[], ndarray],
                 sample_rate: int = 16000,
                 id: Optional[str] = None,
                 label: Optional[str] = "nolabel",
                 fourier_window_length: int = 512,
                 hop_length: int = 128,
                 mel_frequency_count: int = 128,
                 label_with_tags: str = None,
                 positional_label: Optional[PositionalLabel] = None):
        super().__init__(id=id, label=label)

        # The default values for hop_length and fourier_window_length are powers of 2 near the values specified in the wave2letter paper.
        self.get_raw_audio = get_raw_audio
        self.sample_rate = sample_rate
        self.fourier_window_length = fourier_window_length
        self.hop_length = hop_length
        self.mel_frequency_count = mel_frequency_count
        self.label_with_tags = label_with_tags
        self.positional_label = positional_label
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def load(corpus_csv_file: Path,
             sampled_training_example_count: Optional[int] = None) -> 'Corpus':
        import csv
        with corpus_csv_file.open(encoding='utf8') as opened_csv:
            reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            def to_absolute(audio_file_path: Path) -> Path:
                return audio_file_path if audio_file_path.is_absolute() else Path(
                    corpus_csv_file.parent) / audio_file_path

            examples = [
                (
                    LabeledExampleFromFile(
                        audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
                        positional_label=None if positional_label == "" else PositionalLabel.deserialize(
                            positional_label)), Phase[phase])
                for id, audio_file_path, label, phase, positional_label in reader]

            return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
                          test_examples=[e for e, phase in examples if phase == Phase.test],
                          sampled_training_example_count=sampled_training_example_count)
项目:BlueWhale    作者:caffe2    | 项目源码 | 文件源码
def get_max_q_values(
        self,
        next_states: np.ndarray,
        possible_next_actions: Optional[np.ndarray] = None,
        use_target_network: Optional[bool] = True
    ) -> np.ndarray:
        q_values = self.get_q_values_all_actions(
            next_states, use_target_network
        )

        if possible_next_actions is not None:
            mask = np.multiply(
                np.logical_not(possible_next_actions),
                self.ACTION_NOT_POSSIBLE_VAL
            )
            q_values += mask

        return np.max(q_values, axis=1, keepdims=True)
项目:BlueWhale    作者:caffe2    | 项目源码 | 文件源码
def get_q_values_all_actions(
        self, states: np.ndarray, use_target_network: Optional[bool] = True
    ) -> np.ndarray:
        """
        Takes in a set of states and runs the test Q Network on them.

        Creates Q(states, actions), a blob with shape (batch_size, action_dim).
        Q(states, actions)[i][j] is an approximation of Q*(states[i], action_j).
        Note that action_j takes on every possible action (of which there are
        self.action_dim_. Stores blob in self.output_blob and returns its value.

        :param states: Numpy array with shape (batch_size, state_dim). Each row
            contains a representation of a state.
        :param possible_next_actions: Numpy array with shape (batch_size, action_dim).
            possible_next_actions[i][j] = 1 iff the agent can take action j from
            state i.
        :use_target_network: Boolean that indicates whether or not to use this
            trainer's TargetNetwork to compute Q values.
        """
        if use_target_network:
            return self.target_network.target_values(states)
        return self.score(states)
项目:BlueWhale    作者:caffe2    | 项目源码 | 文件源码
def preprocess_samples(
        self,
        states: List[Dict[str, float]],
        actions: List[str],
        rewards: List[float],
        next_states: List[Dict[str, float]],
        next_actions: List[str],
        is_terminals: List[bool],
        possible_next_actions: List[List[str]],
        reward_timelines: Optional[List[Dict[int, float]]],
    ) -> TrainingDataPage:
        tdp = self.preprocess_samples_discrete(
            states, actions, rewards, next_states, next_actions, is_terminals,
            possible_next_actions, reward_timelines
        )
        tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1
                                                           ).astype(np.float32)
        tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape(
            -1, 1
        ).astype(np.float32)
        return tdp
项目:python-netsgiro    作者:otovo    | 项目源码 | 文件源码
def add_assignment(
            self, *,
            service_code: netsgiro.ServiceCode,
            assignment_type: netsgiro.AssignmentType,
            agreement_id: Optional[str]=None,
            number: str,
            account: str,
            date: Optional[datetime.date]=None
            ) -> 'Assignment':
        """Add an assignment to the tranmission."""

        assignment = Assignment(
            service_code=service_code,
            type=assignment_type,
            agreement_id=agreement_id,
            number=number,
            account=account,
            date=date,
        )
        self.assignments.append(assignment)
        return assignment
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None,
               durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack':
        subtitles = []
        if locations:
            start_times, end_times = loc_util.start_end_locations_from_locations(locations)
        elif durations:
            start_times, end_times = loc_util.start_end_locations_from_intervals(durations)
        else:
            raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}")

        for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)):
            subtitle = Subtitle(str(text), start_time, end_time)
            subtitles.append(subtitle)

        subtitle_track = cls(subtitles, name, default=default)

        return subtitle_track
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def first(self, query: str, values: Union[List, Dict],
                    db_name: str = 'default') -> Optional[DictRow]:
        return await self._first(query=query, values=values, db_name=db_name)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def post_message(user: User, text: str, timestamp: Optional[Timestamp]=None) -> None:
    user = intern(user)
    timestamp = timestamp or time()
    post = Post(timestamp, user, text)
    posts.appendleft(post)
    user_posts[user].appendleft(post)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]:
    return list(islice(user_posts[user], limit))
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
    relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
    return list(islice(relevant, limit))
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def search(phrase:str, limit: Optional[int] = None) -> List[Post]:
    # XXX this could benefit from caching and from preindexing
    return list(islice((post for post in posts if phrase in post.text), limit))
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def hash_password(password: str, salt: Optional[bytes] = None) -> HashAndSalt:
    pepper = b'alchemists discovered that gold came from earth air fire and water'
    salt = salt or secrets.token_bytes(16)
    salted_pass = salt + password.encode('utf-8')
    return hashlib.pbkdf2_hmac('sha512', salted_pass, pepper, 100000), salt
项目:pylongecity    作者:cyrbon    | 项目源码 | 文件源码
def write_and_open_in_browser(html: str, filepath: Optional[str]):
    ''' Writes html to filepath and opens it. '''
    file = NamedTemporaryFile('w', encoding='utf-8', delete=False) if filepath is None else open(filepath, 'w', encoding='utf-8')
    file.write(html)
    file.close()
    webbrowser.open(file.name)
项目:pylongecity    作者:cyrbon    | 项目源码 | 文件源码
def render(posts: Iterable[Post], output_filepath: Optional[str] = None):
    ''' Puts html from posts into output_filepath (.html) and opens that in a specified browser '''
    posts = list(posts)
    if posts:
        write_and_open_in_browser(render_to_html(posts), output_filepath)
    else: print('Nothing found')
项目:qqmbr    作者:ischurov    | 项目源码 | 文件源码
def split_by_predicate(seq: Sequence[T],
                       predicate,
                       zero_delim: Optional[T]=None) \
        -> Iterator[Sequence[T]]:
    """
    Splits a sequence by delimeters that satisfy predicate,
    keeping the delimeters

    split_by_predicate([0, "One", 1, 2, 3,
                    "Two", 4, 5, 6, 7, "Three", "Four"],
                    predicate=lambda x: isinstance(x, str),
                    zero_delim="Nothing")

    [["Nothing", 0], ["One", 1, 2, 3], ["Two", 4, 5, 6, 7],
    ["Three"], ["Four"]]

    :param seq: sequence to proceed
    :param predicate: checks whether element is delimeter
    :param zero_delim: pseudo-delimter prepended to the sequence
    :return: sequence of sequences
    """
    g = [zero_delim]
    for el in seq:
        if predicate(el):
            yield g
            g = []
        g.append(el)
    yield g
# END BASED
项目:qqmbr    作者:ischurov    | 项目源码 | 文件源码
def spawn_or_create_counter(parent: Optional[Counter]):
    if parent is not None:
        return parent.spawn_child()
    else:
        return Counter()
项目:qqmbr    作者:ischurov    | 项目源码 | 文件源码
def url_for_eq_snippet(self, label: str) -> Optional[str]:
        """
        Returns url for equation snippet by label
        Should be implemented in subclass

        :param label:
        :return:
        """
        raise NotImplementedError
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def __init__(self, *,
                 warnings: Optional[bool]=None,
                 highlight: Optional[bool]=None,
                 frame_context_length: int=50):
        self._show_warnings = self._env_bool(warnings, 'PY_DEVTOOLS_WARNINGS', True)
        self._highlight = self._env_bool(highlight, 'PY_DEVTOOLS_HIGHLIGHT', None)
        # 50 lines should be enough to make sure we always get the entire function definition
        self._frame_context_length = frame_context_length
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def mainFrame(self) -> Optional['Frame']:
        """Retrun main frame."""
        return self._mainFrame
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, client: Session, mouse: Mouse, touchscreen: Touchscreen,
                 parentFrame: Optional['Frame'], frameId: str) -> None:
        """Make new frame."""
        self._client = client
        self._mouse = mouse
        self._touchscreen = touchscreen
        self._parentFrame = parentFrame
        self._url = ''
        self._detached = False
        self._id = frameId
        self._defaultContextId = '<not-initialized>'
        self._waitTasks: Set[WaitTask] = set()  # maybe list
        self._childFrames: Set[Frame] = set()  # maybe list
        if self._parentFrame:
            self._parentFrame._childFrames.add(self)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelector(self, selector: str) -> Optional['ElementHandle']:
        """Get element which matches `selector` string.

        If `selector` matches multiple elements, return first-matched element.
        """
        remoteObject = await self._rawEvaluate(
            'selector => document.querySelector(selector)', selector)
        if remoteObject.get('subtype') == 'node':
            return ElementHandle(self._client, remoteObject, self._mouse,
                                 self._touchscreen)
        await helper.releaseObject(self._client, remoteObject)
        return None
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelectorEval(self, selector: str, pageFunction: str,
                                *args: Any) -> Optional[Any]:
        """Execute function on element which matches selector."""
        elementHandle = await self.querySelector(selector)
        if elementHandle is None:
            raise PageError(
                f'Error: failed to find element matching selector "{selector}"'
            )
        result = await elementHandle.evaluate(pageFunction, *args)
        await elementHandle.dispose()
        return result
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def clearTimeout(fut: Optional[Union[asyncio.Future, asyncio.Handle]]) -> None:
    """Cancel timer task."""
    if fut:
        fut.cancel()
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def mainFrame(self) -> Optional['Frame']:
        """Get main frame."""
        return self._frameManager._mainFrame
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelector(self, selector: str) -> Optional['ElementHandle']:
        """Get Element which matches `selector`."""
        frame = self.mainFrame
        if not frame:
            raise PageError('no main frame.')
        return await frame.querySelector(selector)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelectorEval(self, selector: str, pageFunction: str,
                                *args: Any) -> Optional[Any]:
        """Execute function on element which matches selector."""
        frame = self.mainFrame
        if not frame:
            raise PageError('no main frame.')
        return await frame.querySelectorEval(selector, pageFunction, *args)