Python typing 模块,Tuple() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Tuple()

项目:flashcard    作者:sotetsuk    | 项目源码 | 文件源码
def get_diff_with_color(expected: str, ans: str) -> Tuple[str, str]:
    d = difflib.Differ()
    diff = d.compare(expected, ans)

    expected_with_mistake = ""
    ans_with_mistake = ""
    for e in diff:
        if e.startswith("+"):
            ans_with_mistake += colored(e[-1], "red")
        elif e.startswith("-"):
            expected_with_mistake += colored(e[-1], "green")
        else:
            expected_with_mistake += e[-1]
            ans_with_mistake += e[-1]

    return expected_with_mistake, ans_with_mistake
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None,
               custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]:
    """
    Get a processor instance. The class and buffers will be selected based on the
    python_driver.ProcessorConfigs dictionary. The input and output buffers can
    be overriden using the custom_inbuffer and custom_outbuffer parameters. This
    is mainly useful for unittesting.
    """
    conf = ProcessorConfigs.get(format_)
    if not conf:
        raise RequestInstantiationException('No RequestProcessor found for format %s' % format_)

    inbuffer  = custom_inbuffer if custom_inbuffer else conf['inbuffer']
    outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer']
    instance  = conf['class'](outbuffer) # type: ignore

    return instance, inbuffer
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def __init__(
            self,
            description: str = None,
            pre_hooks: (List, Tuple) = None,
            post_hooks: (List, Tuple) = None
    ):
        self.result = None
        self.total = None
        self.success = None
        self.errors = None
        self.params = None
        self.output = None
        self.pagination = None
        self.limit = None
        self.offset = None
        self.app = None
        self.settings = None
        self.description = description
        self.pre_hooks = pre_hooks
        self.post_hooks = post_hooks
        self.meta = {}
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]:
        n_total_words = 0
        word_freq = {}
        with open(self.rnnlm_model_path, mode='r') as f:
            for line in f:

                n_total_words += 1

                word, freq = line.split(' ')
                freq = int(freq)
                if freq > threshold:
                    word_freq[word] = freq
                else:
                    word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1

        return (word_freq, n_total_words)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def extra_penalty(self, style, complexity):
        # type: (Style, int) -> Tuple[int, int]
        """Trying longer and longer column limits
        without getting better results should be penalized to speed
        up the search.
        """
        standards = {'ColumnLimit': 80,
                     'MaxEmptyLinesToKeep': 2, }
        penalty = 0
        for optionname, value in standards.items():
            fvalue = style.get(optionname, value)
            if fvalue is not None and fvalue > value:
                penalty += fvalue - value
        if style.get('BreakBeforeBraces') == 'Custom':
            # Rate a commonly known brace breaking style
            # better than an equally performing custom style.
            penalty += 1
            # We would prefer an equally performing style even if we had to
            # add another 12 options.
            complexity += 12
        return complexity, penalty
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        if not keys:
            return []
        cached = []
        uncached = []  # type: List[Tuple[int, Optional[bytes]]]
        contentkeys = super(DedupKeyValueStore, self).mget(keys)
        for idx, contentkey in enumerate(contentkeys):
            if contentkey is None:
                uncached.append((idx, None))
            else:
                sha = binary_type(contentkey)
                cached.append((idx, unistr(sha)))
        if not cached:
            return [None for _, contentkey in uncached]
        indices, existing_keys = zip(*cached)
        existing_values = self.kvstore.mget(existing_keys)
        idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
        return list([value for _, value in idx_value_pairs])
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def split_reffiles(references, filenames):
    # type: (bool, List[str]) -> Tuple[List[str], List[str]]
    """Splits [file1, reffile1, file2, reffile2] into [file1, file2], [reffile1, reffile2]
    when references is True.
    When references is False returns the pair (filenames, filenames).
    """
    if not references:
        return filenames, filenames
    assert len(filenames) % 2 == 0
    files = []
    refs = []
    for filename, reffilename in grouper(2, filenames):
        files.append(filename)
        refs.append(reffilename)
    return files, refs

