Python typing 模块,Any() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Any()

项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
                options: dict = None, **kwargs: Any) -> Awaitable:
        """Wait until `selectorOrFunctionOrTimeout`."""
        if options is None:
            options = dict()
        options.update(kwargs)
        if isinstance(selectorOrFunctionOrTimeout, (int, float)):
            fut: Awaitable[None] = asyncio.ensure_future(
                asyncio.sleep(selectorOrFunctionOrTimeout))
            return fut
        if not isinstance(selectorOrFunctionOrTimeout, str):
            fut = asyncio.get_event_loop().create_future()
            fut.set_exception(TypeError(
                'Unsupported target type: ' +
                str(type(selectorOrFunctionOrTimeout))
            ))
            return fut
        if ('=>' in selectorOrFunctionOrTimeout or
                selectorOrFunctionOrTimeout.strip().startswith('function')):
            return self.waitForFunction(selectorOrFunctionOrTimeout, options)
        return self.waitForSelector(selectorOrFunctionOrTimeout, options)
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _send_receive(self, nummsgs: int, outformat: str='json',
                      dataupdate: Optional[Dict[AnyStr, Any]]=None,
                      restart_data: bool=True) -> List[Response]:
        if restart_data:
            self._restart_data(outformat)

        if dataupdate:
            self.data.update(dataupdate)

        self._add_to_buffer(nummsgs, outformat)
        self.sendbuffer.seek(0)

        processor, _ = get_processor_instance(
                outformat,
                custom_outbuffer=self.recvbuffer,
                custom_inbuffer=self.sendbuffer
        )
        processor.process_requests(self.sendbuffer)
        return self._loadResults(outformat)
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None,
               custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]:
    """
    Get a processor instance. The class and buffers will be selected based on the
    python_driver.ProcessorConfigs dictionary. The input and output buffers can
    be overriden using the custom_inbuffer and custom_outbuffer parameters. This
    is mainly useful for unittesting.
    """
    conf = ProcessorConfigs.get(format_)
    if not conf:
        raise RequestInstantiationException('No RequestProcessor found for format %s' % format_)

    inbuffer  = custom_inbuffer if custom_inbuffer else conf['inbuffer']
    outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer']
    instance  = conf['class'](outbuffer) # type: ignore

    return instance, inbuffer
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def start(self, options: dict = None, **kwargs: Any) -> None:
        """Start."""
        options = options or dict()
        options.update(kwargs)
        categoriesArray = [
            '-*', 'devtools.timeline', 'v8.execute',
            'disabled-by-default-devtools.timeline',
            'disabled-by-default-devtools.timeline.frame', 'toplevel',
            'blink.console', 'blink.user_timing', 'latencyInfo',
            'disabled-by-default-devtools.timeline.stack',
            'disabled-by-default-v8.cpu_profiler',
        ]

        if 'screenshots' in options:
            categoriesArray.append('disabled-by-default-devtools.screenshot')

        self._path = options.get('path', '')
        self._recording = True
        await self._client.send('Tracing.start', {
            'transferMode': 'ReturnAsStream',
            'categories': ','.join(categoriesArray),
        })
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
                 options: dict = None, **kwargs: Any) -> None:
        """Make new navigator watcher."""
        if options is None:
            options = {}
        options.update(kwargs)
        self._client = client
        self._ignoreHTTPSErrors = ignoreHTTPSErrors
        self._timeout = options.get('timeout', 3000)
        self._idleTime = options.get('networkIdleTimeout', 1000)
        self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
        self._idleInflight = options.get('networkIdleInflight', 2)
        self._waitUntil = options.get('waitUntil', 'load')
        if self._waitUntil not in ('load', 'networkidle'):
            raise ValueError(
                f'Unknown value for options.waitUntil: {self._waitUntil}')
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def waitForNavigation(self, options: dict = None, **kwargs: Any
                                ) -> Optional[Response]:
        """Wait navigation completes."""
        if options is None:
            options = dict()
        options.update(kwargs)
        watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
                                   options)
        responses: Dict[str, Response] = dict()
        listener = helper.addEventListener(
            self._networkManager,
            NetworkManager.Events.Response,
            lambda response: responses.__setitem__(response.url, response)
        )
        await watcher.waitForNavigation()
        helper.removeEventListeners([listener])
        return responses.get(self.url)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def screenshot(self, options: dict = None, **kwargs: Any) -> bytes:
        """Take screen shot."""
        options = options or dict()
        options.update(kwargs)
        screenshotType = None
        if 'path' in options:
            mimeType, _ = mimetypes.guess_type(options['path'])
            if mimeType == 'image/png':
                screenshotType = 'png'
            elif mimeType == 'image/jpeg':
                screenshotType = 'jpeg'
            else:
                raise PageError('Unsupported screenshot '
                                f'mime type: {mimeType}')
        if 'type' in options:
            screenshotType = options['type']
        if not screenshotType:
            screenshotType = 'png'
        return await self._screenshotTask(screenshotType, options)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None:
        """Make new launcher."""
        self.options = options or dict()
        self.options.update(kwargs)
        self.chrome_args = DEFAULT_ARGS
        self._tmp_user_data_dir: Optional[str] = None
        self._parse_args()
        if 'headless' not in self.options or self.options.get('headless'):
            self.chrome_args = self.chrome_args + [
                '--headless',
                '--disable-gpu',
                '--hide-scrollbars',
                '--mute-audio',
            ]
        if 'executablePath' in self.options:
            self.exec = self.options['executablePath']
        else:
            if not check_chromium():
                download_chromium()
            self.exec = str(chromium_excutable())
        self.cmd = [self.exec] + self.chrome_args
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def move(self, x: int, y: int, options: dict = None, **kwargs: Any
                   ) -> None:
        """Move cursor."""
        options = options or dict()
        options.update(kwargs)
        fromX = self._x
        fromY = self._y
        self._x = x
        self._y = y
        steps = options.get('steps', 1)
        for i in range(1, steps + 1):
            x = round(fromX + (self._x - fromX) * (i / steps))
            y = round(fromY + (self._y - fromY) * (i / steps))
            await self._client.send('Input.dispatchMouseEvent', {
                'type': 'mouseMoved',
                'button': self._button,
                'x': x,
                'y': y,
                'modifiers': self._keyboard._modifiers,
            })
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def contains_all_options(optiongroup, parentstyle, matchvalues=False):
    # type: (Dict[Any, Any], Style, bool) -> bool
    """Returns true if all options in optiongroup are present in parentstyle.
    If matchvalues is True, the values in optiongroup must also match
    those in the parentstyle.
    """
    for optionname, value in optiongroup.items():
        if optionname not in parentstyle:
            return False
        if isinstance(value, dict):
            parent_suboptions = parentstyle[optionname]
            if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues):
                return False
        elif matchvalues:
            pvalue = parentstyle.get(optionname)
            if type(pvalue) != type(value):
                return False
            if pvalue != value:
                return False
    return True
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def compute_unified_diff(filename, content2, **kwargs):
    # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
    diff = ()  # type: Iterable[str]
    exit_code = ERROR
    kw = kwargs.copy()
    if 'n' not in kwargs:
        # zero context lines
        kw['n'] = 0
    try:
        content1 = get_cached_file(filename)
        if PY3:
            c1 = unistr(content1)
            c2 = unistr(content2)
        else:
            c1 = content1
            c2 = content2
        diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
        exit_code = OK
    finally:
        return exit_code, diff

# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def set_value(self, key, value):
        # type: (str, Any) -> None
        """Modify a value in the configuration.
        """
        self._ensure_have_load_only()

        fname, parser = self._get_parser_to_modify()

        if parser is not None:
            section, name = _disassemble_key(key)

            # Modify the parser and the configuration
            if not parser.has_section(section):
                parser.add_section(section)
            parser.set(section, name, value)

        self._config[self.load_only][key] = value
        self._mark_as_modified(fname, parser)
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def received(self, type: str, resp: Dict[str, Any]) -> None:
        if type == 'message':
            self.message_received(resp['text'])
        elif type == 'init':
            raise Exception()
        elif type == 'advanced':
            # Fast UI Animations
            pass
        elif type == 'connected':
            # Players connected to the game
            pass
        elif type == 'notify':
            self.handle_notify(resp)
        elif type == 'action':
            self.decide_action(resp['can_clue'], resp['can_discard'])
        else:
            print(type, resp)
            raise Exception()
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def __getitem__(self, item: str) -> Any:
        if self._query_values or item in self._values:
            return self._values.get(item)

        hyperparameter = self.configuration_space._hyperparameters[item]
        item_idx = self.configuration_space._hyperparameter_idx[item]

        if not np.isfinite(self._vector[item_idx]):
            raise KeyError()

        value = hyperparameter._transform(self._vector[item_idx])
        # Truncate the representation of the float to be of constant
        # length for a python version
        if isinstance(hyperparameter, FloatHyperparameter):
            value = float(repr(value))
        # TODO make everything faster, then it'll be possible to init all values
        # at the same time and use an OrderedDict instead of only a dict here to
        # support iterating that dict in the same order as the actual order of
        # hyperparameters
        self._values[item] = value
        return self._values[item]
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
        [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
        def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
            examples_by_directory = group(examples, key=key_from_example)
            directories = examples_by_directory.keys()

            # split must be the same every time:
            random.seed(42)
            keys = set(random.sample(directories, int(training_share * len(directories))))

            training_examples = [example for example in examples if key_from_example(example) in keys]
            test_examples = [example for example in examples if key_from_example(example) not in keys]

            return training_examples, test_examples

        return split
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]],
                 arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())):
        """Create a mapping Var -> L from each variable in Var to the corresponding element in L.

        :param variables: list of program variables
        :param lattices: dictionary from variable types to the corresponding lattice types
        :param arguments: dictionary from variable types to arguments of the corresponding lattices
        """
        super().__init__()
        self._variables = variables
        self._lattices = lattices
        self._arguments = arguments
        try:
            self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables}
        except KeyError as key:
            error = f"Missing lattice for variable type {repr(key.args[0])}!"
            raise ValueError(error)
