Python builtins 模块,dict() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用builtins.dict()

项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def main(post, name, password, text):
    """?? ??? ?????."""
    data = _make_comment_data(post, name, password, text)
    SESSION.headers['Referer'] = _get_comment_referrer_url(post)
    SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'

    # Post comment
    r = SESSION.post(url=URL.COMMENT_SUBMIT, data=data)

    # Post comment count
    data = dict(ci_t=post['ci_t'],
                id=post['gall_name'],
                no=post['post_num'])
    SESSION.post(url=URL.COMMENT_COUNT, data=data)

    # Post view count
    data['comment_page'] = 1
    SESSION.post(url=URL.COMMENT_VIEW, data=data)
    return r
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def openscope(self, customlocals=None):
        '''Opens a new (embedded) scope.

        Args:
            customlocals (dict): By default, the locals of the embedding scope
                are visible in the new one. When this is not the desired
                behaviour a dictionary of customized locals can be passed,
                and those locals will become the only visible ones.
        '''
        self._locals_stack.append(self._locals)
        self._globalrefs_stack.append(self._globalrefs)
        if customlocals is not None:
            self._locals = customlocals.copy()
        elif self._locals is not None:
            self._locals = self._locals.copy()
        else:
            self._locals = {}
        self._globalrefs = set()
        self._scope = self._globals.copy()
        self._scope.update(self._locals)
项目:qtpandas    作者:draperjames    | 项目源码 | 文件源码
def update_file(self, filepath, df, notes=None):
        """
        Sets a new DataFrame for the DataFrameModel registered to filepath.
        :param filepath (str)
            The filepath to the DataFrameModel to be updated
        :param df (pandas.DataFrame)
            The new DataFrame to register to the model.

        :param notes (str, default None)
            Optional notes to register along with the update.

        """
        assert isinstance(df, pd.DataFrame), "Cannot update file with type '{}'".format(type(df))

        self._models[filepath].setDataFrame(df, copyDataFrame=False)

        if notes:
            update = dict(date=pd.Timestamp(datetime.datetime.now()),
                                                     notes=notes)

            self._updates[filepath].append(update)
        self._paths_updated.append(filepath)
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def to_dict(self):
        """
        Convert the named hash to a dictionary

        Returns:
            dict: Dictionary representing this enum type
        """
        d = {}

        d['name'] = self.name
        d['type'] = self.typ
        d['description'] = self.description

        d['fields'] = self.fields

        return d
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def set_description(self, name, description, version, custom_fields = None):
        """
        Set the description of this RPC service

        Args:
            name (str): Name of the service
            description (str): Description of the service
            version (str): Version of the service
            custom_fields (dict): A dict with user-defined fields
        """
        self.description['name'] = name
        self.description['description'] = description
        self.description['version'] = version

        if custom_fields is None:
            custom_fields = {}

        self.description['custom_fields'] = custom_fields
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def handle_error(self, e, reply):
        """
        Rewrite a reply dict for an exception caught during RPC function execution

        Args:
            e (Exception): The exception caught when executing an RPC function
            reply (dict): The reply we wanted to send to the client before the error occured

        Returns:
            dict: The modified reply dict now containing information about the error
        """
        try:
            raise e
        except JsonRpcError as e:
            reply['error'] = e.to_dict()
            reply['result'] = None
        except Exception as e:
            traceback.print_exc()
            error = JsonRpcInternalError("Internal error")
            reply['error'] = error.to_dict()
            reply['result'] = None

        return reply
项目:memote    作者:opencobra    | 项目源码 | 文件源码
def test_metabolite_id_namespace_consistency(read_only_model, store):
    """Expect metabolite IDs to be from the same namespace."""
    overview = annotation.generate_component_id_namespace_overview(
        read_only_model, "metabolites")
    store['met_ids_in_each_namespace'] = dict(
        (key, int(val)) for key, val in overview.iteritems())
    distribution = overview.sum()
    cols = list(distribution.index)
    largest = distribution[cols].idxmax()
    if largest != 'bigg.metabolite':
        warn(
            'memote currently only supports syntax checks for BiGG identifiers.'
            ' Please consider mapping your IDs from {} to BiGG'
            ''.format(largest)
        )
    assert distribution[largest] == len(read_only_model.metabolites), \
        "Metabolite IDs that don't belong to the largest fraction: {}"\
        "".format(", ".join(overview.loc[~overview[largest], largest].index))
