Python encodings 模块,idna() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用encodings.idna()

项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_nameprep(self):
        from encodings.idna import nameprep
        for pos, (orig, prepped) in enumerate(nameprep_tests):
            if orig is None:
                # Skipped
                continue
            # The Unicode strings are given in UTF-8
            orig = str(orig, "utf-8", "surrogatepass")
            if prepped is None:
                # Input contains prohibited characters
                self.assertRaises(UnicodeError, nameprep, orig)
            else:
                prepped = str(prepped, "utf-8", "surrogatepass")
                try:
                    self.assertEqual(nameprep(orig), prepped)
                except Exception as e:
                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_nameprep(self):
        from encodings.idna import nameprep
        for pos, (orig, prepped) in enumerate(nameprep_tests):
            if orig is None:
                # Skipped
                continue
            # The Unicode strings are given in UTF-8
            orig = str(orig, "utf-8", "surrogatepass")
            if prepped is None:
                # Input contains prohibited characters
                self.assertRaises(UnicodeError, nameprep, orig)
            else:
                prepped = str(prepped, "utf-8", "surrogatepass")
                try:
                    self.assertEqual(nameprep(orig), prepped)
                except Exception as e:
                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def __call__(self, value):
        if not(isinstance(value, (basestring, unicodeT))) or not value or '@' not in value:
            return (value, translate(self.error_message))

        body, domain = value.rsplit('@', 1)

        try:
            match_body = self.body_regex.match(body)
            match_domain = self.domain_regex.match(domain)

            if not match_domain:
                # check for Internationalized Domain Names
                # see https://docs.python.org/2/library/codecs.html#module-encodings.idna
                domain_encoded = to_unicode(domain).encode('idna').decode('ascii')
                match_domain = self.domain_regex.match(domain_encoded)

            match = (match_body is not None) and (match_domain is not None)
        except (TypeError, UnicodeError):
            # Value may not be a string where we can look for matches.
            # Example: we're calling ANY_OF formatter and IS_EMAIL is asked to validate a date.
            match = None
        if match:
            if (not self.banned or not self.banned.match(domain)) \
                    and (not self.forced or self.forced.match(domain)):
                return (value, None)
        return (value, translate(self.error_message))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy

        if ":" in host:
            addr_bytes = socket.inet_pton(socket.AF_INET6, host)
            file.write(b"\x04" + addr_bytes)
        elif check_ip_valid(host):
            addr_bytes = socket.inet_pton(socket.AF_INET, host)
            file.write(b"\x01" + addr_bytes)
        else:
            if rdns:
                # Resolve remotely
                host_bytes = host.encode('idna')
                file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
            else:
                # Resolve locally
                addr_bytes = socket.inet_aton(socket.gethostbyname(host))
                file.write(b"\x01" + addr_bytes)
                host = socket.inet_ntoa(addr_bytes)

        file.write(struct.pack(">H", port))
        return host, port
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy

        if ":" in host:
            addr_bytes = socket.inet_pton(socket.AF_INET6, host)
            file.write(b"\x04" + addr_bytes)
        elif check_ip_valid(host):
            addr_bytes = socket.inet_pton(socket.AF_INET, host)
            file.write(b"\x01" + addr_bytes)
        else:
            if rdns:
                # Resolve remotely
                host_bytes = host.encode('idna')
                file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
            else:
                # Resolve locally
                addr_bytes = socket.inet_aton(socket.gethostbyname(host))
                file.write(b"\x01" + addr_bytes)
                host = socket.inet_ntoa(addr_bytes)

        file.write(struct.pack(">H", port))
        return host, port
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy

        if ":" in host:
            addr_bytes = socket.inet_pton(socket.AF_INET6, host)
            file.write(b"\x04" + addr_bytes)
        elif check_ip_valid(host):
            addr_bytes = socket.inet_pton(socket.AF_INET, host)
            file.write(b"\x01" + addr_bytes)
        else:
            if rdns:
                # Resolve remotely
                host_bytes = host.encode('idna')
                file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
            else:
                # Resolve locally
                addr_bytes = socket.inet_aton(socket.gethostbyname(host))
                file.write(b"\x01" + addr_bytes)
                host = socket.inet_ntoa(addr_bytes)

        file.write(struct.pack(">H", port))
        return host, port
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_builtin_decode(self):
        self.assertEqual(str(b"python.org", "idna"), "python.org")
        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_builtin_encode(self):
        self.assertEqual("python.org".encode("idna"), b"python.org")
        self.assertEqual("python.org.".encode("idna"), b"python.org.")
        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_stream(self):
        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
        r.read(3)
        self.assertEqual(r.read(), "")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_incremental_decode(self):
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
            "python.org"
        )
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
            "python.org."
        )
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
            "pyth\xf6n.org."
        )
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
            "pyth\xf6n.org."
        )

        decoder = codecs.getincrementaldecoder("idna")()
        self.assertEqual(decoder.decode(b"xn--xam", ), "")
        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
        self.assertEqual(decoder.decode(b"rg"), "")
        self.assertEqual(decoder.decode(b"", True), "org")

        decoder.reset()
        self.assertEqual(decoder.decode(b"xn--xam", ), "")
        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
        self.assertEqual(decoder.decode(b"rg."), "org.")
        self.assertEqual(decoder.decode(b"", True), "")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_errors(self):
        """Only supports "strict" error handler"""
        "python.org".encode("idna", "strict")
        b"python.org".decode("idna", "strict")
        for errors in ("ignore", "replace", "backslashreplace",
                "surrogateescape"):
            self.assertRaises(Exception, "python.org".encode, "idna", errors)
            self.assertRaises(Exception,
                b"python.org".decode, "idna", errors)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_basics_capi(self):
        from _testcapi import codec_incrementalencoder, codec_incrementaldecoder
        s = "abc123"  # all codecs should be able to encode these
        for encoding in all_unicode_encodings:
            if encoding not in broken_unicode_with_stateful:
                # check incremental decoder/encoder (fetched via the C API)
                try:
                    cencoder = codec_incrementalencoder(encoding)
                except LookupError:  # no IncrementalEncoder
                    pass
                else:
                    # check C API
                    encodedresult = b""
                    for c in s:
                        encodedresult += cencoder.encode(c)
                    encodedresult += cencoder.encode("", True)
                    cdecoder = codec_incrementaldecoder(encoding)
                    decodedresult = ""
                    for c in encodedresult:
                        decodedresult += cdecoder.decode(bytes([c]))
                    decodedresult += cdecoder.decode(b"", True)
                    self.assertEqual(decodedresult, s,
                                     "encoding=%r" % encoding)

                if encoding not in ("idna", "mbcs"):
                    # check incremental decoder/encoder with errors argument
                    try:
                        cencoder = codec_incrementalencoder(encoding, "ignore")
                    except LookupError:  # no IncrementalEncoder
                        pass
                    else:
                        encodedresult = b"".join(cencoder.encode(c) for c in s)
                        cdecoder = codec_incrementaldecoder(encoding, "ignore")
                        decodedresult = "".join(cdecoder.decode(bytes([c]))
                                                for c in encodedresult)
                        self.assertEqual(decodedresult, s,
                                         "encoding=%r" % encoding)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_seek(self):
        # all codecs should be able to encode these
        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
        for encoding in all_unicode_encodings:
            if encoding == "idna": # FIXME: See SF bug #1163178
                continue
            if encoding in broken_unicode_with_stateful:
                continue
            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
            for t in range(5):
                # Test that calling seek resets the internal codec state and buffers
                reader.seek(0, 0)
                data = reader.read()
                self.assertEqual(s, data)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_bad_decode_args(self):
        for encoding in all_unicode_encodings:
            decoder = codecs.getdecoder(encoding)
            self.assertRaises(TypeError, decoder)
            if encoding not in ("idna", "punycode"):
                self.assertRaises(TypeError, decoder, 42)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_builtin_decode(self):
        self.assertEqual(str(b"python.org", "idna"), "python.org")
        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_builtin_encode(self):
        self.assertEqual("python.org".encode("idna"), b"python.org")
        self.assertEqual("python.org.".encode("idna"), b"python.org.")
        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_stream(self):
        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
        r.read(3)
        self.assertEqual(r.read(), "")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_incremental_decode(self):
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
            "python.org"
        )
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
            "python.org."
        )
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
            "pyth\xf6n.org."
        )
        self.assertEqual(
            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
            "pyth\xf6n.org."
        )

        decoder = codecs.getincrementaldecoder("idna")()
        self.assertEqual(decoder.decode(b"xn--xam", ), "")
        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
        self.assertEqual(decoder.decode(b"rg"), "")
        self.assertEqual(decoder.decode(b"", True), "org")

        decoder.reset()
        self.assertEqual(decoder.decode(b"xn--xam", ), "")
        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
        self.assertEqual(decoder.decode(b"rg."), "org.")
        self.assertEqual(decoder.decode(b"", True), "")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_basics_capi(self):
        from _testcapi import codec_incrementalencoder, codec_incrementaldecoder
        s = "abc123"  # all codecs should be able to encode these
        for encoding in all_unicode_encodings:
            if encoding not in broken_incremental_coders:
                # check incremental decoder/encoder (fetched via the C API)
                try:
                    cencoder = codec_incrementalencoder(encoding)
                except LookupError:  # no IncrementalEncoder
                    pass
                else:
                    # check C API
                    encodedresult = b""
                    for c in s:
                        encodedresult += cencoder.encode(c)
                    encodedresult += cencoder.encode("", True)
                    cdecoder = codec_incrementaldecoder(encoding)
                    decodedresult = ""
                    for c in encodedresult:
                        decodedresult += cdecoder.decode(bytes([c]))
                    decodedresult += cdecoder.decode(b"", True)
                    self.assertEqual(decodedresult, s,
                                     "encoding=%r" % encoding)

                if encoding not in ("idna", "mbcs"):
                    # check incremental decoder/encoder with errors argument
                    try:
                        cencoder = codec_incrementalencoder(encoding, "ignore")
                    except LookupError:  # no IncrementalEncoder
                        pass
                    else:
                        encodedresult = b"".join(cencoder.encode(c) for c in s)
                        cdecoder = codec_incrementaldecoder(encoding, "ignore")
                        decodedresult = "".join(cdecoder.decode(bytes([c]))
                                                for c in encodedresult)
                        self.assertEqual(decodedresult, s,
                                         "encoding=%r" % encoding)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_seek(self):
        # all codecs should be able to encode these
        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
        for encoding in all_unicode_encodings:
            if encoding == "idna": # FIXME: See SF bug #1163178
                continue
            if encoding in broken_unicode_with_streams:
                continue
            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
            for t in range(5):
                # Test that calling seek resets the internal codec state and buffers
                reader.seek(0, 0)
                data = reader.read()
                self.assertEqual(s, data)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_bad_decode_args(self):
        for encoding in all_unicode_encodings:
            decoder = codecs.getdecoder(encoding)
            self.assertRaises(TypeError, decoder)
            if encoding not in ("idna", "punycode"):
                self.assertRaises(TypeError, decoder, 42)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy

        if ":" in host:
            addr_bytes = socket.inet_pton(socket.AF_INET6, host)
            file.write(b"\x04" + addr_bytes)
        elif check_ip_valid(host):
            addr_bytes = socket.inet_pton(socket.AF_INET, host)
            file.write(b"\x01" + addr_bytes)
        else:
            if rdns:
                # Resolve remotely
                host_bytes = host.encode('idna')
                file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
            else:
                # Resolve locally
                addr_bytes = socket.inet_aton(socket.gethostbyname(host))
                file.write(b"\x01" + addr_bytes)
                host = socket.inet_ntoa(addr_bytes)

        file.write(struct.pack(">H", port))
        return host, port
