Python http 模块,client() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用http.client()

项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def __init__(self, host, port=None, key_file=None, cert_file=None,
                 timeout=None, proxy_info=None,
                 ca_certs=None, disable_ssl_certificate_validation=False):
        # TODO: implement proxy_info
        self.proxy_info = proxy_info
        context = None
        if ca_certs is None:
            ca_certs = CA_CERTS
        if (cert_file or ca_certs):
            if not hasattr(ssl, 'SSLContext'):
                raise CertificateValidationUnsupportedInPython31()
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            if disable_ssl_certificate_validation:
                context.verify_mode = ssl.CERT_NONE
            else:
                context.verify_mode = ssl.CERT_REQUIRED
            if cert_file:
                context.load_cert_chain(cert_file, key_file)
            if ca_certs:
                context.load_verify_locations(ca_certs)
        http.client.HTTPSConnection.__init__(
                self, host, port=port, key_file=key_file,
                cert_file=cert_file, timeout=timeout, context=context,
                check_hostname=disable_ssl_certificate_validation ^ True)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def __init__(self, info):
        # info is either an email.message or
        # an httplib.HTTPResponse object.
        if isinstance(info, http.client.HTTPResponse):
            for key, value in info.getheaders():
                key = key.lower()
                prev = self.get(key)
                if prev is not None:
                    value = ', '.join((prev, value))
                self[key] = value
            self.status = info.status
            self['status'] = str(self.status)
            self.reason = info.reason
            self.version = info.version
        elif isinstance(info, email.message.Message):
            for key, value in list(info.items()):
                self[key.lower()] = value
            self.status = int(self['status'])
        else:
            for key, value in info.items():
                self[key.lower()] = value
            self.status = int(self.get('status', self.status))
项目:isilon_sdk_7_2_python    作者:Isilon    | 项目源码 | 文件源码
def debug(self, value):
        """
        Sets the debug status.

        :param value: The debug status, True or False.
        :type: bool
        """
        self.__debug = value
        if self.__debug:
            # if debug status is True, turn on debug logging
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.DEBUG)
            # turn on httplib debug
            httplib.HTTPConnection.debuglevel = 1
        else:
            # if debug status is False, turn off debug logging,
            # setting log level to default `logging.WARNING`
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.WARNING)
            # turn off httplib debug
            httplib.HTTPConnection.debuglevel = 0
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def execute(self, context):
        scene = context.scene
        netsettings = scene.network_render

        try:
            conn = clientConnection(netsettings, report = self.report)

            if conn:
                # Sending file
                client.sendJobBaking(conn, scene)
                conn.close()
                self.report({'INFO'}, "Job sent to master")
        except Exception as err:
            self.report({'ERROR'}, str(err))

        return {'FINISHED'}
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def execute(self, context):
        scene = context.scene
        netsettings = scene.network_render

        try:
            conn = clientConnection(netsettings, report = self.report)

            if conn:
                # Sending file
                scene.network_render.job_id = client.sendJob(conn, scene, True)
                conn.close()
                self.report({'INFO'}, "Job sent to master")
        except Exception as err:
            self.report({'ERROR'}, str(err))


        return {'FINISHED'}
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def execute(self, context):
        scene = context.scene
        netsettings = scene.network_render

        try:
            conn = clientConnection(netsettings, report = self.report)

            if conn:
                # Sending file
                scene.network_render.job_id = client.sendJob(conn, scene, False)
                conn.close()
                self.report({'INFO'}, "Job sent to master")
        except Exception as err:
            self.report({'ERROR'}, str(err))


        return {'FINISHED'}
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def clientVerifyVersion(conn, timeout):
    with ConnectionContext(timeout):
        conn.request("GET", "/version")
    response = conn.getresponse()

    if response.status != http.client.OK:
        conn.close()
        return False

    server_version = response.read()

    if server_version != VERSION:
        print("Incorrect server version!")
        print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8'))
        return False

    return True
