Python logging 模块,makeLogRecord() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.makeLogRecord()

项目:tk-photoshopcc    作者:shotgunsoftware    | 项目源码 | 文件源码
def _handle_logging(self, level, message):
        """
        Handles an RPC logging request.

        :param str level: One of "debug", "info", "warning", or "error".
        :param str message: The log message.
        """

        # manually create a record to log to the standard file handler.
        # we format it to match the regular logs, but tack on the '.js' to
        # indicate that it came from javascript.
        record = logging.makeLogRecord({
            "levelname": level.upper(),
            "name": "%s.js" % (self.logger.name,),
            "msg": message,
        })

        # forward this message to the base file handler so that it is logged
        # appropriately.
        if sgtk.LogManager().base_file_handler:
            sgtk.LogManager().base_file_handler.handle(record)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def handle(self):
        '''
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format.
        '''
        while 1:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)
项目:ldap2pg    作者:dalibo    | 项目源码 | 文件源码
def test_multiline_formatter():
    import logging
    from ldap2pg.config import MultilineFormatter

    formatter = MultilineFormatter("prefix: %(message)s")

    base_record = dict(
        name='pouet', level=logging.DEBUG, fn="(unknown file)", lno=0, args=(),
        exc_info=None,
    )
    record = logging.makeLogRecord(dict(base_record, msg="single line"))
    payload = formatter.format(record)
    assert "prefix: single line" == payload

    record = logging.makeLogRecord(dict(base_record, msg="Uno\nDos\nTres"))

    payload = formatter.format(record)
    wanted = """\
    prefix: Uno
    prefix: Dos
    prefix: Tres\
    """.replace('    ', '')

    assert wanted == payload
项目:storperf    作者:opnfv    | 项目源码 | 文件源码
def read_logs(self):
        try:
            while True:
                datagram = self.socket.recv(8192)
                chunk = datagram[0:4]
                struct.unpack(">L", chunk)[0]
                chunk = datagram[4:]
                obj = cPickle.loads(chunk)
                record = logging.makeLogRecord(obj)
                if (record.levelno >= self.level):
                    logger = logging.getLogger(record.name)
                    logger.handle(record)

        except Exception as e:
            print "ERROR: " + str(e)
        finally:
            self.socket.close()
项目:Safe-RL-Benchmark    作者:befelix    | 项目源码 | 文件源码
def test_logger_format(self):
        """Test: CONFIG: logger format."""
        config = SRBConfig(logger)

        config.logger_add_stream_handler()
        config.logger_add_file_handler('logs.log')

        fmt = '%(name)s - %(levelname)s - %(message)s'
        formatter = logging.Formatter(fmt)

        config.logger_format = fmt

        tst_record = {
            'name': 'test_logger',
            'level': logging.DEBUG,
            'pathname': os.path.realpath(__file__),
            'lineno': 42,
            'msg': 'test_msg',
            'args': None,
            'exc_info': None,
            'func': 'test_logger_format'
        }
        rec = logging.makeLogRecord(tst_record)
        self.assertEqual(formatter.format(rec),
                         config.logger_stream_handler.format(rec))