项目:memote    作者:opencobra    | 项目源码 | 文件源码
def test_reaction_id_namespace_consistency(read_only_model, store):
    """Expect reaction IDs to be from the same namespace."""
    overview = annotation.generate_component_id_namespace_overview(
        read_only_model, "reactions")
    store['rxn_ids_in_each_namespace'] = dict(
        (key, int(val)) for key, val in overview.iteritems())
    distribution = overview.sum()
    cols = list(distribution.index)
    largest = distribution[cols].idxmax()
    if largest != 'bigg.reaction':
        warn(
            'memote currently only supports syntax checks for BiGG identifiers.'
            ' Please consider mapping your IDs from {} to BiGG'
            ''.format(largest)
        )
    assert distribution[largest] == len(read_only_model.metabolites), \
        "Reaction IDs that don't belong to the largest fraction: {}" \
        "".format(", ".join(overview.loc[~overview[largest], largest].index))
项目:memote    作者:opencobra    | 项目源码 | 文件源码
def _collect_consistency_plots(self):
        """Create plots from the consistency info data frame."""
        df = self.bag.get_consistency_dataframe()
        plots = dict()
        plots["is_consistent"] = plt.boolean_chart(
            df[[self.index, "is_consistent"]],
            "Is Stoichiometrically Consistent")
        plots["unconserved_metabolites"] = plt.scatter_line_chart(
            df[[self.index, "unconserved_metabolites"]],
            "Number of Unconserved Metabolites")
        plots["magic_atp_production"] = plt.boolean_chart(
            df[[self.index, "magic_atp_production"]],
            "Has Magic ATP Production")
        plots["imbalanced_reactions"] = plt.scatter_line_chart(
            df[[self.index, "imbalanced_reactions"]],
            "Number of Imbalanced Reactions")
        plots["blocked_reactions"] = plt.scatter_line_chart(
            df[[self.index, "blocked_reactions"]],
            "Number of Blocked Reactions")
        # TODO: Fix looped in tests.
#        plots["looped_reactions"] = plt.scatter_line_chart(
#            df[[self.index, "looped_reactions"]],
#            "Number of Looped Reactions")
        return plots
项目:memote    作者:opencobra    | 项目源码 | 文件源码
def _collect_biomass_plots(self):
        """Create plots from the biomass info data frame."""
        df = self.bag.get_biomass_dataframe()
        plots = dict()
        # components sum
        factor = "biomass_ids"
        plots["biomass_sum"] = plt.scatter_line_chart(
            df[[self.index, "biomass_sum", factor]],
            r"$ \text{Sum of Biomass Components }"
            r"[ \text{mmol} \text{g}_{\text{DW}}^{-1}  \text{h}^{-1} ] $")
        plots["biomass_default_flux"] = plt.scatter_line_chart(
            df[[self.index, "biomass_default_flux", factor]],
            r"$ \text{Biomass Flux }"
            r"[ \text{mmol} \text{g}_{\text{DW}}^{-1}  \text{h}^{-1} ] $")
        plots["num_default_blocked_precursors"] = plt.scatter_line_chart(
            df[[self.index, "num_default_blocked_precursors", factor]],
            "Number of Blocked Biomass Precursors in Default Medium")
        plots["num_open_blocked_precursors"] = plt.scatter_line_chart(
            df[[self.index, "num_open_blocked_precursors", factor]],
            "Number of Blocked Biomass Precursors in Complete Medium")
        plots["gam_in_biomass"] = plt.boolean_chart(
            df[[self.index, "gam_in_biomass"]],
            "Biomass Contains Growth-associated Maintenance")
        return plots