项目:connect-python-sdk    作者:square    | 项目源码 | 文件源码
def debug(self, value):
        """
        Sets the debug status.

        :param value: The debug status, True or False.
        :type: bool
        """
        self.__debug = value
        if self.__debug:
            # if debug status is True, turn on debug logging
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.DEBUG)
            # turn on httplib debug
            httplib.HTTPConnection.debuglevel = 1
        else:
            # if debug status is False, turn off debug logging,
            # setting log level to default `logging.WARNING`
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.WARNING)
            # turn off httplib debug
            httplib.HTTPConnection.debuglevel = 0
项目:automation    作者:kubic-project    | 项目源码 | 文件源码
def _api_get(self, path):
        ctx = http.client.ssl._create_stdlib_context()
        conn = http.client.HTTPSConnection(self._api, timeout=20, context=ctx)
        log.info("calling {}/<**token**>{}".format(self._api, path))
        tpath = '/' + self._api_token + path
        # Do not leak out token in production
        # log.info("calling {}{}".format(self._api, tpath))
        try:
            conn.request('GET', tpath)
            response = conn.getresponse().read().decode('utf-8')
        except socket.timeout:
            log.info("socket timeout")
            raise APIError('testbed daemon socket timeout')
        j = self._parse(response)
        if j["status"] != "ok":
            raise APIError(response)
        return j
项目:automation    作者:kubic-project    | 项目源码 | 文件源码
def fetch_and_mangle_worker_autoyast(admin_host_ipaddr):
    """Fetch autoyast file and upload it to
    /srv/www/htdocs/autoyast/caasp/worker_mangled.xml
    """
    assert admin_host_ipaddr
    log.info("Fetching autoyast file from {}".format(admin_host_ipaddr))

    ctx = http.client.ssl._create_stdlib_context()
    conn = http.client.HTTPSConnection(admin_host_ipaddr, timeout=20, context=ctx)
    conn.request('GET', AUTOYAST_URL_PATH)
    ay = conn.getresponse().read().decode('utf-8')

    assert '<pattern>SUSE-CaaSP-Stack</pattern>' in ay, ay
    xml = ay.replace('</storage>\n', '</storage>\n' + AUTOYAST_SIG_CHUNK)

    # Inject SSH key into authorized keys to allow SSHing to workers
    xml = xml.replace(
        '    </chroot-scripts>\n',
        AUTOYAST_AUTHORIZED_KEYS_CHUNK + '    </chroot-scripts>\n'
    )
    tsclient.upload_worker_mangled_xml(xml)