项目:proxenos    作者:darvid    | 项目源码 | 文件源码
def hashex(method,    # type: HashMethod
           key,       # type: KeyType
           **options  # type: typing.Any
           ):
    # type: (...) -> int
    if isinstance(key, six.text_type):
        key = key.encode('utf-8')
    if method.name.lower() in hashlib.algorithms_guaranteed:
        return int(hashlib.new(method.name.lower(), key).hexdigest(), 16)
    elif method == HashMethod.MMH3_32:
        return hash_murmur3(key, size=32, **options)
    elif method == HashMethod.MMH3_64:
        return hash_murmur3(key, size=64, **options)
    elif method == HashMethod.MMH3_128:
        return hash_murmur3(key, size=128, **options)
    elif method == HashMethod.SIPHASH:
        return hash_siphash(key, **options)
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
        dispatcher = singledispatch(method)
        provides = set()

        def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
            call = dispatcher.dispatch(type)
            try:
                return call(self, query, context=context)
            except TypeError:
                raise DataSource.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
            provides.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._provides = provides
        update_wrapper(wrapper, method)
        return wrapper
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
        dispatcher = singledispatch(method)
        accepts = set()

        def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None:
            call = dispatcher.dispatch(type)
            try:
                return call(self, items, context=context)
            except TypeError:
                raise DataSink.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
            accepts.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._accepts = accepts
        update_wrapper(wrapper, method)
        return wrapper
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        to_type = None
        for target_type in target_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    to_type = target_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
        return best, to_type, best_cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
        # This is a little weird. We have to handle the case of can_have("x").and_("y") here.
        # Since the only type of Node that can return False rather than just raising a
        # QueryValidationError is a KeyNode that isn't required, or an OrNode whose children
        # are all KeyNodes that aren't required, we can't short circuit on a False value. If
        # we have can_have("x").and_("y"), and only some are True, we want raise a
        # QueryValidationError. If all are True or all are False, everything's fine.
        all_true = True
        all_possible_false = True
        for child in self.children:
            is_true = child.evaluate(query, context)
            all_true = all_true and is_true
            if child.falsifiable:
                all_possible_false = all_possible_false and not is_true

        if all_possible_false or all_true:
            return True
        raise BoundKeyExistenceError("Query must have all or none of the elements joined with \"and\" in a \"can_have\" statement!")
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
        dispatcher = singledispatch(method)
        transforms = {}

        def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T:
            call = dispatcher.dispatch(TypePair[value.__class__, target_type])
            try:
                return call(self, value, context=context)
            except TypeError:
                raise DataTransformer.unsupported(target_type, value)

        def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
            try:
                target_types = transforms[from_type]
            except KeyError:
                target_types = set()
                transforms[from_type] = target_types
            target_types.add(to_type)

            return dispatcher.register(TypePair[from_type, to_type])

        wrapper.register = register
        wrapper._transforms = transforms
        update_wrapper(wrapper, method)
        return wrapper