项目:memote    作者:opencobra    | 项目源码 | 文件源码
def build_index(self):
        """Build a data index either from timestamps and commit hashes."""
        LOGGER.debug("Building index...")
        expected = pd.DataFrame({
            "timestamp": pd.Series(dtype="datetime64[ns]"),
            "commit_hash": pd.Series(dtype="str")
        })
        df = self._bag.pluck("meta", dict()).to_dataframe(expected).compute()
        df.set_index(
            "commit_hash", drop=True, inplace=True, verify_integrity=True)
        trunc = 5
        res = df.index.str[:trunc]
        while len(res.unique()) < len(df):
            trunc += 1
            res = df.index.str[:trunc]
        df["commit_hash"] = res.copy()
        df.sort_values("timestamp", inplace=True, kind="mergesort")
        self._index = df
        LOGGER.debug("%s", str(df))
项目:myhdlpeek    作者:xesscorp    | 项目源码 | 文件源码
def clear(cls):
        '''Clear the global list of Peekers.'''
        cls._peekers = dict()
项目:skidl    作者:xesscorp    | 项目源码 | 文件源码
def _find_num_copies(**attribs):
    """
    Return the number of copies to make from the length of attribute values.

    Keyword Args:
        attribs: Dict of Keyword/Value pairs for setting object attributes.
            If the value is a scalar, then the number of copies is one.
            If the value is a list/tuple, the number of copies is the
            length of the list/tuple.

    Returns:
        The length of the longest value in the dict of attributes.

    Raises:
        Exception if there are two or more list/tuple values with different
        lengths that are greater than 1. (All attribute values must be scalars
        or lists/tuples of the same length.)
    """
    num_copies = set()
    for k, v in attribs.items():
        if isinstance(v, (list, tuple)):
            num_copies.add(len(v))
        else:
            num_copies.add(1)

    num_copies = list(num_copies)
    if len(num_copies) > 2:
        logger.error("Mismatched lengths of attributes: {}!".format(
            num_copies))
        raise Exception
    elif len(num_copies) > 1 and min(num_copies) > 1:
        logger.error("Mismatched lengths of attributes: {}!".format(
            num_copies))
        raise Exception

    try:
        return max(num_copies)
    except ValueError:
        return 0  # If the list if empty.