项目:automation    作者:kubic-project    | 项目源码 | 文件源码
def install_prometheus_certs(kubeconfig):
    log.info("Fetching API client keys for Prometheus")
    y = yaml.load(kubeconfig)
    k = base64.b64decode(y['users'][0]['user']['client-key-data'])
    with open('client_key', 'w') as f:
        f.write(k.decode())
    subprocess.check_call(["/usr/bin/sudo", "scp",  "client_key",
        PROMETHEUS_SSH + ":/srv/prometheus/prometheus/kube_api_client_key"])

    c = base64.b64decode(y['users'][0]['user']['client-certificate-data'])
    with open('client_cert', 'w') as f:
        f.write(c.decode())
    subprocess.check_call(["/usr/bin/sudo", "scp", "client_cert",
        PROMETHEUS_SSH + ":/srv/prometheus/prometheus/kube_api_client_cert"])
    log.info("Reloading Prometheus")
    subprocess.check_call(["/usr/bin/sudo", "ssh", PROMETHEUS_SSH, "/usr/bin/systemctl", "reload", "prometheus.service"])
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def __init__(self, host, port=None, key_file=None, cert_file=None,
                 timeout=None, proxy_info=None,
                 ca_certs=None, disable_ssl_certificate_validation=False):
        self.proxy_info = proxy_info
        context = None
        if ca_certs is None:
            ca_certs = CA_CERTS
        if (cert_file or ca_certs) and not disable_ssl_certificate_validation:
            if not hasattr(ssl, 'SSLContext'):
                raise CertificateValidationUnsupportedInPython31()
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            if cert_file:
                context.load_cert_chain(cert_file, key_file)
            if ca_certs:
                context.load_verify_locations(ca_certs)
        http.client.HTTPSConnection.__init__(
                self, host, port=port, key_file=key_file,
                cert_file=cert_file, timeout=timeout, context=context,
                check_hostname=True)
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def __init__(self, info):
        # info is either an email.message or
        # an httplib.HTTPResponse object.
        if isinstance(info, http.client.HTTPResponse):
            for key, value in info.getheaders():
                key = key.lower()
                prev = self.get(key)
                if prev is not None:
                    value = ', '.join((prev, value))
                self[key] = value
            self.status = info.status
            self['status'] = str(self.status)
            self.reason = info.reason
            self.version = info.version
        elif isinstance(info, email.message.Message):
            for key, value in list(info.items()):
                self[key.lower()] = value
            self.status = int(self['status'])
        else:
            for key, value in info.items():
                self[key.lower()] = value
            self.status = int(self.get('status', self.status))
项目:jupyterhub-kubernetes_spawner    作者:danielfrg    | 项目源码 | 文件源码
def debug(self, value):
        """
        Sets the debug status.

        :param value: The debug status, True or False.
        :type: bool
        """
        self.__debug = value
        if self.__debug:
            # if debug status is True, turn on debug logging
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.DEBUG)
            # turn on httplib debug
            httplib.HTTPConnection.debuglevel = 1
        else:
            # if debug status is False, turn off debug logging,
            # setting log level to default `logging.WARNING`
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.WARNING)
            # turn off httplib debug
            httplib.HTTPConnection.debuglevel = 0
项目:graylog.py    作者:yumimobi    | 项目源码 | 文件源码
def debug(self, value):
        """
        Sets the debug status.

        :param value: The debug status, True or False.
        :type: bool
        """
        self.__debug = value
        if self.__debug:
            # if debug status is True, turn on debug logging
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.DEBUG)
            # turn on httplib debug
            httplib.HTTPConnection.debuglevel = 1
        else:
            # if debug status is False, turn off debug logging,
            # setting log level to default `logging.WARNING`
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.WARNING)
            # turn off httplib debug
            httplib.HTTPConnection.debuglevel = 0
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def handle_expect_100(self):
        """Decide what to do with an "Expect: 100-continue" header.

        If the client is expecting a 100 Continue response, we must
        respond with either a 100 Continue or a final response before
        waiting for the request body. The default is to always respond
        with a 100 Continue. You can behave differently (for example,
        reject unauthorized requests) by overriding this method.

        This method should either return True (possibly after sending
        a 100 Continue response) or send an error response and return
        False.

        """
        self.send_response_only(100)
        return True
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def log_message(self, format, *args):
        """Log an arbitrary message.

        This is used by all other logging functions.  Override
        it if you have specific logging wishes.

        The first argument, FORMAT, is a format string for the
        message to be logged.  If the format string contains
        any % escapes requiring parameters, they should be
        specified as subsequent arguments (it's just like
        printf!).

        The client host and current date/time are prefixed to
        every message.

        """

        sys.stderr.write("%s - - [%s] %s\n" %
                         (self.address_string(),
                          self.log_date_time_string(),
                          format%args))
项目:bcloud    作者:wangYanJava    | 项目源码 | 文件源码
def urloption(url, headers={}, retries=RETRIES):
    '''??OPTION ??'''
    headers_merged = default_headers.copy()
    for key in headers.keys():
        headers_merged[key] = headers[key]
    schema = urllib.parse.urlparse(url)
    for i in range(retries):
        try:
            conn = http.client.HTTPConnection(schema.netloc)
            conn.request('OPTIONS', url, headers=headers_merged)
            resp = conn.getresponse()
            return resp
        except OSError:
            logger.error(traceback.format_exc())
            #return None
        except:
            logger.error(traceback.format_exc())
            #return None
    return None
