Python builtins 模块,list() 实例源码

我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用builtins.list()

项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def isoratio_init(self,isos):
        '''
        This file returns the isotopic ratio of two isotopes specified
        as iso1 and iso2. The isotopes are given as, e.g.,
        ['Fe',56,'Fe',58] or ['Fe-56','Fe-58'] (for compatibility)
        -> list.

        '''
        if len(isos) == 2:
            dumb = []
            dumb = isos[0].split('-')
            dumb.append(isos[1].split('-')[0])
            dumb.append(isos[1].split('-')[1])
            isos = dumb
        ssratio = old_div(self.habu[isos[0].ljust(2).lower() + str(int(isos[1])).rjust(3)], self.habu[isos[2].ljust(2).lower() + str(int(isos[3])).rjust(3)])
        return ssratio
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def iso_abundance(self,isos):
        '''
        This routine returns the abundance of a specific isotope.
        Isotope given as, e.g., 'Si-28' or as list
        ['Si-28','Si-29','Si-30']

        '''
        if type(isos) == list:
            dumb = []
            for it in range(len(isos)):
                dumb.append(isos[it].split('-'))
            ssratio = []
            isos = dumb
            for it in range(len(isos)):
                ssratio.append(self.habu[isos[it][0].ljust(2).lower() + str(int(isos[it][1])).rjust(3)])
        else:
            isos = isos.split('-')
            ssratio = self.habu[isos[0].ljust(2).lower() + str(int(isos[1])).rjust(3)]
        return ssratio
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def make_list(default_symbol_list, len_list_to_print):
    '''
    provide the list of symbols to use according for the list of
    species/arrays to plot.

    Parameters
    ----------
    default_symbol_list : list
        Symbols that the user choose to use.
    len_list_to_print : integer
        len of list of species/arrays to print.

    '''

    symbol_used = []
    for i in range(len_list_to_print):
        symbol_used.append(default_symbol_list[sc.mod(i,len(default_symbol_list))])

    return symbol_used
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def _get_callable_argspec_py2(func):
    argspec = inspect.getargspec(func)
    if argspec.keywords is not None:
        msg = "variable length keyword argument '{0}' found"\
            .format(argspec.keywords)
        raise FyppFatalError(msg)
    vararg = argspec.varargs
    args = argspec.args
    tuplearg = False
    for elem in args:
        tuplearg = tuplearg or isinstance(elem, list)
    if tuplearg:
        msg = 'tuple argument(s) found'
        raise FyppFatalError(msg)
    defaults = {}
    if argspec.defaults is not None:
        for ind, default in enumerate(argspec.defaults):
            iarg = len(args) - len(argspec.defaults) + ind
            defaults[args[iarg]] = default
    return args, defaults, vararg
项目:gfapy    作者:ggonnella    | 项目源码 | 文件源码
def _get_default_gfa_tag_datatype(obj):
    """Default GFA tag datatype for a given object

    Parameters:
      obj : an object of any Python class

    Returns:
      str : the identifier of a datatype (one of the keys of FIELD_MODULE)
        to be used for a tag with obj as value, if a datatype has not
        been specified by the user
    """
    if getattr(obj, "_default_gfa_tag_datatype",None):
      return obj._default_gfa_tag_datatype()
    else:
      if isinstance(obj, list) and\
             (all([isinstance(v, builtins.int) for v in obj]) or
              all([isinstance(v, builtins.float) for v in obj])):
        return "B"
      for k,v in gfapy.Field._default_tag_datatypes:
        if isinstance(obj, k):
          return v
      return "J"
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def build_rpc_call(self, method, *params):
        """
        Builds a JSON-RPC request dictionary

        Args:
            method (str): Name of the RPC method
            params (list): Parameters for the RPC method

        Returns:
            dict: Request dictionary
        """
        request = {}
        request['id'] = self.req_id
        self.req_id = self.req_id + 1
        request['method'] = method
        request['params'] = params

        return request
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def build_rpc_notify(self, method, *params):
        """
        Builds a JSON-RPC notify request dictionary

        Args:
            method (str): Name of the notify method
            params (list): Parameters for the notify method

        Returns:
            dict: Request dictionary
        """
        request = {}
        request['id'] = None
        request['method'] = method
        request['params'] = params

        return request
