Python http.client 模块,HTTPSConnection() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用http.client.HTTPSConnection()

项目:bitcoin-arbitrage    作者:ucfyao    | 项目源码 | 文件源码
def httpPost(url,resource,params):
    headers = {
        "Content-type" : "application/x-www-form-urlencoded",
    }
    try :
        conn = httplib.HTTPSConnection(url, timeout=10)
        temp_params = urllib.parse.urlencode(params)
        conn.request("POST", resource, temp_params, headers)
        response = conn.getresponse()
        data = response.read().decode('utf-8')
        params.clear()
        conn.close()
        return data
    except:
    # except Exception,e:  
        # print(Exception,":",e)
        traceback.print_exc()
        return False
项目:SmartSocks    作者:waylybaye    | 项目源码 | 文件源码
def validate_optional_args(args):
    """Check if an argument was provided that depends on a module that may
    not be part of the Python standard library.

    If such an argument is supplied, and the module does not exist, exit
    with an error stating which module is missing.
    """
    optional_args = {
        'json': ('json/simplejson python module', json),
        'secure': ('SSL support', HTTPSConnection),
    }

    for arg, info in optional_args.items():
        if getattr(args, arg, False) and info[1] is None:
            raise SystemExit('%s is not installed. --%s is '
                             'unavailable' % (info[0], arg))
项目:azure-search-ta    作者:yokawasa    | 项目源码 | 文件源码
def textanalyze(self,index_name, analyzer, text):
        # Create JSON string for request body
        reqobject={}
        reqobject['text'] = text
        reqobject['analyzer'] = analyzer
        io=StringIO()
        json.dump(reqobject, io)
        req_body = io.getvalue()
        # HTTP request to Azure search REST API
        conn = httplib.HTTPSConnection(self.__api_url)
        conn.request("POST",
                u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION),
                req_body, self.headers)
        response = conn.getresponse()
        #print "status:", response.status, response.reason
        data = (response.read()).decode('utf-8')
        #print("data:{}".format(data))
        conn.close()
        return data
项目:whatsapp-rest-webservice    作者:svub    | 项目源码 | 文件源码
def sendRequest(host, port, path, headers, params, reqType="GET"):

        params = urlencode(params)


        path = path + "?"+ params if reqType == "GET" and params else path

        if len(headers):
            logger.debug(headers)
        if len(params):
            logger.debug(params)

        logger.debug("Opening connection to %s" % host);
        conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port)

        logger.debug("Sending %s request to %s" % (reqType, path))
        conn.request(reqType, path, params, headers);

        response = conn.getresponse()
        return response