##############################################################################
项目:skidl    作者:xesscorp    | 项目源码 | 文件源码
def _load_sch_lib_skidl(self, filename=None, lib_search_paths=None):
        """
        Load the parts from a SKiDL schematic library file.

        Args:
            filename: The name of the SKiDL schematic library file.
        """

        f = _find_and_open_file(filename, lib_search_paths, lib_suffixes[SKIDL], allow_failure=True)
        if not f:
            logger.warning('Unable to open SKiDL Schematic Library File {}.\n'.format(filename))
            return
        try:
            # The SKiDL library is stored as a Python module that's executed to
            # recreate the library object.
            vars = {}  # Empty dictionary for storing library object.
            exec(f.read(), vars)  # Execute and store library in dict.

            # Now look through the dict to find the library object.
            for v, val in vars.items():
                if isinstance(val, SchLib):
                    # Overwrite self with the new library.
                    self.__dict__.update(val.__dict__)
                    return

            # Oops! No library object. Something went wrong.
            raise Exception('No SchLib object found in {}'.format(filename))

        except Exception as e:
            logger.error('Problem with {}'.format(f))
            logger.error(e)
            raise Exception
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def main(post, gall_name_en, comment_num, password):
    """??? ?????."""
    data = dict(ci_t=post['ci_t'],
                no=post['post_num'],
                id=gall_name_en,
                p_no=post['post_num'],
                re_no=comment_num,
                orgin_no=0,
                password=password,
                best_orgin='',
                check_7=post['check_7'])

    SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'
    return SESSION.post(url=URL.COMMENT_DELETE_SUBMIT, data=data)
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def _yield_comment_html(post, cmt_all=True):
    """? ???? html? yield???."""
    if cmt_all:
        max_pages = post['total_pages']
    else:
        max_pages = 1

    for page in range(1, max_pages + 1):
        data = dict(ci_t=post['ci_t'],
                    id=post['gall_name'],
                    no=post['post_num'],
                    comment_page=page)
        SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'
        r = SESSION.post(url=URL.COMMENT_VIEW, data=data).text
        yield r
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def upvote(gall_name_en, post_num, ci_t):
    """?? ??? ??? ???."""
    SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'
    SESSION.headers['Referer'] = URL.POST_VOTE_UP
    SESSION.cookies[gall_name_en + str(post_num) + '_Firstcheck'] = 'Y'

    data = dict(ci_t=ci_t, id=gall_name_en, no=post_num,
                recommend=0, vote='vote', user_id='')
    return SESSION.post(url=URL.POST_VOTE_UP, data=data)
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def downvote(gall_name_en, post_num, ci_t):
    """?? ??? ???."""
    SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'
    SESSION.headers['Referer'] = URL.POST_VOTE_DOWN
    SESSION.cookies[gall_name_en + str(post_num) + '_Firstcheck'] = 'Y'
    SESSION.cookies[gall_name_en + str(post_num) + '_Firstcheck_down'] = 'Y'

    data = dict(ci_t=ci_t, id=gall_name_en, no=post_num,
                recommend=0, vote='vote', user_id='')
    return SESSION.post(url=URL.POST_VOTE_DOWN, data=data)
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def _get_key(ci_t, gall_name_en, password, post_num):
    """key? ????."""
    SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'
    data = dict(ci_t=ci_t, password=password, id=gall_name_en, no=post_num)
    con = SESSION.post(url=URL.DELETE_POST2, data=data)
    key = con.text.split('||')[1]
    return key
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def _get_block_key(block_key, ci_t, r_key, gall_name_en):
    """block_key? ????."""
    while 1:
        url = 'http://gall.dcinside.com/block/block/'
        SESSION.headers['X-Requested-With'] = 'XMLHttpRequest'
        SESSION.cookies['dcgame_top'] = 'Y'
        data = dict(ci_t=ci_t, id=gall_name_en, block_key=block_key)
        con = SESSION.post(url, data=data)
        if con.text != '':
            return con.text

        # ?? ??? ?? ?? ?? ? ?? ?? ?
        time.sleep(0.5)
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def _submit(gall_name_en, ci_t, r_key, block_key,
            subject, password, name, memo):
    """?? ?????."""

    url = 'http://gall.dcinside.com/forms/article_submit'

    data = dict(
        upload_status='N',
        id=gall_name_en,
        ci_t=ci_t,
        subject=subject,
        password=password,
        r_key=r_key,
        name=name,
        memo=memo,
        block_key=block_key,
        vid='',
        isMovie='',
        campus=0,
        ipt_movieCompType='',
        wiki_tag='',
        sijosae='tlwhtorororRl'
    )

    SESSION.headers['Referer'] = URL.POST_CREATE.format(
        gall_name_en=gall_name_en)
    return SESSION.post(url, data=data)
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def post_parser(html, gall_en_name, post_num):
    d = pq(html)
    gall_name, post_num = gall_en_name, post_num
    comments_count = _parse_comments_count(d)

    post = dict(
        gall_name=gall_name,
        post_num=post_num,
        imgs1=_parse_imgs1(d),
        imgs2=_parse_imgs2(d),
        title=_parse_title(d),
        author=_parse_author(d),
        views=_parse_views_count(d),
        comments_count=comments_count,
        content=_parse_text_content(d),
        html_content=_parse_html_content(d),
        datetime=_parse_datetime(d),
        IP=_parse_ip(d),
        user_id=_parse_author_id(d),
        ci_t=_parse_ci_t(d),
        total_pages=_parse_total_pages(comments_count),
        check_6=_parse_check_6(d),
        check_7=_parse_check_7(d),
        check_8=_parse_check_8(d),
        delete_values=_parse_delete_values(d),
        upvote=_parse_upvote(d),
        downvote=_parse_downvote(d),
        mandu=_parse_mandu(d),
    )

    return post