项目:XX-Net-Mini    作者:GFWParty    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy

        if ":" in host:
            addr_bytes = socket.inet_pton(socket.AF_INET6, host)
            file.write(b"\x04" + addr_bytes)
        elif check_ip_valid(host):
            addr_bytes = socket.inet_pton(socket.AF_INET, host)
            file.write(b"\x01" + addr_bytes)
        else:
            if rdns:
                # Resolve remotely
                host_bytes = host.encode('idna')
                file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
            else:
                # Resolve locally
                addr_bytes = socket.inet_aton(socket.gethostbyname(host))
                file.write(b"\x01" + addr_bytes)
                host = socket.inet_ntoa(addr_bytes)

        file.write(struct.pack(">H", port))
        return host, port
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def unicode_to_ascii_authority(authority):
    """
    Follows the steps in RFC 3490, Section 4 to convert a unicode authority
    string into its ASCII equivalent.
    For example, u'www.Alliancefran\xe7aise.nu' will be converted into
    'www.xn--alliancefranaise-npb.nu'

    Args:
        authority: unicode string, the URL authority component to convert,
            e.g. u'www.Alliancefran\xe7aise.nu'

    Returns:
        string: the US-ASCII character equivalent to the inputed authority,
             e.g. 'www.xn--alliancefranaise-npb.nu'

    Raises:
        Exception: if the function is not able to convert the inputed
            authority

    @author: Jonathan Benn
    """
    # RFC 3490, Section 4, Step 1
    # The encodings.idna Python module assumes that AllowUnassigned == True

    # RFC 3490, Section 4, Step 2
    labels = label_split_regex.split(authority)

    # RFC 3490, Section 4, Step 3
    # The encodings.idna Python module assumes that UseSTD3ASCIIRules == False

    # RFC 3490, Section 4, Step 4
    # We use the ToASCII operation because we are about to put the authority
    # into an IDN-unaware slot
    asciiLabels = []
    import encodings.idna
    for label in labels:
        if label:
            asciiLabels.append(to_native(encodings.idna.ToASCII(label)))
        else:
            # encodings.idna.ToASCII does not accept an empty string, but
            # it is necessary for us to allow for empty labels so that we
            # don't modify the URL
            asciiLabels.append('')
    # RFC 3490, Section 4, Step 5
    return str(reduce(lambda x, y: x + unichr(0x002E) + y, asciiLabels))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _negotiate_SOCKS4(self, dest_addr, dest_port):
        """
        Negotiates a connection through a SOCKS4 server.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        writer = self.makefile("wb")
        reader = self.makefile("rb", 0)  # buffering=0 renamed in Python 3
        try:
            # Check if the destination address provided is an IP address
            remote_resolve = False
            try:
                addr_bytes = socket.inet_aton(dest_addr)
            except socket.error:
                # It's a DNS name. Check where it should be resolved.
                if rdns:
                    addr_bytes = b"\x00\x00\x00\x01"
                    remote_resolve = True
                else:
                    addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr))

            # Construct the request packet
            writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port))
            writer.write(addr_bytes)

            # The username parameter is considered userid for SOCKS4
            if username:
                writer.write(username)
            writer.write(b"\x00")

            # DNS name if remote resolving is required
            # NOTE: This is actually an extension to the SOCKS4 protocol
            # called SOCKS4A and may not be supported in all cases.
            if remote_resolve:
                writer.write(dest_addr.encode('idna') + b"\x00")
            writer.flush()

            # Get the response from the server
            resp = self._readall(reader, 8)
            if resp[0:1] != b"\x00":
                # Bad data
                raise GeneralProxyError("SOCKS4 proxy server sent invalid data")

            status = ord(resp[1:2])
            if status != 0x5A:
                # Connection failed: server returned an error
                error = SOCKS4_ERRORS.get(status, "Unknown error")
                raise SOCKS4Error("{0:#04x}: {1}".format(status, error))

            # Get the bound address/port
            self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
            if remote_resolve:
                self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port
            else:
                self.proxy_peername = dest_addr, dest_port
        finally:
            reader.close()
            writer.close()
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _negotiate_HTTP(self, dest_addr, dest_port):
        """
        Negotiates a connection through an HTTP server.
        NOTE: This currently only supports HTTP CONNECT-style proxies.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        # If we need to resolve locally, we do this now
        addr = dest_addr if rdns else socket.gethostbyname(dest_addr)

        self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() +
                     b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n")

        # We just need the first line to check if the connection was successful
        fobj = self.makefile()
        status_line = fobj.readline()
        fobj.close()

        if not status_line:
            raise GeneralProxyError("Connection closed unexpectedly")

        try:
            proto, status_code, status_msg = status_line.split(" ", 2)
        except ValueError:
            raise GeneralProxyError("HTTP proxy server sent invalid response")

        if not proto.startswith("HTTP/"):
            raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy")

        try:
            status_code = int(status_code)
        except ValueError:
            raise HTTPError("HTTP proxy server did not return a valid HTTP status")

        if status_code != 200:
            error = "{0}: {1}".format(status_code, status_msg)
            if status_code in (400, 403, 405):
                # It's likely that the HTTP proxy server does not support the CONNECT tunneling method
                error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks"
                          " (must be a CONNECT tunnel proxy)")
            raise HTTPError(error)

        self.proxy_sockname = (b"0.0.0.0", 0)
        self.proxy_peername = addr, dest_port
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _negotiate_SOCKS4(self, dest_addr, dest_port):
        """
        Negotiates a connection through a SOCKS4 server.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        writer = self.makefile("wb")
        reader = self.makefile("rb", 0)  # buffering=0 renamed in Python 3
        try:
            # Check if the destination address provided is an IP address
            remote_resolve = False
            try:
                addr_bytes = socket.inet_aton(dest_addr)
            except socket.error:
                # It's a DNS name. Check where it should be resolved.
                if rdns:
                    addr_bytes = b"\x00\x00\x00\x01"
                    remote_resolve = True
                else:
                    addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr))

            # Construct the request packet
            writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port))
            writer.write(addr_bytes)

            # The username parameter is considered userid for SOCKS4
            if username:
                writer.write(username)
            writer.write(b"\x00")

            # DNS name if remote resolving is required
            # NOTE: This is actually an extension to the SOCKS4 protocol
            # called SOCKS4A and may not be supported in all cases.
            if remote_resolve:
                writer.write(dest_addr.encode('idna') + b"\x00")
            writer.flush()

            # Get the response from the server
            resp = self._readall(reader, 8)
            if resp[0:1] != b"\x00":
                # Bad data
                raise GeneralProxyError("SOCKS4 proxy server sent invalid data")

            status = ord(resp[1:2])
            if status != 0x5A:
                # Connection failed: server returned an error
                error = SOCKS4_ERRORS.get(status, "Unknown error")
                raise SOCKS4Error("{0:#04x}: {1}".format(status, error))

            # Get the bound address/port
            self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
            if remote_resolve:
                self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port
            else:
                self.proxy_peername = dest_addr, dest_port
        finally:
            reader.close()
            writer.close()
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _negotiate_HTTP(self, dest_addr, dest_port):
        """
        Negotiates a connection through an HTTP server.
        NOTE: This currently only supports HTTP CONNECT-style proxies.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        # If we need to resolve locally, we do this now
        addr = dest_addr if rdns else socket.gethostbyname(dest_addr)

        self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() +
                     b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n")

        # We just need the first line to check if the connection was successful
        fobj = self.makefile()
        status_line = fobj.readline()
        fobj.close()

        if not status_line:
            raise GeneralProxyError("Connection closed unexpectedly")

        try:
            proto, status_code, status_msg = status_line.split(" ", 2)
        except ValueError:
            raise GeneralProxyError("HTTP proxy server sent invalid response")

        if not proto.startswith("HTTP/"):
            raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy")

        try:
            status_code = int(status_code)
        except ValueError:
            raise HTTPError("HTTP proxy server did not return a valid HTTP status")

        if status_code != 200:
            error = "{0}: {1}".format(status_code, status_msg)
            if status_code in (400, 403, 405):
                # It's likely that the HTTP proxy server does not support the CONNECT tunneling method
                error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks"
                          " (must be a CONNECT tunnel proxy)")
            raise HTTPError(error)

        self.proxy_sockname = (b"0.0.0.0", 0)
        self.proxy_peername = addr, dest_port