# ----------------------------------------------------------------------
# Functions to convert ANSI text into HTML.
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def update_evaluations(formatter,  # type: CodeFormatter
                       evaluations,  # type: List[AttemptResult]
                       finished_styles,  # type: List[AttemptResult]
                       bestdist  # type: Sequence[int]
                       ):
    # type: (...) -> Tuple[bool, bool, Sequence[int]]
    attemptresult = heapq.heappop(evaluations)
    nested_round = False
    if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)):
        bestdist = attemptresult.distance
        heapq.heappush(evaluations, attemptresult)
    else:
        # We found a style that could no longer be improved by adding a single option value.
        heapq.heappush(finished_styles, attemptresult)
        nested_styles = formatter.nested_derivations(attemptresult.formatstyle)
        if not nested_styles:
            # This formatstyle does not unlock more options.
            return True, nested_round, bestdist
        # Restart the optimization from scratch with the attemptresult augmented with
        # every nested option as seed styles.
        bestdist = None
        ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE)
        evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles]
        nested_round = True
    return False, nested_round, bestdist
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def avg_linelength_diffs(diffargs):
    # type: (List[Tuple[str, bytes]]) -> Iterable[int]
    """Returns the nudged absolute line length differences.
    """
    for filename1, content2 in diffargs:
        linelen1 = get_num_lines(filename1)
        filelen1 = len(get_cached_file(filename1))
        avg1 = 0.0
        if linelen1 > 0:
            avg1 = float(filelen1) / linelen1

        linelen2 = count_content_lines(content2)
        filelen2 = len(content2)
        avg2 = 0.0
        if linelen2 > 0:
            avg2 = float(filelen2) / linelen2

        yield int(abs(10000.0 * (avg1 - avg2)))
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unified_diff(filename, content2=None):
    # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
    """This function prints a unified diff of the contents of
    filename and the standard input, when used from the command line
    as follows:
        echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
    We get this result:
    ---
    +++
    @@ -1 +1 @@
    -123
    +456
    """
    use_stdin = content2 is None
    if content2 is None:
        # Read binary input stream
        stdin = rawstream(sys.stdin)
        econtent2 = bytestr(stdin.read())
    else:
        econtent2 = content2
    exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
    if use_stdin:
        write('\n'.join(diff))
    return exit_code, diff
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def compute_unified_diff(filename, content2, **kwargs):
    # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
    diff = ()  # type: Iterable[str]
    exit_code = ERROR
    kw = kwargs.copy()
    if 'n' not in kwargs:
        # zero context lines
        kw['n'] = 0
    try:
        content1 = get_cached_file(filename)
        if PY3:
            c1 = unistr(content1)
            c2 = unistr(content2)
        else:
            c1 = content1
            c2 = content2
        diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
        exit_code = OK
    finally:
        return exit_code, diff

# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
项目:cxflow-tensorflow    作者:Cognexa    | 项目源码 | 文件源码
def bin_stats(predictions: tf.Tensor, labels: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
    """
    Calculate f1, precision and recall from binary classification expected and predicted values.

    :param predictions: 2-d tensor (batch, predictions) of predicted 0/1 classes
    :param labels: 2-d tensor (batch, labels) of expected 0/1 classes
    :return: a tuple of batched (f1, precision and recall) values
    """
    predictions = tf.cast(predictions, tf.int32)
    labels = tf.cast(labels, tf.int32)

    true_positives = tf.reduce_sum((predictions * labels), axis=1)
    false_positives = tf.reduce_sum(tf.cast(tf.greater(predictions, labels), tf.int32), axis=1)
    false_negatives = tf.reduce_sum(tf.cast(tf.greater(labels, predictions), tf.int32), axis=1)

    recall = true_positives / (true_positives + false_negatives)
    precision = true_positives / (true_positives + false_positives)
    f1_score = 2 / (1 / precision + 1 / recall)

    return f1_score, precision, recall
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def _check_edges(self, edges: List[Tuple[str, str]]) -> None:
        for parent_node, child_node in edges:
            # check if both nodes are already inserted into the graph
            if child_node not in self._hyperparameters:
                raise ValueError("Child hyperparameter '%s' not in configuration "
                                 "space." % child_node)
            if parent_node not in self._hyperparameters:
                raise ValueError("Parent hyperparameter '%s' not in configuration "
                                 "space." % parent_node)

        # TODO: recursively check everything which is inside the conditions,
        # this means we have to recursively traverse the condition

        tmp_dag = self._create_tmp_dag()
        for parent_node, child_node in edges:
            tmp_dag.add_edge(parent_node, child_node)

        if not ConfigSpace.nx.is_directed_acyclic_graph(tmp_dag):
            cycles = list(ConfigSpace.nx.simple_cycles(tmp_dag))  # type: List[List[str]]
            for cycle in cycles:
                cycle.sort()
            cycles.sort()
            raise ValueError("Hyperparameter configuration contains a "
                             "cycle %s" % str(cycles))
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
        [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
        def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
            examples_by_directory = group(examples, key=key_from_example)
            directories = examples_by_directory.keys()

            # split must be the same every time:
            random.seed(42)
            keys = set(random.sample(directories, int(training_share * len(directories))))

            training_examples = [example for example in examples if key_from_example(example) in keys]
            test_examples = [example for example in examples if key_from_example(example) not in keys]

            return training_examples, test_examples

        return split
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]:
        try:
            LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
            path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost")
            LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        except (KeyError, NetworkXNoPath):
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type))

        LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        chain = []
        cost = 0
        for source, target in _pairwise(path):
            transformer = self._type_graph.adj[source][target][_TRANSFORMER]
            chain.append((transformer, target))
            cost += transformer.cost
        LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))

        if not chain:
            return _identity, 0

        return partial(_transform, transformer_chain=chain), cost
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        to_type = None
        for target_type in target_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    to_type = target_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
        return best, to_type, best_cost
