Python inspect 模块,stack() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.stack()

项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __ImportFrom(module_xname, xname):
    """
    Import a specified name from a python module into the global namespace. This can be accomplished
    programatically and inside a method or class.

    @param module_xname : name of the module to load x.y.z
    @param xname :  symbol to import from loaded module
    @return method/class : imported item from loaded module
    """
    try:
        module = __import__(module_xname,globals(),locals(), [xname])
    except ImportError:
        return None

    try:
      module = getattr(module,xname)
      if xname:
        for x in range( len(inspect.stack()) ):
            inspect.currentframe(x).f_globals[xname]=module
    except AttributeError as e:
        module=None

    return module
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def stat_daily_site(self, start=None, end=None):
        """
        Extracting statistics per site from start to end (unixtimestamps)
        :param start:
        :param end:
        :return:
        """
        unixend = int(time.time()) if end is None else end
        unixstart = (unixend - 52 * 7 * 24 * 3600) if start is None else start
        content = self.sitecmdjson('/stat/report/daily.site', {
            'attrs': ['bytes', 'wan-tx_bytes', 'wan-rx_bytes', 'wlan_bytes', 'num_sta', 'lan-num_sta', 'wlan-num_sta',
                      'time'],
            'start': unixstart,
            'end': unixend
        })
        return self.response(content, inspect.stack()[0].function, 'Daily Statistics for site')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def stat_hourly_ap(self, start=None, end=None):
        """
        Extracting statistics per site from start to end (unixtimestamps)
        :param start:
        :param end:
        :return:
        """
        unixend = int(time.time()) if end is None else end
        unixstart = (unixend - 7 * 24 * 3600) if start is None else start

        content = self.sitecmdjson('/stat/report/hourly.ap', {
            'attrs': ['bytes', 'num_sta', 'time'],
            'start': unixstart,
            'end': unixend
        })
        return self.response(content, inspect.stack()[0].function, 'Hourly Statistics for AP')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def set_ap_radiosettings(self, ap_id, radio='ng', channel=1, ht='20', tx_power_mode=0, tx_power=0):
        """
        Set AP settings
        :param ap_id:
        :param radio:
        :param channel:
        :param ht:
        :param tx_power_mode:
        :param tx_power:
        :return:
        """
        content = self.sitecmdjson('/upd/device/' + urllib.parse.quote(ap_id), {
            'radio': radio,
            'channel': channel,
            'ht': ht,
            'tx_power_mode': tx_power_mode,
            'tx_power': tx_power
        })
        return self.response(content, inspect.stack()[0].function, 'AP Radio Settings')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def set_ap_network(self, ap_id, type="dhcp", ip="192.168.1.6", netmask="255.255.255.0", gateway="192.168.1.1", dns1="8.8.8.8", dns2="8.8.4.4"):
        """
        Configure network
        :param ap_id:
        :param type:
        :param ip:
        :param netmask:
        :param gateway:
        :param dns1:
        :param dns2:
        :return:
        """
        content = self.sitecmdjson('/rest/device/' + str(ap_id), {
            "config_network": [
                {
                    "type": type,
                    "ip": ip,
                    "netmask": netmask,
                    "gateway": gateway,
                    "dns1": dns1,
                    "dns2": dns2
                }
            ]
        }, method='PUT')
        return self.response(content, inspect.stack()[0].function, 'AP Network Config')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def add_hotspot2(self, name, network_access_internet=True, network_type=2, venue_group=2, venue_type=0):
        """
        Add HotSpot 2.0, simple settings
        :param name:
        :param network_access_internet:
        :param network_type:
        :param venue_group:
        :param venue_type:
        :return:
        """
        content = self.sitecmdjson('/rest/hotspot2conf', {
            "name": name,
            "network_access_internet": network_access_internet,
            "network_type": network_type,
            "venue_group": venue_group,
            "venue_type": venue_type
        })
        return self.response(content, inspect.stack()[0].function, 'Add Hotspot 2.0')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def set_hotspot2(self, hs_id, name=None, network_access_internet=None, network_type=None, venue_group=None, venue_type=None):
        """
        Modify Hotspot 2.0
        :param hs_id:
        :param name:
        :param network_access_internet:
        :param network_type:
        :param venue_group:
        :param venue_type:
        :return:
        """
        content = self.sitecmdjson('/rest/hotspot2conf/'+str(hs_id), {
            "_id": hs_id,
            "name": name,
            "network_access_internet": network_access_internet,
            "network_type": network_type,
            "venue_group": venue_group,
            "venue_type": venue_type
        }, method="PUT")
        return self.response(content, inspect.stack()[0].function, 'Modify Hotspot 2.0 sites')
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def log_assert(bool_, message="", logger=None, logger_name="", verbose=False):
    """Use this as a replacement for assert if you want the failing of the
    assert statement to be logged."""
    if logger is None:
        logger = logging.getLogger(logger_name)
    try:
        assert bool_, message
    except AssertionError:
        # construct an exception message from the code of the calling frame
        last_stackframe = inspect.stack()[-2]
        source_file, line_no, func = last_stackframe[1:4]
        source = "Traceback (most recent call last):\n" + \
            '  File "%s", line %s, in %s\n    ' % (source_file, line_no, func)
        if verbose:
            # include more lines than that where the statement was made
            source_code = open(source_file).readlines()
            source += "".join(source_code[line_no - 3:line_no + 1])
        else:
            source += last_stackframe[-2][0].strip()
        logger.debug("%s\n%s" % (message, source))
        raise AssertionError("%s\n%s" % (message, source))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def psome(obj, count):
    """
    The obj will printed only a number of count times.
    """
    global d
    key = inspect.stack()[1][1]+':'+str(inspect.stack()[1][2])
    d[key] = d.get(key,count) -1

    if d[key]>=0:
        print(obj)