项目:riko    作者:nerevu    | 项目源码 | 文件源码
def assign(item, assignment, **kwargs):
    key = kwargs.get('assign')
    value = next(assignment) if kwargs.get('one') else list(assignment)
    merged = merge([item, {key: value}])
    yield DotDict(merged) if kwargs.get('dictize') else merged
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def fit(self, x, y, dcoef='none'):
        '''
        performs the fit

        x, y : list
            Matching data arrays that define a numerical function y(x),
            this is the data to be fitted.
        dcoef : list or string
            You can provide a different guess for the coefficients, or
            provide the string 'none' to use the inital guess.  The
            default is 'none'.

        Returns
        -------
        ierr
            Values between 1 and 4 signal success.

        Notes
        -----
        self.fcoef, contains the fitted coefficients.

        '''
        self.x = x
        self.y = y

        if dcoef is not 'none':
            coef = dcoef
        else:
            coef = self.coef

        fcoef=optimize.leastsq(self.residual,coef,args=(y,self.func,x))
        self.fcoef = fcoef[0].tolist()
        return fcoef[1]
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def plot(self, ifig=1, data_label='data', fit_label='fit',
             data_shape='o', fit_shape='-'):
        '''
        plot the data and the fitted function.

        Parameters
        ----------
        ifig : integer
            Figure window number.  The default is 1.
        data_label : string
            Legend for data.  The default is 'data'.
        fit_label : string
            Legend for fit.  If fit_lable is 'fit', then substitute fit
            function type self.func_name.  The default is 'fit'.
        data_shape : character
            Shape for data.  The default is 'o'.
        fit_shape : character
            Shape for fit.  The default is '-'.

        '''

        if len(self.coef) is not len(self.fcoef):
            print("Warning: the fitted coefficient list is not same")
            print("         length as guessed list - still I will try ...")

        pl.figure(ifig)
        pl.plot(self.x,self.y,data_shape,label=data_label)
        if fit_label is 'fit':
            fit_label=self.__name__
        pl.plot(self.x,self.func(self.fcoef,self.x),fit_shape,label=fit_label)
        pl.legend()
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def is_stable(self,species):
        '''
        This routine accepts input formatted like 'He-3' and checks with
        stable_el list if occurs in there.  If it does, the routine
        returns True, otherwise False.

        Notes
        -----
        this method is designed to work with an se instance from
        nugridse.py. In order to make it work with ppn.py some
        additional work is required.

        FH, April 20, 2013.

        '''
        element_name_of_iso            = species.split('-')[0]
        try:
            a_of_iso               = int(species.split('-')[1])
        except ValueError: # if the species name contains in addition to the
                           # mass number some letters, e.g. for isomere, then
                           # we assume it is unstable. This is not correct but
                           # related to the fact that in nugridse.py we do not
                           # identify species properly by the three numbers A, Z
                           # and isomeric_state. We should do that!!!!!!
            a_of_iso               = 999
        idp_of_element_in_stable_names = self.stable_names.index(element_name_of_iso)
        if a_of_iso in self.stable_el[idp_of_element_in_stable_names][1:]:
            return True
        else:
            return False
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def strictly_monotonic(bb):
    '''
    bb is an index array which may have numerous double or triple
    occurrences of indices, such as for example the decay_index_pointer.
    This method removes all entries <= -, then all dublicates and
    finally returns a sorted list of indices.

    '''
    cc=bb[np.where(bb>=0)]
    cc.sort()
    dc=cc[1:]-cc[:-1] # subsequent equal entries have 0 in db
    dc=np.insert(dc,0,1) # the first element is always unique (the second occurence is the dublicate)
    dc_mask=np.ma.masked_equal(dc,0)
    return np.ma.array(cc,mask=dc_mask.mask).compressed()
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def _func_getargvalues(*args, **kwargs):
        return list(args), kwargs
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def _get_syspath_without_scriptdir():
        '''Remove the folder of the fypp binary from the search path'''
        syspath = list(sys.path)
        scriptdir = os.path.abspath(os.path.dirname(sys.argv[0]))
        if os.path.abspath(syspath[0]) == scriptdir:
            del syspath[0]
        return syspath
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def __call__(self, line):
        '''Folds a line.

        Can be directly called to return the list of folded lines::

            linefolder = FortranLineFolder(maxlen=10)
            linefolder('  print *, "some Fortran line"')

        Args:
            line (str): Line to fold.

        Returns:
            list of str: Components of folded line. They should be
                assembled via ``\\n.join()`` to obtain the string
                representation.
        '''
        if self._maxlen < 0 or len(line) <= self._maxlen:
            return [line]
        if self._inherit_indent:
            indent = len(line) - len(line.lstrip())
            prefix = ' ' * indent + self._prefix
        else:
            indent = 0
            prefix = self._prefix
        suffix = self._suffix
        return self._split_line(line, self._maxlen, prefix, suffix,
                                self._fold_position_finder)