项目:dcinside    作者:carcdrcons    | 项目源码 | 文件源码
def parse_td(td_list):
    post_number = get_number_from_lxml_elem(td_list[0])
    author = [i for i in td_list[2].itertext()][0]

    return dict(
        post_num=post_number,
        subject=pq(td_list[1])('a')[0].text_content(),
        reply_num=parse_int(pq(td_list[1])('em').text()),
        post_type=pq(td_list[1])('a')[0].get('class'),
        author=author,
        user_id=pq(td_list[2])[0].get('user_id'),
        date=datetime.strptime(td_list[3].text_content(), '%Y.%m.%d'),
        views=get_number_from_lxml_elem(td_list[4]),
        recommended_num=get_number_from_lxml_elem(td_list[5])
    )
项目:statik    作者:thanethomson    | 项目源码 | 文件源码
def __init__(self):
        self.filters = dict()
        self.tags = dict()
        self.takes_context = dict()
项目:gobble    作者:openspending    | 项目源码 | 文件源码
def _request_authentication(self):
        """Ask Open-Spending if the token is valid.
        """
        query = dict(jwt=self.token)
        response = authenticate_user(params=query)
        authentication = handle(response)

        if not authentication['authenticated']:
            message = 'Token has expired: request a new one'
            log.error(message)
            raise InvalidToken(message)

        name = authentication['profile']['name']
        log.info('Hello %s! You are logged into Open-Spending', name)
        return authentication
项目:gobble    作者:openspending    | 项目源码 | 文件源码
def _request_permissions(self):
        """Request permissions for Open-Spending services.
        """
        permissions = {}

        for service in OS_SERVICES:
            query = dict(jwt=self.token, service=service)
            response = authorize_user(params=query)
            permission = handle(response)
            permissions.update({service: permission})

            return permissions
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def _get_restricted_builtins(cls):
        bidict = dict(cls._RESTRICTED_BUILTINS)
        major = sys.version_info[0]
        if major == 2:
            bidict['True'] = True
            bidict['False'] = False
        return bidict
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def process_file(self, infile, outfile=None):
        '''Processes input file and writes result to output file.

        Args:
            infile (str): Name of the file to read and process. If its value is
                '-', input is read from stdin.
            outfile (str, optional): Name of the file to write the result to.
                If its value is '-', result is written to stdout. If not
                present, result will be returned as string.
            env (dict, optional): Additional definitions for the evaluator.

        Returns:
            str: Result of processed input, if no outfile was specified.
        '''
        infile = STDIN if infile == '-' else infile
        output = self._preprocessor.process_file(infile)
        if outfile is None:
            return output
        else:
            if outfile == '-':
                outfile = sys.stdout
            else:
                outfile = _open_output_file(outfile, self._create_parent_folder)
            outfile.write(output)
            if outfile != sys.stdout:
                outfile.close()
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def process_text(self, txt):
        '''Processes a string.

        Args:
            txt (str): String to process.
            env (dict, optional): Additional definitions for the evaluator.

        Returns:
            str: Processed content.
        '''
        return self._preprocessor.process_text(txt)
项目:logger    作者:oval-group    | 项目源码 | 文件源码
def __init__(self, name, log_git_hash=True,
                 use_visdom=False, visdom_opts=None,
                 time_indexing=True, xlabel=None):
        """ Create an experiment with the following parameters:
        - log_git_hash (bool): retrieve current commit hash to log code status
        - use_visdom (bool): monitor metrics logged on visdom
        - visdom_opts (dict): options for visdom
        - time_indexing (bool): use time to index values (otherwise counter)
        """

        super(Experiment, self).__init__()

        self.name = name.split('/')[-1]
        self.name_and_dir = name
        self.date_and_time = time.strftime('%d-%m-%Y--%H-%M-%S')

        self.logged = defaultdict(OrderedDict)
        self.metrics = defaultdict(dict)
        self.registered = []

        self.config = dict()
        self.use_visdom = use_visdom
        self.time_indexing = time_indexing

        if self.use_visdom:
            self.plotter = Plotter(self, visdom_opts, xlabel)

        if log_git_hash:
            self.log_git_hash()