项目:bigchaindb    作者:bigchaindb    | 项目源码 | 文件源码
def handle(self):
        """Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unpickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handle_log_record(record)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_error_handling(self):
        h = TestStreamHandler(BadStream())
        r = logging.makeLogRecord({})
        old_raise = logging.raiseExceptions
        old_stderr = sys.stderr
        try:
            h.handle(r)
            self.assertIs(h.error_record, r)
            h = logging.StreamHandler(BadStream())
            sys.stderr = sio = io.StringIO()
            h.handle(r)
            self.assertIn('\nRuntimeError: deliberate mistake\n',
                          sio.getvalue())
            logging.raiseExceptions = False
            sys.stderr = sio = io.StringIO()
            h.handle(r)
            self.assertEqual('', sio.getvalue())
        finally:
            logging.raiseExceptions = old_raise
            sys.stderr = old_stderr

# -- The following section could be moved into a server_helper.py module
# -- if it proves to be of wider utility than just test_logging
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_basic(self):
        sockmap = {}
        server = TestSMTPServer(('localhost', 0), self.process_message, 0.001,
                                sockmap)
        server.start()
        addr = ('localhost', server.port)
        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=5.0)
        self.assertEqual(h.toaddrs, ['you'])
        self.messages = []
        r = logging.makeLogRecord({'msg': 'Hello'})
        self.handled = threading.Event()
        h.handle(r)
        self.handled.wait(5.0)  # 14314: don't wait forever
        server.stop()
        self.assertTrue(self.handled.is_set())
        self.assertEqual(len(self.messages), 1)
        peer, mailfrom, rcpttos, data = self.messages[0]
        self.assertEqual(mailfrom, 'me')
        self.assertEqual(rcpttos, ['you'])
        self.assertIn('\nSubject: Log\n', data)
        self.assertTrue(data.endswith('\n\nHello'))
        h.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_error_handling(self):
        h = TestStreamHandler(BadStream())
        r = logging.makeLogRecord({})
        old_raise = logging.raiseExceptions
        old_stderr = sys.stderr
        try:
            h.handle(r)
            self.assertIs(h.error_record, r)
            h = logging.StreamHandler(BadStream())
            sys.stderr = sio = io.StringIO()
            h.handle(r)
            self.assertIn('\nRuntimeError: deliberate mistake\n',
                          sio.getvalue())
            logging.raiseExceptions = False
            sys.stderr = sio = io.StringIO()
            h.handle(r)
            self.assertEqual('', sio.getvalue())
        finally:
            logging.raiseExceptions = old_raise
            sys.stderr = old_stderr

# -- The following section could be moved into a server_helper.py module
# -- if it proves to be of wider utility than just test_logging
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_basic(self):
        sockmap = {}
        server = TestSMTPServer((HOST, 0), self.process_message, 0.001,
                                sockmap)
        server.start()
        addr = (HOST, server.port)
        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
                                         timeout=self.TIMEOUT)
        self.assertEqual(h.toaddrs, ['you'])
        self.messages = []
        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
        self.handled = threading.Event()
        h.handle(r)
        self.handled.wait(self.TIMEOUT)  # 14314: don't wait forever
        server.stop()
        self.assertTrue(self.handled.is_set())
        self.assertEqual(len(self.messages), 1)
        peer, mailfrom, rcpttos, data = self.messages[0]
        self.assertEqual(mailfrom, 'me')
        self.assertEqual(rcpttos, ['you'])
        self.assertIn('\nSubject: Log\n', data)
        self.assertTrue(data.endswith('\n\nHello \u2713'))
        h.close()
项目:antitools    作者:bufubaoni    | 项目源码 | 文件源码
def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_error_handling(self):
        h = TestStreamHandler(BadStream())
        r = logging.makeLogRecord({})
        old_raise = logging.raiseExceptions
        old_stderr = sys.stderr
        try:
            h.handle(r)
            self.assertIs(h.error_record, r)
            h = logging.StreamHandler(BadStream())
            sys.stderr = sio = io.StringIO()
            h.handle(r)
            self.assertIn('\nRuntimeError: deliberate mistake\n',
                          sio.getvalue())
            logging.raiseExceptions = False
            sys.stderr = sio = io.StringIO()
            h.handle(r)
            self.assertEqual('', sio.getvalue())
        finally:
            logging.raiseExceptions = old_raise
            sys.stderr = old_stderr

# -- The following section could be moved into a server_helper.py module
# -- if it proves to be of wider utility than just test_logging
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_basic(self):
        sockmap = {}
        server = TestSMTPServer(('localhost', 0), self.process_message, 0.001,
                                sockmap)
        server.start()
        addr = ('localhost', server.port)
        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
                                         timeout=self.TIMEOUT)
        self.assertEqual(h.toaddrs, ['you'])
        self.messages = []
        r = logging.makeLogRecord({'msg': 'Hello'})
        self.handled = threading.Event()
        h.handle(r)
        self.handled.wait(self.TIMEOUT)  # 14314: don't wait forever
        server.stop()
        self.assertTrue(self.handled.is_set())
        self.assertEqual(len(self.messages), 1)
        peer, mailfrom, rcpttos, data = self.messages[0]
        self.assertEqual(mailfrom, 'me')
        self.assertEqual(rcpttos, ['you'])
        self.assertIn('\nSubject: Log\n', data)
        self.assertTrue(data.endswith('\n\nHello'))
        h.close()
项目:ldap2pg    作者:dalibo    | 项目源码 | 文件源码
def test_color_handler():
    import logging
    from ldap2pg.config import ColoredStreamHandler

    handler = ColoredStreamHandler()
    record = logging.makeLogRecord(dict(
        name='pouet', level=logging.DEBUG, fn="(unknown file)", msg="Message",
        lno=0, args=(), exc_info=None,
    ))
    payload = handler.format(record)
    assert "\033[0" in payload
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def update(self,frame,events):
        if self._socket:
            while self._socket.new_data:
                t,s = self._socket.recv()
                self.on_log(logging.makeLogRecord(s))
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def recent_events(self,events):
        if self._socket and self._socket.new_data:
            t,s = self._socket.recv()
            self.on_log(logging.makeLogRecord(s))
        self.alpha -= min(.2,events['dt'])
项目:mdk    作者:datawire    | 项目源码 | 文件源码
def test_emitReturnsLoggedMessageId(self):
        """
        MDKHandler.emit returns the LoggedMessageId from the Session.
        """
        mdk, tracer = create_mdk_with_faketracer()
        session = mdk.session()
        handler = MDKHandler(mdk, lambda: session)
        record = logging.makeLogRecord({"name": "", "levelname": "INFO",
                                        "message": "hello"})
        mid = handler.emit(record)
        self.assertIsInstance(mid, LoggedMessageId)
        self.assertEqual(mid.traceId, session._context.traceId)
项目:mdk    作者:datawire    | 项目源码 | 文件源码
def test_emitReturnsLoggedMessageId(self):
        """
        MDKLoggingHandler.emit returns the LoggedMessageId.
        """
        mdk, tracer = create_mdk_with_faketracer()
        handler = MDKLoggingHandler(mdk)
        record = logging.makeLogRecord({"name": "", "levelname": "INFO",
                                        "message": "hello"})
        mid = handler.emit(record)
        self.assertIsInstance(mid, LoggedMessageId)
项目:multilog    作者:humangeo    | 项目源码 | 文件源码
def handle(self):
        """Deal with the incoming log data"""
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            struct_len = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(struct_len)
            while len(chunk) < struct_len:
                chunk = chunk + self.connection.recv(struct_len - len(chunk))
            obj = self.unpickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handle_log_record(record)
项目:integration-prototype    作者:SKA-ScienceDataProcessor    | 项目源码 | 文件源码
def run(self):
        """Run loop.

        Receives log messages from connected publishers and logs them via
        a python logging interface.
        """
        log = logging.getLogger('sip.logging_aggregator')
        fail_count = 0
        fail_count_limit = 100
        # Exponential relaxation of timeout in event loop.
        timeout = np.logspace(-6, -2, fail_count_limit)
        while not self._stop_requested.is_set():
            try:
                topic, values = self._subscriber.recv_multipart(zmq.NOBLOCK)
                str_values = values.decode('utf-8')
                try:
                    dict_values = json.loads(str_values)
                    record = logging.makeLogRecord(dict_values)
                    log.handle(record)
                    fail_count = 0
                except json.decoder.JSONDecodeError:
                    print('ERROR: Unable to convert JSON log record.')
                    raise
            except zmq.ZMQError as e:
                if e.errno == zmq.EAGAIN:
                    fail_count += 1
                else:
                    raise  # Re-raise the exception
            if fail_count < fail_count_limit:
                _timeout = timeout[fail_count]
            else:
                _timeout = timeout[-1]
            self._stop_requested.wait(_timeout)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def handle(self):
        """Handle multiple requests - each expected to be of 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally."""
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack(">L", chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unpickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handle_log_record(record)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_record(self, name=None):
        result = dict(self.common)
        if name is not None:
            result.update(self.variants[name])
        return logging.makeLogRecord(result)
项目:python-telegram-handler    作者:sashgorokhov    | 项目源码 | 文件源码
def test_html_formatter_format():
    formatter = formatters.HtmlFormatter()

    logrecord = logging.makeLogRecord(dict(name='<foo>', func='<module>', msg='Whatsup?', funcName='test'))
    s = formatter.format(logrecord)

    assert '<foo>' not in s
    assert '<module>' not in s
项目:python-telegram-handler    作者:sashgorokhov    | 项目源码 | 文件源码
def test_html_formatter_emoji():
    formatter = formatters.HtmlFormatter(use_emoji=True)

    emoji_level_map = {
        formatters.EMOJI.WHITE_CIRCLE: [logging.DEBUG],
        formatters.EMOJI.BLUE_CIRCLE: [logging.INFO],
        formatters.EMOJI.RED_CIRCLE: [logging.WARNING, logging.ERROR]
    }

    for emoji, levels in emoji_level_map.items():
        for level in levels:
            logrecord = logging.makeLogRecord({'levelno': level, 'levelname': logging.getLevelName(level)})
            s = formatter.format(logrecord)
            assert s.find(emoji) > 0, 'Emoji not found in %s' % level
项目:python-telegram-handler    作者:sashgorokhov    | 项目源码 | 文件源码
def test_emit(handler):
    record = logging.makeLogRecord({'msg': 'hello'})

    with mock.patch('requests.post') as patch:
        handler.emit(record)

    assert patch.called
    assert patch.call_count == 1
    assert patch.call_args[1]['json']['chat_id'] == 'bar'
    assert 'hello' in patch.call_args[1]['json']['text']
    assert patch.call_args[1]['json']['parse_mode'] == 'HTML'
项目:python-telegram-handler    作者:sashgorokhov    | 项目源码 | 文件源码
def test_emit_http_exception(handler):
    record = logging.makeLogRecord({'msg': 'hello'})

    with mock.patch('requests.post') as patch:
        response = requests.Response()
        response.status_code = 500
        response._content = 'Server error'.encode()
        patch.return_value = response
        handler.emit(record)

    assert telegram_handler.handlers.logger.handlers[0].messages['error']
    assert telegram_handler.handlers.logger.handlers[0].messages['debug']
项目:python-telegram-handler    作者:sashgorokhov    | 项目源码 | 文件源码
def test_emit_telegram_error(handler):
    record = logging.makeLogRecord({'msg': 'hello'})

    with mock.patch('requests.post') as patch:
        response = requests.Response()
        response.status_code = 200
        response._content = json.dumps({'ok': False}).encode()
        patch.return_value = response
        handler.emit(record)

    assert telegram_handler.handlers.logger.handlers[0].messages['warning']
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def test_adds_padding(self):
        self.handler.emit(logging.makeLogRecord({}))
        self.d.infobox.assert_called_once_with(mock.ANY, 4, 10)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def test_args_in_msg_get_replaced(self):
        assert len('123456') <= self.handler.width
        self.handler.emit(logging.makeLogRecord(
            {'msg': '123%s', 'args': (456,)}))
        self.d.infobox.assert_called_once_with('123456', mock.ANY, mock.ANY)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def test_wraps_nospace_is_greedy(self):
        assert len('1234567') > self.handler.width
        self.handler.emit(logging.makeLogRecord({'msg': '1234567'}))
        self.d.infobox.assert_called_once_with('123456\n7', mock.ANY, mock.ANY)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def test_wraps_at_whitespace(self):
        assert len('123 567') > self.handler.width
        self.handler.emit(logging.makeLogRecord({'msg': '123 567'}))
        self.d.infobox.assert_called_once_with('123\n567', mock.ANY, mock.ANY)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def test_only_last_lines_are_printed(self):
        assert len('a\nb\nc'.split()) > self.handler.height
        self.handler.emit(logging.makeLogRecord({'msg': 'a\n\nb\nc'}))
        self.d.infobox.assert_called_once_with('b\nc', mock.ANY, mock.ANY)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def handle(self):
        """Handle multiple requests - each expected to be of 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally."""
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack(">L", chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unpickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handle_log_record(record)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_race(self):
        # Issue #14632 refers.
        def remove_loop(fname, tries):
            for _ in range(tries):
                try:
                    os.unlink(fname)
                except OSError:
                    pass
                time.sleep(0.004 * random.randint(0, 4))

        del_count = 500
        log_count = 500

        for delay in (False, True):
            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
            os.close(fd)
            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
            remover.daemon = True
            remover.start()
            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
            h.setFormatter(f)
            try:
                for _ in range(log_count):
                    time.sleep(0.005)
                    r = logging.makeLogRecord({'msg': 'testing' })
                    h.handle(r)
            finally:
                remover.join()
                try:
                    h.close()
                except ValueError:
                    pass
                if os.path.exists(fn):
                    os.unlink(fn)


# Set the locale to the platform-dependent default.  I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def handle(self):
        """Handle multiple requests - each expected to be of 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally."""
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack(">L", chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unpickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handle_log_record(record)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_race(self):
        # Issue #14632 refers.
        def remove_loop(fname, tries):
            for _ in range(tries):
                try:
                    os.unlink(fname)
                except OSError:
                    pass
                time.sleep(0.004 * random.randint(0, 4))

        del_count = 500
        log_count = 500

        for delay in (False, True):
            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
            os.close(fd)
            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
            remover.daemon = True
            remover.start()
            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
            h.setFormatter(f)
            try:
                for _ in range(log_count):
                    time.sleep(0.005)
                    r = logging.makeLogRecord({'msg': 'testing' })
                    h.handle(r)
            finally:
                remover.join()
                try:
                    h.close()
                except ValueError:
                    pass
                if os.path.exists(fn):
                    os.unlink(fn)


# Set the locale to the platform-dependent default.  I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
项目:orizonhub    作者:gumblex    | 项目源码 | 文件源码
def log(self, msg: Message):
        d = msg._asdict()
        d['asctime'] = datetime.fromtimestamp(msg.time, self.tz).strftime('%Y-%m-%d %H:%M:%S')
        d['srcname'] = msg.src.alias
        d['srcid'] = msg.src.id
        self.loghandler.emit(logging.makeLogRecord({'msg': self.FORMAT % d}))
项目:geonotebook    作者:OpenGeoscience    | 项目源码 | 文件源码
def post(self):
        self.log.handle(logging.makeLogRecord(self.get_json_body()))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_empty_filter(self):
        f = logging.Filter()
        r = logging.makeLogRecord({'name': 'spam.eggs'})
        self.assertTrue(f.filter(r))

#
#   First, we define our levels. There can be as many as you want - the only
#     limitations are that they should be integers, the lowest should be > 0 and
#   larger values mean less information being logged. If you need specific
#   level values which do not fit into these limitations, you can use a
#   mapping dictionary to convert between your application levels and the
#   logging system.
#
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_race(self):
        # Issue #14632 refers.
        def remove_loop(fname, tries):
            for _ in range(tries):
                try:
                    os.unlink(fname)
                except OSError:
                    pass
                time.sleep(0.004 * random.randint(0, 4))

        del_count = 500
        log_count = 500

        for delay in (False, True):
            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
            os.close(fd)
            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
            remover.daemon = True
            remover.start()
            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
            h.setFormatter(f)
            try:
                for _ in range(log_count):
                    time.sleep(0.005)
                    r = logging.makeLogRecord({'msg': 'testing' })
                    h.handle(r)
            finally:
                remover.join()
                h.close()
                if os.path.exists(fn):
                    os.unlink(fn)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def handle_datagram(self, request):
        slen = struct.pack('>L', 0) # length of prefix
        packet = request.packet[len(slen):]
        obj = pickle.loads(packet)
        record = logging.makeLogRecord(obj)
        self.log_output += record.msg + '\n'
        self.handled.set()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def get_record(self, name=None):
        result = dict(self.common)
        if name is not None:
            result.update(self.variants[name])
        return logging.makeLogRecord(result)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.records = [
            logging.makeLogRecord({'msg': 'one'}),
            logging.makeLogRecord({'msg': 'two'}),
        ]
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_str_rep(self):
        r = logging.makeLogRecord({})
        s = str(r)
        self.assertTrue(s.startswith('<LogRecord: '))
        self.assertTrue(s.endswith('>'))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_multiprocessing(self):
        r = logging.makeLogRecord({})
        self.assertEqual(r.processName, 'MainProcess')
        try:
            import multiprocessing as mp
            r = logging.makeLogRecord({})
            self.assertEqual(r.processName, mp.current_process().name)
        except ImportError:
            pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_delay(self):
        os.unlink(self.fn)
        fh = logging.FileHandler(self.fn, delay=True)
        self.assertIsNone(fh.stream)
        self.assertFalse(os.path.exists(self.fn))
        fh.handle(logging.makeLogRecord({}))
        self.assertIsNotNone(fh.stream)
        self.assertTrue(os.path.exists(self.fn))
        fh.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_rollover(self):
        fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
                                                       backupCount=1)
        fmt = logging.Formatter('%(asctime)s %(message)s')
        fh.setFormatter(fmt)
        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
        fh.emit(r1)
        self.assertLogFile(self.fn)
        time.sleep(1.1)    # a little over a second ...
        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
        fh.emit(r2)
        fh.close()
        # At this point, we should have a recent rotated file which we
        # can test for the existence of. However, in practice, on some
        # machines which run really slowly, we don't know how far back
        # in time to go to look for the log file. So, we go back a fair
        # bit, and stop as soon as we see a rotated file. In theory this
        # could of course still fail, but the chances are lower.
        found = False
        now = datetime.datetime.now()
        GO_BACK = 5 * 60 # seconds
        for secs in range(GO_BACK):
            prev = now - datetime.timedelta(seconds=secs)
            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
            found = os.path.exists(fn)
            if found:
                self.rmfiles.append(fn)
                break
        msg = 'No rotated files found, went back %d seconds' % GO_BACK
        if not found:
            #print additional diagnostics
            dn, fn = os.path.split(self.fn)
            files = [f for f in os.listdir(dn) if f.startswith(fn)]
            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
            print('The only matching files are: %s' % files, file=sys.stderr)
            for f in files:
                print('Contents of %s:' % f)
                path = os.path.join(dn, f)
                with open(path, 'r') as tf:
                    print(tf.read())
        self.assertTrue(found, msg=msg)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_basic(self):
        logtype = 'Application'
        elh = win32evtlog.OpenEventLog(None, logtype)
        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
        h = logging.handlers.NTEventLogHandler('test_logging')
        r = logging.makeLogRecord({'msg': 'Test Log Message'})
        h.handle(r)
        h.close()
        # Now see if the event is recorded
        self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh))
        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
                win32evtlog.EVENTLOG_SEQUENTIAL_READ
        found = False
        GO_BACK = 100
        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
        for e in events:
            if e.SourceName != 'test_logging':
                continue
            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
            if msg != 'Test Log Message\r\n':
                continue
            found = True
            break
        msg = 'Record not found in event log, went back %d records' % GO_BACK
        self.assertTrue(found, msg=msg)

# Set the locale to the platform-dependent default.  I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def handle(self):
        """Handle multiple requests - each expected to be of 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally."""
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack(">L", chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unpickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handle_log_record(record)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_race(self):
        # Issue #14632 refers.
        def remove_loop(fname, tries):
            for _ in range(tries):
                try:
                    os.unlink(fname)
                except OSError:
                    pass
                time.sleep(0.004 * random.randint(0, 4))

        del_count = 500
        log_count = 500

        for delay in (False, True):
            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
            os.close(fd)
            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
            remover.daemon = True
            remover.start()
            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
            h.setFormatter(f)
            try:
                for _ in range(log_count):
                    time.sleep(0.005)
                    r = logging.makeLogRecord({'msg': 'testing' })
                    h.handle(r)
            finally:
                remover.join()
                try:
                    h.close()
                except ValueError:
                    pass
                if os.path.exists(fn):
                    os.unlink(fn)


# Set the locale to the platform-dependent default.  I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.