项目:fypp    作者:aradi    | 项目源码 | 文件源码
def __call__(self, line):
        '''Returns the entire line without any folding.

        Returns:
            list of str: Components of folded line. They should be
                assembled via ``\\n.join()`` to obtain the string
                representation.
        '''
        return [line]
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def pytest_addoption(parser):
    group = parser.getgroup('nodev')
    group.addoption(
        '--candidates-from-stdlib', action='store_true',
        help="Collects candidates form the Python standard library.")
    group.addoption(
        '--candidates-from-all', action='store_true',
        help="Collects candidates form the Python standard library and all installed packages. "
             "Disabled by default, see the docs.")
    group.addoption(
        '--candidates-from-specs', default=[], nargs='+',
        help="Collects candidates from installed packages. Space separated list of `pip` specs.")
    group.addoption(
        '--candidates-from-modules', default=[], nargs='+',
        help="Collects candidates from installed modules. Space separated list of module names.")
    group.addoption(
        '--candidates-includes', nargs='+',
        help="Space separated list of regexs matching full object names to include, "
             "defaults to include all objects collected via `--candidates-from-*`.")
    group.addoption(
        '--candidates-excludes', default=[], nargs='+',
        help="Space separated list of regexs matching full object names to exclude.")
    group.addoption(
        '--candidates-predicate', default='builtins:callable',
        help="Full name of the predicate passed to `inspect.getmembers`, "
             "defaults to `builtins.callable`.")
    group.addoption('--candidates-fail', action='store_true', help="Show candidates failures.")
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def pytest_pycollect_makeitem(collector, name, obj):
    candidate_marker = getattr(obj, 'candidate', None)
    if candidate_marker and getattr(candidate_marker, 'args', []):
        candidate_name = candidate_marker.args[0]

        def wrapper(candidate, monkeypatch, *args, **kwargs):
            if '.' in candidate_name:
                monkeypatch.setattr(candidate_name, candidate, raising=False)
            else:
                monkeypatch.setattr(inspect.getmodule(obj), candidate_name, candidate, raising=False)
            return obj(*args, **kwargs)

        wrapper.__dict__ = obj.__dict__
        return list(collector._genfunctions(name, wrapper))
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def collect_stdlib_distributions():
    """Yield a conventional spec and the names of all top_level standard library modules."""
    distribution_spec = 'Python==%d.%d.%d' % sys.version_info[:3]
    stdlib_path = sysconfig.get_python_lib(standard_lib=True)
    distribution_top_level = [name for _, name, _ in pkgutil.iter_modules(path=[stdlib_path])]
    distribution_top_level += list(sys.builtin_module_names)
    yield distribution_spec, distribution_top_level
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def test_collect_stdlib_distributions():
    stdlib_distributions = list(collect.collect_stdlib_distributions())
    assert len(stdlib_distributions) == 1
    _, module_names = stdlib_distributions[0]
    assert len(module_names) > 10
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def test_collect_distributions():
    distributions = list(collect.collect_distributions(['pytest-nodev']))
    assert len(distributions) == 1
    _, module_names = distributions[0]
    assert len(module_names) == 1
    assert len(list(collect.collect_distributions(['non_existent_distribution']))) == 0
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def test_import_distributions():
    distributions = [('pytest-nodev', ['pytest_nodev'])]
    module_names = list(collect.import_distributions(distributions))
    assert module_names == ['pytest_nodev']

    distributions = [('pytest-nodev', ['non_existent_module'])]
    module_names = list(collect.import_distributions(distributions))
    assert module_names == []
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def test_generate_module_objects():
    expected_item = ('generate_module_objects', collect.generate_module_objects)
    assert expected_item in list(collect.generate_module_objects(collect))
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def test_generate_objects_from_modules():
    import re
    modules = {'pytest_nodev.collection': collect, 're': re}
    include_patterns = ['pytest_nodev.collection:generate_objects_from_modules']
    objs = collect.generate_objects_from_modules(
        modules, include_patterns, module_blacklist_pattern='re')
    assert len(list(objs)) == 1
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def rpc_call(self, method, *params):
        """
        Call a RPC function on the server

        This function returns the server response or raises an error

        Args:
            method (str): The name of the RPC method to call on the server
            params (list): The parameters to pass to the RPC method

        Returns:
            JSON type: The value returned by the server

        Raises:
            RpcError: Generic exception to encapsulate all errors
        """
        json_data = json.dumps(self.build_rpc_call(method, *params))

        json_reply = self.rpc_call_raw(json_data)

        reply = json.loads(json_reply)

        if 'error' in reply and reply['error']:
            raise RpcError(reply['error'])

        return reply['result']
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def rpc_notify(self, method, *params):
        """
        Call a RPC function on the server but tell it to send no response

        Args:
            method (str): The name of the RPC method to call on the server
            params (list): The parameters to pass to the RPC method
        """

        json_data = json.dumps(self.build_rpc_call(method, *params))
        self.rpc_call_raw(json_data, True)
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def __init__(self):
        """
        Constructor
        """
        self.functions = []
        self.functions_dict = {}

        self.custom_types = []
        self.custom_types_dict = {}

        self.description = {
                'name': '',
                'description': '',
                'version': '',
                'custom_fields': {}
        }

        self.builtins = {}
        self.builtins['__describe_service'] = self.describe_service
        self.builtins['__describe_functions'] = self.describe_functions
        self.builtins['__describe_custom_types'] = self.describe_custom_types

        self.named_hash_validation = True

        self.json2py = {'bool': 'bool', 'int': 'int', 'float': 'float', 'string':
                'str', 'array': 'list', 'hash': 'dict', 'base64': 'str'}

        self.py2json = {'bool': 'bool', 'int': 'int', 'float': 'float', 'str':
                'string', 'list': 'array', 'dict': 'hash'}
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def describe_functions(self):
        """
        Describe the functions exposed by this RPC service

        Returns:
            list: Description of all functions registered to this RpcProcessor
        """
        return [function.to_dict() for function in self.functions]
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def describe_custom_types(self):
        """
        Describe the custom types exposed by this RPC service

        Returns:
            list: Description of all custom types registered to this RpcProcessor
        """
        return [custom_type.to_dict() for custom_type in self.custom_types]
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def check_request_types(self, func, params):
        """
        Check the types of the parameters passed by a JSON-RPC call

        Args:
            func (RpcFunction): Description of the called function
            params (list): Parameters passed in the request

        Raises:
            JsonRpcTypeError: If a parameter type is invalid
            JsonRpcParamTypeError: If a parameter type is invalid
        """
        if len(params) != len(func.params):
            raise JsonRpcParamError(func.name, len(func.params), len(params))

        i = 0

        for p in func.params:
            value = params[i]
            try:
                self.check_param_type(func, p, value, '')
            except InvalidEnumValueError as e:
                raise JsonRpcTypeError("%s: '%s' is not a valid value for parameter '%s' of enum type '%s'"
                        % (func.name, e.value, e.name, e.expected_type))
            except InvalidEnumTypeError as e:
                raise JsonRpcTypeError("%s: Enum parameter '%s' requires a value of type 'int' or 'string' but type was '%s'"
                        % (func.name, e.name, e.real_type))
            except InvalidNamedHashError as e:
                raise JsonRpcTypeError("%s: Named hash parameter '%s' of type '%s' requires a hash value but got '%s'"
                        % (func.name, e.name, e.expected_type, e.real_type))
            except InvalidPrimitiveTypeError as e:
                raise JsonRpcParamTypeError(func.name, e.name, e.expected_type, e.real_type)

            i += 1
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def __init__(self, func='linear', coef=(1, 1)):
        '''
        Parameters
        ----------
        func : string or function, optional
            If func is a string, then it must be one of the two strings
            'linear' or 'powerlaw'. If func is not a string it must be
            a custom function.  The default is "linear".
        coef : list, optional
            A guess for the list of coeffiecients.  For 'powerlaw' coef
            must have three enries.  For 'linear' coef must have two
            enries.  If you provide your own function, then provide as
            many coef entries as your function needs.  The default is
            (1, 1).

        '''
        if func is 'linear':
            print("Information: 'linear' fit needs coef list with 2 entries")
            print(" -> will use default: coef = "+str(coef))
            if len(coef) is not 2:
                print("Warning: you want a linear fit but you have not")
                print("         provided a guess for coef with the")
                print("         right length (2).")
                print(" -> I will continue and assume coef=(1,1)")
                coef = (1,1)
            def ff(coef,x):
                return coef[0]*x + coef[1]
            self.__name__ = func
        elif func is 'powerlaw':
            print("Information: 'powerlaw' fit needs coef list with 3 entries")
            print(" -> will use default: coef = "+str(coef))
            if len(coef) is not 3:
                print("Warning: you want a power law fit but you have")
                print("         not provided a guess for coef with the")
                print("         right length (3).")
                print(" -> I will continue and assume coef=(1,1,1)")
                coef = (1,1,1)
            def ff(coef,x):
                return coef[0]*x**coef[1] + coef[2]
            self.__name__ = func
        else:
            print("Information: You provide a fit function yourself. I trust")
            print("             you have provided a matching guess for the ")
            print("             coefficient list!")
            ff = func
            self.__name__ = func.__name__


        # we want to determine the coefficients that
        # fit the power law to the data
        # this is done by finding the minimum to a
        # residual function:
        # func(params) = ydata - f(xdata, params)
        # therefore we define a residual function
        def fres(coef,y,ff,x):
            return y-ff(coef,x)

        self.residual = fres
        self.coef     = coef
        self.func     = ff
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def _process_abundance_vector(self, a, z, isomers, yps):
        '''
        This private method takes as input one vector definition and
        processes it, including sorting by charge number and
        mass number. It returns the processed input variables
        plus an element and isotope vector and a list of
        isomers.

        '''
        def cmp_to_key(mycmp):
            'Convert a cmp= function into a key= function'
            class K(object):
                def __init__(self, obj, *args):
                    self.obj = obj
                def __lt__(self, other):
                    return mycmp(self.obj, other.obj) < 0
                def __gt__(self, other):
                    return mycmp(self.obj, other.obj) > 0
                def __eq__(self, other):
                    return mycmp(self.obj, other.obj) == 0
                def __le__(self, other):
                    return mycmp(self.obj, other.obj) <= 0  
                def __ge__(self, other):
                    return mycmp(self.obj, other.obj) >= 0
                def __ne__(self, other):
                    return mycmp(self.obj, other.obj) != 0
            return K

        tmp=[]
        isom=[]
        for i in range(len(a)):
            if z[i]!=0 and isomers[i]==1: #if its not 'NEUt and not an isomer'
                tmp.append([self.stable_names[int(z[i])]+'-'+str(int(a[i])),yps[i],z[i],a[i]])
            elif isomers[i]!=1: #if it is an isomer
                if yps[i]==0:
                    isom.append([self.stable_names[int(z[i])]+'-'+str(int(a[i]))+'-'+str(int(isomers[i]-1)),1e-99])
                else:
                    isom.append([self.stable_names[int(z[i])]+'-'+str(int(a[i]))+'-'+str(int(isomers[i]-1)),yps[i]])
        tmp.sort(key = cmp_to_key(self.compar))
        tmp.sort(key = cmp_to_key(self.comparator))
        abunds=[]
        isotope_to_plot=[]
        z_iso_to_plot=[]
        a_iso_to_plot=[]
        el_iso_to_plot=[]
        for i in range(len(tmp)):
            isotope_to_plot.append(tmp[i][0])
            abunds.append(tmp[i][1])
            z_iso_to_plot.append(int(tmp[i][2]))
            a_iso_to_plot.append(int(tmp[i][3]))
            el_iso_to_plot.append(self.stable_names[int(tmp[i][2])])

        return a_iso_to_plot,z_iso_to_plot,abunds,isotope_to_plot,el_iso_to_plot,isom
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def trajectory_SgConst(Sg=0.1, delta_logt_dex=-0.01):
    '''
    setup trajectories for constant radiation entropy.

    S_gamma/R where the radiation constant R = N_A*k
    (Dave Arnett, Supernova book, p. 212)
    This relates rho and T but the time scale for this
    is independent.

    Parameters
    ----------
    Sg : float
        S_gamma/R, values between 0.1 and 10. reflect conditions in
        massive stars.  The default is 0.1.
    delta_logt_dex : float
        Sets interval between time steps in dex of logtimerev.  The
        default is -0.01.

    '''

    # reverse logarithmic time
    logtimerev=np.arange(5.,-6.,delta_logt_dex)
    logrho=np.linspace(0,8.5,len(logtimerev))
    logT = (old_div(1.,3.))*(logrho + 21.9161 + np.log10(Sg))

    #rho_6=10**logrho/(0.1213*1.e6)
    #T9=rho_6**(1./3.)
    #logT_T3=np.log10(T9*1.e9)


    pl.close(3);pl.figure(3);pl.plot(logrho,logT,label='$S/\mathrm{N_Ak}='+str(Sg)+'$')
    pl.legend(loc=2);pl.xlabel('$\log \\rho$'); pl.ylabel('$\log T$')
    pl.close(5);pl.figure(5);pl.plot(logtimerev, logrho)
    pl.xlabel('$\log (t_\mathrm{final}-t)$'); pl.ylabel('$\log \\rho$')
    pl.xlim(8,-6)

    pl.close(6);pl.figure(6);pl.plot(logtimerev)
    pl.ylabel('$\log (t_\mathrm{final}-t)$'); pl.xlabel('cycle')

    # [t] logtimerev yrs
    # [rho] cgs
    # [T]   K

    T9=old_div(10**logT,1.e9)
    data=[logtimerev,T9,logrho]
    att.writeTraj(filename='trajectory.input', data=data, ageunit=2, tunit=1, rhounit=1, idNum=1)

    # data: A list of 1D data vectors with time, T and rho                                                       # ageunit: If 1 ageunit = SEC, If 0 ageunit = YRS. If 2 agunit = logtimerev in yrs. Default is 0             # logtimerev is log of time until end