项目:logger    作者:oval-group    | 项目源码 | 文件源码
def log_git_hash(self):

        try:
            repo = git.Repo(search_parent_directories=True)
            git_hash = repo.head.object.hexsha
            head = repo.head.commit.tree
            git_diff = repo.git.diff(head)
            self.log_config(dict(git_hash=git_hash,
                                 git_diff=git_diff),
                            to_visdom=False)
        except:
            print("I tried to find a git repository in current "
                  "and parent directories but did not find any.")
项目:logger    作者:oval-group    | 项目源码 | 文件源码
def _dict_process(my_dict):
    logged = defaultdict(OrderedDict)
    for key in my_dict['logged'].keys():
        splitted = key.split('_')
        if not len(splitted):
            splitted.append("default")
        name, tag = '_'.join(splitted[:-1]), splitted[-1]
        values = my_dict['logged'].pop(key)
        # sort values based on x-value
        values = sorted(values.items(), key=lambda x: float(x[0]))
        logged[tag][name] = OrderedDict(values)
    my_dict['logged'] = logged
    # no need for an ordered dictionary for config
    my_dict['config'] = dict(my_dict['config'])
    return my_dict
项目:pyiem    作者:rheineke    | 项目源码 | 文件源码
def __repr__(self):
        fmt = 'Single {contract}: {side} {qty}@{pricetimelimit} ({cp})'
        kwargs = dict(contract=self.contract,
                      side=self.side,
                      qty=self.quantity,
                      pricetimelimit=self.price_time_limit,
                      cp=self.counterparty)
        return fmt.format(**kwargs)


# TODO: Can combine if contract can hold same info as contract_bundle
项目:pyiem    作者:rheineke    | 项目源码 | 文件源码
def __repr__(self):
        fmt = 'Bundle {contract}: {side} {qty} ({cp})'
        kwargs = dict(contract=self.contract_bundle, side=self.side,
                      qty=self.quantity, cp=self.counterparty)
        return fmt.format(**kwargs)
