Python typing 模块,Text() 实例源码


项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, capture_exc_info=False):
        # type: (bool) -> None
        :param capture_exc_info:
            Whether to capture `sys.exc_info` when an handling an

            This is turned off by default to reduce memory usage, but
            it is useful in certain cases (e.g., if you want to send
            exceptions to a logger that expect exc_info).

            Regardless, you can still check ``self.has_exceptions`` to
            see if an exception occurred.
        super(MemoryHandler, self).__init__()

        self.messages           = OrderedDict() # type: Union[OrderedDict, Dict[Text, List[FilterMessage]]]
        self.has_exceptions     = False
        self.capture_exc_info   = capture_exc_info
        self.exc_info           = [] # type: List[Tuple[type, Exception, TracebackType]]
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def get_errors(self, with_context=False):
        # type: (bool) -> Dict[Text, List[Dict[Text, Text]]]
        Returns a dict of error messages generated by the Filter, in a
        format suitable for inclusion in e.g., an API 400 response

        :param with_context:
            Whether to include the context object in the result (for
            debugging purposes).

            Note: context is usually not safe to expose to end users!
        return {
            key: [m.as_dict(with_context) for m in messages]
                for key, messages in iteritems(self.filter_messages)
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def is_filter_type(target):
    # type: (Any) -> Union[bool, Text]
    Returns whether the specified object can be registered as a filter.

        Returns ``True`` if the object is a filter.
        Otherwise, returns a string indicating why it is not valid.
    if not is_class(target):
        return 'not a class'

    if not issubclass(target, BaseFilter):
        return 'does not extend BaseFilter'

    if is_abstract(target):
        return 'abstract class'

    return True
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, pattern, keys=None):
        # type: (Union[Text, regex._pattern_type, re._pattern_type], Optional[Sequence[Text]]) -> None
        :param pattern:
            Regex used to split incoming string values.

            IMPORTANT:  If you specify your own compiled regex, be sure
            to add the ``UNICODE`` flag for Unicode support!

        :param keys:
            If set, the resulting list will be converted into an
            OrderedDict, using the specified keys.

            IMPORTANT:  If ``keys`` is set, the split value's length
            must be less than or equal to ``len(keys)``.
        super(Split, self).__init__()

        self.regex = (
                if isinstance(pattern, (regex._pattern_type, re._pattern_type))
                else regex.compile(pattern, regex.UNICODE)

        self.keys = keys
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, encoding='utf-8', normalize=False):
        # type: (Text, bool) -> None
        :param encoding:
            Used to decode non-unicode values.

        :param normalize:
            Whether to normalize the unicode value before converting
            back into bytes:

                - Convert to NFC form.
                - Remove non-printable characters.
                - Convert all line endings to unix-style ('\n').

            Note that ``normalize`` is ``False`` by default for
            :py:class:`ByteString`, but ``True`` by default for
        super(ByteString, self).__init__(encoding, normalize)

    # noinspection SpellCheckingInspection