项目:bcloud    作者:wangYanJava    | 项目源码 | 文件源码
def urlopen_without_redirect(url, headers={}, data=None, retries=RETRIES):
    '''????URL, ?????Response??. ??????.

    ??????????URL???(Error 301/302)????, ?????URL??
    ???????, ??Header????????.
    '''
    headers_merged = default_headers.copy()
    for key in headers.keys():
        headers_merged[key] = headers[key]

    parse_result = urllib.parse.urlparse(url)
    for i in range(retries):
        try:
            conn = http.client.HTTPConnection(parse_result.netloc)
            if data:
                conn.request('POST', url, body=data, headers=headers_merged)
            else:
                conn.request('GET', url, body=data, headers=headers_merged)
            return conn.getresponse()
        except OSError:
            logger.error(traceback.format_exc())
        except:
            logger.error(traceback.format_exc())
            #return None
    return None
项目:babeltrace-zipkin    作者:vears91    | 项目源码 | 文件源码
def create_time_annotation(self, service_name, ipv4, port, timestamp, value):
        """
        Creates an annotation thrift object

        :param service_name: name that identifies the service of the endpoint
        :param ipv4: ipv4 address of the endpoint
        :param port: port of the service
        :param timestamp: timestamp of when the annotation occured in microseconds
        :param value: Zipkin value for the annotation, like CS (client start), SS (server send) or SR (server receive)

        :returns: zipkin annotation object
        """   
        return ({
            "endpoint": 
                {
                    "serviceName": service_name,
                    "ipv4": ipv4,
                    "port": port
                },
            "timestamp": timestamp,
            "value": value
            })
项目:aliyun-sms    作者:wilslee    | 项目源码 | 文件源码
def _make_http_response(self, endpoint, request):
        content = request.get_content()
        method = request.get_method()
        header = request.get_signed_header(self.get_region_id(), self.get_access_key(),
                                               self.get_access_secret())
        if self.get_user_agent() is not None:
            header['User-Agent'] = self.get_user_agent()
            header['x-sdk-client'] = 'python/2.0.0'
        if header is None:
            header = {}

        protocol = request.get_protocol_type()
        url = request.get_url(self.get_region_id(), self.get_access_key(), self.get_access_secret())
        response = HttpResponse(endpoint, url, method, header, protocol, content,
                                self._port)
        return response