项目:python-netsgiro    作者:otovo    | 项目源码 | 文件源码
def _split_text_to_lines_and_columns(
            cls, text) -> Iterable[Tuple[int, int, str]]:
        lines = text.splitlines()

        if len(lines) > cls._MAX_LINES:
            raise ValueError(
                'Max {} specification lines allowed, got {}'
                .format(cls._MAX_LINES, len(lines)))

        for line_number, line_text in enumerate(lines, 1):
            if len(line_text) > cls._MAX_LINE_LENGTH:
                raise ValueError(
                    'Specification lines must be max {} chars long, '
                    'got {}: {!r}'
                    .format(cls._MAX_LINE_LENGTH, len(line_text), line_text))

            yield (line_number, 1, '{:40}'.format(line_text[0:40]))
            yield (line_number, 2, '{:40}'.format(line_text[40:80]))
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def _encrypt_key(
            self,
            key: bytes,
            pubkey: bytes = None
    ) -> Tuple[bytes, bytes]:
        """
        Encrypts the `key` provided for the provided `pubkey` using the ECIES
        schema. If no `pubkey` is provided, it uses `self.pub_key`.

        :param key: Key to encrypt
        :param pubkey: Public Key to encrypt the `key` for

        :return (encrypted key, encapsulated ECIES key)
        """
        pubkey = pubkey or self.pub_key

        symm_key, enc_symm_key = API.ecies_encaspulate(pubkey)
        enc_key = API.symm_encrypt(symm_key, key)
        return (enc_key, enc_symm_key)
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def gen_path_keys(
            self,
            path: bytes
    ) -> List[Tuple[bytes, bytes]]:
        """
        Generates path keys and returns path keys

        :param path: Path to derive key(s) from

        :return: List of path keys
        """
        subpaths = self._split_path(path)
        keys = []
        for subpath in subpaths:
            path_priv, path_pub = self._derive_path_key(subpath)
            keys.append((path_priv, path_pub))
        return keys
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def encrypt(
            self,
            data: bytes,
            pubkey: bytes = None
    ) -> Tuple[bytes, bytes]:
        """
        Encrypts data with Public key encryption

        :param data: Data to encrypt
        :param pubkey: publc key to encrypt for

        :return: (Encrypted Key, Encrypted data)
        """
        pubkey = pubkey or self.pub_key

        key, enc_key = API.ecies_encapsulate(pubkey)
        enc_data = API.symm_encrypt(key, data)

        return (enc_data, API.elliptic_curve.serialize(enc_key.ekey))
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def decrypt(
            self,
            enc_data: Tuple[bytes, bytes],
            privkey: bytes = None
    ) -> bytes:
        """
        Decrypts data using ECIES PKE. If no `privkey` is provided, it uses
        `self.priv_key`.

        :param enc_data: Tuple: (encrypted data, ECIES encapsulated key)
        :param privkey: Private key to decapsulate with

        :return: Decrypted data
        """
        privkey = privkey or self.priv_key
        ciphertext, enc_key = enc_data

        enc_key = API.elliptic_curve.deserialize(API.PRE.ecgroup, enc_key)
        enc_key = API.umbral.EncryptedKey(ekey=enc_key, re_id=None)

        dec_key = API.ecies_decapsulate(privkey, enc_key)

        return API.symm_decrypt(dec_key, ciphertext)
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def _ecies_gen_ephemeral_key(
        recp_pubkey: Union[bytes, elliptic_curve.ec_element]
) -> Tuple[bytes, Tuple[bytes, bytes]]:
    """
    Generates and encrypts an ephemeral key for the `recp_pubkey`.

    :param recp_pubkey: Recipient's pubkey

    :return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric
             key, and encrypted ephemeral privkey
    """
    symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey)
    eph_privkey = API.ecies_gen_priv()

    enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey)
    return (eph_privkey, (enc_symm_key, enc_eph_privkey))
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecdsa_priv2pub(
        privkey: bytes,
        to_bytes: bool = True
) -> Union[bytes, Tuple[int]]:
    """
    Returns the public component of an ECDSA private key.

    :param privkey: Private key as an int or bytestring
    :param to_bytes: Serialize to bytes or not?

    :return: Byte encoded or Tuple[int] ECDSA pubkey
    """
    pubkey = privtopub(privkey)
    if to_bytes:
        return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey)
    return pubkey
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecdsa_verify(
        v: int,
        r: int,
        s: int,
        msghash: bytes,
        pubkey: Union[bytes, Tuple[int, int]]
) -> bool:
    """
    Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA.

    :param v: V of sig
    :param r: R of sig
    :param s: S of sig
    :param bytes msghash: The hashed message to verify
    :param bytes pubkey: Pubkey to validate signature for

    :rtype: Boolean
    :return: Is the signature valid or not?
    """
    if bytes == type(pubkey):
        pubkey = ecdsa_bytes2pub(pubkey)

    verify_sig = ecdsa_raw_recover(msghash, (v, r, s))
    # TODO: Should this equality test be done better?
    return verify_sig == pubkey
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecies_ephemeral_split_rekey(
        privkey_a: Union[bytes, elliptic_curve.ec_element],
        pubkey_b: Union[bytes, elliptic_curve.ec_element],
        min_shares: int,
        total_shares: int
) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]:
    """
    Performs a split-key re-encryption key generation where a minimum
    number of shares `min_shares` are required to reproduce a rekey.
    Will split a rekey inot `total_shares`.
    This also generates an ephemeral keypair for the recipient as `pubkey_b`.

    :param privkey_a: Privkey to re-encrypt from
    :param pubkey_b: Public key to re-encrypt for (w/ ephemeral key)
    :param min_shares: Minium shares needed to reproduce a rekey
    :param total_shares: Total shares to generate from split-rekey gen

    :return: A tuple containing a list of rekey frags, and a tuple of the
             encrypted ephemeral key data (enc_symm_key, enc_eph_privkey)
    """
    eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b)
    kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares)
    pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message)

    return (kfrags, pfrag)
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def encrypt(self,
                data: bytes,
                pubkey: bytes = None) -> Tuple[bytes, bytes]:
        """
        :data:      The data to encrypt. If derived per-subpath, it's a
                    symmetric key to use for block ciphers.
        :pubkey:    Optional public key to encrypt for. If not given, encrypt
                    for ours

        :returns:   (ekey, edata) where ekey is needed for recepient to
                    reconstruct a DH secret, edata is data encrypted with this
                    DH secret. The output should be treated as a monolithic
                    ciphertext outside of this class
        """
        if pubkey is None:
            pubkey = self._pub_key
        else:
            pubkey = ec.deserialize(self.pre.ecgroup, pubkey)

        key, ekey = self.pre.encapsulate(pubkey)
        cipher = SecretBox(key)

        return ((ec.serialize(ekey.ekey), None),
                cipher.encrypt(data))
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def crop_scale(self, dimensions: Tuple[int, int]) -> 'Segment':
        """
        Returns
        -------
        A new Segment, cropped and/or scaled as necessary to reach specified dimensions
        """
        segment = self.copy()
        dimensions = Dimensions(*dimensions)

        if segment.aspect_ratio != dimensions.aspect_ratio:
            # Crop segment to match aspect ratio
            segment = segment.crop_to_aspect_ratio(dimensions.aspect_ratio)

        if segment.dimensions != dimensions:
            # Resize segment to reach final dimensions
            segment = segment.resize(dimensions)

        return segment
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def store_message_in_file(self, message: Message) -> Tuple[str, str]:
        """
        Stores a message in a json file.
        The filename of the file will be the current time.
        Also generates a response file location in which the executable may
        write a response into

        :param message: The message to save
        :return: The location of the stored message json file,
                 the location of the response file
        """

        json_data = message.to_dict()

        while True:  # Make sure that file does not exist
            message_file = os.path.join(self.message_dir, str(time.time()))
            if not os.path.isfile(message_file):
                with open(message_file + ".json", 'w') as json_file:
                    json.dump(json_data, json_file)
                return message_file + ".json", message_file + "-response.json"

    # noinspection PyMethodMayBeStatic
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def run_plugin(executable: str, args: List[str], timeout: int) -> Tuple[str, List[str]]:
    run_args = [executable] + args
    try:
        proc = await asyncio.create_subprocess_exec(
            *run_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
    except FileNotFoundError:
        raise NagiosError('executable not found')
    stdin_data, stderr_data = await proc.communicate()
    std_data = stdin_data + stderr_data
    await proc.wait()
    if proc.returncode not in [STATUS_OK, STATUS_WARNING, STATUS_CRITICAL]:
        raise MonitorFailedError(std_data)
    text, perf = parse_plugin_output(std_data)
    if proc.returncode not in [STATUS_OK, STATUS_WARNING]:
        raise MonitorFailedError(text)
    return text, perf
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def callAllMethods(obj: object) -> List[Tuple[str, Any]]:
    results = []  # type: List[Tuple[str, Any]]
    for method in dir(obj):
        if method == '__hash__':
            continue
        if callable(getattr(obj, method)):
            try:
                res = getattr(obj, method)()
                if isinstance(res, bool) or isinstance(res, int):
                    results.append((method, res))
                if isinstance(res, str):
                    # Ignore anything with 0x in it since memory addresses change
                    if '0x' not in res:
                        results.append((method, res))
            except:
                if '0x' not in method:
                    results.append(('except', method))
    return results
项目:tf-crnn    作者:solivr    | 项目源码 | 文件源码
def image_reading(path: str, resized_size: Tuple[int, int]=None, data_augmentation: bool=False,
                  padding: bool=False) -> Tuple[tf.Tensor, tf.Tensor]:
    # Read image
    image_content = tf.read_file(path, name='image_reader')
    image = tf.cond(tf.equal(tf.string_split([path], '.').values[1], tf.constant('jpg', dtype=tf.string)),
                    true_fn=lambda: tf.image.decode_jpeg(image_content, channels=1, try_recover_truncated=True), # TODO channels = 3 ?
                    false_fn=lambda: tf.image.decode_png(image_content, channels=1), name='image_decoding')

    # Data augmentation
    if data_augmentation:
        image = augment_data(image)

    # Padding
    if padding:
        with tf.name_scope('padding'):
            image, img_width = padding_inputs_width(image, resized_size, increment=CONST.DIMENSION_REDUCTION_W_POOLING)
    # Resize
    else:
        image = tf.image.resize_images(image, size=resized_size)
        img_width = tf.shape(image)[1]

    with tf.control_dependencies([tf.assert_equal(image.shape[:2], resized_size)]):
        return image, img_width
项目:Pyanimelist    作者:GetRektByMe    | 项目源码 | 文件源码
def process_(child) -> Tuple[str, datetime]:
        name, text = child.name, child.get_text()
        try:
            # Try converting text to an integer
            text = int(text)
        # Ignore if we get a value we can't cast to int
        except ValueError:
            pass
        if name == "my_last_updated":
            text = datetime.fromtimestamp(float(text))
        if name in ('my_finish_date', "my_start_date", "series_end", "series_start"):
            try:
                text = datetime.strptime(text, "%Y-%m-%d")
            except ValueError:
                text = datetime.fromtimestamp(0)
        # Return name and text in tuple
        return name, text
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def _sort_dataset_by_padding(dataset: Dataset,
                                 sorting_keys: List[Tuple[str, str]],  # pylint: disable=invalid-sequence-index
                                 padding_noise: float = 0.0) -> Dataset:
        """
        Sorts the ``Instances`` in this ``Dataset`` by their padding lengths, using the keys in
        ``sorting_keys`` (in the order in which they are provided).  ``sorting_keys`` is a list of
        ``(field_name, padding_key)`` tuples.
        """
        instances_with_lengths = []
        for instance in dataset.instances:
            padding_lengths = cast(Dict[str, Dict[str, float]], instance.get_padding_lengths())
            if padding_noise > 0.0:
                noisy_lengths = {}
                for field_name, field_lengths in padding_lengths.items():
                    noisy_lengths[field_name] = add_noise_to_dict_values(field_lengths, padding_noise)
                padding_lengths = noisy_lengths
            instance_with_lengths = ([padding_lengths[field_name][padding_key]
                                      for (field_name, padding_key) in sorting_keys],
                                     instance)
            instances_with_lengths.append(instance_with_lengths)
        instances_with_lengths.sort(key=lambda x: x[0])
        return Dataset([instance_with_lengths[-1] for instance_with_lengths in instances_with_lengths])
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def text_to_instance(self,  # type: ignore
                         question_text: str,
                         passage_text: str,
                         token_spans: List[Tuple[int, int]] = None,
                         answer_texts: List[str] = None,
                         question_tokens: List[Token] = None,
                         passage_tokens: List[Token] = None) -> Instance:
        # pylint: disable=arguments-differ
        if not question_tokens:
            question_tokens = self._tokenizer.tokenize(question_text)
        if not passage_tokens:
            passage_tokens = self._tokenizer.tokenize(passage_text)
        return util.make_reading_comprehension_instance(question_tokens,
                                                        passage_tokens,
                                                        self._token_indexers,
                                                        passage_text,
                                                        token_spans,
                                                        answer_texts)
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def evaluate_result(result: dict, group_order: list) -> Tuple[dict, OrderedDict]:
    """
    Evaluate and describe a complete result dictionary.

    As a result, a dictionary of the groups is returned. Each group has another
    dictionary specifying the amount of good, the amount of bad and the amount
    of neutral results as well as the overall group rating and the ratio of
    good results.
    """
    if 'reachable' in result and not result['reachable']:
        return UnrateableSiteEvaluation(), {}
    evaluated_groups = {}
    described_groups = OrderedDict()
    for group in group_order:
        if group not in CHECKS:
            continue
        evaluated_groups[group], described_groups[group] = evaluate_group(
            group, result)
    return SiteEvaluation(evaluated_groups, group_order), described_groups
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def _parse_new_results(previous_results: List[Tuple[list, dict]]) -> tuple:
    """
    Parse previous results, split into raw data, results and errors and merge
    data from multiple test suites.
    """
    raw = []
    result = {}
    errors = []
    for e in previous_results:
        if isinstance(e, (list, tuple)):
            scan_host = e[0]
            test = e[1]
            if isinstance(e[2], dict):
                # add test specifier to each raw data element
                for identifier, raw_elem in e[2].items():
                    raw.append(dict(
                        identifier=identifier,
                        scan_host=scan_host,
                        test=test,
                        **raw_elem))
            if isinstance(e[3], dict):
                result.update(e[3])
        else:
            errors.append(e)
    return raw, result, errors
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_form_and_formset(
    request: HttpRequest=None, extra: int=1, initial_form: SessionBaseForm=None,
    initial_formset=None
) -> Tuple[SessionBaseForm, Any]:
    ItemMovementFormSet = forms.formset_factory(ItemMovementForm, extra=extra)

    if request:
        form = SessionBaseForm(request.POST, prefix='session')
        formset = ItemMovementFormSet(request.POST, prefix='items')
    elif initial_form or initial_formset:
        form = SessionBaseForm(initial=initial_form, prefix='session')
        formset = ItemMovementFormSet(initial=initial_formset, prefix='items')
    else:
        form = SessionBaseForm(prefix='session')
        formset = ItemMovementFormSet(prefix='items')
    return form, formset
项目:extra-trees    作者:allrod5    | 项目源码 | 文件源码
def _split_sample(
            split: Callable[[object], bool], X: np.ndarray, y: np.ndarray
    ) -> Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]:
        """
        Split X, y sample set in two with a split function
        :return: ((X_left, y_left), (X_right, y_right))
        """
        if split.type is 'numerical':
            left_indexes = X[:, split.attribute] < split.criteria
            right_indexes = ~left_indexes
        else:
            Z = (
                pd.Index(pd.unique(split.criteria))
                .get_indexer(X[:, split.attribute]))
            left_indexes = np.where(Z >= 0)[0]
            right_indexes = np.where(Z < 0)[0]

        left = X[left_indexes], y[left_indexes]
        right = X[right_indexes], y[right_indexes]

        return left, right