# From test.py
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parseFile( self, file_or_filename, parseAll=False ):
        """Execute the parse expression on the given file or filename.
           If a filename is specified (instead of a file object),
           the entire file is opened, read, and closed before parsing.
        """
        try:
            file_contents = file_or_filename.read()
        except AttributeError:
            f = open(file_or_filename, "r")
            file_contents = f.read()
            f.close()
        try:
            return self.parseString(file_contents, parseAll)
        except ParseBaseException as exc:
            if ParserElement.verbose_stacktrace:
                raise
            else:
                # catch and re-raise exception from here, clears out pyparsing internal stack trace
                raise exc
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def filter(self, record):

        # skip this function
        stack = inspect.stack()[1:]

        for i, (frame, path, ln, func, line, xx) in enumerate(stack):

            if (frame.f_globals.get('__name__') == 'pykit.ututil'
                    and func == 'dd'):

                # this frame is dd(), find the caller
                _, path, ln, func, line, xx = stack[i + 1]

                record._fn = os.path.basename(path)
                record._ln = ln
                record._func = func
                return True

        record._fn = record.filename
        record._ln = record.lineno
        record._func = record.funcName
        return True
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def _find_frame_by_self(clz):
    """
    Find the first frame on stack in which there is local variable 'self' of
    type clz.
    """

    frame = inspect.currentframe()

    while frame:
        self = frame.f_locals.get('self')
        if isinstance(self, clz):
            return frame

        frame = frame.f_back

    return None
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def patch(self, orig, replace):
        if not hasattr(self, '__patches'):
            self.__patches = []
            self.addCleanup(unpatch, self)

        f = inspect.stack()[1][0]

        parts = orig.split('.')

        v = f.f_globals.copy()
        v.update(f.f_locals)

        orig = v[parts[0]]

        for part in parts[1:-1]:
            orig = getattr(orig, part)

        to_replace = getattr(orig, parts[-1])

        self.__patches.append((orig, parts[-1], to_replace))

        setattr(orig, parts[-1], replace)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def parseFile( self, file_or_filename, parseAll=False ):
        """Execute the parse expression on the given file or filename.
           If a filename is specified (instead of a file object),
           the entire file is opened, read, and closed before parsing.
        """
        try:
            file_contents = file_or_filename.read()
        except AttributeError:
            f = open(file_or_filename, "r")
            file_contents = f.read()
            f.close()
        try:
            return self.parseString(file_contents, parseAll)
        except ParseBaseException as exc:
            if ParserElement.verbose_stacktrace:
                raise
            else:
                # catch and re-raise exception from here, clears out pyparsing internal stack trace
                raise exc
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def debug(self, message):
        if not self.opts['_debug']:
            return
        frm = inspect.stack()[1]
        mod = inspect.getmodule(frm[0])

        if mod is None:
            modName = "Unknown"
        else:
            modName = mod.__name__

        if self.dbh is None:
            print '[' + modName + '] ' + message
        else:
            self._dblog("DEBUG", message, modName)
        return