项目:aliyun-sms    作者:wilslee    | 项目源码 | 文件源码
def do_action_with_exception(self, acs_request):

        # set server response format as json, because thie function will
        # parse the response so which format doesn't matter
        acs_request.set_accept_format('json')

        status, headers, body = self._implementation_of_do_action(acs_request)

        request_id = None
        ret = body

        try:
            body_obj = json.loads(body.decode('utf-8'))
            request_id = body_obj.get('RequestId')
            ret = body_obj
        except ValueError:
            # in case the response body is not a json string, return the raw data instead
            pass

        if status != http.client.OK:
            server_error_code, server_error_message = self._parse_error_info_from_response_body(body)
            raise ServerException(server_error_code, server_error_message, http_status=status, request_id=request_id)

        return body
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def setup(self):
        # SSLify..
        if self.server.sslkey:
            import ssl
            sslca = self.server.sslca
            keyfile = self.server.sslkey
            certfile = self.server.sslcrt
            sslreq = ssl.CERT_NONE
            # If they specify a CA key, require valid client certs
            if sslca:
                sslreq=ssl.CERT_REQUIRED

            self.request = ssl.wrap_socket(self.request,
                                     keyfile=keyfile, certfile=certfile,
                                     ca_certs=sslca, cert_reqs=sslreq,
                                     server_side=True)
        http.server.BaseHTTPRequestHandler.setup(self)
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def handleLogin(self, objname, authinfo):
        if not self.server.getSharedObject(objname):
            raise Exception('Shared object does not exists!')

        if self.server.authmod:
            if not authinfo or not self.server.authmod.authCobraUser(authinfo):
                self.send_response(http.client.UNAUTHORIZED)
                self.end_headers()
            else:
                sesskey = base64.b64encode( os.urandom(32) )
                self.server.sessions[ sesskey ] = (authinfo, time.time())
                c = http.cookies.SimpleCookie()
                c['SessionId'] = sesskey
                # set morsel
                self.send_response(http.client.OK)
                self.send_header('Set-Cookie', list(c.values())[0].output(header=''))
                self.send_header("Content-type", "text/html")
                self.end_headers()
            return

        self.send_response(http.client.OK)
        self.end_headers()
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def handleGetAttr(self, objname, attr): 
        if not self.server.attr:
            self.send_response(http.client.FORBIDDEN)
            self.end_headers()
            excinfo = "__getattr__ disabled"
            self.wfile.write(json.dumps(excinfo))
            return
        if verbose: print("GETTING ATTRIBUTE:", attr) 
        obj = self.server.getSharedObject(objname)
        try:
            val = getattr(obj, attr)
            self.send_response(http.client.OK)
            self.end_headers()
            self.wfile.write(json.dumps(val)) 
        except Exception as e:
            self.send_response(http.client.NOT_FOUND)
            self.end_headers()
            excinfo = "%s" % traceback.format_exc()
            self.wfile.write(json.dumps(excinfo))
项目:zabbix-youtrack-action    作者:devopshq    | 项目源码 | 文件源码
def __init__(self, host, port=None, key_file=None, cert_file=None,
                 timeout=None, proxy_info=None,
                 ca_certs=None, disable_ssl_certificate_validation=False):
        self.proxy_info = proxy_info
        context = None
        if ca_certs is None:
            ca_certs = CA_CERTS
        if (cert_file or ca_certs) and not disable_ssl_certificate_validation:
            if not hasattr(ssl, 'SSLContext'):
                raise CertificateValidationUnsupportedInPython31()
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            context.verify_mode = ssl.CERT_REQUIRED
            if cert_file:
                context.load_cert_chain(cert_file, key_file)
            if ca_certs:
                context.load_verify_locations(ca_certs)
        http.client.HTTPSConnection.__init__(
                self, host, port=port, key_file=key_file,
                cert_file=cert_file, timeout=timeout, context=context,
                check_hostname=False)
项目:zabbix-youtrack-action    作者:devopshq    | 项目源码 | 文件源码
def __init__(self, info):
        # info is either an email.message or
        # an httplib.HTTPResponse object.
        if isinstance(info, http.client.HTTPResponse):
            for key, value in info.getheaders():
                key = key.lower()
                prev = self.get(key)
                if prev is not None:
                    value = ', '.join((prev, value))
                self[key] = value
            self.status = info.status
            self['status'] = str(self.status)
            self.reason = info.reason
            self.version = info.version
        elif isinstance(info, email.message.Message):
            for key, value in list(info.items()):
                self[key.lower()] = value
            self.status = int(self['status'])
        else:
            for key, value in info.items():
                self[key.lower()] = value
            self.status = int(self.get('status', self.status))