项目:imcsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def connect(self):
        """Overrides HTTPSConnection.connect to specify TLS version"""
        # Standard implementation from HTTPSConnection, which is not
        # designed for extension, unfortunately
        if sys.version_info >= (2, 7):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)
        elif sys.version_info >= (2, 6):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout)
        else:
            sock = socket.create_connection((self.host, self.port))

        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        # This is the only difference; default wrap_socket uses SSLv23
        self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                    ssl_version=ssl.PROTOCOL_TLSv1)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __getattr__(self, name):
            # Anything that can't be found on this instance is presumably a
            # property of underlying connection object.
            # We need to be a little bit careful here. There are a few methods
            # that can act on a HTTPSConnection before it actually connects to
            # the remote server. We don't want to change the semantics of the,
            # HTTPSConnection so we need to spot these and queue them up. When
            # we actually create the backing Connection, we'll apply them
            # immediately. These methods can't throw exceptions, so we should
            # be fine.
            delay_methods = ["set_tunnel", "set_debuglevel"]

            if self._conn is None and name in delay_methods:
                # Return a little closure that saves off the method call to
                # apply later.
                def capture(obj, *args, **kwargs):
                    self._call_queue.append((name, args, kwargs))
                return capture
            elif self._conn is None:
                # We're being told to do something! We can now connect to the
                # remote server and build the connection object.
                self._delayed_connect()

            # Call through to the underlying object.
            return getattr(self._conn, name)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __getattr__(self, name):
            # Anything that can't be found on this instance is presumably a
            # property of underlying connection object.
            # We need to be a little bit careful here. There are a few methods
            # that can act on a HTTPSConnection before it actually connects to
            # the remote server. We don't want to change the semantics of the,
            # HTTPSConnection so we need to spot these and queue them up. When
            # we actually create the backing Connection, we'll apply them
            # immediately. These methods can't throw exceptions, so we should
            # be fine.
            delay_methods = ["set_tunnel", "set_debuglevel"]

            if self._conn is None and name in delay_methods:
                # Return a little closure that saves off the method call to
                # apply later.
                def capture(obj, *args, **kwargs):
                    self._call_queue.append((name, args, kwargs))
                return capture
            elif self._conn is None:
                # We're being told to do something! We can now connect to the
                # remote server and build the connection object.
                self._delayed_connect()

            # Call through to the underlying object.
            return getattr(self._conn, name)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_local_bad_hostname(self):
        # The (valid) cert doesn't validate the HTTP hostname
        import ssl
        from test.ssl_servers import make_https_server
        server = make_https_server(self, CERT_fakehostname)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_fakehostname)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # Same with explicit check_hostname=True
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=True)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # With check_hostname=False, the mismatching is ignored
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=False)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)
项目:crop    作者:jbrower95    | 项目源码 | 文件源码
def checkUpdate():
        try:
            conn = httplib.HTTPSConnection("raw.githubusercontent.com", 443)
            conn.request("GET", "/JonathanSalwan/ROPgadget/master/ropgadget/version.py")
        except:
            print("Can't connect to raw.githubusercontent.com")
            return
        d = conn.getresponse().read()
        majorVersion = re.search("MAJOR_VERSION.+=.+(?P<value>[\d])", d).group("value")
        minorVersion = re.search("MINOR_VERSION.+=.+(?P<value>[\d])", d).group("value")
        webVersion = int("%s%s" %(majorVersion, minorVersion))
        curVersion = int("%s%s" %(MAJOR_VERSION, MINOR_VERSION))
        if webVersion > curVersion:
            print("The version %s.%s is available. Currently, you use the version %d.%d." %(majorVersion, minorVersion, MAJOR_VERSION, MINOR_VERSION))
        else:
            print("Your version is up-to-date.")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_local_bad_hostname(self):
        # The (valid) cert doesn't validate the HTTP hostname
        import ssl
        server = self.make_server(CERT_fakehostname)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_fakehostname)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # Same with explicit check_hostname=True
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=True)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # With check_hostname=False, the mismatching is ignored
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=False)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)
        del server
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def __getattr__(self, name):
            # Anything that can't be found on this instance is presumably a
            # property of underlying connection object.
            # We need to be a little bit careful here. There are a few methods
            # that can act on a HTTPSConnection before it actually connects to
            # the remote server. We don't want to change the semantics of the,
            # HTTPSConnection so we need to spot these and queue them up. When
            # we actually create the backing Connection, we'll apply them
            # immediately. These methods can't throw exceptions, so we should
            # be fine.
            delay_methods = ["set_tunnel", "set_debuglevel"]

            if self._conn is None and name in delay_methods:
                # Return a little closure that saves off the method call to
                # apply later.
                def capture(obj, *args, **kwargs):
                    self._call_queue.append((name, args, kwargs))
                return capture
            elif self._conn is None:
                # We're being told to do something! We can now connect to the
                # remote server and build the connection object.
                self._delayed_connect()

            # Call through to the underlying object.
            return getattr(self._conn, name)
项目:crypto-arbitrager    作者:artooze    | 项目源码 | 文件源码
def httpPost(url,resource,params):
    headers = {
        "Content-type" : "application/x-www-form-urlencoded",
    }
    try :
        conn = httplib.HTTPSConnection(url, timeout=10)
        temp_params = urllib.parse.urlencode(params)
        conn.request("POST", resource, temp_params, headers)
        response = conn.getresponse()
        data = response.read().decode('utf-8')
        params.clear()
        conn.close()
        return data
    except:
    # except Exception,e:  
        # print(Exception,":",e)
        traceback.print_exc()
        return False
项目:ucscsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def connect(self):
        """Overrides HTTPSConnection.connect to specify TLS version"""
        # Standard implementation from HTTPSConnection, which is not
        # designed for extension, unfortunately
        if sys.version_info >= (2, 7):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)
        elif sys.version_info >= (2, 6):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout)
        else:
            sock = socket.create_connection((self.host, self.port))

        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        # This is the only difference; default wrap_socket uses SSLv23
        self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                    ssl_version=ssl.PROTOCOL_TLSv1)