项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def valid_url(self, value):
        match = self.URL_REGEX.match(value)
        if not match:
            return False
        url = match.groupdict()

        if url['scheme'].lower() not in self.schemes:
            return False
        if url['host6']:
            if IPv6Type.valid_ip(url['host6']):
                return url
            else:
                return False
        if url['host4']:
            return url

        try:
            hostname = url['hostn'].encode('ascii').decode('ascii')
        except UnicodeError:
            try:
                hostname = url['hostn'].encode('idna').decode('ascii')
            except UnicodeError:
                return False

        if hostname[-1] == '.':
            hostname = hostname[:-1]
        if len(hostname) > 253:
            return False

        labels = hostname.split('.')
        for label in labels:
            if not 0 < len(label) < 64:
                return False
            if '-' in (label[0], label[-1]):
                return False
        if self.fqdn:
            if len(labels) == 1 \
              or not self.TLD_REGEX.match(labels[-1]):
                return False

        url['hostn_enc'] = hostname

        return url
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def valid_url(self, value):
        match = self.URL_REGEX.match(value)
        if not match:
            return False
        url = match.groupdict()

        if url['scheme'].lower() not in self.schemes:
            return False
        if url['host6']:
            if IPv6Type.valid_ip(url['host6']):
                return url
            else:
                return False
        if url['host4']:
            return url

        try:
            hostname = url['hostn'].encode('ascii').decode('ascii')
        except UnicodeError:
            try:
                hostname = url['hostn'].encode('idna').decode('ascii')
            except UnicodeError:
                return False

        if hostname[-1] == '.':
            hostname = hostname[:-1]
        if len(hostname) > 253:
            return False

        labels = hostname.split('.')
        for label in labels:
            if not 0 < len(label) < 64:
                return False
            if '-' in (label[0], label[-1]):
                return False
        if self.fqdn:
            if len(labels) == 1 \
              or not self.TLD_REGEX.match(labels[-1]):
                return False

        url['hostn_enc'] = hostname

        return url
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def valid_url(self, value):
        match = self.URL_REGEX.match(value)
        if not match:
            return False
        url = match.groupdict()

        if url['scheme'].lower() not in self.schemes:
            return False
        if url['host6']:
            if IPv6Type.valid_ip(url['host6']):
                return url
            else:
                return False
        if url['host4']:
            return url

        try:
            hostname = url['hostn'].encode('ascii').decode('ascii')
        except UnicodeError:
            try:
                hostname = url['hostn'].encode('idna').decode('ascii')
            except UnicodeError:
                return False

        if hostname[-1] == '.':
            hostname = hostname[:-1]
        if len(hostname) > 253:
            return False

        labels = hostname.split('.')
        for label in labels:
            if not 0 < len(label) < 64:
                return False
            if '-' in (label[0], label[-1]):
                return False
        if self.fqdn:
            if len(labels) == 1 \
              or not self.TLD_REGEX.match(labels[-1]):
                return False

        url['hostn_enc'] = hostname

        return url
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _negotiate_SOCKS4(self, dest_addr, dest_port):
        """
        Negotiates a connection through a SOCKS4 server.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        writer = self.makefile("wb")
        reader = self.makefile("rb", 0)  # buffering=0 renamed in Python 3
        try:
            # Check if the destination address provided is an IP address
            remote_resolve = False
            try:
                addr_bytes = socket.inet_aton(dest_addr)
            except socket.error:
                # It's a DNS name. Check where it should be resolved.
                if rdns:
                    addr_bytes = b"\x00\x00\x00\x01"
                    remote_resolve = True
                else:
                    addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr))

            # Construct the request packet
            writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port))
            writer.write(addr_bytes)

            # The username parameter is considered userid for SOCKS4
            if username:
                writer.write(username)
            writer.write(b"\x00")

            # DNS name if remote resolving is required
            # NOTE: This is actually an extension to the SOCKS4 protocol
            # called SOCKS4A and may not be supported in all cases.
            if remote_resolve:
                writer.write(dest_addr.encode('idna') + b"\x00")
            writer.flush()

            # Get the response from the server
            resp = self._readall(reader, 8)
            if resp[0:1] != b"\x00":
                # Bad data
                raise GeneralProxyError("SOCKS4 proxy server sent invalid data")

            status = ord(resp[1:2])
            if status != 0x5A:
                # Connection failed: server returned an error
                error = SOCKS4_ERRORS.get(status, "Unknown error")
                raise SOCKS4Error("{0:#04x}: {1}".format(status, error))

            # Get the bound address/port
            self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
            if remote_resolve:
                self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port
            else:
                self.proxy_peername = dest_addr, dest_port
        finally:
            reader.close()
            writer.close()
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _negotiate_HTTP(self, dest_addr, dest_port):
        """
        Negotiates a connection through an HTTP server.
        NOTE: This currently only supports HTTP CONNECT-style proxies.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        # If we need to resolve locally, we do this now
        addr = dest_addr if rdns else socket.gethostbyname(dest_addr)

        self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() +
                     b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n")

        # We just need the first line to check if the connection was successful
        fobj = self.makefile()
        status_line = fobj.readline()
        fobj.close()

        if not status_line:
            raise GeneralProxyError("Connection closed unexpectedly")

        try:
            proto, status_code, status_msg = status_line.split(" ", 2)
        except ValueError:
            raise GeneralProxyError("HTTP proxy server sent invalid response")

        if not proto.startswith("HTTP/"):
            raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy")

        try:
            status_code = int(status_code)
        except ValueError:
            raise HTTPError("HTTP proxy server did not return a valid HTTP status")

        if status_code != 200:
            error = "{0}: {1}".format(status_code, status_msg)
            if status_code in (400, 403, 405):
                # It's likely that the HTTP proxy server does not support the CONNECT tunneling method
                error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks"
                          " (must be a CONNECT tunnel proxy)")
            raise HTTPError(error)

        self.proxy_sockname = (b"0.0.0.0", 0)
        self.proxy_peername = addr, dest_port
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _negotiate_SOCKS4(self, dest_addr, dest_port):
        """
        Negotiates a connection through a SOCKS4 server.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        writer = self.makefile("wb")
        reader = self.makefile("rb", 0)  # buffering=0 renamed in Python 3
        try:
            # Check if the destination address provided is an IP address
            remote_resolve = False
            try:
                addr_bytes = socket.inet_aton(dest_addr)
            except socket.error:
                # It's a DNS name. Check where it should be resolved.
                if rdns:
                    addr_bytes = b"\x00\x00\x00\x01"
                    remote_resolve = True
                else:
                    addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr))

            # Construct the request packet
            writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port))
            writer.write(addr_bytes)

            # The username parameter is considered userid for SOCKS4
            if username:
                writer.write(username)
            writer.write(b"\x00")

            # DNS name if remote resolving is required
            # NOTE: This is actually an extension to the SOCKS4 protocol
            # called SOCKS4A and may not be supported in all cases.
            if remote_resolve:
                writer.write(dest_addr.encode('idna') + b"\x00")
            writer.flush()

            # Get the response from the server
            resp = self._readall(reader, 8)
            if resp[0:1] != b"\x00":
                # Bad data
                raise GeneralProxyError("SOCKS4 proxy server sent invalid data")

            status = ord(resp[1:2])
            if status != 0x5A:
                # Connection failed: server returned an error
                error = SOCKS4_ERRORS.get(status, "Unknown error")
                raise SOCKS4Error("{0:#04x}: {1}".format(status, error))

            # Get the bound address/port
            self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
            if remote_resolve:
                self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port
            else:
                self.proxy_peername = dest_addr, dest_port
        finally:
            reader.close()
            writer.close()
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _negotiate_HTTP(self, dest_addr, dest_port):
        """
        Negotiates a connection through an HTTP server.
        NOTE: This currently only supports HTTP CONNECT-style proxies.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        # If we need to resolve locally, we do this now
        addr = dest_addr if rdns else socket.gethostbyname(dest_addr)

        self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() +
                     b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n")

        # We just need the first line to check if the connection was successful
        fobj = self.makefile()
        status_line = fobj.readline()
        fobj.close()

        if not status_line:
            raise GeneralProxyError("Connection closed unexpectedly")

        try:
            proto, status_code, status_msg = status_line.split(" ", 2)
        except ValueError:
            raise GeneralProxyError("HTTP proxy server sent invalid response")

        if not proto.startswith("HTTP/"):
            raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy")

        try:
            status_code = int(status_code)
        except ValueError:
            raise HTTPError("HTTP proxy server did not return a valid HTTP status")

        if status_code != 200:
            error = "{0}: {1}".format(status_code, status_msg)
            if status_code in (400, 403, 405):
                # It's likely that the HTTP proxy server does not support the CONNECT tunneling method
                error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks"
                          " (must be a CONNECT tunnel proxy)")
            raise HTTPError(error)

        self.proxy_sockname = (b"0.0.0.0", 0)
        self.proxy_peername = addr, dest_port