项目:grakn-python    作者:graknlabs    | 项目源码 | 文件源码
def execute(self, query: str, *, infer: bool = True, multi: bool = False) -> Any:
        """Execute a Graql query against the knowledge base

        :param query: the Graql query string to execute against the knowledge base
        :param infer: enable inference
        :param multi: treat this request as a list of queries
        :return: a list of query results

        :raises: GraknError, requests.exceptions.ConnectionError
        """
        response = self._post(query, infer=infer, multi=multi)

        if response.ok:
            return response.json()
        else:
            raise GraknError(response.json()['exception'])
项目:botodesu    作者:futursolo    | 项目源码 | 文件源码
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData:
    form_fata = aiohttp.FormData()

    for name, value in kwargs.items():
        if isinstance(value, BotoFairu):
            form_fata.add_field(
                name, value._content,
                content_type=value._content_type,
                filename=value._filename,
                content_transfer_encoding=value._content_transfer_encoding)

        elif isinstance(value, (list, dict)):
            form_fata.add_field(name, json.dumps(value))

        elif isinstance(value, (int, float, str, bool)):
            form_fata.add_field(name, str(value))

        else:
            raise TypeError("Unknown Type: {}".format(type(value)))

    return form_fata
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float,
                                        default: Any = _sentinel) -> Union[Dimensions, Any]:
    """
    Returns
    -------
    The largest dimensions after cropping each dimensions to reach desired aspect ratio
    """
    if not dimensions_list:
        if default is not _sentinel:
            return default
        raise ValueError(f"{dimensions_list} must not be empty.")

    largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio)
    for dimensions in dimensions_list[1:]:
        nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio)

        if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution:
            largest_dimensions = nearest_dimensions_to_aspect_ratio

    return largest_dimensions
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None,
               durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack':
        subtitles = []
        if locations:
            start_times, end_times = loc_util.start_end_locations_from_locations(locations)
        elif durations:
            start_times, end_times = loc_util.start_end_locations_from_intervals(durations)
        else:
            raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}")

        for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)):
            subtitle = Subtitle(str(text), start_time, end_time)
            subtitles.append(subtitle)

        subtitle_track = cls(subtitles, name, default=default)

        return subtitle_track
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
    ret = {
        'webhook-url': config.get('slack-webhook-url'),
        'tmpl-msg': config.get('slack-tmpl-msg'),
        'tmpl-duration': config.get('slack-tmpl-duration', fallback=''),
        'tmpl-url': config.get('slack-tmpl-url', fallback='')
    }  # type: Any
    if not ret['webhook-url'] or not ret['tmpl-msg']:
        log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS')
        ret = None
    else:
        log.debug('Valid slack notification settings found', 'NOTIFICATIONS')
        ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg'])
        if ret['tmpl-duration']:
            ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration'])
        if ret['tmpl-url']:
            ret['tmpl-url'] = jinja2.Template(ret['tmpl-url'])
    return ret
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
    """Parse clicksend sms settings.

    Should only be called from sms.parse_settings.
    """
    ret = {
        'provider': 'clicksend',
        'username': config.get('sms-clicksend-username'),
        'api-key': config.get('sms-clicksend-api-key'),
        'sender': config.get('sms-clicksend-sender'),
        'tmpl': config.get('sms-tmpl'),
    }  # type: Any
    if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']:
        log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS')
        ret = None
    else:
        log.debug('Valid SMS notification settings found', 'NOTIFICATIONS')
        ret['tmpl'] = jinja2.Template(ret['tmpl'])
    return ret
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def ctrlc_signal_handler(sgn: int, frame: Any) -> None:
    sys.exit(0)
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def __call__(self, input: Any, *styles, reset: bool=True, apply: bool=True):
        """
        Styles text with ANSI styles and returns the new string.

        By default the styling is cleared at the end of the string, this can be prevented with``reset=False``.

        Examples::

            print(sformat('Hello World!', sformat.green))
            print(sformat('ATTENTION!', sformat.bg_magenta))
            print(sformat('Some things', sformat.reverse, sformat.bold))

        :param input: the object to style with ansi codes.
        :param *styles: zero or more styles to apply to the text, should be either style instances or strings
                        matching style names.
        :param reset: if False the ansi reset code is not appended to the end of the string
        :param: apply: if False no ansi codes are applied
        """
        text = str(input)
        if not apply:
            return text
        codes = []
        for s in styles:
            # raw ints are allowed
            if not isinstance(s, self.__class__) and not isinstance(s, int):
                try:
                    s = self.styles[s]
                except KeyError:
                    raise ValueError('invalid style "{}"'.format(s))
            codes.append(str(s.value))

        if codes:
            r = _ansi_template.format(';'.join(codes)) + text
        else:
            r = text

        if reset:
            r += _ansi_template.format(self.reset)
        return r
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def __call__(self, value: Any, *, indent: int=0, indent_first: bool=False, highlight: bool=False):
        self._stream = io.StringIO()
        self._format(value, indent_current=indent, indent_first=indent_first)
        s = self._stream.getvalue()
        if highlight and pyg_lexer:
            # apparently highlight adds a trailing new line we don't want
            s = pygments.highlight(s, lexer=pyg_lexer, formatter=pyg_formatter).rstrip('\n')
        return s
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def _format(self, value: Any, indent_current: int, indent_first: bool):
        if indent_first:
            self._stream.write(indent_current * self._c)

        value_repr = repr(value)
        if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator):
            self._stream.write(value_repr)
        else:
            indent_new = indent_current + self._indent_step
            for t, func in self._type_lookup:
                if isinstance(value, t):
                    func(value, value_repr, indent_current, indent_new)
                    return
            self._format_raw(value, value_repr, indent_current, indent_new)