项目:sample-code-cmx    作者:CiscoDevNet    | 项目源码 | 文件源码
def get_json_response(server, api, username, password):
        """
        Returns the response from the URL specified
        """
        try:
            # lib opener
            response = {}
            context = ssl._create_unverified_context()
            conn = HTTPSConnection(server, context=context)
            auth = str.encode("%s:%s" % (username, password))
            user_and_pass = b64encode(auth).decode("ascii")
            headers = {'Authorization': 'Basic %s' % user_and_pass, "Accept": 'application/json'}
            conn.request('GET', api, headers=headers)
            res = conn.getresponse()
            bit_data = res.read()
            string_data = bit_data.decode(encoding='UTF-8')
            response['data'] = string_data
            response['status'] = 200
        except:
            print("--Unexpected error:", sys.exc_info()[1])
            response['data'] = sys.exc_info()[1]
            response['status'] = 400

        return response
项目:L.E.S.M.A    作者:NatanaelAntonioli    | 项目源码 | 文件源码
def validate_optional_args(args):
    """Check if an argument was provided that depends on a module that may
    not be part of the Python standard library.

    If such an argument is supplied, and the module does not exist, exit
    with an error stating which module is missing.
    """
    optional_args = {
        'json': ('json/simplejson python module', json),
        'secure': ('SSL support', HTTPSConnection),
    }

    for arg, info in optional_args.items():
        if getattr(args, arg, False) and info[1] is None:
            raise SystemExit('%s is not installed. --%s is '
                             'unavailable' % (info[0], arg))
项目:yowsup    作者:colonyhq    | 项目源码 | 文件源码
def sendRequest(host, port, path, headers, params, reqType="GET"):

        params = urlencode(params)


        path = path + "?"+ params if reqType == "GET" and params else path

        if len(headers):
            logger.debug(headers)
        if len(params):
            logger.debug(params)

        logger.debug("Opening connection to %s" % host);
        conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port)

        logger.debug("Sending %s request to %s" % (reqType, path))
        conn.request(reqType, path, params, headers);

        response = conn.getresponse()
        return response
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_local_bad_hostname(self):
        # The (valid) cert doesn't validate the HTTP hostname
        import ssl
        server = self.make_server(CERT_fakehostname)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_fakehostname)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # Same with explicit check_hostname=True
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=True)
        with self.assertRaises(ssl.CertificateError):
            h.request('GET', '/')
        # With check_hostname=False, the mismatching is ignored
        h = client.HTTPSConnection('localhost', server.port, context=context,
                                   check_hostname=False)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)
        del server
项目:irwin    作者:clarkerubber    | 项目源码 | 文件源码
def http(method, url, body=None, headers=None):
    url_info = urlparse.urlparse(url)
    if url_info.scheme == "https":
        con = httplib.HTTPSConnection(url_info.hostname, url_info.port or 443)
    else:
        con = httplib.HTTPConnection(url_info.hostname, url_info.port or 80)

    con.request(method, url_info.path, body, headers)
    response = con.getresponse()

    try:
        if 400 <= response.status < 500:
            raise HttpClientError(response.status, response.reason,
                                  response.read())
        elif 500 <= response.status < 600:
            raise HttpServerError(response.status, response.reason,
                                  response.read())
        else:
            yield response
    finally:
        con.close()
项目:rune    作者:hoonkim    | 项目源码 | 文件源码
def wisp_callback(url):
    """
    This returns current server's time of given url.

    :param url: url. (e.g. www.example.com, http://www.example.com, https://www.example.com:8080/test)
    :return: Date and time or the server.
    """
    parsed_url = urlparse(url)

    protocol = parsed_url.scheme or "http"
    port = parsed_url.port or (80 if protocol == "http" else 443)
    path = parsed_url.path or "/"
    if parsed_url.hostname:
        url = parsed_url.hostname
    else:
        path = "/"
        url = parsed_url.path
    request = "GET"

    conn = (HTTPConnection if protocol == "http" else HTTPSConnection)(url, port)
    conn.request(request, path)
    response = conn.getresponse()
    time = response.getheader("Date")

    return time
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def __getattr__(self, name):
            # Anything that can't be found on this instance is presumably a
            # property of underlying connection object.
            # We need to be a little bit careful here. There are a few methods
            # that can act on a HTTPSConnection before it actually connects to
            # the remote server. We don't want to change the semantics of the,
            # HTTPSConnection so we need to spot these and queue them up. When
            # we actually create the backing Connection, we'll apply them
            # immediately. These methods can't throw exceptions, so we should
            # be fine.
            delay_methods = ["set_tunnel", "set_debuglevel"]

            if self._conn is None and name in delay_methods:
                # Return a little closure that saves off the method call to
                # apply later.
                def capture(obj, *args, **kwargs):
                    self._call_queue.append((name, args, kwargs))
                return capture
            elif self._conn is None:
                # We're being told to do something! We can now connect to the
                # remote server and build the connection object.
                self._delayed_connect()

            # Call through to the underlying object.
            return getattr(self._conn, name)
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def ise_m_activecount(ip,username,password,port=443):

    #This sets up the https connection
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)#ssl???????
    context.verify_mode = ssl.CERT_NONE#CERT_NONE, CERT_OPTIONAL or CERT_REQUIRED???????????
    context.load_verify_locations('/usr/share/kde4/apps/kssl/ca-bundle.crt')#?????
    c = HTTPSConnection(ip, port=port, context=context)
    user_pass_str = username + ':' + password
    user_pass_str_encode = user_pass_str.encode()
    userAndPass = b64encode(user_pass_str_encode).decode("ascii")
    headers = { 'Authorization' : 'Basic %s' %  userAndPass }
    c.request('GET', '/admin/API/mnt/Session/ActiveCount', headers=headers)
    res = c.getresponse()
    data = res.read()
    root = XML(data.decode())
    activecount = root.find('count').text
    return activecount