项目:StockRecommendSystem    作者:doncat99    | 项目源码 | 文件源码
def SetXTwitterHeaders(self, client, url, version):
        """Set the X-Twitter HTTP headers that will be sent to the server.

        Args:
          client:
             The client name as a string.  Will be sent to the server as
             the 'X-Twitter-Client' header.
          url:
             The URL of the meta.xml as a string.  Will be sent to the server
             as the 'X-Twitter-Client-URL' header.
          version:
             The client version as a string.  Will be sent to the server
             as the 'X-Twitter-Client-Version' header.
        """
        self._request_headers['X-Twitter-Client'] = client
        self._request_headers['X-Twitter-Client-URL'] = url
        self._request_headers['X-Twitter-Client-Version'] = version
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def verify_request(self, sock, ca):
        # Do the sspi auth dance
        self.sa.reset()
        while 1:
            data = _get_msg(sock)
            if data is None:
                return False
            try:
                err, sec_buffer = self.sa.authorize(data)
            except sspi.error as details:
                print("FAILED to authorize client:", details)
                return False

            if err==0:
                break
            _send_msg(sock, sec_buffer[0].Buffer)
        return True
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def process_request(self, request, client_address):
        # An example using the connection once it is established.
        print("The server is running as user", GetUserName())
        self.sa.ctxt.ImpersonateSecurityContext()
        try:
            print("Having conversation with client as user", GetUserName())
            while 1:
                # we need to grab 2 bits of data - the encrypted data, and the
                # 'key'
                data = _get_msg(request)
                key = _get_msg(request)
                if data is None or key is None:
                    break
                data = self.sa.decrypt(data, key)
                print("Client sent:", repr(data))
        finally:
            self.sa.ctxt.RevertSecurityContext()
        self.close_request(request)
        print("The server is back to user", GetUserName())
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def sspi_client():
    c = http.client.HTTPConnection("localhost", options.port)
    c.connect()
    # Do the auth dance.
    ca = sspi.ClientAuth(options.package, targetspn=options.target_spn)
    data = None
    while 1:
        err, out_buf = ca.authorize(data)
        _send_msg(c.sock, out_buf[0].Buffer)
        if err==0:
            break
        data = _get_msg(c.sock)
    print("Auth dance complete - sending a few encryted messages")
    # Assume out data is sensitive - encrypt the message.
    for data in "Hello from the client".split():
        blob, key = ca.encrypt(data)
        _send_msg(c.sock, blob)
        _send_msg(c.sock, key)
    c.sock.close()
    print("Client completed.")
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def execute(self, context):
        scene = context.scene
        netsettings = scene.network_render

        try:
            conn = clientConnection(netsettings, report = self.report)

            if conn:
                # Sending file
                client.sendJobBaking(conn, scene)
                conn.close()
                self.report({'INFO'}, "Job sent to master")
        except Exception as err:
            self.report({'ERROR'}, str(err))

        return {'FINISHED'}
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def execute(self, context):
        scene = context.scene
        netsettings = scene.network_render

        try:
            conn = clientConnection(netsettings, report = self.report)

            if conn:
                # Sending file
                scene.network_render.job_id = client.sendJob(conn, scene, True)
                conn.close()
                self.report({'INFO'}, "Job sent to master")
        except Exception as err:
            self.report({'ERROR'}, str(err))


        return {'FINISHED'}
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def execute(self, context):
        scene = context.scene
        netsettings = scene.network_render

        try:
            conn = clientConnection(netsettings, report = self.report)

            if conn:
                # Sending file
                scene.network_render.job_id = client.sendJob(conn, scene, False)
                conn.close()
                self.report({'INFO'}, "Job sent to master")
        except Exception as err:
            self.report({'ERROR'}, str(err))


        return {'FINISHED'}
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def clientVerifyVersion(conn, timeout):
    with ConnectionContext(timeout):
        conn.request("GET", "/version")
    response = conn.getresponse()

    if response.status != http.client.OK:
        conn.close()
        return False

    server_version = response.read()

    if server_version != VERSION:
        print("Incorrect server version!")
        print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8'))
        return False

    return True
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def handle_expect_100(self):
        """Decide what to do with an "Expect: 100-continue" header.

        If the client is expecting a 100 Continue response, we must
        respond with either a 100 Continue or a final response before
        waiting for the request body. The default is to always respond
        with a 100 Continue. You can behave differently (for example,
        reject unauthorized requests) by overriding this method.

        This method should either return True (possibly after sending
        a 100 Continue response) or send an error response and return
        False.

        """
        self.send_response_only(100)
        self.end_headers()
        return True
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def log_message(self, format, *args):
        """Log an arbitrary message.

        This is used by all other logging functions.  Override
        it if you have specific logging wishes.

        The first argument, FORMAT, is a format string for the
        message to be logged.  If the format string contains
        any % escapes requiring parameters, they should be
        specified as subsequent arguments (it's just like
        printf!).

        The client ip and current date/time are prefixed to
        every message.

        """

        sys.stderr.write("%s - - [%s] %s\n" %
                         (self.address_string(),
                          self.log_date_time_string(),
                          format%args))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def getproxies_environment():
    """Return a dictionary of scheme -> proxy server URL mappings.

    Scan the environment for variables named <scheme>_proxy;
    this seems to be the standard convention.  If you need a
    different way, you can pass a proxies dictionary to the
    [Fancy]URLopener constructor.

    """
    proxies = {}
    for name, value in os.environ.items():
        name = name.lower()
        if value and name[-6:] == '_proxy':
            proxies[name[:-6]] = value

    # CVE-2016-1000110 - If we are running as CGI script, forget HTTP_PROXY
    # (non-all-lowercase) as it may be set from the web server by a "Proxy:"
    # header from the client
    if 'REQUEST_METHOD' in os.environ:
        proxies.pop('http', None)

    return proxies