项目:pyload-requests    作者:pyload    | 项目源码 | 文件源码
def safeurlencode(data):
    data = dict(data)
    return urlencode(
        dict((convert.to_bytes(x, x), convert.to_bytes(y, y))
             for x, y in data.items()))
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def json2py(json_type):
    mapping = {'bool': 'bool', 'int': 'int', 'float': 'float', 'string':
               'str', 'array': 'list', 'hash': 'dict', 'base64': 'str'}

    if json_type not in mapping:
        return None

    return mapping[json_type]
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def to_dict(self):
        """
        Convert the error to a dictionary

        Returns:
            dict: Dictionary representing this error
        """
        error = {}

        error['name'] = self.name
        error['message'] = self.msg

        return error
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def to_dict(self):
        """
        Convert the enum to a dictionary

        Returns:
            dict: Dictionary representing this enum type
        """
        d = {}

        d['name'] = self.name
        d['type'] = self.typ
        d['description'] = self.description
        d['values'] = self.values

        return d
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def to_dict(self):
        """
        Convert the function description to a dictionary

        Returns:
            dict: Dictionary representing this error
        """
        d = {}

        d['name'] = self.name
        d['description'] = self.description
        d['result_type'] = self.result_type
        d['result_desc'] = self.result_desc

        params = []

        for p in self.params:
            param = {}

            param['name'] = p['name']
            param['description'] = p['description']
            param['type'] = p['type']

            params.append(param)

        d['params'] = params

        return d
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def describe_service(self):
        """
        Return the self-description of this RPC service

        Returns:
            dict: Description of this service
        """
        return self.description
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def call_function(self, rpcfunction, rpcinfo, *params):
        """
        Execute the actual function

        Params:
            rpcfunction (RpcFunction): RPC function object representing a function
            rpcinfo (dict): Additional information to pass to the function
        """
        if rpcfunction.requires_rpcinfo:
            return rpcfunction.func(rpcinfo, *params)

        return rpcfunction.func(*params)
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def __init__(self, segid, functions, other_components_id_list=[]):
        """Builds a new SegmentPlotList
        :param functions: functions which must return a `Plot` (or an Plot-convertible object).
        **the first item MUST be `_getme` by default**
        """
        super(SegmentPlotList, self).__init__([None] * len(functions))
        self.functions = functions
        # use dict instead of {} to make it py2 compatible (see future imports at module's top)
        self.data = dict(stream=None, plot_title_prefix='',
                         **({k: None for k in self._data_to_invalidate}))
        self.segment_id = segid
        self.oc_segment_ids = other_components_id_list
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def get_plots(self, session, plot_indices, inv_cache, config):
        '''
        Returns the list of `Plot`s representing the the custom functions of
        the segment identified by `seg_id. The length of the returned list equals
        `len(plot_indices)`. The list can be manipulated without affecting the stored internal list,
        the elements are passed by reference and thus each element modification affects the
        stored element
        :param seg_id: (integer) a valid segment id (i.e., must be the id of one of the segments
        passed in the constructor)
        :param inv: (inventory object) an object either inventory or exception
        (will be handled by `exec_function`, which is called internally)
        :param config: (dict) the plot config parsed from a user defined yaml file
        :param all_components_on_main_plot: (bool) if True, and the index of the main plot
        (usually 0) is in plot_indices, then the relative plot will show the stream and
        and all other components together
        '''
        with enhancesegmentclass(config):
            index_of_main_plot = 0  # the index of the function returning the
            # trace plot (main plot returning the trace as it is)

            ret = []
            for i in plot_indices:
                if self[i] is None:
                    # trace either trace or exception (will be handled by exec_function:
                    # skip calculation and return empty trace with err message)
                    # inv: either trace or exception (will be handled by exec_function:
                    # append err mesage to warnings and proceed to calculation normally)
                    self[i] = self.get_plot(self.functions[i], session, inv_cache, config,
                                            func_name='' if i == index_of_main_plot else None)
                ret.append(self[i])
            return ret
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def update(self, *args, **kwargs):  # python2 compatibility (python3 calls __setitem__)
        if args:
            if len(args) > 1:
                raise TypeError("update expected at most 1 arguments, got %d" % len(args))
            other = dict(args[0])
            for key in other:
                super(LimitedSizeDict, self).__setitem__(key, other[key])
        for key in kwargs:
            super(LimitedSizeDict, self).__setitem__(key, kwargs[key])
        self._check_size_limit()
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def __init__(self, size_limit=30):
        super(InventoryCache, self).__init__(size_limit=size_limit)
        self._segid2staid = dict()
项目:pythainlp    作者:PyThaiNLP    | 项目源码 | 文件源码
def thai_num_to_num(text):
    """?????? ''str'' ?????? ''str'' ????????????????"""
    thaitonum = dict((x[2], x[1]) for x in p[1:])
    return thaitonum[text]
#????????????????
项目:pythainlp    作者:PyThaiNLP    | 项目源码 | 文件源码
def thai_num_to_text(text):
    """?????? ''str'' ?????? ''str'' ????????????????????"""
    thaitonum = dict((x[2], x[0]) for x in p[1:])
    return thaitonum[text]
#????????????
项目:pythainlp    作者:PyThaiNLP    | 项目源码 | 文件源码
def num_to_thai_num(text):
    """?????? ''str'' ?????? ''str'' ????????????????"""
    thaitonum = dict((x[1], x[2]) for x in p[1:])
    return thaitonum[text]
#?????????????
项目:pythainlp    作者:PyThaiNLP    | 项目源码 | 文件源码
def num_to_text(text):
    """?????? ''str'' ?????? ''str'' ?????????????????"""
    thaitonum = dict((x[1], x[0]) for x in p[1:])
    return thaitonum[text]
#?????????????
项目:pythainlp    作者:PyThaiNLP    | 项目源码 | 文件源码
def text_to_num(text):
    """?????? ''str'' ?????? ''str'' ?????????????????"""
    thaitonum = dict((x[0], x[1]) for x in p[1:])
    return thaitonum[text]
#????????????????