项目:miros    作者:aleph2c    | 项目源码 | 文件源码
def goto(self, rTarget, **kwargs):

    if callable(rTarget) is False:
      rTarget = self.registry[rTarget]

    if( "payload" in kwargs ):
      self.tEvt['payload'] = kwargs['payload']

    """Transition to a new state (Cached Version)"""
    caller = inspect.stack()[1][3]  # hsm could be calling its super states
                                    # we aren't tracking this in this, so we can
                                    # just reference the call stack to find who
                                    # discovered the event
    #print "\nCurrent state: ", self.rCurr; print "Source state: ", self.rSource
    self.__spy__(caller, self.tEvt['event'], self.tEvt['payload'])

    # this is how you see the target
    if self.rCurr['toLcaCache'].get(self.tEvt['event'], None) == None:
      self.rCurr['toLcaCache'][self.tEvt['event']] = \
          self.to_lca(self.hsm_dict[rTarget])
    # self.pp.pprint(self.rCurr); print
    self.exit(self.rCurr['toLcaCache'][self.tEvt['event']])
    self.rNext = self.hsm_dict[rTarget]
项目:argparseinator    作者:ellethee    | 项目源码 | 文件源码
def check_class():
    """
    Return the class name for the current frame.
    If the result is ** None ** means that the call is made from a module.
    """
    # get frames
    frames = inspect.stack()
    cls = None
    # should be the third frame
    # 0: this function
    # 1: function/decorator
    # 2: class that contains the function
    if len(frames) > 2:
        frame = frames[2][0]
        if '__module__' in frame.f_code.co_names:
            cls = SillyClass(**frame.f_locals)
            cls.__cls_name__ = frame.f_code.co_name
    return cls