项目    作者:iotaledger    | 项目源码 | 文件源码
def add_route(self, command, adapter):
    # type: (Text, AdapterSpec) -> RoutingWrapper
    Adds a route to the wrapper.

    :param command:
      The name of the command to route (e.g., "attachToTangle").

    :param adapter:
      The adapter object or URI to route requests to.
    if not isinstance(adapter, BaseAdapter):
        adapter = self.adapter_aliases[adapter]
      except KeyError:
        self.adapter_aliases[adapter] = adapter = resolve_adapter(adapter)

    self.routes[command] = adapter

    return self
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def normalise_response_json(self, data):
        # type: (Dict[Text, Any]) -> List[Dict[Text, Any]]
        """Transform data to format."""

        entities = {}
        for entity in data["entities"]:
            entities[entity["entity"]] = {
                "confidence": None,
                "type": "value",
                "value": entity["value"],
                "start": entity["start"],
                "end": entity["end"]

        return [
                "_text": data["text"],
                "confidence": data["intent"]['confidence'],
                "intent": data["intent"]['name'],
                "entities": entities
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def normalise_response_json(self, data):
        # type: (Dict[Text, Any]) -> Dict[Text, Any]
        """Transform data to format."""

        top_intent = self._top_intent(data)
        ranking = self._ranking(data)
        return {
            "query": data["text"],
            "topScoringIntent": top_intent,
            "intents": ranking,
            "entities": [
                    "entity": e["value"],
                    "type": e["entity"],
                    "startIndex": None,
                    "endIndex": None,
                    "score": None
                } for e in data["entities"]
                ] if "entities" in data else []
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def normalise_request_json(self, data):
        # type: (Dict[Text, Any]) -> Dict[Text, Any]

        _data = {}
        _data["text"] = data["q"][0] if type(data["q"]) == list else data["q"]

        if not data.get("project"):
            _data["project"] = "default"
        elif type(data["project"]) == list:
            _data["project"] = data["project"][0]
            _data["project"] = data["project"]

        if data.get("model"):
            _data["model"] = data["model"][0] if type(data["model"]) == list else data["model"]

        _data['time'] = data["time"] if "time" in data else None
        return _data
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def guess_format(files):
    # type: (List[Text]) -> Text
    """Given a set of files, tries to guess which data format is used."""

    for filename in files:
        with, encoding="utf-8-sig") as f:
            raw_data = ""
                raw_data =
                file_data = json.loads(raw_data)
                if "data" in file_data and type(file_data.get("data")) is list:
                    return WIT_FILE_FORMAT
                elif "luis_schema_version" in file_data:
                    return LUIS_FILE_FORMAT
                elif "supportedLanguages" in file_data:
                    return DIALOGFLOW_FILE_FORMAT
                elif "rasa_nlu_data" in file_data:
                    return RASA_FILE_FORMAT

            except ValueError:
                if "## intent:" in raw_data:
                    return MARKDOWN_FILE_FORMAT

    return UNK_FILE_FORMAT
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def get_component_class(component_name):
    # type: (Text) -> Optional[Type[Component]]
    """Resolve component name to a registered components class."""

    if component_name not in registered_components:
            return utils.class_from_module_path(component_name)
        except Exception:
            raise Exception(
                    "Failed to find component class for '{}'. Unknown "
                    "component name. Check your configured pipeline and make "
                    "sure the mentioned component is not misspelled. If you "
                    "are creating your own component, make sure it is either "
                    "listed as part of the `component_classes` in "
                    "`` or is a proper name of a class "
                    "in a module.".format(component_name))
    return registered_components[component_name]
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def validate_requirements(component_names, dev_requirements_file="alt_requirements/requirements_dev.txt"):
    # type: (List[Text], Text) -> None
    """Ensures that all required python packages are installed to instantiate and used the passed components."""
    from rasa_nlu import registry

    # Validate that all required packages are installed
    failed_imports = set()
    for component_name in component_names:
        component_class = registry.get_component_class(component_name)
    if failed_imports:  # pragma: no cover
        # if available, use the development file to figure out the correct version numbers for each requirement
        all_requirements = _read_dev_requirements(dev_requirements_file)
        if all_requirements:
            missing_requirements = [r for i in failed_imports for r in all_requirements[i]]
            raise Exception("Not all required packages are installed. " +
                            "Failed to find the following imports {}. ".format(", ".join(failed_imports)) +
                            "To use this pipeline, you need to install the missing dependencies, e.g. by running:\n\t" +
                            "> pip install {}".format(" ".join(missing_requirements)))
            raise Exception("Not all required packages are installed. " +
                            "To use this pipeline, you need to install the missing dependencies. " +
                            "Please install {}".format(", ".join(failed_imports)))
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def validate_arguments(pipeline, context, allow_empty_pipeline=False):
    # type: (List[Component], Dict[Text, Any], bool) -> None
    """Validates a pipeline before it is run. Ensures, that all arguments are present to train the pipeline."""

    # Ensure the pipeline is not empty
    if not allow_empty_pipeline and len(pipeline) == 0:
        raise ValueError("Can not train an empty pipeline. " +
                         "Make sure to specify a proper pipeline in the configuration using the `pipeline` key." +
                         "The `backend` configuration key is NOT supported anymore.")

    provided_properties = set(context.keys())

    for component in pipeline:
        for r in component.requires:
            if r not in provided_properties:
                raise Exception("Failed to validate at component '{}'. Missing property: '{}'".format(
          , r))
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def load_component(self, component_name, model_dir, model_metadata, **context):
        # type: (Text, Text, Metadata, **Any) -> Component
        """Tries to retrieve a component from the cache, calls `load` to create a new component."""
        from rasa_nlu import registry
        from rasa_nlu.model import Metadata

            cached_component, cache_key = self.__get_cached_component(component_name, model_metadata)
            component = registry.load_component_by_name(component_name, model_dir,
                                                        model_metadata, cached_component, **context)
            if not cached_component:
                # If the component wasn't in the cache, let us add it if possible
                self.__add_to_cache(component, cache_key)
            return component
        except MissingArgumentError as e:  # pragma: no cover
            raise Exception("Failed to load component '{}'. {}".format(component_name, e))
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def parse(self, text, time=None):
        # type: (Text) -> Dict[Text, Any]
        """Parse the input text, classify it and return pipeline result.

        The pipeline result usually contains intent and entities."""

        if not text:
            # Not all components are able to handle empty strings. So we need
            # to prevent that... This default return will not contain all
            # output attributes of all components, but in the end, no one should
            # pass an empty string in the first place.
            output = self.default_output_attributes()
            output["text"] = ""
            return output

        message = Message(text, self.default_output_attributes(), time=time)

        for component in self.pipeline:
            component.process(message, **self.context)

        output = self.default_output_attributes()
        return output
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def do_train(config,  # type: RasaNLUConfig
             component_builder=None  # type: Optional[ComponentBuilder]
    # type: (...) -> Tuple[Trainer, Interpreter, Text]
    """Loads the trainer and the data and runs the training of the model."""

    # Ensure we are training a model that we can save in the end
    # WARN: there is still a race condition if a model with the same name is
    # trained in another subprocess
    trainer = Trainer(config, component_builder)
    persistor = create_persistor(config)
    training_data = load_data(config['data'], config['language'])
    interpreter = trainer.train(training_data)
    persisted_path = trainer.persist(config['path'], persistor,
    return trainer, interpreter, persisted_path
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def load(cls,
             model_dir=None,  # type: Text
             model_metadata=None,  # type: Metadata
             cached_component=None,  # type:Optional[DucklingExtractor]
             **kwargs  # type: **Any
        # type: (...) -> DucklingExtractor

        persisted = os.path.join(model_dir,
        if cached_component:
            duckling = cached_component.duckling
            language = model_metadata.get("language")
            duckling = cls.create_duckling_wrapper(language)

        if os.path.isfile(persisted):
            with, encoding='utf-8') as f:
                persisted_data = json.loads(
                return DucklingExtractor(duckling, persisted_data["dimensions"])
        return DucklingExtractor(duckling)
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def load(cls,
             model_dir=None,  # type: Text
             model_metadata=None,  # type: Metadata
             cached_component=None,  # type: Optional[DucklingHTTPExtractor]
             **kwargs  # type: **Any
        # type: (...) -> DucklingHTTPExtractor

        persisted = os.path.join(model_dir, model_metadata.get(
        config = kwargs.get("config", {})
        dimensions = None

        if os.path.isfile(persisted):
            with, encoding='utf-8') as f:
                persisted_data = simplejson.loads(
                dimensions = persisted_data["dimensions"]

        return DucklingHTTPExtractor(config.get("duckling_http_url"),
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def load(cls,
             model_dir,     # type: Text
             model_metadata,    # type: Metadata
             cached_component,  # type: Optional[CRFEntityExtractor]
             **kwargs   # type: **Any
        # type: (...) -> CRFEntityExtractor
        from sklearn.externals import joblib

        if model_dir and model_metadata.get("entity_extractor_crf"):
            meta = model_metadata.get("entity_extractor_crf")
            ent_tagger = joblib.load(os.path.join(model_dir, meta["model_file"]))
            return CRFEntityExtractor(ent_tagger=ent_tagger,
            return CRFEntityExtractor()
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def persist(self, model_dir):
        # type: (Text) -> Dict[Text, Any]
        """Persist this model into the passed directory.

        Returns the metadata necessary to load the model again."""

        from sklearn.externals import joblib

        if self.ent_tagger:
            model_file_name = os.path.join(model_dir, "crf_model.pkl")

            joblib.dump(self.ent_tagger, model_file_name)
            return {"entity_extractor_crf": {"model_file": "crf_model.pkl",
                                             "crf_features": self.crf_features,
                                             "BILOU_flag": self.BILOU_flag,
                                             "version": 1}}
            return {"entity_extractor_crf": None}
项目:rasa_nlu    作者:RasaHQ    | 项目源码 | 文件源码
def _from_json_to_crf(self, message, entity_offsets):
        # type: (Message, List[Tuple[int, int, Text]]) -> List[Tuple[Text, Text, Text, Text]]
        """Takes the json examples and switches them to a format which crfsuite likes."""
        from import GoldParse

        doc = message.get("spacy_doc")
        gold = GoldParse(doc, entities=entity_offsets)
        ents = [l[5] for l in gold.orig_annot]
        if '-' in ents:
            logger.warn("Misaligned entity annotation in sentence '{}'. ".format(doc.text) +
                        "Make sure the start and end values of the annotated training " +
                        "examples end at token boundaries (e.g. don't include trailing whitespaces).")
        if not self.BILOU_flag:
            for i, entity in enumerate(ents):
                if entity.startswith('B-') or \
                        entity.startswith('I-') or \
                        entity.startswith('U-') or \
                    ents[i] = entity[2:]        # removes the BILOU tags

        return self._from_text_to_crf(message, ents)
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, name=None):
        # type: (Optional[Text]) -> None
        super(TestFilterBravo, self).__init__() = name
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, message, context, exc_info=None):
        # type: (Text, dict, Text) -> None
        :param exc_info: Exception traceback (if applicable).
        super(FilterMessage, self).__init__()

        self.message    = message
        self.context    = context
        self.code       = context.get('code') or message
        self.exc_info   = exc_info
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def errors(self):
        # type: () -> Dict[Text, List[Dict[Text, Text]]]
        Returns a dict of error messages generated by the Filter, in a
        format suitable for inclusion in e.g., an API 400 response


                'authToken': [
                        'code':     'not_found',
                            'No AuthToken found matching this value.',

                'data.foobar': [
                        'code':     'unexpected',
                        'message':  'Unexpected key "foobar".',

                # etc.
        return self.get_errors()
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def filter_messages(self):
        # type: () -> Dict[Text, List[FilterMessage]]
        Returns the raw FilterMessages that were generated by the
        return self._handler.messages
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __getattr__(self, item):
        # type: (Text) -> Type[BaseFilter]
        return self[item]
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def _get_cache(self):
        # type: () -> Dict[Text, Type[BaseFilter]]
        if self._cache is None:
            self._cache = {}

                for target in iter_entry_points( # type: EntryPoint
                    filter_ = target.load()

                    ift_result = is_filter_type(filter_)

                    if ift_result is True:
                            'Registering extension filter '
                            '{cls.__module__}.{cls.__name__} as {name}.'.format(
                                cls     = filter_,
                                name    =,

                        self._cache[] = filter_

                            'Using legacy extension loader for '
                            '{} ({reason}).'.format(
                                reason  = ift_result,
                                target  = target,

            except DeprecationWarning:
                # The user has ``simplefilter('error')`` set; reset the
                # cache so that the next time we try to load extension
                # filters, we don't miss anything.
                self._cache = None

        # noinspection PyTypeChecker
        return self._cache
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self,
            to_nearest  = 1,
            rounding    = ROUND_HALF_UP,
            result_type = DecimalType,
        # type: (Union[int, Text, DecimalType], Text, type) -> None
        :param to_nearest:
            The value that the filter should round to.

            E.g., ``Round(1)`` rounds to the nearest whole number.

            If you want to round to a float value, it is recommended
            that you provide it as a string or Decimal, to avoid
            floating point problems.

        :param rounding:
            Controls how to round values.

        :param result_type:
            The type of result to return.
        super(Round, self).__init__()

        self.to_nearest = DecimalType(to_nearest)

        # Rounding to negative values isn't supported.
        # I'm not even sure if that concept is valid.

        self.result_type    = result_type
        self.rounding       = rounding
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def ip_type(self):
        # type: () -> Text
        Returns the IP address versions that this Filter accepts.
        return '/'.join(filter(None, [
            'IPv4' if self.ipv4 else None,
            'IPv6' if self.ipv6 else None,
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, decoder=json.loads):
        # type: (Callable[Text, Any]) -> None
        super(JsonDecode, self).__init__()

        self.decoder = decoder
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def _apply(self, value):
        value = self._filter(value, Type(text_type)) # type: Text

        if self._has_errors:
            return None

            # :see:
            return self.decoder(value, object_pairs_hook=OrderedDict)
        except ValueError:
            return self._invalid_value(value, self.CODE_INVALID, exc_info=True)
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(
            truncate    = True,
            prefix      = '',
            encoding    = 'utf-8',
        # type: (int, bool, Text, Text) -> None
        :param max_bytes:
            Max number of bytes to allow.

        :param truncate:
            Whether to truncate values that are too long.

            Set this to ``False`` to save system resources when you
            know that you will reject values that are too long.

        :param prefix:
            Prefix to apply to truncated values.

            Ignored when ``truncate`` is ``False``.

        :param encoding:
            The character encoding to check against.

            Note: This filter is optimized for UTF-8.
        super(MaxBytes, self).__init__()

        self.encoding   = encoding
        self.max_bytes  = max_bytes
        self.prefix     = prefix
        self.truncate   = truncate
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, leading=r'[\p{C}\s]+', trailing=r'[\p{C}\s]+'):
        # type: (Text, Text) -> None
        :param leading:
            Regex to match at the start of the string.

        :param trailing:
            Regex to match at the end of the string.
        super(Strip, self).__init__()

        if leading:
            self.leading = regex.compile(
            self.leading = None

        if trailing:
            self.trailing = regex.compile(
            self.trailing = None
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def __init__(self, encoding='utf-8', normalize=True):
        # type: (Text, bool) -> None
        :param encoding:
            Used to decode non-unicode values.

        :param normalize:
            Whether to normalize the resulting value:
                - Convert to NFC form.
                - Remove non-printable characters.
                - Convert all line endings to unix-style ('\n').
        super(Unicode, self).__init__()

        self.encoding   = encoding
        self.normalize  = normalize

        if self.normalize:
            # Compile the regex that we will use to remove non-
            # printables from the resulting unicode.
            # Note: using a double negative so that we can exclude
            # newlines, which are technically considered control chars.
            self.npr = regex.compile(r'[^\P{C}\s]+', regex.UNICODE)
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def _apply(self, value):
        decoded = super(ByteString, self)._apply(value) # type: Text

        # No need to catch UnicodeEncodeErrors here; UTF-8 can handle
        # any unicode value.
        # Technically, we could get this error if we encounter a code
        # point beyond U+10FFFF (the highest valid code point in the
        # Unicode standard).
        # However, it's not possible to create a `unicode` object with
        # an invalid code point, so we wouldn't even be able to get
        # this far if the incoming value contained a character that
        # can't be represented using UTF-8.
        # Note that in some versions of Python, it is possible (albeit
        # really difficult) to trick Python into creating unicode
        # objects with invalid code points, but it generally requires
        # using specific codecs that aren't UTF-8.
        # Example of exploit and release notes from the Python release
        # (2.7.6) that fixes the issue:
        #   -
        #   -

        # Normally we return ``None`` if we get any errors, but in this
        # case, we'll let the superclass method decide.
        return decoded if self._has_errors else decoded.encode('utf-8')
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def __init__(self, region: Text, key: Text):
        self._region = region
        self._key = key
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_api_base(self) -> Text:
        return 'https://{region}'.format(
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_headers(self, kv: Iterable[Header]) -> Dict[Text, Text]:
        headers = {self._auth_keyname: self._key}
        for key, value in kv:
            headers[key] = value
        return headers
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _get_json(self, url: Text, **kwargs) -> Dict:
        return self._make_json_request('get', url, **kwargs)
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _post_json(self, url: Text, **kwargs) -> Dict:
        return self._make_json_request('post', url, **kwargs)
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _auth_keyname(self) -> Text:
        raise NotImplementedError
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_projects_endpoint(self) -> Text:
        return '{base}/customvision/v1.0/Training/projects'.format(
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_new_project_endpoint(self, project_name: Text) -> Text:
        query = (('name', project_name),
                 ('description', ''),
                 ('classifier', 'MultiLabel'),
                 ('useNegativeSet', 'true'))

        return '{base}?{query}'.format(
            query='&'.join('{}={}'.format(*kv) for kv in query))
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_project_endpoint(self, project_id: Text) -> Text:
        return '{base}/{project_id}'.format(
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_tags_endpoint(self, project_id: Text) -> Text:
        return '{base}/tags'.format(
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_training_endpoint(self, project_id: Text) -> Text:
        return '{base}/train'.format(
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_image_url(self, project_id: Text, tags: Iterable[Tag]) -> Text:
        return '{base}/images/image?tagIds={tagIds}'.format(
            tagIds='&tagIds='.join(tag.Id for tag in tags))
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _fetch_project_tags(self, project_id: Text) -> Iterable[Tag]:
        url = self._format_tags_endpoint(project_id)
        response = self._get_json(url)
        return [create(Tag, _) for _ in response['Tags']]
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _fetch_tags_for_names(self, project_id: Text,
                              names: Iterable[Text]) -> Iterable[Tag]:

        all_tags = {tag.Name: tag
                    for tag in self._fetch_project_tags(project_id)}

        return [all_tags[name] for name in names]
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def create_project(self, project_name: Text) -> Project:
        url = self._format_new_project_endpoint(project_name)
        response = self._post_json(url, headers=[('Content-Length', '0')])
        return create(Project, response)