项目:AbricotGame    作者:NB0174    | 项目源码 | 文件源码
def linearize(path: List, obstacles: List[Tuple]) ->List:
    """
    Remplit l'espace entre deux cases non consecutives
    :param path: -> Liste de coordonnees du chemin
    :param obstacles: -> Liste de coordonnees des obstacles
    :return: -> Une liste linearisee
    """
    y_dir = 1 if path[0][1] < path[-1][1] else -1
    x_dir = 1 if path[0][0] < path[-1][0] else -1
    list2 = []
    for i in range(1, len(path) + 1):
        try:
            list2.append(path[i - 1])
            if path[i - 1][0] != path[i][0] and path[i - 1][1] != path[i][1]:
                if (path[i - 1][0], path[i - 1][1] + y_dir) not in obstacles:
                    list2.append((path[i - 1][0], path[i - 1][1] + y_dir))
                elif (path[i - 1][0] + x_dir, path[i - 1][1]) not in obstacles:
                    list2.append((path[i - 1][0] + x_dir, path[i - 1][1]))
        except IndexError:
            continue

    return list(remove_duplicates(list2))
项目:AbricotGame    作者:NB0174    | 项目源码 | 文件源码
def linearize(path: List, obstacles: List[Tuple]) ->List:
    """
    Remplit l'espace entre deux cases non consecutives
    :param path: -> Liste de coordonnees du chemin
    :param obstacles: -> Liste de coordonnees des obstacles
    :return: -> Une liste linearisee
    """
    y_dir = 1 if path[0][1] < path[-1][1] else -1
    x_dir = 1 if path[0][0] < path[-1][0] else -1
    list2 = []
    for i in range(1, len(path) + 1):
        try:
            list2.append(path[i - 1])
            if path[i - 1][0] != path[i][0] and path[i - 1][1] != path[i][1]:
                if (path[i - 1][0], path[i - 1][1] + y_dir) not in obstacles:
                    list2.append((path[i - 1][0], path[i - 1][1] + y_dir))
                elif (path[i - 1][0] + x_dir, path[i - 1][1]) not in obstacles:
                    list2.append((path[i - 1][0] + x_dir, path[i - 1][1]))
        except IndexError:
            continue

    return list(remove_duplicates(list2))