项目:python-k8sclient    作者:mward29    | 项目源码 | 文件源码
def debug(self, value):
        """
        Sets the debug status.

        :param value: The debug status, True or False.
        :type: bool
        """
        self.__debug = value
        if self.__debug:
            # if debug status is True, turn on debug logging
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.DEBUG)
            # turn on httplib debug
            httplib.HTTPConnection.debuglevel = 1
        else:
            # if debug status is False, turn off debug logging,
            # setting log level to default `logging.WARNING`
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.WARNING)
            # turn off httplib debug
            httplib.HTTPConnection.debuglevel = 0
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def handle_expect_100(self):
        """Decide what to do with an "Expect: 100-continue" header.

        If the client is expecting a 100 Continue response, we must
        respond with either a 100 Continue or a final response before
        waiting for the request body. The default is to always respond
        with a 100 Continue. You can behave differently (for example,
        reject unauthorized requests) by overriding this method.

        This method should either return True (possibly after sending
        a 100 Continue response) or send an error response and return
        False.

        """
        self.send_response_only(100)
        self.end_headers()
        return True
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def log_message(self, format, *args):
        """Log an arbitrary message.

        This is used by all other logging functions.  Override
        it if you have specific logging wishes.

        The first argument, FORMAT, is a format string for the
        message to be logged.  If the format string contains
        any % escapes requiring parameters, they should be
        specified as subsequent arguments (it's just like
        printf!).

        The client ip and current date/time are prefixed to
        every message.

        """

        sys.stderr.write("%s - - [%s] %s\n" %
                         (self.address_string(),
                          self.log_date_time_string(),
                          format%args))