项目:python-devtools    作者:samuelcolvin    | 项目源码 | 文件源码
def _format_raw(self, value: Any, value_repr: str, indent_current: int, indent_new: int):
        lines = value_repr.splitlines(True)
        if len(lines) > 1 or (len(value_repr) + indent_current) >= self._width:
            self._stream.write('(\n')
            wrap_at = self._width - indent_new
            prefix = indent_new * self._c
            for line in lines:
                sub_lines = textwrap.wrap(line, wrap_at)
                for sline in sub_lines:
                    self._stream.write(prefix + sline + '\n')
            self._stream.write(indent_current * self._c + ')')
        else:
            self._stream.write(value_repr)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self) -> None:
        """Make new multimap."""
        # maybe defaultdict(set) is better
        self._map: OrderedDict[str, List[Any]] = OrderedDict()
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def set(self, key: str, value: Any) -> None:
        """Set value."""
        _set = self._map.get(key)
        if not _set:
            _set = list()
            self._map[key] = _set
        if value not in _set:
            _set.append(value)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def get(self, key: str) -> List[Any]:
        """Get values."""
        return self._map.get(key, list())
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def hasValue(self, key: str, value: Any) -> bool:
        """Chekc value is in this map."""
        _set = self._map.get(key, list())
        return value in _set
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def firstValue(self, key: str) -> Any:
        """Get first value of the key."""
        _set = self._map.get(key)
        if not _set:
            return None
        return _set[0]
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def valuesArray(self) -> List[Any]:
        """Get all values as list."""
        result: List[Any] = list()
        for values in self._map.values():
            result.extend(values)
        return result
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelectorEval(self, selector: str, pageFunction: str,
                                *args: Any) -> Optional[Any]:
        """Execute function on element which matches selector."""
        elementHandle = await self.querySelector(selector)
        if elementHandle is None:
            raise PageError(
                f'Error: failed to find element matching selector "{selector}"'
            )
        result = await elementHandle.evaluate(pageFunction, *args)
        await elementHandle.dispose()
        return result
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def waitForSelector(self, selector: str, options: dict = None,
                        **kwargs: Any) -> Awaitable:
        """Wait for selector matches element."""
        if options is None:
            options = dict()
        options.update(kwargs)
        timeout = options.get('timeout', 30_000)  # msec
        interval = options.get('interval', 0)  # msec
        return WaitTask(self, 'selector', selector, timeout, interval=interval)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, frame: Frame, _type: str, expr: str, timeout: float,
                 *args: Any, interval: float = 0) -> None:
        """Make new wait task.

        :arg float timeout: msec to wait for task [default 30_000 [msec]].
        :arg float interval: msec to poll for task [default timeout / 1000].
        """
        if _type not in ['function', 'selector']:
            raise ValueError('Unsupported type for WaitTask: ' + _type)
        super().__init__()
        self.__frame: Frame = frame
        self.__type = _type
        self.expr = expr
        self.__timeout = timeout / 1000  # sec
        self.__interval = interval / 1000 or self.__timeout / 100  # sec
        self.__runCount: int = 0
        self.__terminated = False
        self.__done = False
        frame._waitTasks.add(self)
        # Since page navigation requires us to re-install the pageScript,
        # we should track timeout on our end.
        self.__loop = asyncio.get_event_loop()
        self.__timeoutTimer = self.__loop.call_later(
            self.__timeout,
            lambda: self.terminate(
                BrowserError(f'waiting failed: timeout {timeout}ms exceeded')
            )
        )
        asyncio.ensure_future(self.rerun(True))
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def setUserAgent(self, userAgent: str) -> Any:
        """Set user agent."""
        return await self._client.send('Network.setUserAgentOverride',
                                       {'userAgent': userAgent})
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def response(self) -> Any:
        """Get response."""
        return self._response
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def _onTargetCrashed(self, *args: Any, **kwargs: Any) -> None:
        self.emit('error', PageError('Page crashed!'))
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelectorEval(self, selector: str, pageFunction: str,
                                *args: Any) -> Optional[Any]:
        """Execute function on element which matches selector."""
        frame = self.mainFrame
        if not frame:
            raise PageError('no main frame.')
        return await frame.querySelectorEval(selector, pageFunction, *args)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def _onDialog(self, event: Any) -> None:
        dialogType = ''
        _type = event.get('type')
        if _type == 'alert':
            dialogType = Dialog.Type.Alert
        elif (_type == 'confirm'):
            dialogType = Dialog.Type.Confirm
        elif (_type == 'prompt'):
            dialogType = Dialog.Type.Prompt
        elif (_type == 'beforeunload'):
            dialogType = Dialog.Type.BeforeUnload
        dialog = Dialog(self._client, dialogType, event.get('message'),
                        event.get('defaultPrompt'))
        self.emit(Page.Events.Dialog, dialog)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def goto(self, url: str, options: dict = None, **kwargs: Any
                   ) -> Optional[Response]:
        """Got to url."""
        if options is None:
            options = dict()
        options.update(kwargs)
        watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
                                   options)
        responses: Dict[str, Response] = dict()
        listener = helper.addEventListener(
            self._networkManager, NetworkManager.Events.Response,
            lambda response: responses.__setitem__(response.url, response)
        )
        result = asyncio.ensure_future(watcher.waitForNavigation())
        referrer = self._networkManager.extraHTTPHeaders().get('referer', '')

        try:
            await self._client.send('Page.navigate',
                                    dict(url=url, referrer=referrer))
        except Exception:
            watcher.cancel()
            raise
        await result
        helper.removeEventListeners([listener])

        if self._frameManager.isMainFrameLoadingFailed():
            raise PageError('Failed to navigate: ' + url)
        return responses.get(self.url)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def reload(self, options: dict = None, **kwargs: Any
                     ) -> Optional[Response]:
        """Reload this page."""
        if options is None:
            options = dict()
        options.update(kwargs)
        await self._client.send('Page.reload')
        return await self.waitForNavigation(options)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def goForward(self, options: dict = None, **kwargs: Any
                        ) -> Optional[Response]:
        """Go forward history."""
        if options is None:
            options = dict()
        options.update(kwargs)
        return await self._go(+1, options)