项目:AbricotGame    作者:NB0174    | 项目源码 | 文件源码
def verif_conditions(self, entitee: Entitee, cible: Tuple[int, int]) -> bool:
        """Cette fonction détermine si le sort est valide"""
        if entitee.var_attributs.ap >= self.cost:
            if entitee.combat_coords[0] == cible[0]:
                if not (self.max_range >= abs(entitee.combat_coords[1] - cible[1]) >= self.min_range):
                    return False
            elif entitee.combat_coords[1] == cible[1]:
                if not (self.max_range >= abs(entitee.combat_coords[0] - cible[0]) >= self.min_range):
                    return False
            else:
                return False
            cases_traversee = bresenham(entitee.combat_coords, cible)
            for i in cases_traversee:
                if i in entitee.combat.map.fullobs:
                    return False
            entitee.var_attributs.ap -= self.cost
            return True
        return False
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def config_per_platform(config: ConfigType,
                        domain: str) -> Iterable[Tuple[Any, Any]]:
    """Generator to break a component config into different platforms.

    For example, will find 'switch', 'switch 2', 'switch 3', .. etc
    """
    for config_key in extract_domain_configs(config, domain):
        platform_config = config[config_key]

        if not platform_config:
            continue
        elif not isinstance(platform_config, list):
            platform_config = [platform_config]

        for item in platform_config:
            try:
                platform = item.get(CONF_PLATFORM)
            except AttributeError:
                platform = None

            yield platform, item