项目:Speedtest.bundle    作者:Twoure    | 项目源码 | 文件源码
def validate_optional_args(args):
    """Check if an argument was provided that depends on a module that may
    not be part of the Python standard library.

    If such an argument is supplied, and the module does not exist, exit
    with an error stating which module is missing.
    """
    optional_args = {
        'json': ('json/simplejson python module', json),
        'secure': ('SSL support', HTTPSConnection),
    }

    for arg, info in optional_args.items():
        if getattr(args, arg, False) and info[1] is None:
            raise SystemExit('%s is not installed. --%s is '
                             'unavailable' % (info[0], arg))
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def https_connection(self):
        """Return an https connection to this Connection's endpoint.

        Returns a 3-tuple containing::

            1. The :class:`HTTPSConnection` instance
            2. Dictionary of auth headers to be used with the connection
            3. The root url path (str) to be used for requests.

        """
        endpoint = self.endpoint
        host, remainder = endpoint.split(':', 1)
        port = remainder
        if '/' in remainder:
            port, _ = remainder.split('/', 1)

        conn = HTTPSConnection(
            host, int(port),
            context=self._get_ssl(self.cacert),
        )

        path = (
            "/model/{}".format(self.uuid)
            if self.uuid else ""
        )
        return conn, self._http_headers(), path
项目:bitcoin-arbitrage    作者:ucfyao    | 项目源码 | 文件源码
def httpGet(url, resource, params=''):
    try :
        conn = httplib.HTTPSConnection(url, timeout=10)
        conn.request("GET",resource + '?' + params)
        response = conn.getresponse()
        data = response.read().decode('utf-8')
        return json.loads(data)
    except:
        return False
项目:k8s.io    作者:kubernetes    | 项目源码 | 文件源码
def do_get(url):
    parsed = urlparse.urlparse(url)
    path = parsed.path
    if parsed.query:
        path = '%s?%s' % (path, parsed.query)
    if parsed.scheme == 'http':
        conn = httplib.HTTPConnection(TARGET_IP)
    elif parsed.scheme == 'https':
        conn = httplib.HTTPSConnection(TARGET_IP)
    conn.request('GET', path, headers={'Host': parsed.netloc})
    resp = conn.getresponse()
    body = resp.read().decode('utf8')
    resp.close()
    conn.close()
    return resp, body
项目:admin-sites    作者:istio    | 项目源码 | 文件源码
def do_get(url):
    parsed = urlparse.urlparse(url)
    path = parsed.path
    if parsed.query:
        path = '%s?%s' % (path, parsed.query)
    if parsed.scheme == 'http':
        conn = httplib.HTTPConnection(TARGET_IP)
    elif parsed.scheme == 'https':
        conn = httplib.HTTPSConnection(TARGET_IP, timeout=8, context=ssl._create_unverified_context())
    conn.request('GET', path, headers={'Host': parsed.netloc})
    resp = conn.getresponse()
    body = resp.read().decode('utf8')
    resp.close()
    conn.close()
    return resp, body
