Python codecs 模块,decode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.decode()

项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def translate_non_sgml_chars(data, enc='utf-8'):
    # type: (bytes, str) -> bytes

    def replace_non_sgml(m):
        # type: (Match) -> str
        codepoint = ord(m.group(0))
        if 127 <= codepoint <= 159:
            try:
                return int2byte(codepoint).decode('windows-1252')
            except UnicodeDecodeError:
                pass
        # Unicode Character 'REPLACEMENT CHARACTER'
        return u'\ufffd'

    text = data.decode(enc, 'replace')
    text = re.sub(unistr(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]'), replace_non_sgml, text)
    return text.encode(enc, 'replace')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:lightning-coindesk    作者:lightninglabs    | 项目源码 | 文件源码
def check_payment(self):
        """
        Checks if the Lightning payment has been received for this invoice
        """
        if self.status == 'pending_invoice':
            return False

        channel = grpc.insecure_channel(settings.LND_RPCHOST)
        stub = lnrpc.LightningStub(channel)

        r_hash_base64 = self.r_hash.encode('utf-8')
        r_hash_bytes = str(codecs.decode(r_hash_base64, 'base64'))
        invoice_resp = stub.LookupInvoice(ln.PaymentHash(r_hash=r_hash_bytes))

        if invoice_resp.settled:
            # Payment complete
            self.status = 'complete'
            self.save()
            return True
        else:
            # Payment not received
            return False
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
        self.cls = MultiDict if cls is None else cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def _real_extract(self, url):
        video_id = self._match_id(url)

        webpage = self._download_webpage(url, video_id)
        if re.search(r"<(?:DIV|div) class='login-required-screen'>", webpage):
            self.raise_login_required('This video requires login')

        title = self._og_search_title(webpage)
        description = self._og_search_description(webpage)
        thumbnail = self._og_search_thumbnail(webpage)
        duration = int_or_none(self._html_search_meta(
            'video:duration', webpage, 'duration'))

        apu = self._search_regex(r"apu='([^']+)'", webpage, 'apu')
        m3u8_url = codecs.decode(apu, 'rot_13')[::-1]
        formats = self._extract_m3u8_formats(m3u8_url, video_id, ext='mp4')

        return {
            'id': video_id,
            'title': title,
            'formats': formats,
            'thumbnail': thumbnail,
            'description': description,
            'duration': duration,
        }
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else: # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else: # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def seal_aes_ctr_legacy(key_service, secret, digest_method=DEFAULT_DIGEST):
    """
    Encrypts `secret` using the key service.
    You can decrypt with the companion method `open_aes_ctr_legacy`.
    """
    # generate a a 64 byte key.
    # Half will be for data encryption, the other half for HMAC
    key, encoded_key = key_service.generate_key_data(64)
    ciphertext, hmac = _seal_aes_ctr(
        secret, key, LEGACY_NONCE, digest_method,
    )
    return {
        'key': b64encode(encoded_key).decode('utf-8'),
        'contents': b64encode(ciphertext).decode('utf-8'),
        'hmac': codecs.encode(hmac, "hex_codec"),
        'digest': digest_method,
    }
项目:mdx-server    作者:ninja33    | 项目源码 | 文件源码
def _split_key_block(self, key_block):
        key_list = []
        key_start_index = 0
        while key_start_index < len(key_block):
            temp = key_block[key_start_index:key_start_index + self._number_width]
            # the corresponding record's offset in record block
            key_id = unpack(self._number_format, key_block[key_start_index:key_start_index + self._number_width])[0]
            # key text ends with '\x00'
            if self._encoding == 'UTF-16':
                delimiter = b'\x00\x00'
                width = 2
            else:
                delimiter = b'\x00'
                width = 1
            i = key_start_index + self._number_width
            while i < len(key_block):
                if key_block[i:i + width] == delimiter:
                    key_end_index = i
                    break
                i += width
            key_text = key_block[key_start_index + self._number_width:key_end_index]\
                .decode(self._encoding, errors='ignore').encode('utf-8').strip()
            key_start_index = key_end_index + width
            key_list += [(key_id, key_text)]
        return key_list
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
        self.cls = MultiDict if cls is None else cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else: # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:mgtools    作者:miyagaw61    | 项目源码 | 文件源码
def pattern_env(self, *arg):
        """
        Set environment variable with a cyclic pattern
        Set "pattern" option for basic/extended pattern type
        Usage:
            MYNAME ENVNAME size[,offset]
        """

        (env, size) = normalize_argv(arg, 2)
        if size is None:
            self._missing_argument()

        (size, offset) = (arg[1] + ",").split(",")[:2]
        size = to_int(size)
        if offset:
            offset = to_int(offset)
        else:
            offset = 0
        if size is None or offset is None:
            self._missing_argument()

        peda.execute("set env %s %s" % (env, cyclic_pattern(size, offset).decode('utf-8')))
        msg("Set environment %s = cyclic_pattern(%d, %d)" % (env, size, offset))

        return
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else: # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:django-sysinfo    作者:saxix    | 项目源码 | 文件源码
def http_basic_auth(func):
    @wraps(func)
    def _decorator(request, *args, **kwargs):
        from django.contrib.auth import authenticate, login

        if "HTTP_AUTHORIZATION" in request.META:
            authmeth, auth = request.META["HTTP_AUTHORIZATION"].split(b" ", 1)
            if authmeth.lower() == b"basic":
                auth = codecs.decode(auth.strip(), "base64")
                username, password = auth.split(b":", 1)
                user = authenticate(username=username, password=password)
                if user and is_authorized(user):
                    login(request, user)
                else:
                    raise PermissionDenied()
        return func(request, *args, **kwargs)

    return _decorator
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.stream_factory = stream_factory
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        if stream_factory is None:
            stream_factory = default_stream_factory
        if cls is None:
            cls = MultiDict
        self.cls = cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:cuny-bdif    作者:aristotle-tek    | 项目源码 | 文件源码
def test_resume_archive_from_file(self, mock_resume_file_upload):
        part_size = 4
        mock_list_parts = Mock()
        mock_list_parts.return_value = {
            'PartSizeInBytes': part_size,
            'Parts': [{
                'RangeInBytes': '0-3',
                'SHA256TreeHash': '12',
            }, {
                'RangeInBytes': '4-6',
                'SHA256TreeHash': '34',
            }],
        }

        self.vault.list_all_parts = mock_list_parts
        self.vault.resume_archive_from_file(
            sentinel.upload_id, file_obj=sentinel.file_obj)
        mock_resume_file_upload.assert_called_once_with(
            self.vault, sentinel.upload_id, part_size, sentinel.file_obj,
            {0: codecs.decode('12', 'hex_codec'), 1: codecs.decode('34', 'hex_codec')})
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def _load_channel(self, deposit_txid):
        # Check if a payment has been made to this chanel
        if not os.path.exists(deposit_txid + ".server.tx"):
            raise PaymentChannelNotFoundError("Payment channel not found.")

        # Load JSON channel
        with open(deposit_txid + ".server.tx") as f:
            channel_json = json.load(f)

        # Deserialize into objects
        channel = {}
        channel['deposit_tx'] = bitcoin.Transaction.from_hex(channel_json['deposit_tx'])
        channel['redeem_script'] = PaymentChannelRedeemScript.from_bytes(
            codecs.decode(channel_json['redeem_script'], 'hex_codec'))
        channel['payment_tx'] = bitcoin.Transaction.from_hex(
            channel_json['payment_tx']) if channel_json['payment_tx'] else None
        return channel
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def open(self, deposit_tx, redeem_script):
        # Deserialize deposit tx and redeem script
        deposit_tx = bitcoin.Transaction.from_hex(deposit_tx)
        try:
            redeem_script = PaymentChannelRedeemScript.from_bytes(codecs.decode(redeem_script, 'hex_codec'))
        except ValueError:
            raise AssertionError("Invalid payment channel redeem script.")

        # Validate deposit tx
        assert len(deposit_tx.outputs) > 1, "Invalid deposit tx outputs."
        output_index = deposit_tx.output_index_for_address(redeem_script.hash160())
        assert output_index is not None, "Missing deposit tx P2SH output."
        assert deposit_tx.outputs[output_index].script.is_p2sh(), "Invalid deposit tx output P2SH script."
        assert deposit_tx.outputs[output_index].script.get_hash160() == redeem_script.hash160(), "Invalid deposit tx output script P2SH address."  # nopep8

        # Store channel
        deposit_txid = str(deposit_tx.hash)
        self._store_channel(
            deposit_txid, {'deposit_tx': deposit_tx, 'redeem_script': redeem_script, 'payment_tx': None})
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def open(self, deposit_tx, redeem_script):
        # Deserialize deposit tx and redeem script
        deposit_tx = bitcoin.Transaction.from_hex(deposit_tx)
        deposit_txid = str(deposit_tx.hash)
        redeem_script = statemachine.PaymentChannelRedeemScript.from_bytes(codecs.decode(redeem_script, 'hex_codec'))

        # Validate redeem_script
        assert redeem_script.merchant_public_key.compressed_bytes == self.PRIVATE_KEY.public_key.compressed_bytes

        # Validate deposit tx
        assert len(deposit_tx.outputs) == 1, "Invalid deposit tx outputs."
        output_index = deposit_tx.output_index_for_address(redeem_script.hash160())
        assert output_index is not None, "Missing deposit tx P2SH output."
        assert deposit_tx.outputs[output_index].script.is_p2sh(), "Invalid deposit tx output P2SH script."
        assert deposit_tx.outputs[output_index].script.get_hash160() == redeem_script.hash160(), "Invalid deposit tx output script P2SH address."  # nopep8

        self.channels[deposit_txid] = {'deposit_tx': deposit_tx, 'redeem_script': redeem_script, 'payment_tx': None}
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
        self.cls = MultiDict if cls is None else cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def __call__(self, topic=None):
        if topic is None:
            sys.stdout._write('<span class=help>%s</span>' % repr(self))
            return
        import pydoc
        pydoc.help(topic)
        rv = sys.stdout.reset()
        if isinstance(rv, bytes):
            rv = rv.decode('utf-8', 'ignore')
        paragraphs = _paragraph_re.split(rv)
        if len(paragraphs) > 1:
            title = paragraphs[0]
            text = '\n\n'.join(paragraphs[1:])
        else:  # pragma: no cover
            title = 'Help'
            text = paragraphs[0]
        sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
                 max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
        self.charset = charset
        self.errors = errors
        self.max_form_memory_size = max_form_memory_size
        self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
        self.cls = MultiDict if cls is None else cls

        # make sure the buffer size is divisible by four so that we can base64
        # decode chunk by chunk
        assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4'
        # also the buffer size has to be at least 1024 bytes long or long headers
        # will freak out the system
        assert buffer_size >= 1024, 'buffer size has to be at least 1KB'

        self.buffer_size = buffer_size
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def bytes_to_hex_str(arg_bytes):
    return codecs.encode(arg_bytes,'hex').decode('ascii')
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def hex_str_to_bytes(arg_str):
    return codecs.decode(arg_str.encode('ascii'),'hex')
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def parse_42_guid(guid):
    guid_parts = guid.split('-')
    guid_int = codecs.decode("".join(guid_parts)[:32], "hex")
    return struct.unpack('>IIQ', guid_int)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unistr(text, errors='strict'):
    # type: (Union[str, bytes], str) -> str
    if isinstance(text, text_type):
        return text
    try:
        return text.decode('utf-8', errors=errors)
    except UnicodeDecodeError:
        if HAS_FSCODEC:
            return os.fsdecode(text)
        raise
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def unifilename(filename):
    # type: (Union[str, bytes]) -> str
    if isinstance(filename, text_type):
        return filename
    try:
        return filename.decode(sys.getfilesystemencoding())
    except UnicodeDecodeError:
        try:
            return filename.decode('utf-8')
        except UnicodeDecodeError:
            if HAS_FSCODEC:
                return os.fsdecode(filename)
            else:
                raise
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def use_startstyle(self, inlinestyletext):
        # type: (Optional[str]) -> None
        if inlinestyletext:
            # Transform YAML-like JSON
            # e.g. '{based_on_style: pep8, column_limit: 79}'
            # into '{"based_on_style": "pep8", "column_limit": 79}'
            # which can be parsed as JSON.
            inlinestyletext = re.sub(r'([a-zA-Z_]\w+)', r'"\1"', inlinestyletext)
            d = json.JSONDecoder().decode(inlinestyletext)  # type: ignore
            self.initial_style = style_make(d)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def surrdecode(s, enc='utf-8'):
    # type: (bytes, str) -> str
    return s.decode(enc, 'replace')