项目:e2e-ie-release    作者:rasmusbergpalm    | 项目源码 | 文件源码
def metrics_from_counts(counts: List[int]) -> Tuple[float, float, float, float]:
    """
    Computes classifier metrics given counts of correct, incorrect, missing and spurious

    :param counts: A (4,) vector of (correct, incorrect, missing, spurious)
    :return: acc, recall, precision and f1
    """

    eps = 1e-16
    correct, incorrect, missing, spurious = counts

    acc = correct / (correct + incorrect + missing + spurious + eps)
    recall = correct / (correct + incorrect + missing + eps)
    precision = correct / (correct + incorrect + spurious + eps)
    f1 = 2 * (precision * recall) / (recall + precision + eps)

    return acc, recall, precision, f1
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def _batch_questions(self, questions: List[Tuple[QASetting, List[Answer]]], batch_size, is_eval: bool):
        """Optionally shuffles and batches annotations.

        By default, all annotations are shuffled (if self.shuffle(is_eval) and
        then batched. Override this method if you want to customize the
        batching, e.g., to do stratified sampling, sampling with replacement,
        etc.

        Args:
            - annotations: List of annotations to shuffle & batch.
            - is_eval: Whether batches are generated for evaluation.

        Returns: Batch iterator
        """
        rng = _rng if self._shuffle(is_eval) else None
        return shuffle_and_batch(questions, batch_size, rng)
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def train(self, optimizer,
              training_set: Iterable[Tuple[QASetting, List[Answer]]],
              batch_size: int, max_epochs=10, hooks=tuple(),
              l2=0.0, clip=None, clip_op=tf.clip_by_value, summary_writer=None, **kwargs):
        """
        This method trains the reader (and changes its state).

        Args:
            optimizer: TF optimizer
            training_set: the training instances.
            batch_size: size of training batches
            max_epochs: maximum number of epochs
            hooks: TrainingHook implementations that are called after epochs and batches
            l2: whether to use l2 regularization
            clip: whether to apply gradient clipping and at which value
            clip_op: operation to perform for clipping
        """
        batches, loss, min_op, summaries = self._setup_training(
            batch_size, clip, optimizer, training_set, summary_writer, l2, clip_op, **kwargs)

        self._train_loop(min_op, loss, batches, hooks, max_epochs, summaries, summary_writer, **kwargs)
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def __init__(self, text: str, span: Tuple[int, int] = None, doc_idx: int = 0, score: float = 1.0):
        """
        Create a new answer.
        Args:
            text: The text string of the answer.
            span: For extractive QA, a span in the support documents. The triple `(start, end)`
                represents a span in support document with index `doc_index` in the ordered sequence of
            doc_idx: index of document where answer was found
            support documents. The span starts at `start` and ends at `end` (exclusive).
            score: the score a model associates with this answer.
        """
        assert span is None or len(span) == 2, "span should be (char_start, char_end) tuple"

        self.score = score
        self.span = span
        self.doc_idx = doc_idx
        self.text = text
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def quadratic(a: float, b: float, c: float) -> Tuple[complex, complex]:
    ''' Compute the roots of the quadratic equation:

            ax^2 + bx + c = 0

        Written in Python as:

            a*x**2 + b*x + c == 0.0

        For example:

            >>> x1, x2 = quadratic(a=8, b=22, c=15)
            >>> x1
            (-1.25+0j)
            >>> x2
            (-1.5+0j)
            >>> 8*x1**2 + 22*x1 + 15
            0j
            >>> 8*x2**2 + 22*x2 + 15
            0j

    '''
    discriminant = cmath.sqrt(b**2.0 - 4.0*a*c)
    x1 = (-b + discriminant) / (2.0 * a)
    x2 = (-b - discriminant) / (2.0 * a)
    return x1, x2
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def tokenize(sentences: List[str]) -> Tuple[List[int], List[List[str]]]:

    tokenizer = Tokenizer()
    lengths = []
    texts = []
    for s in sentences:
        result = tokenizer.tokenize(s)

        surfaces = [t.surface for t in result]
        lengths.append(len(surfaces))

        text = ' '.join(surfaces)
        texts.append(text)
    return lengths, texts
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def option_make(optionname,      # type: AnyStr
                optiontype,      # type: AnyStr
                configs,         # type: Iterable[OptionValue]
                nestedopts=None  # type: Optional[StyleDef]
                ):
    # type: (...) -> Tuple[str, str, List[OptionValue], Optional[StyleDef]]
    configs = [typeconv(c) for c in configs]
    return unistr(optionname), unistr(optiontype), configs, nestedopts
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def style_make(options=None):
    # type: (Union[dict, List[Tuple[str, OptionValue]], None]) -> Style
    if options is None:
        return Style()
    if isinstance(options, dict):
        s = style_make()
        for k, v in sorted(options.items()):
            if isinstance(v, dict):
                v = style_make(v)
            set_option(s, k, v)
        return s
    raise TypeError('options must be a dict or None')