项目:XX-Net-Mini    作者:GFWParty    | 项目源码 | 文件源码
def _negotiate_SOCKS4(self, dest_addr, dest_port):
        """
        Negotiates a connection through a SOCKS4 server.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        writer = self.makefile("wb")
        reader = self.makefile("rb", 0)  # buffering=0 renamed in Python 3
        try:
            # Check if the destination address provided is an IP address
            remote_resolve = False
            try:
                addr_bytes = socket.inet_aton(dest_addr)
            except socket.error:
                # It's a DNS name. Check where it should be resolved.
                if rdns:
                    addr_bytes = b"\x00\x00\x00\x01"
                    remote_resolve = True
                else:
                    addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr))

            # Construct the request packet
            writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port))
            writer.write(addr_bytes)

            # The username parameter is considered userid for SOCKS4
            if username:
                writer.write(username)
            writer.write(b"\x00")

            # DNS name if remote resolving is required
            # NOTE: This is actually an extension to the SOCKS4 protocol
            # called SOCKS4A and may not be supported in all cases.
            if remote_resolve:
                writer.write(dest_addr.encode('idna') + b"\x00")
            writer.flush()

            # Get the response from the server
            resp = self._readall(reader, 8)
            if resp[0:1] != b"\x00":
                # Bad data
                raise GeneralProxyError("SOCKS4 proxy server sent invalid data")

            status = ord(resp[1:2])
            if status != 0x5A:
                # Connection failed: server returned an error
                error = SOCKS4_ERRORS.get(status, "Unknown error")
                raise SOCKS4Error("{0:#04x}: {1}".format(status, error))

            # Get the bound address/port
            self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
            if remote_resolve:
                self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port
            else:
                self.proxy_peername = dest_addr, dest_port
        finally:
            reader.close()
            writer.close()
项目:XX-Net-Mini    作者:GFWParty    | 项目源码 | 文件源码
def _negotiate_HTTP(self, dest_addr, dest_port):
        """
        Negotiates a connection through an HTTP server.
        NOTE: This currently only supports HTTP CONNECT-style proxies.
        """
        proxy_type, addr, port, rdns, username, password = self.proxy

        # If we need to resolve locally, we do this now
        addr = dest_addr if rdns else socket.gethostbyname(dest_addr)

        self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() +
                     b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n")

        # We just need the first line to check if the connection was successful
        fobj = self.makefile()
        status_line = fobj.readline()
        fobj.close()

        if not status_line:
            raise GeneralProxyError("Connection closed unexpectedly")

        try:
            proto, status_code, status_msg = status_line.split(" ", 2)
        except ValueError:
            raise GeneralProxyError("HTTP proxy server sent invalid response")

        if not proto.startswith("HTTP/"):
            raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy")

        try:
            status_code = int(status_code)
        except ValueError:
            raise HTTPError("HTTP proxy server did not return a valid HTTP status")

        if status_code != 200:
            error = "{0}: {1}".format(status_code, status_msg)
            if status_code in (400, 403, 405):
                # It's likely that the HTTP proxy server does not support the CONNECT tunneling method
                error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks"
                          " (must be a CONNECT tunnel proxy)")
            raise HTTPError(error)

        self.proxy_sockname = (b"0.0.0.0", 0)
        self.proxy_peername = addr, dest_port