项目:antenna    作者:mozilla-services    | 项目源码 | 文件源码
def http_post(posturl, headers, data):
    parsed = urllib.parse.urlparse(posturl)
    if ':' in parsed.netloc:
        host, port = parsed.netloc.split(':')
    else:
        host = parsed.netloc
        port = '443' if posturl.startswith('https') else '80'

    if posturl.startswith('https'):
        conn = HTTPSConnection(host, int(port))
    else:
        conn = HTTPConnection(host, int(port))
    conn.request('POST', parsed.path, headers=headers, body=data)
    return conn.getresponse()
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def __init__(self, proxytype, proxyaddr, proxyport=None, rdns=True, username=None, password=None, *args, **kwargs):
        self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
        httplib.HTTPSConnection.__init__(self, *args, **kwargs)
项目:supernovae    作者:astrocatalogs    | 项目源码 | 文件源码
def mastQuery(request):
    """Perform a MAST query.

    Parameters
    ----------
    request (dictionary): The Mashup request json object

    Returns head,content where head is the response HTTP headers, and content
    is the returned data.

    """
    server = 'mast.stsci.edu'

    # Grab Python Version
    version = ".".join(map(str, sys.version_info[:3]))

    # Create Http Header Variables
    headers = {"Content-type": "application/x-www-form-urlencoded",
               "Accept": "text/plain",
               "User-agent": "python-requests/" + version}

    # Encoding the request as a json string
    requestString = json.dumps(request)
    requestString = urlencode(requestString)

    # opening the https connection
    conn = httplib.HTTPSConnection(server)

    # Making the query
    conn.request("POST", "/api/v0/invoke", "request=" + requestString, headers)

    # Getting the response
    resp = conn.getresponse()
    head = resp.getheaders()
    content = resp.read().decode('utf-8')

    # Close the https connection
    conn.close()

    return head, content
项目:python-mysql-pool    作者:LuciferJack    | 项目源码 | 文件源码
def get_https_connection(self, host, timeout=300):
            """Returns a HTTPSConnection"""
            return HTTPSConnection(
                host,
                key_file=self._ssl_config['key'],
                cert_file=self._ssl_config['cert']
            )
项目:DualisWatcher    作者:LucaVazz    | 项目源码 | 文件源码
def __init__(self, token = ''):
        self.connection = HTTPSConnection('dualis.dhbw.de')
        self.token = token
        self.stdHeader = {
            'Cookie': 'cnsc=0',
            # The Dualis System assumes by the presence of this field that we are ready to handle
            #   and store cookies
            #  Yeah, right... We definitively do that...
            #  (No, we don't have to. Chrome is also sending cnsc=0 everytime and it works fine)
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
            # copied straight out of Chrome
        }
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def __init__(self , token):
        self.token = token
        self.httpClient = HTTPSConnection(vs.HTTP_URL, vs.HTTP_PORT)
项目:Bethesda_SublimeFO4    作者:Scrivener07    | 项目源码 | 文件源码
def _new_conn(self):
        """
        Return a fresh :class:`httplib.HTTPSConnection`.
        """
        self.num_connections += 1
        log.info("Starting new HTTPS connection (%d): %s"
                 % (self.num_connections, self.host))

        if not ssl: # Platform-specific: Python compiled without +ssl
            if not HTTPSConnection or HTTPSConnection is object:
                raise SSLError("Can't connect to HTTPS URL because the SSL "
                               "module is not available.")

            return HTTPSConnection(host=self.host,
                                   port=self.port,
                                   strict=self.strict)

        connection = VerifiedHTTPSConnection(host=self.host,
                                             port=self.port,
                                             strict=self.strict)
        connection.set_cert(key_file=self.key_file, cert_file=self.cert_file,
                            cert_reqs=self.cert_reqs, ca_certs=self.ca_certs)

        if self.ssl_version is None:
            connection.ssl_version = ssl.PROTOCOL_SSLv23
        else:
            connection.ssl_version = self.ssl_version

        return connection
项目:MailFail    作者:m0rtem    | 项目源码 | 文件源码
def __init__(self, proxytype, proxyaddr, proxyport=None, rdns=True, username=None, password=None, *args, **kwargs):
        self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
        httplib.HTTPSConnection.__init__(self, *args, **kwargs)
项目:imcsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def __init__(self, host, **kwargs):
        httplib.HTTPSConnection.__init__(self, host, **kwargs)
项目:imcsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def __init__(self, host, **kwargs):
        httplib.HTTPSConnection.__init__(self, host, **kwargs)