项目:isilon_sdk_8_0_python    作者:Isilon    | 项目源码 | 文件源码
def debug(self, value):
        """
        Sets the debug status.

        :param value: The debug status, True or False.
        :type: bool
        """
        self.__debug = value
        if self.__debug:
            # if debug status is True, turn on debug logging
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.DEBUG)
            # turn on httplib debug
            httplib.HTTPConnection.debuglevel = 1
        else:
            # if debug status is False, turn off debug logging,
            # setting log level to default `logging.WARNING`
            for _, logger in iteritems(self.logger):
                logger.setLevel(logging.WARNING)
            # turn off httplib debug
            httplib.HTTPConnection.debuglevel = 0
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def __init__(self, host, port, user, password, database,
                 settings, format, format_stdin, multiline, stacktrace):
        self.config = None

        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database
        self.settings = {k: v[0] for k, v in parse_qs(settings).items()}
        self.format = format
        self.format_stdin = format_stdin
        self.multiline = multiline
        self.stacktrace = stacktrace
        self.server_version = None

        self.query_ids = []
        self.client = None
        self.echo = Echo(verbose=True, colors=True)
        self.progress = None

        self.metadata = {}
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def test_get_a_run(self):
        url = reverse('testruns-get', args=[self.test_run.pk])
        response = self.client.get(url)

        self.assertEqual(http.client.OK, response.status_code)

        for i, case_run in enumerate(
                (self.case_run_1, self.case_run_2, self.case_run_3), 1):
            self.assertContains(
                response,
                '<a href="#caserun_{0}">#{0}</a>'.format(case_run.pk),
                html=True)
            self.assertContains(
                response,
                '<a id="link_{0}" href="#caserun_{1}" title="Expand test case">'
                '{2}</a>'.format(i, case_run.pk, case_run.case.summary),
                html=True)
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def test_show_create_new_run_page(self):
        self.client.login(username=self.tester.username, password='password')

        response = self.client.post(self.url, {
            'from_plan': self.plan.pk,
            'case': [self.case_1.pk, self.case_2.pk, self.case_3.pk]
        })

        # Assert listed cases
        for i, case in enumerate((self.case_1, self.case_2, self.case_3), 1):
            self.assertContains(
                response,
                '<a href="/case/{0}/">{0}</a>'.format(case.pk),
                html=True)
            self.assertContains(
                response,
                '<a id="link_{0}" class="blind_title_link js-case-summary" '
                'data-param="{0}">{1}</a>'.format(i, case.summary),
                html=True)
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def test_add_env_value_to_runs(self):
        self.login_tester()

        self.client.get(self.env_value_url, {
            'a': 'add',
            'env_value_id': self.value_bsd.pk,
            'run_id': [self.test_run.pk, self.test_run_1.pk]
        })

        rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run,
                                                value=self.value_bsd)
        self.assertTrue(rel.exists())

        rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run_1,
                                                value=self.value_bsd)
        self.assertTrue(rel.exists())
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def test_delete_env_value_from_runs(self):
        self.login_tester()

        self.client.get(self.env_value_url, {
            'a': 'remove',
            'env_value_id': self.value_linux.pk,
            'run_id': [self.test_run.pk, self.test_run_1.pk],
        })

        rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run,
                                                value=self.value_linux)
        self.assertFalse(rel.exists())

        rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run_1,
                                                value=self.value_linux)
        self.assertFalse(rel.exists())
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def test_refuse_if_action_is_unknown(self):
        self.login_tester()

        post_data = {
            'a': 'unknown action',
            'case_run': self.case_run_1.pk,
            'case': self.case_run_1.case.pk,
            'bug_system_id': TestCaseBugSystem.objects.get(name='Bugzilla').pk,
            'bug_id': '123456',
        }

        response = self.client.get(self.case_run_bug_url, post_data)

        self.assertJsonResponse(
            response,
            {'rc': 1, 'response': 'Unrecognizable actions'})
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def test_remove_bug_from_case_run(self):
        self.login_tester()

        post_data = {
            'a': 'remove',
            'case_run': self.case_run_1.pk,
            'bug_id': self.bug_12345,
        }

        response = self.client.get(self.case_run_bug_url, post_data)

        bug_exists = TestCaseBug.objects.filter(
            bug_id=self.bug_12345,
            case=self.case_run_1.case,
            case_run=self.case_run_1).exists()
        self.assertFalse(bug_exists)

        self.assertJsonResponse(
            response,
            {'rc': 0, 'response': 'ok', 'run_bug_count': 1})