Python http.client 模块,HTTPConnection() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用http.client.HTTPConnection()

项目:eddie    作者:greenkey    | 项目源码 | 文件源码
def stop(self):
        """Stops the webserver."""
        self._http_on = False

        conn = HTTPConnection(
            self._host + ":" + str(self._port)
        )
        try:
            conn.request("GET", "/shutdown")
        except ConnectionRefusedError:
            pass
        conn.close()
        while self._http_thread.is_alive():
            sleep(0.5)
            self._httpd.server_close()
            self._httpd.socket.close()
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_json(self,url):
        '''
        Retrieve data from the Veneer service at the given url path.

        url: Path to required resource, relative to the root of the Veneer service.
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        if self.protocol=='file':
            text = open(self.prefix+url+self.data_ext).read()
        else:
            conn = hc.HTTPConnection(self.host,port=self.port)
            conn.request('GET',quote(url+self.data_ext))
            resp = conn.getresponse()
            text = resp.read().decode('utf-8')
            #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8')

        text = self._replace_inf(text)
        if PRINT_ALL:
            print(json.loads(text))
            print("")
        return json.loads(text)
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def block_http(whitelist):
    def whitelisted(self, host, *args, **kwargs):
        try:
            string_type = basestring
        except NameError:
            # python3
            string_type = str
        if isinstance(host, string_type) and host not in whitelist:
            logger.warning('Denied HTTP connection to: %s' % host)
            raise MockHttpCall(host)
        logger.debug('Allowed HTTP connection to: %s' % host)
        return self.old(host, *args, **kwargs)

    whitelisted.blockade = True

    if not getattr(httplib.HTTPConnection, 'blockade', False):
        logger.debug('Monkey patching httplib')
        httplib.HTTPConnection.old = httplib.HTTPConnection.__init__
        httplib.HTTPConnection.__init__ = whitelisted
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:whatsapp-rest-webservice    作者:svub    | 项目源码 | 文件源码
def sendRequest(host, port, path, headers, params, reqType="GET"):

        params = urlencode(params)


        path = path + "?"+ params if reqType == "GET" and params else path

        if len(headers):
            logger.debug(headers)
        if len(params):
            logger.debug(params)

        logger.debug("Opening connection to %s" % host);
        conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port)

        logger.debug("Sending %s request to %s" % (reqType, path))
        conn.request(reqType, path, params, headers);

        response = conn.getresponse()
        return response
项目:rheostatic    作者:waylan    | 项目源码 | 文件源码
def assertResponse(self, app, method, url, status=None, headers=None, content=None):
        host, port = 'localhost', 80
        http_client_intercept.install()
        add_wsgi_intercept(host, port, app)
        client = http_lib.HTTPConnection(host, port)
        client.request(method, url)
        response = client.getresponse()

        if status is not None:
            self.assertEqual(response.status, status)

        headers = headers or {}
        for k, v in headers.items():
            self.assertEqual(response.getheader(k), v)

        if content is not None:
            self.assertEqual(response.read(), content)

        client.close()
        remove_wsgi_intercept(host, port)
        http_client_intercept.uninstall()
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ipv6host_header(self):
        # Default host header on IPv6 transaction should wrapped by [] if
        # its actual IPv6 address
        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001::]:81')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001:102A::]')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(socket.error,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))

    # Test lines overflowing the max line size (_MAXLINE in http.client)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_userpass_inurl_w_spaces(self):
        self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
        try:
            userpass = "a b:c d"
            url = "http://{0}@python.org/".format(userpass)
            fakehttp_wrapper = http_client.HTTPConnection
            authorization = ("Authorization: Basic %s\r\n" %
                             b64encode(userpass.encode("ASCII")).decode("ASCII"))
            fp = urlopen(url)
            # The authorization header must be in place
            self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8"))
            self.assertEqual(fp.readline(), b"Hello!")
            self.assertEqual(fp.readline(), b"")
            # the spaces are quoted in URL so no match
            self.assertNotEqual(fp.geturl(), url)
            self.assertEqual(fp.getcode(), 200)
        finally:
            self.unfakehttp()
项目:ShuoshuoMonitor    作者:aploium    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ipv6host_header(self):
        # Default host header on IPv6 transaction should wrapped by [] if
        # its actual IPv6 address
        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001::]:81')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001:102A::]')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_send_updating_file(self):
        def data():
            yield 'data'
            yield None
            yield 'data_two'

        class UpdatingFile():
            mode = 'r'
            d = data()
            def read(self, blocksize=-1):
                return self.d.__next__()

        expected = b'data'

        conn = client.HTTPConnection('example.com')
        sock = FakeSocket("")
        conn.sock = sock
        conn.send(UpdatingFile())
        self.assertEqual(sock.data, expected)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(socket.error,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))

    # Test lines overflowing the max line size (_MAXLINE in http.client)
项目:amazon_order_history_scraper    作者:drewctate    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def block_http(whitelist):
    def whitelisted(self, host, *args, **kwargs):
        try:
            string_type = basestring
        except NameError:
            # python3
            string_type = str
        if isinstance(host, string_type) and host not in whitelist:
            logger.warning('Denied HTTP connection to: %s' % host)
            raise MockHttpCall(host)
        logger.debug('Allowed HTTP connection to: %s' % host)
        return self.old(host, *args, **kwargs)

    whitelisted.blockade = True

    if not getattr(httplib.HTTPConnection, 'blockade', False):
        logger.debug('Monkey patching httplib')
        httplib.HTTPConnection.old = httplib.HTTPConnection.__init__
        httplib.HTTPConnection.__init__ = whitelisted
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ipv6host_header(self):
        # Default host header on IPv6 transaction should wrapped by [] if
        # its actual IPv6 address
        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001::]:81')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001:102A::]')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_send_updating_file(self):
        def data():
            yield 'data'
            yield None
            yield 'data_two'

        class UpdatingFile():
            mode = 'r'
            d = data()
            def read(self, blocksize=-1):
                return self.d.__next__()

        expected = b'data'

        conn = client.HTTPConnection('example.com')
        sock = FakeSocket("")
        conn.sock = sock
        conn.send(UpdatingFile())
        self.assertEqual(sock.data, expected)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(OSError,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))

    # Test lines overflowing the max line size (_MAXLINE in http.client)
项目:tap    作者:mfouesneau    | 项目源码 | 文件源码
def status(self):
        """ Check job status on the server """
        headers = {}
        # add authentication and other cookies to the header
        try:
            location = self.location
            location = location.split('://')[-1]
            self.response = self.session.get(self.protocol + '://' + location)
            data = self.response.text
        except:
            connection = HTTPConnection(self.host, self.port)
            connection.request("GET", self.path + "/" + self.jobid, headers)
            self.response = connection.getresponse()
            data = self.response.read()
        #XML response: parse it to obtain the current status
        dom = parseString(data)
        phase_element = dom.getElementsByTagName('uws:phase')[0]
        phase_value_element = phase_element.firstChild
        phase = phase_value_element.toxml()
        return phase
项目:yowsup    作者:colonyhq    | 项目源码 | 文件源码
def sendRequest(host, port, path, headers, params, reqType="GET"):

        params = urlencode(params)


        path = path + "?"+ params if reqType == "GET" and params else path

        if len(headers):
            logger.debug(headers)
        if len(params):
            logger.debug(params)

        logger.debug("Opening connection to %s" % host);
        conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port)

        logger.debug("Sending %s request to %s" % (reqType, path))
        conn.request(reqType, path, params, headers);

        response = conn.getresponse()
        return response
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ipv6host_header(self):
        # Default host header on IPv6 transaction should wrapped by [] if
        # its actual IPv6 address
        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001::]:81')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))

        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
                   b'Accept-Encoding: identity\r\n\r\n'
        conn = client.HTTPConnection('[2001:102A::]')
        sock = FakeSocket('')
        conn.sock = sock
        conn.request('GET', '/foo')
        self.assertTrue(sock.data.startswith(expected))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_send_updating_file(self):
        def data():
            yield 'data'
            yield None
            yield 'data_two'

        class UpdatingFile():
            mode = 'r'
            d = data()
            def read(self, blocksize=-1):
                return self.d.__next__()

        expected = b'data'

        conn = client.HTTPConnection('example.com')
        sock = FakeSocket("")
        conn.sock = sock
        conn.send(UpdatingFile())
        self.assertEqual(sock.data, expected)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_epipe(self):
        sock = EPipeSocket(
            "HTTP/1.0 401 Authorization Required\r\n"
            "Content-type: text/html\r\n"
            "WWW-Authenticate: Basic realm=\"example\"\r\n",
            b"Content-Length")
        conn = client.HTTPConnection("example.com")
        conn.sock = sock
        self.assertRaises(OSError,
                          lambda: conn.request("PUT", "/url", "body"))
        resp = conn.getresponse()
        self.assertEqual(401, resp.status)
        self.assertEqual("Basic realm=\"example\"",
                         resp.getheader("www-authenticate"))

    # Test lines overflowing the max line size (_MAXLINE in http.client)
项目:irwin    作者:clarkerubber    | 项目源码 | 文件源码
def http(method, url, body=None, headers=None):
    url_info = urlparse.urlparse(url)
    if url_info.scheme == "https":
        con = httplib.HTTPSConnection(url_info.hostname, url_info.port or 443)
    else:
        con = httplib.HTTPConnection(url_info.hostname, url_info.port or 80)

    con.request(method, url_info.path, body, headers)
    response = con.getresponse()

    try:
        if 400 <= response.status < 500:
            raise HttpClientError(response.status, response.reason,
                                  response.read())
        elif 500 <= response.status < 600:
            raise HttpServerError(response.status, response.reason,
                                  response.read())
        else:
            yield response
    finally:
        con.close()
项目:rune    作者:hoonkim    | 项目源码 | 文件源码
def wisp_callback(url):
    """
    This returns current server's time of given url.

    :param url: url. (e.g. www.example.com, http://www.example.com, https://www.example.com:8080/test)
    :return: Date and time or the server.
    """
    parsed_url = urlparse(url)

    protocol = parsed_url.scheme or "http"
    port = parsed_url.port or (80 if protocol == "http" else 443)
    path = parsed_url.path or "/"
    if parsed_url.hostname:
        url = parsed_url.hostname
    else:
        path = "/"
        url = parsed_url.path
    request = "GET"

    conn = (HTTPConnection if protocol == "http" else HTTPSConnection)(url, port)
    conn.request(request, path)
    response = conn.getresponse()
    time = response.getheader("Date")

    return time
项目:trough    作者:internetarchive    | 项目源码 | 文件源码
def _do_read(self, query, raw=False):
        # send query to server, return JSON
        rethinker = doublethink.Rethinker(db="trough_configuration", servers=self.rethinkdb)
        healthy_databases = list(rethinker.table('services').get_all(self.database, index='segment').run())
        healthy_databases = [db for db in healthy_databases if db['role'] == 'trough-read' and (rethinker.now().run() - db['last_heartbeat']).seconds < db['ttl']]
        try:
            assert len(healthy_databases) > 0
        except:
            raise Exception('No healthy node found for segment %s' % self.database)
        url = urlparse(healthy_databases[0].get('url'))
        if self.proxy:
            conn = HTTPConnection(self.proxy, self.proxy_port)
            conn.set_tunnel(url.netloc, url.port)
            conn.sock = socks.socksocket()
            conn.sock.set_proxy(self.proxy_type, self.proxy, self.proxy_port)
            conn.sock.connect((url.netloc.split(":")[0], url.port))
        else:
            conn = HTTPConnection(url.netloc)
        request_path = "%s?%s" % (url.path, url.query)
        conn.request("POST", request_path, query)
        response = conn.getresponse()
        results = json.loads(response.read())
        self._last_results = results
项目:deb-python-wsgi-intercept    作者:openstack    | 项目源码 | 文件源码
def test_httpclient_in_out():
    hostname = str(uuid4())
    port = 9999
    url = 'http://%s:%s/' % (hostname, port)
    with HttpClientInterceptor(app=app, url=url):
        client = http_client.HTTPConnection(hostname, port)
        client.request('GET', '/')
        response = client.getresponse()
        content = response.read().decode('utf-8')
        assert response.status == 200
        assert 'WSGI intercept successful!' in content

    # outside the context manager the intercept does not work
    with py.test.raises(socket.gaierror):
        client = http_client.HTTPConnection(hostname, port)
        client.request('GET', '/')


# Httplib2
项目:server    作者:happypandax    | 项目源码 | 文件源码
def _send_soap_request(location, upnp_schema, control_path, soap_fn,
                       soap_message):
    """
    Send out SOAP request to UPnP device and return a response.
    """
    headers = {
        'SOAPAction': (
            '"urn:schemas-upnp-org:service:{schema}:'
            '1#{fn_name}"'.format(schema=upnp_schema, fn_name=soap_fn)
        ),
        'Content-Type': 'text/xml'
    }
    logging.debug("Sending UPnP request to {0}:{1}...".format(
        location.hostname, location.port))
    conn = client.HTTPConnection(location.hostname, location.port)
    conn.request('POST', control_path, soap_message, headers)

    response = conn.getresponse()
    conn.close()

    return _parse_for_errors(response)
项目:fortiosclient    作者:jerryz1982    | 项目源码 | 文件源码
def _normalize_conn_params(self, conn_or_conn_params):
        """Normalize conn_param tuple.

        Args:
            conn_or_conn_params: either a HTTP(S)Connection object or the
                resolved conn_params tuple returned by self._conn_params().

        Returns: Normalized conn_param tuple
        """
        if (not isinstance(conn_or_conn_params, tuple) and
            not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
            LOG.debug("Invalid conn_params value: '%s'",
                      str(conn_or_conn_params))
            return conn_or_conn_params
        if isinstance(conn_or_conn_params, httplib.HTTPConnection):
            conn_params = self._conn_params(conn_or_conn_params)
        else:
            conn_params = conn_or_conn_params
        host, port, is_ssl = conn_params
        if port is None:
            port = 443 if is_ssl else 80
        return (host, port, is_ssl)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def saveFailedTest(data, expect, filename):
    """Upload failed test images to web server to allow CI test debugging.
    """
    commit = runSubprocess(['git', 'rev-parse',  'HEAD'])
    name = filename.split('/')
    name.insert(-1, commit.strip())
    filename = '/'.join(name)
    host = 'data.pyqtgraph.org'

    # concatenate data, expect, and diff into a single image
    ds = data.shape
    es = expect.shape

    shape = (max(ds[0], es[0]) + 4, ds[1] + es[1] + 8 + max(ds[1], es[1]), 4)
    img = np.empty(shape, dtype=np.ubyte)
    img[..., :3] = 100
    img[..., 3] = 255

    img[2:2+ds[0], 2:2+ds[1], :ds[2]] = data
    img[2:2+es[0], ds[1]+4:ds[1]+4+es[1], :es[2]] = expect

    diff = makeDiffImage(data, expect)
    img[2:2+diff.shape[0], -diff.shape[1]-2:-2] = diff

    png = makePng(img)

    conn = httplib.HTTPConnection(host)
    req = urllib.urlencode({'name': filename,
                            'data': base64.b64encode(png)})
    conn.request('POST', '/upload.py', req)
    response = conn.getresponse().read()
    conn.close()
    print("\nImage comparison failed. Test result: %s %s   Expected result: "
          "%s %s" % (data.shape, data.dtype, expect.shape, expect.dtype))
    print("Uploaded to: \nhttp://%s/data/%s" % (host, filename))
    if not response.startswith(b'OK'):
        print("WARNING: Error uploading data to %s" % host)
        print(response)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def saveFailedTest(data, expect, filename):
    """Upload failed test images to web server to allow CI test debugging.
    """
    commit = runSubprocess(['git', 'rev-parse',  'HEAD'])
    name = filename.split('/')
    name.insert(-1, commit.strip())
    filename = '/'.join(name)
    host = 'data.pyqtgraph.org'

    # concatenate data, expect, and diff into a single image
    ds = data.shape
    es = expect.shape

    shape = (max(ds[0], es[0]) + 4, ds[1] + es[1] + 8 + max(ds[1], es[1]), 4)
    img = np.empty(shape, dtype=np.ubyte)
    img[..., :3] = 100
    img[..., 3] = 255

    img[2:2+ds[0], 2:2+ds[1], :ds[2]] = data
    img[2:2+es[0], ds[1]+4:ds[1]+4+es[1], :es[2]] = expect

    diff = makeDiffImage(data, expect)
    img[2:2+diff.shape[0], -diff.shape[1]-2:-2] = diff

    png = makePng(img)

    conn = httplib.HTTPConnection(host)
    req = urllib.urlencode({'name': filename,
                            'data': base64.b64encode(png)})
    conn.request('POST', '/upload.py', req)
    response = conn.getresponse().read()
    conn.close()
    print("\nImage comparison failed. Test result: %s %s   Expected result: "
          "%s %s" % (data.shape, data.dtype, expect.shape, expect.dtype))
    print("Uploaded to: \nhttp://%s/data/%s" % (host, filename))
    if not response.startswith(b'OK'):
        print("WARNING: Error uploading data to %s" % host)
        print(response)
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def drop_run(self,run='latest'):
        '''
        Tell Source to drop/delete a specific set of results from memory.

        run: Run number to delete. Default ='latest'. Valid values are 'latest' and integers from 1
        '''
        assert self.live_source
        conn = hc.HTTPConnection(self.host,port=self.port)
        conn.request('DELETE','/runs/%s'%str(run))
        resp = conn.getresponse()
        code = resp.getcode()
        return code
项目:k8s.io    作者:kubernetes    | 项目源码 | 文件源码
def do_get(url):
    parsed = urlparse.urlparse(url)
    path = parsed.path
    if parsed.query:
        path = '%s?%s' % (path, parsed.query)
    if parsed.scheme == 'http':
        conn = httplib.HTTPConnection(TARGET_IP)
    elif parsed.scheme == 'https':
        conn = httplib.HTTPSConnection(TARGET_IP)
    conn.request('GET', path, headers={'Host': parsed.netloc})
    resp = conn.getresponse()
    body = resp.read().decode('utf8')
    resp.close()
    conn.close()
    return resp, body
项目:admin-sites    作者:istio    | 项目源码 | 文件源码
def do_get(url):
    parsed = urlparse.urlparse(url)
    path = parsed.path
    if parsed.query:
        path = '%s?%s' % (path, parsed.query)
    if parsed.scheme == 'http':
        conn = httplib.HTTPConnection(TARGET_IP)
    elif parsed.scheme == 'https':
        conn = httplib.HTTPSConnection(TARGET_IP, timeout=8, context=ssl._create_unverified_context())
    conn.request('GET', path, headers={'Host': parsed.netloc})
    resp = conn.getresponse()
    body = resp.read().decode('utf8')
    resp.close()
    conn.close()
    return resp, body
项目:BioQueue    作者:liyao001    | 项目源码 | 文件源码
def http_size(host_name, path):
    try:
        from httplib import HTTPConnection
    except ImportError:
        from http.client import HTTPConnection
    try:
        conn = HTTPConnection(host_name, timeout=3)
        conn.request("GET", path)
        resp = conn.getresponse()
        return int(resp.getheader("content-length"))
    except Exception as e:
        print(e)
        return 0
项目:antenna    作者:mozilla-services    | 项目源码 | 文件源码
def http_post(posturl, headers, data):
    parsed = urllib.parse.urlparse(posturl)
    if ':' in parsed.netloc:
        host, port = parsed.netloc.split(':')
    else:
        host = parsed.netloc
        port = '443' if posturl.startswith('https') else '80'

    if posturl.startswith('https'):
        conn = HTTPSConnection(host, int(port))
    else:
        conn = HTTPConnection(host, int(port))
    conn.request('POST', parsed.path, headers=headers, body=data)
    return conn.getresponse()
项目:sphinxcontrib-confluencebuilder    作者:tonybaloney    | 项目源码 | 文件源码
def make_connection(self, host):
        self.realhost = host
        if self.proxy:
            return httplib.HTTPConnection(self.proxy, timeout=self.timeout)
        else:
            return httplib.HTTPConnection(self.realhost, timeout=self.timeout)
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def __init__(self, proxytype, proxyaddr, proxyport=None, rdns=True, username=None, password=None, *args, **kwargs):
        self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
        httplib.HTTPConnection.__init__(self, *args, **kwargs)
项目:corvus-web-public    作者:eleme    | 项目源码 | 文件源码
def init_app(self, app):
        self.token = app.config.get('HUSKAR_TOKEN')
        self.team = app.config.get('HUSKAR_TEAM')
        host = app.config.get('HUSKAR_API_HOST')
        port = app.config.get('HUSKAR_API_PORT')

        if not all([self.token, host, port, self.team]):
            raise HuskarException('Missing huskar connection infomation')

        self.client = http.HTTPConnection(host, port, timeout=self.timeout)
项目:corvus-web-public    作者:eleme    | 项目源码 | 文件源码
def init_app(self, app):
        host = app.config.get('SASH_HOST')
        port = app.config.get('SASH_PORT')
        if not all([host, port]):
            raise SashException('Missing sash connection infomation')
        self.client = http.HTTPConnection(host, port, timeout=self.timeout)
项目:DualisWatcher    作者:LucaVazz    | 项目源码 | 文件源码
def _fetch_state(self, uid) -> str:
        connection = HTTPConnection('vorlesungsplan.dhbw-mannheim.de')
        connection.request(
            'GET',
            '/ical.php?uid=%s'%(uid)
        )
        response = connection.getresponse()

        response_status = response.getcode()
        if response_status != 200:
            raise RuntimeError('Server reported Status %s'%(response_status))

        return response.read().decode('utf-8')
项目:Bethesda_SublimeFO4    作者:Scrivener07    | 项目源码 | 文件源码
def _new_conn(self):
        """
        Return a fresh :class:`httplib.HTTPConnection`.
        """
        self.num_connections += 1
        log.info("Starting new HTTP connection (%d): %s" %
                 (self.num_connections, self.host))
        return HTTPConnection(host=self.host,
                              port=self.port,
                              strict=self.strict)
项目:MailFail    作者:m0rtem    | 项目源码 | 文件源码
def __init__(self, proxytype, proxyaddr, proxyport=None, rdns=True, username=None, password=None, *args, **kwargs):
        self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
        httplib.HTTPConnection.__init__(self, *args, **kwargs)
项目:2017_script    作者:endl524    | 项目源码 | 文件源码
def connectOpenAPIServer(getType):
    global conn, server
    if getType == 3:
        conn = HTTPConnection(daumServer)
    else:
        conn = HTTPConnection(server)