项目:imcsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def connect(self):
        """Overrides HTTPSConnection.connect to specify TLS version"""
        # Standard implementation from HTTPSConnection, which is not
        # designed for extension, unfortunately
        if sys.version_info >= (2, 7):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)
        elif sys.version_info >= (2, 6):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout)
        else:
            sock = socket.create_connection((self.host, self.port))

        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        if hasattr(ssl, 'SSLContext'):
            # Since python 2.7.9, tls 1.1 and 1.2 are supported via
            # SSLContext
            ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            ssl_context.options |= ssl.OP_NO_SSLv2
            ssl_context.options |= ssl.OP_NO_SSLv3
            if self.key_file and self.cert_file:
                ssl_context.load_cert_chain(keyfile=self.key_file,
                                            certfile=self.cert_file)
            self.sock = ssl_context.wrap_socket(sock)
        else:
            # This is the only difference; default wrap_socket uses SSLv23
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        ssl_version=ssl.PROTOCOL_TLSv1)
项目:importacsv    作者:rasertux    | 项目源码 | 文件源码
def get_https_connection(self, host, timeout=300):
            """Returns a HTTPSConnection"""
            return HTTPSConnection(
                host,
                key_file=self._ssl_config['key'],
                cert_file=self._ssl_config['cert']
            )
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def __init__(self, url, key_file=None, cert_file=None, debug=False):
        """LXD Client.

        :param url: The URL of the LXD server. (e.g. unix:/var/lib/lxd/unix.socket or https://127.0.0.1)
        :type url: ``str``
        :param key_file: The path of the client certificate key file.
        :type key_file: ``str``
        :param cert_file: The path of the client certificate file.
        :type cert_file: ``str``
        :param debug: The debug flag. The request and response are stored in logs when debug is true.
        :type debug: ``bool``
        """
        self.url = url
        self.debug = debug
        self.logs = []
        if url.startswith('https:'):
            self.cert_file = cert_file
            self.key_file = key_file
            parts = generic_urlparse(urlparse(self.url))
            ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
            ctx.load_cert_chain(cert_file, keyfile=key_file)
            self.connection = HTTPSConnection(parts.get('netloc'), context=ctx)
        elif url.startswith('unix:'):
            unix_socket_path = url[len('unix:'):]
            self.connection = UnixHTTPConnection(unix_socket_path)
        else:
            raise LXDClientException('URL scheme must be unix: or https:')
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
            httplib.HTTPSConnection.__init__(self, *args, **kwargs)
            if HAS_SSLCONTEXT:
                self.context = create_default_context()
                if self.cert_file:
                    self.context.load_cert_chain(self.cert_file, self.key_file)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testHTTPSConnectionSourceAddress(self):
        self.conn = client.HTTPSConnection(HOST, self.port,
                source_address=('', self.source_port))
        # We don't test anything here other the constructor not barfing as
        # this code doesn't deal with setting up an active running SSL server
        # for an ssl_wrapped connect() to actually return from.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        if not hasattr(client, 'HTTPSConnection'):
            self.skipTest('ssl support required')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_attributes(self):
        # simple test to check it's storing the timeout
        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
        self.assertEqual(h.timeout, 30)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_networked(self):
        # Default settings: no cert verification is done
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            h = client.HTTPSConnection('svn.python.org', 443)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_networked_good_cert(self):
        # We feed a CA cert that validates the server's cert
        import ssl
        support.requires('network')
        with support.transient_internet('svn.python.org'):
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            context.load_verify_locations(CACERT_svn_python_org)
            h = client.HTTPSConnection('svn.python.org', 443, context=context)
            h.request('GET', '/')
            resp = h.getresponse()
            self._check_svn_python_org(resp)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_local_good_hostname(self):
        # The (valid) cert validates the HTTP hostname
        import ssl
        from test.ssl_servers import make_https_server
        server = make_https_server(self, CERT_localhost)
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(CERT_localhost)
        h = client.HTTPSConnection('localhost', server.port, context=context)
        h.request('GET', '/nonexistent')
        resp = h.getresponse()
        self.assertEqual(resp.status, 404)
项目:Polyglot    作者:UniversalDevicesInc    | 项目源码 | 文件源码
def get_ip_address(self, set_result=False):

        """ Get the bridge ip address from the meethue.com nupnp api """

        connection = httplib.HTTPSConnection('www.meethue.com')
        connection.request('GET', '/api/nupnp')

        logger.info('Connecting to meethue.com/api/nupnp')

        result = connection.getresponse()

        if PY3K:
            data = json.loads(str(result.read(), encoding='utf-8'))
        else:
            result_str = result.read()
            data = json.loads(result_str)

        """ close connection after read() is done, to prevent issues with read() """

        connection.close()

        ip = str(data[0]['internalipaddress'])

        if ip is not '':
            if set_result:
                self.ip = ip

            return ip
        else:
            return False