项目:pytest-nodev    作者:nodev-io    | 项目源码 | 文件源码
def make_candidate_index(config):
    if config.getoption('candidates_from_all') and os.environ.get('PYTEST_NODEV_MODE') != 'FEARLESS':
        raise ValueError("Use of --candidates-from-all may be very dangerous, see the docs.")

    if not hasattr(config, '_candidate_index'):
        # take over collect logging
        collect.logger.propagate = False
        collect.logger.setLevel(logging.DEBUG)  # FIXME: loglevel should be configurable
        collect.logger.addHandler(utils.EmitHandler(config._warn))

        # delegate interrupting hanging tests to pytest-timeout
        os.environ['PYTEST_TIMEOUT'] = os.environ.get('PYTEST_TIMEOUT', '1')

        # build the object index
        distributions = collections.OrderedDict()

        if config.getoption('candidates_from_stdlib') or config.getoption('candidates_from_all'):
            distributions.update(collect.collect_stdlib_distributions())

        if config.getoption('candidates_from_all'):
            distributions.update(collect.collect_installed_distributions())

        distributions.update(collect.collect_distributions(config.getoption('candidates_from_specs')))

        if config.getoption('candidates_from_modules'):
            distributions['unknown distribution'] = config.getoption('candidates_from_modules')

        top_level_modules = collect.import_distributions(distributions.items())

        includes = config.getoption('candidates_includes')
        if not includes:
            includes = ['.'] if config.getoption('candidates_from_all') else sorted(top_level_modules)
        excludes = config.getoption('candidates_excludes')
        predicate = config.getoption('candidates_predicate')

        # NOTE: 'copy' is needed here because indexing may unexpectedly trigger a module load
        modules = sys.modules.copy()
        object_index = dict(
            collect.generate_objects_from_modules(modules, includes, excludes, predicate)
        )

        # store index
        config._candidate_index = list(zip(*sorted(object_index.items()))) or [(), ()]

    return config._candidate_index
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def do_help(self, line):
        if not line:
            print("list   - List all RPC functions advertised by the server")
            print("doc    - Show the documentation of a RPC function")
            print("type   - Show the documentation of a custom RPC type")
            print("types  - List all custom RPC types advertised by the server")
            print("exec   - Execute an RPC call")
            print("notify - Execute an RPC call but tell the server to send no response")
            print("raw    - Directly send a raw JSON-RPC message to the server")
            print("quit   - Quit this program")
            print("help   - Print this message. 'help [command]' prints a")
            print("         detailed help message for a command")
            return

        if line == 'list':
            print("List all RPC functions advertised by the server")
        elif line == 'doc':
            print("Show the documentation of an RPC function")
            print("Example:")
            print("    doc echo")
        elif line == 'type':
            print("Shos the documentation of a custom RPC type")
            print("Example:")
            print("    type PhoneType")
        elif line == 'types':
            print("List all custom RPC types advertised by the server")
        elif line == 'exec':
            print("Execute an RPC call")
            print("Examples:")
            print("    exec echo \"Hello RPC server\"")
            print("    exec add 4 8")
        elif line == 'notify':
            print("Execute an RPC call but tell the server to send no response")
            print("Example:")
            print("    notify rpc_function")
        elif line == 'raw':
            print("Directly send a raw JSON-RPC message to the server")
            print("Example:")
            print('    raw {"method": "echo", "params": ["Hello Server"], "id": 1}')
        elif line == 'quit':
            print("Quit this program")
        elif line == 'help':
            pass
        else:
            print("No help available for unknown command:", line)
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def check_param_type(self, func, param, value, path):
        """
        Check the type of a single parameter

        Args:
            value (any): Actual value that was passed by the caller
            path (str): Path to the variable in nested structures
        """
        real_type = type(value).__name__
        name = param['name']
        declared_type = param['type']

        if not path:
            path = name

        # workaround for Python 2.7
        if real_type == 'unicode':
            real_type = 'str'

        # custom type?
        if declared_type[0].isupper():
            typeobj = self.custom_types_dict[declared_type]

            if type(typeobj).__name__ == 'JsonEnumType':
                try:
                    if not typeobj.validate(value):
                        raise InvalidEnumValueError(path, declared_type, str(value))
                except ValueError:
                    raise InvalidEnumTypeError(path, self.py2json[real_type])
            elif type(typeobj).__name__ == 'JsonHashType':
                if self.py2json[real_type] != 'hash':
                    raise InvalidNamedHashError(path, declared_type, self.py2json[real_type])

                # validate named hashes if named has validation is enabled
                if self.named_hash_validation and self.__is_named_hash_type(declared_type):
                    self.check_named_hash(func, param, value, path)
        # typed array?
        elif declared_type.startswith('array<'):
            if real_type != 'list':
                raise InvalidPrimitiveTypeError(path, declared_type, self.py2json[real_type])

            array_type = declared_type[len('array<'):-1]
            i = 0
            for v in value:
                p = {'name': '[%d]' % (i), 'type': array_type}
                self.check_param_type(func, p, v, path + p['name'])
                i += 1
        # primitive type?
        elif real_type != self.json2py[declared_type]:
            raise InvalidPrimitiveTypeError(path, declared_type, self.py2json[real_type])
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def test_rpcsh_expect_unix_socket(self):
        try:
            server = ServerRunner('../examples/serverunixsocket.py',
                    '/tmp/reflectrpc.sock')
            server.run()

            python = sys.executable
            child = pexpect.spawn('%s ../rpcsh unix:///tmp/reflectrpc.sock' % (python))

            child.expect('ReflectRPC Shell\r\n')
            child.expect('================\r\n\r\n')
            child.expect("Type 'help' for available commands\r\n\r\n")
            child.expect('RPC server: unix:///tmp/reflectrpc.sock\r\n\r\n')
            child.expect('Self-description of the Service:\r\n')
            child.expect('================================\r\n')
            child.expect('Example RPC Service \(1.0\)\r\n')
            child.expect('This is an example service for ReflectRPC\r\n')
            child.expect('\(rpc\) ')

            child.sendline('list')
            child.expect('echo\(message\)\r\n')
            child.expect('add\(a, b\)\r\n')
            child.expect('sub\(a, b\)\r\n')
            child.expect('mul\(a, b\)\r\n')
            child.expect('div\(a, b\)\r\n')
            child.expect('enum_echo\(phone_type\)\r\n')
            child.expect('hash_echo\(address\)\r\n')
            child.expect('notify\(value\)\r\n')
            child.expect('is_authenticated\(\)\r\n')
            child.expect('get_username\(\)\r\n')
            child.expect('echo_ints\(ints\)\r\n')

            child.sendline('exec echo "Hello Server"')
            child.expect('Server replied: "Hello Server"\r\n')

            child.sendline('exec add 5 6')
            child.expect('Server replied: 11\r\n')

            child.sendline('exec is_authenticated')
            child.expect('Server replied: false\r\n')

            child.sendline('exec get_username')
            child.expect('Server replied: null\r\n')
        finally:
            child.close(True)
            server.stop()
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def test_rpcsh_expect_http(self):
        try:
            server = ServerRunner('../examples/serverhttp.py', 5500)
            server.run()

            python = sys.executable
            child = pexpect.spawn('%s ../rpcsh localhost 5500 --http' % (python))

            child.expect('ReflectRPC Shell\r\n')
            child.expect('================\r\n\r\n')
            child.expect("Type 'help' for available commands\r\n\r\n")
            child.expect('RPC server: localhost:5500\r\n\r\n')
            child.expect('Self-description of the Service:\r\n')
            child.expect('================================\r\n')
            child.expect('Example RPC Service \(1.0\)\r\n')
            child.expect('This is an example service for ReflectRPC\r\n')
            child.expect('\(rpc\) ')

            child.sendline('list')
            child.expect('echo\(message\)\r\n')
            child.expect('add\(a, b\)\r\n')
            child.expect('sub\(a, b\)\r\n')
            child.expect('mul\(a, b\)\r\n')
            child.expect('div\(a, b\)\r\n')
            child.expect('enum_echo\(phone_type\)\r\n')
            child.expect('hash_echo\(address\)\r\n')
            child.expect('notify\(value\)\r\n')
            child.expect('is_authenticated\(\)\r\n')
            child.expect('get_username\(\)\r\n')
            child.expect('echo_ints\(ints\)\r\n')

            child.sendline('exec echo "Hello Server"')
            child.expect('Server replied: "Hello Server"\r\n')

            child.sendline('exec add 5 6')
            child.expect('Server replied: 11\r\n')

            child.sendline('exec is_authenticated')
            child.expect('Server replied: false\r\n')

            child.sendline('exec get_username')
            child.expect('Server replied: null\r\n')
        finally:
            child.close(True)
            server.stop()
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def test_rpcsh_expect_http_basic_auth(self):
        try:
            server = ServerRunner('../examples/serverhttp_basic_auth.py', 5500)
            server.run()

            python = sys.executable
            child = pexpect.spawn('%s ../rpcsh localhost 5500 --http --http-basic-user testuser' % (python))
            child.expect('Password: ')
            child.sendline('123456')

            child.expect('ReflectRPC Shell\r\n')
            child.expect('================\r\n\r\n')
            child.expect("Type 'help' for available commands\r\n\r\n")
            child.expect('RPC server: localhost:5500\r\n\r\n')
            child.expect('Self-description of the Service:\r\n')
            child.expect('================================\r\n')
            child.expect('Example RPC Service \(1.0\)\r\n')
            child.expect('This is an example service for ReflectRPC\r\n')
            child.expect('\(rpc\) ')

            child.sendline('list')
            child.expect('echo\(message\)\r\n')
            child.expect('add\(a, b\)\r\n')
            child.expect('sub\(a, b\)\r\n')
            child.expect('mul\(a, b\)\r\n')
            child.expect('div\(a, b\)\r\n')
            child.expect('enum_echo\(phone_type\)\r\n')
            child.expect('hash_echo\(address\)\r\n')
            child.expect('notify\(value\)\r\n')
            child.expect('is_authenticated\(\)\r\n')
            child.expect('get_username\(\)\r\n')
            child.expect('echo_ints\(ints\)\r\n')

            child.sendline('exec echo "Hello Server"')
            child.expect('Server replied: "Hello Server"\r\n')

            child.sendline('exec add 5 6')
            child.expect('Server replied: 11\r\n')

            child.sendline('exec is_authenticated')
            child.expect('Server replied: true\r\n')

            child.sendline('exec get_username')
            child.expect('Server replied: "testuser"\r\n')
        finally:
            child.close(True)
            server.stop()
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def test_rpcsh_expect_tls(self):
        try:
            server = ServerRunner('../examples/servertls.py', 5500)
            server.run()

            python = sys.executable
            child = pexpect.spawn('%s ../rpcsh localhost 5500 --tls' % (python))

            child.expect('ReflectRPC Shell\r\n')
            child.expect('================\r\n\r\n')
            child.expect("Type 'help' for available commands\r\n\r\n")
            child.expect('RPC server: localhost:5500\r\n\r\n')
            child.expect('Self-description of the Service:\r\n')
            child.expect('================================\r\n')
            child.expect('Example RPC Service \(1.0\)\r\n')
            child.expect('This is an example service for ReflectRPC\r\n')
            child.expect('\(rpc\) ')

            child.sendline('list')
            child.expect('echo\(message\)\r\n')
            child.expect('add\(a, b\)\r\n')
            child.expect('sub\(a, b\)\r\n')
            child.expect('mul\(a, b\)\r\n')
            child.expect('div\(a, b\)\r\n')
            child.expect('enum_echo\(phone_type\)\r\n')
            child.expect('hash_echo\(address\)\r\n')
            child.expect('notify\(value\)\r\n')
            child.expect('is_authenticated\(\)\r\n')
            child.expect('get_username\(\)\r\n')
            child.expect('echo_ints\(ints\)\r\n')

            child.sendline('exec echo "Hello Server"')
            child.expect('Server replied: "Hello Server"\r\n')

            child.sendline('exec add 5 6')
            child.expect('Server replied: 11\r\n')

            child.sendline('exec is_authenticated')
            child.expect('Server replied: false\r\n')

            child.sendline('exec get_username')
            child.expect('Server replied: null\r\n')
        finally:
            child.close(True)
            server.stop()
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def test_rpcsh_expect_tls_client_auth(self):
        try:
            server = ServerRunner('../examples/servertls_clientauth.py', 5500)
            server.run()

            python = sys.executable
            child = pexpect.spawn('%s ../rpcsh localhost 5500 --tls --ca ../examples/certs/rootCA.crt --key ../examples/certs/client.key --cert ../examples/certs/client.crt' % (python))

            child.expect('ReflectRPC Shell\r\n')
            child.expect('================\r\n\r\n')
            child.expect("Type 'help' for available commands\r\n\r\n")
            child.expect('RPC server: localhost:5500\r\n\r\n')
            child.expect('Self-description of the Service:\r\n')
            child.expect('================================\r\n')
            child.expect('Example RPC Service \(1.0\)\r\n')
            child.expect('This is an example service for ReflectRPC\r\n')
            child.expect('\(rpc\) ')

            child.sendline('list')
            child.expect('echo\(message\)\r\n')
            child.expect('add\(a, b\)\r\n')
            child.expect('sub\(a, b\)\r\n')
            child.expect('mul\(a, b\)\r\n')
            child.expect('div\(a, b\)\r\n')
            child.expect('enum_echo\(phone_type\)\r\n')
            child.expect('hash_echo\(address\)\r\n')
            child.expect('notify\(value\)\r\n')
            child.expect('is_authenticated\(\)\r\n')
            child.expect('get_username\(\)\r\n')
            child.expect('echo_ints\(ints\)\r\n')

            child.sendline('exec echo "Hello Server"')
            child.expect('Server replied: "Hello Server"\r\n')

            child.sendline('exec add 5 6')
            child.expect('Server replied: 11\r\n')

            child.sendline('exec is_authenticated')
            child.expect('Server replied: true\r\n')

            child.sendline('exec get_username')
            child.expect('Server replied: "example-username"\r\n')
        finally:
            child.close(True)
            server.stop()