项目:mongodb    作者:autopilotpattern    | 项目源码 | 文件源码
def debug(fn):
    """
    Function/method decorator to trace calls via debug logging.
    Is a pass-thru if we're not at LOG_LEVEL=DEBUG. Normally this
    would have a lot of perf impact but this application doesn't
    have significant throughput.
    """
    @wraps(fn)
    def wrapper(*args, **kwargs):
        try:
            # because we have concurrent processes running we want
            # to tag each stack with an identifier for that process
            arg = "[{}]".format(sys.argv[1])
        except IndexError:
            arg = "[pre_start]"
        name = '{}{}{}'.format(arg, (len(inspect.stack()) * " "), fn.__name__)
        log.debug('%s' % name)
        out = apply(fn, args, kwargs)
        log.debug('%s: %s', name, out)
        return out
    return wrapper
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_5000_preflight_default_vhost_absolute(self):
        """Check that the default vhost works and can be identified

        """
        self.real_test = "{0}_{1}".format(inspect.stack()[0][3],
                                          self.tested_char_name)
        self.setGravity(self.GRAVITY_MINOR)
        if Register.hasFlag('abs_default_vhost', default=False):
            self.skipTest("Preflight test already done.")
        self.req.set_method('GET')
        target_host = self.config.get('SERVER_HOST')
        self.req.host = self.config.get('SERVER_HOST')
        self.req.location = 'http://{0}{1}'.format(
            target_host, self.req.location)
        self._end_almost_regular_query(regular_expected=True)
        Register.flag('abs_default_vhost', self.status == self.STATUS_ACCEPTED)
        self.assertIn(self.status,
                      [self.STATUS_ACCEPTED],
                      'Bad response status {0}'.format(self.status))
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_5002_preflight_non_default_vhost(self):
        """Check that the non default vhost works and can be identified

        """
        # FIXME: on preflight: detect bad proxy configuration with
        # empty echo query. Means the 2nd vhost proxy is not working
        # TODO: do this test also on first proxy and test_regular
        self.real_test = "{0}_{1}".format(inspect.stack()[0][3],
                                          self.tested_char_name)
        if not self.config.getboolean('MULTIPLE_HOSTS_TESTS'):
            self.skipTest("Tests with multiple virtualhosts are not activated"
                          " in configuration.")
        if Register.hasFlag('non_default_vhost', default=False):
            self.skipTest("Preflight test already done.")
        self.setGravity(self.GRAVITY_MINOR)
        self.req.set_method('GET')
        self.req.host = self.config.get('SERVER_NON_DEFAULT_HOST')
        self.req.location = self.get_non_default_location(
            with_prefix=self.use_backend_location)
        self._end_almost_regular_query(regular_expected=True)
        Register.flag('non_default_vhost',
                      self.status == self.STATUS_ACCEPTED)
        self.assertIn(self.status,
                      [self.STATUS_ACCEPTED],
                      'Bad response status {0}'.format(self.status))
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_0005_bad_pipeline(self):
        "Chain 3 queries, second is bad, should stop the response stream."
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self._prepare_pipe_test()
        self.req2.set_location(Tools.NULL,
                               random=True)
        self.req3 = Request(id(self))
        self.req3.set_location(self.config.get('SERVER_DEFAULT_LOCATION'),
                               random=True)
        with Client() as csock:
            csock.send(self.req1)
            csock.send(self.req2)
            csock.send(self.req3)
            responses = csock.read_all()
            outmsg(str(responses))

        self.analysis(responses,
                      expected_number=2,
                      regular_expected=False)

        self.assertTrue((responses.count <= 2))
        if (responses.count > 1):
            self.assertIn(self.status,
                          [self.STATUS_REJECTED, self.STATUS_ERR400],
                          'Bad response status {0}'.format(self.status))
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_2000_preflight_regular_chunked_get(self):
        """Let's start by a regular GET with chunked body

        """
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.setGravity(self.GRAVITY_MINOR)
        self.req.set_method('GET')
        self.req.add_header('Transfer-Encoding', 'chunked')
        self.req.add_header('Content-Type',
                            'application/x-www-form-urlencoded')
        self.req.add_chunk('Hello')
        self.req.add_chunk('World')
        self._end_almost_regular_query()
        Register.flag('get_chunk_{0}'.format(self.reverse_proxy_mode),
                      (self.status == self.STATUS_ACCEPTED))
        self.assertIn(self.status,
                      [self.STATUS_ACCEPTED, self.STATUS_ERR405],
                      'Bad response status {0}'.format(self.status))
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_2001_preflight_regular_chunked_post(self):
        """If get is not good, try POST for chunked queries.

        """
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.setGravity(self.GRAVITY_MINOR)

        self.req.set_method('POST')
        self.req.add_header('Transfer-Encoding', 'chunked')
        self.req.add_header('Content-Type',
                            'application/x-www-form-urlencoded')
        self.req.add_chunk('Hello')
        self.req.add_chunk('World')
        self._end_almost_regular_query()
        Register.flag('post_chunk_{0}'.format(self.reverse_proxy_mode),
                      (self.status == self.STATUS_ACCEPTED))
        self.assertIn(self.status,
                      [self.STATUS_ACCEPTED, self.STATUS_ERR405],
                      'Bad response status {0}'.format(self.status))
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_2020_bad_chunked_transfer_encoding(self):
        """chunk not used as last marker in Transfer-Encoding header.

        """
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.setGravity(self.GRAVITY_MINOR)

        method = self._get_valid_chunk_method()
        self.req.method = method
        # chunk MUST be the last marker
        self.req.add_header('Transfer-Encoding', 'chunked, zorg')
        self.req.add_header('Content-Type',
                            'application/x-www-form-urlencoded')
        self.req.add_chunk('Hello')
        self.req.add_chunk('World')
        self._end_expected_error()
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3013_line_prefix(self):
        "Some characters before the query..."
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.setGravity(BaseTest.GRAVITY_MINOR)
        self.req.set_first_line_prefix(self.separator)

        # for RP mode:
        self.transmission_map = {
            '{0}GET'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' GET': self.STATUS_TRANSMITTED_CRAP,
        }

        self._add_default_status_map(
            valid=self.valid_prefix,
            always_allow_rejected=self.can_be_rejected
        )

        # local changes
        self.status_map[self.STATUS_TRANSMITTED_EXACT] = self.GRAVITY_MINOR
        self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_MINOR

        self._end_1st_line_query()
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3014_line_suffix(self):
        "Let's add some garbage after the protocol."
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.setGravity(BaseTest.GRAVITY_WARNING)
        self.req.set_first_line_suffix(self.separator)

        # for RP mode:
        self.transmission_map = {
            ' HTTP/1.0{0}\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' HTTP/1.1{0}\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' HTTP/1.1 \r\n': self.STATUS_TRANSMITTED_CRAP,
        }

        self._add_default_status_map(
            valid=False)
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3016_line_suffix_with_char_H(self):
        "Nginx, for example, likes the H character"
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.setGravity(BaseTest.GRAVITY_WARNING)
        self.req.set_first_line_suffix(self.separator + u'H')

        # for RP mode:
        self.transmission_map = {
            ' HTTP/1.0{0}H\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' HTTP/1.1{0}H\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            'HTTP/1.1{0}H H'.format(
                self.separator): self.STATUS_TRANSMITTED_CRAP,
            'HTTP/1.1 H\r\n': self.STATUS_TRANSMITTED_CRAP,
        }

        self._add_default_status_map(
            valid=False)
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3017_line_suffix_with_double_HTTP11(self):
        "Ending first line with two times the protocol"
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.req.set_first_line_suffix(self.separator + u'HTTP/1.1')

        # for RP mode:
        self.transmission_map = {
            'HTTP/1.1{0}HTTP/1.1\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            'HTTP/1.0{0}HTTP/1.0\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_CRAP,
            'HTTP/1.0{0}HTTP/1.1\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_CRAP,
            'HTTP/1.1{0}HTTP/1.0\r\n'.format(
                self.separator): self.STATUS_TRANSMITTED_CRAP,
        }

        self._add_default_status_map(
            valid=False)
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3020_multiple_cr_line_prefix(self):
        # TODO: this is maybe allowed, but we do not have any LF here...
        self.separator = u'\r\r\r\r\r'
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.setGravity(BaseTest.GRAVITY_MINOR)
        self.req.set_first_line_prefix(self.separator)
        # for RP mode:
        self.transmission_map = {
            '{0}GET'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' GET': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(
            valid=False,
            always_allow_rejected=True)
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3021_crlf_line_prefix(self):
        self.separator = u'\r\n'
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.setGravity(BaseTest.GRAVITY_MINOR)
        self.req.set_first_line_prefix(self.separator)
        # for RP mode:
        self.transmission_map = {
            '{0}GET'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' GET': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(
            valid=True,
            always_allow_rejected=False)  # this is allowed in RFC
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3023_crcrcrlf_line_prefix(self):
        self.separator = u'\r\r\r\r\r\n'
        self.real_test = "{0}_{1}".format(
            inspect.stack()[0][3],
            Tools.show_chars(self.separator))
        self.setGravity(BaseTest.GRAVITY_MINOR)
        self.req.set_first_line_prefix(self.separator)
        # for RP mode:
        self.transmission_map = {
            '{0}GET'.format(
                self.separator): self.STATUS_TRANSMITTED_EXACT,
            ' GET': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(
            valid=True,
            always_allow_rejected=True)
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3040_http_65536_9(self):
        "Integer overflow attempt on protocol version."
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.req.set_major_version(65536)
        self.req.set_minor_version(9)
        self.req.add_argument('last', 'marker')
        # for RP mode:
        self.transmission_map = {
            ' HTTP/65536.9\r\n': self.STATUS_TRANSMITTED_EXACT,
            'HTTP/65536.9 H': self.STATUS_TRANSMITTED_CRAP,
            'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(valid=False)
        # local changes
        self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3041_http_0_9_explicit(self):
        "HTTP/0.9 does not exists. Should be an empty protocol."
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.req.set_major_version(0)
        self.req.set_minor_version(9)
        self.req.add_argument('last', 'marker')
        # for RP mode:
        self.transmission_map = {
            ' HTTP/0.9\r\n': self.STATUS_TRANSMITTED_EXACT,
            'HTTP/0.9 H': self.STATUS_TRANSMITTED_CRAP,
            'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(valid=False)
        # local changes
        self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3043_http_minus_1_0(self):
        "HTTP/-1.0 does not exists."
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.req.set_major_version(-1)
        self.req.set_minor_version(0)
        self.req.add_argument('last', 'marker')
        # for RP mode:
        self.transmission_map = {
            ' HTTP/-1.0\r\n': self.STATUS_TRANSMITTED_EXACT,
            'HTTP/-1.0 H': self.STATUS_TRANSMITTED_CRAP,
            'HTTP/-1.1': self.STATUS_TRANSMITTED_CRAP,
            'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(valid=False)
        # local changes
        self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3044_http_1_nodigit(self):
        "HTTP/1.x1 does not exists."
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.req.set_major_version(1)
        self.req.set_minor_version('x1')
        self.req.add_argument('last', 'marker')
        # for RP mode:
        self.transmission_map = {
            ' HTTP/1.x1\r\n': self.STATUS_TRANSMITTED_EXACT,
            'HTTP/1.0 H': self.STATUS_TRANSMITTED_CRAP,
            'HTTP/1.1 H': self.STATUS_TRANSMITTED_CRAP,
            'HTTP/1.x1 H': self.STATUS_TRANSMITTED_CRAP,
            'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(valid=False)
        # local changes
        self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
        self._end_1st_line_query(http09_allowed=False)
项目:HTTPWookiee    作者:regilero    | 项目源码 | 文件源码
def test_3045_http_version_too_much_digits(self):
        "New RFC is quite strict, DIGIT.DIGIT only, in fact."
        self.real_test = "{0}".format(inspect.stack()[0][3])
        self.req.set_major_version(11)
        self.req.set_minor_version(11)
        self.req.add_argument('last', 'marker')
        # for RP mode:
        self.transmission_map = {
            ' HTTP/11.11\r\n': self.STATUS_TRANSMITTED_EXACT,
            'HTTP/11.0': self.STATUS_TRANSMITTED_CRAP,
            '.11 H': self.STATUS_TRANSMITTED_CRAP,
            'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
        }
        self._add_default_status_map(valid=False)
        # local changes
        self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
        self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
        self._end_1st_line_query(http09_allowed=False)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _get(self, database_type, ip_address):
        if not database_type in self.metadata().database_type:
            caller = inspect.stack()[2][3]
            raise TypeError("The %s method cannot be used with the "
                            "%s database" %
                            (caller, self.metadata().database_type))
        record = self._db_reader.get(ip_address)
        if record is None:
            raise geoip2.errors.AddressNotFoundError(
                "The address %s is not in the database." % ip_address)
        return record
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __ImportModule( module_name, asName=None ):
    """
    Import a python module as needed into the global namespace. This can be accomplished
    programatically and inside a method or class.

    @param module_name : name of the module to load x.y.z
    @return module : python module object that was loaded
    """
    module = __import__(module_name)
    for layer in module_name.split('.')[1:]:
        module = getattr(module,layer)
    if asName:
        for x in range( len(inspect.stack()) ):
            inspect.currentframe(x).f_globals[asName]=module
    return module
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def __aenter__(self):
        model_nonce = uuid.uuid4().hex[-4:]
        frame = inspect.stack()[1]
        test_name = frame.function.replace('_', '-')
        jujudata = TestJujuData()
        self._controller = Controller(jujudata=jujudata)
        controller_name = jujudata.current_controller()
        user_name = jujudata.accounts()[controller_name]['user']
        await self._controller.connect(controller_name)

        model_name = 'test-{}-{}-{}'.format(
            test_run_nonce,
            test_name,
            model_nonce,
        )
        self._model = await self._controller.add_model(model_name)

        # Change the JujuData instance so that it will return the new
        # model as the current model name, so that we'll connect
        # to it by default.
        jujudata.set_model(
            controller_name,
            user_name + "/" + model_name,
            self._model.info.uuid,
        )

        # save the model UUID in case test closes model
        self._model_uuid = self._model.info.uuid

        return self._model
项目:apiTest    作者:wuranxu    | 项目源码 | 文件源码
def get_current_func(self):
        '''
        :return: ???????
        '''
        return inspect.stack()[1][3]
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __repr__(self):
        if inspect.stack()[1][3] == '?':
            self()
            return ''
        return '<pydoc.Helper instance>'
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def get_stack_path(i=0, depth=None):
    '''Inspect Stack to get a relative name for this code point
    '''
    stk = inspect.stack()[(2 + i):depth]
    loc = []

    for sf in reversed(stk):
        _, fn = os.path.split(sf.filename)
        f = sf.function
        loc.append('[{}]{}'.format(fn, f))

    return ':'.join(loc)
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def __init__(self, name, *values, **options):
        """Create a new rule definition. Simply instantiating a new rule definition
        will add it to the current ``GramFuzzer`` instance.

        :param str name: The name of the rule being defined
        :param list values: The list of values that define the value of the rule
            (will be concatenated when built)
        :param str cat: The category to create the rule in (default=``"default"``).
        :param bool no_prune: If this rule should not be pruned *EVEN IF* it is found to be
            unreachable (default=``False``)
        """
        self.name = name
        self.options = options
        self.values = list(values)

        self.sep = self.options.setdefault("sep", self.sep)
        self.cat = self.options.setdefault("cat", self.cat)
        self.no_prune = self.options.setdefault("no_prune", self.no_prune)

        self.fuzzer = GramFuzzer.instance()

        frame,mod_path,_,_,_,_ = inspect.stack()[1]
        module_name = os.path.basename(mod_path).replace(".pyc", "").replace(".py", "")
        if "TOP_CAT" in frame.f_locals:
            self.fuzzer.cat_group_defaults[module_name] = frame.f_locals["TOP_CAT"]
        self.fuzzer.add_definition(self.cat, self.name, self, no_prune=self.no_prune, gram_file=module_name)
项目:daiquiri    作者:jd    | 项目源码 | 文件源码
def get_program_name():
    return os.path.basename(inspect.stack()[-1][1])
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def get_test_path(filename):
  frame = inspect.stack()[1]
  module = inspect.getmodule(frame[0])
  path = os.path.realpath(module.__file__)
  root = os.path.abspath(os.path.join(path, os.pardir))
  return os.path.join(root, 'test_data', filename)
项目:kaira    作者:mulonemartin    | 项目源码 | 文件源码
def __init__(self,
                 name=None,
                 router=None,
                 load_env=True,
                 log_config=LOGGING):
        if log_config:
            logging.config.dictConfig(log_config)
        # Only set up a default log handler if the
        # end-user application didn't set anything up.
        if not (logging.root.handlers and
                log.level == logging.NOTSET and
                log_config):
            formatter = logging.Formatter(
                "%(asctime)s: %(levelname)s: %(message)s")
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)
            log.addHandler(handler)
            log.setLevel(logging.INFO)

        # Get name from previous stack frame
        if name is None:
            frame_records = stack()[1]
            name = getmodulename(frame_records[1])

        self.name = name
        self.config = Config(load_env=load_env)
        self.router = router or PathRouter()
        self.debug = None
        self.static_handler = None
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def authorize_guest(self, mac, minutes=60, up=None, down=None, mbytes=None, apmac=None):
        """
        Authorize one guest
        :param mac: the mac of the guest
        :param minutes: for how many minutes it is authorized
        :param up: upstream bandwidth in kbits
        :param down: downstream bandwidth in kbits
        :param mbytes: mbytes limit in kbits
        :param apmac: AP mac address (faster performance)
        :return:
        """
        data = {
            'mac': mac.lower(),
            'cmd': 'authorize-guest',
            'minutes': minutes
        }
        if up:
            data['up'] = up
        if down:
            data['down'] = down
        if mbytes:
            data['bytes'] = mbytes
        if apmac:
            data['ap_mac'] = apmac.lower()

        content = self.sitecmdjson('/cmd/stamgr', data)
        return self.response(content, inspect.stack()[0].function, 'Guest Authorization')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def unauthorize_guest(self, mac):
        """
        Unauthorize one guest
        :param mac:
        :return:
        """
        content = self.sitecmdjson('/cmd/stamgr', {
            'cmd': 'unauthorize-guest',
            'mac': mac.lower()
        })
        return self.response(content, inspect.stack()[0].function, 'Guest Unuthorization')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def kick_sta(self, mac):
        """
        Reconnect client device
        :param mac:
        :return:
        """
        content = self.sitecmdjson('/cmd/stamgr', {
            'cmd': 'kick-sta',
            'mac': mac.lower()
        })
        return self.response(content, inspect.stack()[0].function, 'Kicking Station')
项目:pythonUnifiAPI    作者:delian    | 项目源码 | 文件源码
def block_sta(self, mac):
        """
        Blocking client device
        :param mac:
        :return:
        """
        content = self.sitecmdjson('/cmd/stamgr', {
            'cmd': 'block-sta',
            'mac': mac.lower()
        })
        return self.response(content, inspect.stack()[0].function, 'Blocking Station')