Python http 模块,server() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用http.server()

项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testInvalidServer(self):
        """Test download on non-existent server"""

        spec = { 'url' : "https://127.1.2.3:7257" }
        archive = SimpleHttpArchive(spec)
        archive.wantDownload(True)
        archive.wantUpload(True)

        # Local
        archive.downloadPackage(b'\x00'*20, "unused", "unused", 0)
        archive.downloadPackage(b'\x00'*20, "unused", "unused", 1)
        self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None)

        # Jenkins
        with TemporaryDirectory() as workspace:
            with open(os.path.join(workspace, "test.buildid"), "wb") as f:
                f.write(b'\x00'*20)
            script = archive.download(None, "test.buildid", "result.tgz")
            callJenkinsScript(script, workspace)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def __init__(self, job_id, job_info):
        super().__init__(job_info)
        self.id = job_id
        self.last_dispatched = time.time()
        self.start_time = time.time()
        self.finish_time = self.start_time
        # force one chunk for process jobs
        if self.type == netrender.model.JOB_PROCESS:
            self.chunks = 1

        # Force WAITING status on creation
        self.status = netrender.model.JOB_WAITING

        # special server properties
        self.last_update = 0
        self.save_path = ""
        self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def do_HEAD(self):

        if self.path == "/status":
            job_id = self.headers.get('job-id', "")
            job_frame = int(self.headers.get('job-frame', -1))

            job = self.server.getJobID(job_id)
            if job:
                frame = job[job_frame]


                if frame:
                    self.send_head(http.client.OK)
                else:
                    # no such frame
                    self.send_head(http.client.NO_CONTENT)
            else:
                # no such job id
                self.send_head(http.client.NO_CONTENT)

    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def clientScan(report = None):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.settimeout(30)

        s.bind(('', 8000))

        buf, address = s.recvfrom(64)

        address = address[0]
        port = int(str(buf, encoding='utf8'))

        reporting(report, "Master server found")

        return (address, port)
    except socket.timeout:
        reporting(report, "No master server on network", IOError)

        return ("", 8000) # return default values
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def clientVerifyVersion(conn, timeout):
    with ConnectionContext(timeout):
        conn.request("GET", "/version")
    response = conn.getresponse()

    if response.status != http.client.OK:
        conn.close()
        return False

    server_version = response.read()

    if server_version != VERSION:
        print("Incorrect server version!")
        print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8'))
        return False

    return True
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def setup(self):
        # SSLify..
        if self.server.sslkey:
            import ssl
            sslca = self.server.sslca
            keyfile = self.server.sslkey
            certfile = self.server.sslcrt
            sslreq = ssl.CERT_NONE
            # If they specify a CA key, require valid client certs
            if sslca:
                sslreq=ssl.CERT_REQUIRED

            self.request = ssl.wrap_socket(self.request,
                                     keyfile=keyfile, certfile=certfile,
                                     ca_certs=sslca, cert_reqs=sslreq,
                                     server_side=True)
        http.server.BaseHTTPRequestHandler.setup(self)
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def handleLogin(self, objname, authinfo):
        if not self.server.getSharedObject(objname):
            raise Exception('Shared object does not exists!')

        if self.server.authmod:
            if not authinfo or not self.server.authmod.authCobraUser(authinfo):
                self.send_response(http.client.UNAUTHORIZED)
                self.end_headers()
            else:
                sesskey = base64.b64encode( os.urandom(32) )
                self.server.sessions[ sesskey ] = (authinfo, time.time())
                c = http.cookies.SimpleCookie()
                c['SessionId'] = sesskey
                # set morsel
                self.send_response(http.client.OK)
                self.send_header('Set-Cookie', list(c.values())[0].output(header=''))
                self.send_header("Content-type", "text/html")
                self.end_headers()
            return

        self.send_response(http.client.OK)
        self.end_headers()
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def shareObject(self, obj, name=None, doref=False):
        """
        Share an object in this cobra server.  By specifying
        doref=True you will let CobraProxy objects decide that
        the object is done and should be un-shared.  Also, if
        name == None a random name is chosen.

        Returns: name (or the newly generated random one)
        """
        refcnt = None
        if doref:
            refcnt = 0
        if name == None:
            raise Exception("You must specify an object name to share!")

        self.shared[name] = obj
        if doref:
            raise Exception("CobraHttp doesnt do refcnt")
        return name
项目:python3-linkedin    作者:DEKHTIARJonathan    | 项目源码 | 文件源码
def quick_api(api_key, secret_key, port=8000):
    """
    This method helps you get access to linkedin api quickly when using it
    from the interpreter.
    Notice that this method creates http server and wait for a request, so it
    shouldn't be used in real production code - it's just an helper for debugging

    The usage is basically:
    api = quick_api(KEY, SECRET)
    After you do that, it will print a URL to the screen which you must go in
    and allow the access, after you do that, the method will return with the api
    object.
    """
    auth = LinkedInAuthentication(api_key, secret_key, 'http://localhost:8000/',
                                  list(PERMISSIONS.enums.values()))
    app = LinkedInApplication(authentication=auth)
    print(auth.authorization_url)
    _wait_for_user_to_enter_browser(app, port)
    return app
项目:chromecast-player    作者:wa4557    | 项目源码 | 文件源码
def local_thumb(self, bitstream, mimetype):
        """ upload thumbnail to simple http server"""

        webserver_ip =[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]

        req_handler = local_server.ImageRequestHandler

        # create a webserver to handle a single request on a free port or a specific port if passed in the parameter   
        port = 0    
        req_handler.content_type = mimetype
        req_handler.content = bitstream

        self.imageserver = http.server.HTTPServer((webserver_ip, port), req_handler)
        self.imagethread = threading.Thread(target=self.imageserver.handle_request)
        self.imagethread.start()

        url = "http://%s:%s" % (webserver_ip, str(self.imageserver.server_port))

        return url
项目:chromecast-player    作者:wa4557    | 项目源码 | 文件源码
def local_sub(self, filename, mimetype):
        """serve a local subtitle file"""

        if os.path.isfile(filename):
            filename = os.path.abspath(filename)
        else:
            return None

        webserver_ip =[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]

        req_handler = local_server.SubtitleRequestHandler

        # create a webserver to handle a single request on a free port or a specific port if passed in the parameter   
        port = 0    

        self.subtitleserver = http.server.HTTPServer((webserver_ip, port), req_handler)
        self.subtitlethread = threading.Thread(target=self.subtitleserver.handle_request)
        self.subtitlethread.start()    

        url = "http://%s:%s%s" % (webserver_ip, str(self.subtitleserver.server_port), quote_plus(filename, "/"))
        return url
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def __init__(self, handler_class, port, ip='', keyfile=None, certfile=None):
        """Initialize new TcpIpHttpEndpoint object

        Args:
            handler_class (obj): a request handler class that will be handling
                requests received by internal httpd server
            port (int): tcp port that httpd server will listen on
            ip (str): ip address that httpd server will listen on, by default
                listen on all addresses
        """
        if certfile is not None and keyfile is not None:
            endpoint_id = "https://{}:{}".format(ip, port)
        else:
            endpoint_id = "http://{}:{}".format(ip, port)
        super().__init__(endpoint_id)

        self._context.data['listen_ip'] = ip
        self._context.data['listen_port'] = port
        self._context.data['certfile'] = certfile
        self._context.data['keyfile'] = keyfile

        self._handler_class = handler_class

        self.__setup_httpd_thread(ip, port)
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def __init__(self, handler_class, path, keyfile=None, certfile=None):
        """Initialize new UnixSocketHTTPEndpoint object

        Args:
            handler_class (obj): a request handler class that will be handling
                requests received by internal httpd server
            path (str): Unix socket path, that internal httpd server will listen
                on
        """
        if certfile is not None and keyfile is not None:
            endpoint_id = "https://{}".format(path)
        else:
            endpoint_id = "http://{}".format(path)
        super().__init__(endpoint_id)

        self._context.data['socket_path'] = path
        self._context.data['certfile'] = certfile
        self._context.data['keyfile'] = keyfile

        self._handler_class = handler_class

        self.__cleanup_stale_socket(path)
        self.__setup_httpd_thread(path)
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def __setup_httpd_thread(self, socket_path):
        """Setup internal HTTPd server that this endpoints relies on to serve
           requests.

        Args:
            path (str): Unix socket path, that internal httpd server will listen
                on
        """
        self._httpd = UnixSocketStatefulHTTPServer(self._context,
                                                   socket_path,
                                                   self._handler_class)

        httpd_thread_name = "UnixSocketHttpdThread-{}".format(self.id)
        self._httpd_thread = threading.Thread(target=self._httpd.serve_forever,
                                              name=httpd_thread_name)

        # nginx spawns worker processes as 'nobody/nogroup', so we need to
        # make the socket available to it.
        os.chmod(socket_path, 0o777)
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def __init__(self, job_id, job_info):
        super().__init__(job_info)
        self.id = job_id
        self.last_dispatched = time.time()
        self.start_time = time.time()
        self.finish_time = self.start_time
        # force one chunk for process jobs
        if self.type == netrender.model.JOB_PROCESS:
            self.chunks = 1

        # Force WAITING status on creation
        self.status = netrender.model.JOB_WAITING

        # special server properties
        self.last_update = 0
        self.save_path = ""
        self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def do_HEAD(self):

        if self.path == "/status":
            job_id = self.headers.get('job-id', "")
            job_frame = int(self.headers.get('job-frame', -1))

            job = self.server.getJobID(job_id)
            if job:
                frame = job[job_frame]


                if frame:
                    self.send_head(http.client.OK)
                else:
                    # no such frame
                    self.send_head(http.client.NO_CONTENT)
            else:
                # no such job id
                self.send_head(http.client.NO_CONTENT)

    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def clientScan(report = None):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.settimeout(30)

        s.bind(('', 8000))

        buf, address = s.recvfrom(64)

        address = address[0]
        port = int(str(buf, encoding='utf8'))

        reporting(report, "Master server found")

        return (address, port)
    except socket.timeout:
        reporting(report, "No master server on network", IOError)

        return ("", 8000) # return default values
项目:blender-addons    作者:scorpion81    | 项目源码 | 文件源码
def clientVerifyVersion(conn, timeout):
    with ConnectionContext(timeout):
        conn.request("GET", "/version")
    response = conn.getresponse()

    if response.status != http.client.OK:
        conn.close()
        return False

    server_version = response.read()

    if server_version != VERSION:
        print("Incorrect server version!")
        print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8'))
        return False

    return True
项目:arthur-redshift-etl    作者:harrystech    | 项目源码 | 文件源码
def start_server(self):
        """
        Start background daemon to serve our events.
        """
        handler_class = self.create_handler()

        class BackgroundServer(threading.Thread):
            def run(self):
                logger.info("Starting background server for monitor on port %d", MemoryStorage.SERVER_PORT)
                try:
                    httpd = _ThreadingSimpleServer((MemoryStorage.SERVER_HOST, MemoryStorage.SERVER_PORT),
                                                   handler_class)
                    httpd.serve_forever()
                except Exception as exc:
                    logger.info("Background server stopped: %s", str(exc))

        try:
            thread = BackgroundServer(daemon=True)
            thread.start()
        except RuntimeError:
            logger.warning("Failed to start monitor server:", exc_info=True)
项目:arthur-redshift-etl    作者:harrystech    | 项目源码 | 文件源码
def test_run():
    Monitor.environment = "test"  # type: ignore
    memory = MemoryStorage()
    MonitorPayload.dispatchers.append(memory)

    schema_names = ["auburn", "burgundy", "cardinal", "flame", "fuchsia"]
    table_names = ["apple", "banana", "cantaloupe", "durian", "fig"]
    index = {"current": 0, "final": len(schema_names) * len(table_names)}

    host = MemoryStorage.SERVER_HOST if MemoryStorage.SERVER_HOST else "localhost"
    print("Creating events ... follow along at http://{0}:{1}/".format(host, MemoryStorage.SERVER_PORT))

    with Monitor("color.fruit", "test", index=dict(current=1, final=1, name="outer")):
        for i, names in enumerate(itertools.product(schema_names, table_names)):
            try:
                with Monitor('.'.join(names), "test", index=dict(index, current=i + 1)):
                    time.sleep(random.uniform(0.5, 2.0))
                    # Create an error on one "table" so that highlighting of errors can be tested:
                    if i == 9:
                        raise RuntimeError("An error occurred!")
            except RuntimeError:
                pass

    input("Press return (or Ctrl-c) to stop server\n")
项目:docklet    作者:unias    | 项目源码 | 文件源码
def internal_server_error(error):
    logger.debug("An internel server error occured")
    logger.error(traceback.format_exc())
    return json.dumps({'success':'false', 'message':'500 Internal Server Error', 'Unauthorized': 'True'})
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()
        self.httpd = socketserver.ThreadingTCPServer(("localhost", 0), createHttpHandler(self.repo.name))
        self.ip, self.port = self.httpd.server_address
        self.server = threading.Thread(target=self.httpd.serve_forever)
        self.server.daemon = True
        self.server.start()
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def run_server(host, port, priv_key, cert, i2pseeds_file):
    """Start HTTPS server"""
    Handler = ReseedHandler
    Handler.i2pseeds_file = i2pseeds_file 

    httpd = http.server.HTTPServer((host, int(port)), Handler)
    httpd.socket = ssl.wrap_socket(httpd.socket, server_side=True, 
            keyfile=priv_key, certfile=cert, ssl_version=ssl.PROTOCOL_TLSv1)

    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        exit()
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def rsyncwebsite(options):
    # Copy to the server
    os.environ['RSYNC_RSH'] = '/usr/bin/ssh'
    src_path = path(options.sphinx.builddir) / 'html'
    sh('(cd %s; rsync --archive --delete --verbose . %s:%s)' %
        (src_path, options.website.server,
         options.website.server_path))
    return
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def test(options):
    # Copy to the local site
    src_path = path(options.sphinx.builddir) / 'html'
    os.chdir(src_path)
    server_address = ('', 8080)
    httpd = http.server.HTTPServer(server_address,
                                   http.server.SimpleHTTPRequestHandler)
    httpd.serve_forever()
    return
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def deploy(options):
    """Rebuild and copy website files to the remote server.
    """
    # Rebuild
    html_clean(options)
    # Copy the sdist into the html output directory.
    sdist(options)
    # Rebuild the site-map
    buildsitemap(options)
    # Install
    rsyncwebsite(options)
    # Update Google
    notify_google(options)
    return
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def execute(self, context):
        netsettings = context.scene.network_render


        # open connection to make sure server exists
        conn = clientConnection(netsettings, report = self.report)

        if conn:
            conn.close()
            if netsettings.use_ssl:
               webbrowser.open("https://%s:%i" % (netsettings.server_address, netsettings.server_port))
            else:
               webbrowser.open("http://%s:%i" % (netsettings.server_address, netsettings.server_port))
        return {'FINISHED'}
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def web_server(port):

    server = http.server.HTTPServer
    # Comment out the above line and uncomment the below for Python 2.7.x.
    #server = BaseHTTPServer.HTTPServer

    handler = http.server.CGIHTTPRequestHandler #RequestsHandler
    # Comment out the above line and uncomment the below for Python 2.7.x.
    #handler = CGIHTTPServer.CGIHTTPRequestHandler #RequestsHandler

    server_address = ("", port)
    handler.cgi_directories = ["/cgi-bin", ]
    httpd = server(server_address, handler)
    print ("Starting web server with CGI support on port: %s ..." %port)
    httpd.serve_forever()
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def from_import(module_name, *symbol_names, **kwargs):
    """
    Example use:
        >>> HTTPConnection = from_import('http.client', 'HTTPConnection')
        >>> HTTPServer = from_import('http.server', 'HTTPServer')
        >>> urlopen, urlparse = from_import('urllib.request', 'urlopen', 'urlparse')

    Equivalent to this on Py3:

        >>> from module_name import symbol_names[0], symbol_names[1], ...

    and this on Py2:

        >>> from future.moves.module_name import symbol_names[0], ...

    or:

        >>> from future.backports.module_name import symbol_names[0], ...

    except that it also handles dotted module names such as ``http.client``.
    """

    if PY3:
        return __import__(module_name)
    else:
        if 'backport' in kwargs and bool(kwargs['backport']):
            prefix = 'future.backports'
        else:
            prefix = 'future.moves'
        parts = prefix.split('.') + module_name.split('.')
        module = importlib.import_module(prefix + '.' + module_name)
        output = [getattr(module, name) for name in symbol_names]
        if len(output) == 1:
            return output[0]
        else:
            return output
项目:tinygen    作者:beardog108    | 项目源码 | 文件源码
def webServer(config):
    os.chdir('generated/')
    Handler = http.server.SimpleHTTPRequestHandler
    try:
        PORT = int(config['ETC']['server-port'])
    except:
        print('Port not specified in config, defaulting to 8080')
        PORT = 8080
    try:
        ADDR = config['ETC']['server-ip']
    except:
        print('Address not specified in config, defaulting to 127.0.0.1')
        ADDR = '127.0.0.1'

    Handler = http.server.SimpleHTTPRequestHandler
    try:
        with socketserver.TCPServer((ADDR, PORT), Handler) as httpd:
            def closeServer():
                os.chdir('../')
                print('Closed server')
                httpd.server_close()
                sys.exit(0)
            def server_signal_handler(signal, frame):
                closeServer()
            signal.signal(signal.SIGINT, server_signal_handler)
            print('''

NOTICE: This server is meant primarily for site testing purposes.
For increased security, speed, and for advanced features such as https, consider using a full web server like Caddy, Nginx, or Apache

''')
            print('Serving at port', PORT, 'on', ADDR)
            atexit.register(closeServer)
            httpd.serve_forever()
    except OSError:
        return 'port already taken'
项目:paraproxio    作者:intagger    | 项目源码 | 文件源码
def tcp_keepalive(server, transport):
        sock = transport.get_extra_info('socket')
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
项目:paraproxio    作者:intagger    | 项目源码 | 文件源码
def tcp_keepalive(server, transport):  # pragma: no cover
        pass
项目:httpimport    作者:operatorequals    | 项目源码 | 文件源码
def test_simple_HTTP(self) :
        # ============== Setting up an HTTP server at 'http://localhost:8001/' in current directory
        try :
            PORT = int(sys.argv[1])
        except :
            PORT = randint(1025, 65535)

        try :
            Handler = SimpleHTTPServer.SimpleHTTPRequestHandler
            httpd = SocketServer.TCPServer(("", PORT), Handler)
        except :
            Handler = http.server.SimpleHTTPRequestHandler
            httpd = socketserver.TCPServer(("", PORT), Handler)

        print ("Serving at port %d" % PORT)

        http_thread = Thread( target = httpd.serve_forever, )
        http_thread.daemon = True

        # ============== Starting the HTTP server
        http_thread.start()

        # ============== Wait until HTTP server is ready
        sleep(1)

        with remote_repo(['test_package'], base_url = 'http://localhost:%d/' % PORT) :
            from test_package import module1

        self.assertTrue(module1.dummy_str)  # If this point is reached then the module1 is imported succesfully!
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def __init__(self, config, handler_class):
        super().__init__(name='Webhook', daemon=False)
        self.config = config
        if self.config.use_https:
            self.server = WebhookHTTPServer(('', self.config.port), handler_class)
            self.server.socket = ssl.wrap_socket(self.server.socket.socket,
                                                 keyfile=config.https_keyfile,
                                                 certfile=config.https_certfile,
                                                 server_side=True)
        else:
            self.server = WebhookHTTPServer(('', self.config.port), handler_class)
        self.server.init_props()
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def webhook_url(self):
        if not hasattr(self, '_webhook_url'):
            self.hostname = self.config.host if self.config.host != '' else resolve_public_ip()
            self._webhook_url = '%s://%s:%d/%s' % (
                'https' if self.config.use_https else 'http',
                self.hostname, self.server.server_port, self.server.session_token)
        return self._webhook_url
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def set_worker(self, worker):
        self.server.worker_thread = worker
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def stop(self):
        self.server.shutdown()
项目:onedrived-dev    作者:xybu    | 项目源码 | 文件源码
def run(self):
        logging.info('Webhook server listening on %s.', self.webhook_url)
        self.server.serve_forever()
        logging.info('Webhook server stopped.')
项目:Chaos    作者:Chaosthebot    | 项目源码 | 文件源码
def get_voters(amount: hug.types.number = 20):
    try:
        voters = sorted(json.loads(
            linecache.getline("/root/workspace/Chaos/server/voters.json", 1)).items(),
            key=lambda x: x[1], reverse=True)
        if amount > 0:
            voters = voters[:amount]
        return {x[0]: x[1] for x in voters}
    except:
        __log.exception("Failed to read voters!")
项目:Chaos    作者:Chaosthebot    | 项目源码 | 文件源码
def get_meritocracy(amount: hug.types.number = 20):
    try:
        meritocracy = json.loads(
            linecache.getline("/root/workspace/Chaos/server/meritocracy.json", 1))
        if amount > 0:
            meritocracy = meritocracy[:amount]
        return meritocracy
    except:
        __log.exception("Failed to read meritocracy!")
项目:SnapFiles    作者:stirlingstout    | 项目源码 | 文件源码
def server(username, filename, data, parsedQuery): # Ignore filename
        if data:
            if data == "sfs_version":
                return Responder.writeResult(VERSION)
            elif data == "python_version":
                return Responder.writeResult(sys.version)
            else:
                return Responder.error("invalid server information request")
        else:
            return Responder.error("no server information requested")
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_can_import_several(self):
        """
        This test failed in v0.12-pre if e.g.
        future/standard_library/email/header.py contained:

            from future import standard_library
            standard_library.remove_hooks()
        """

        import future.moves.urllib.parse as urllib_parse
        import future.moves.urllib.request as urllib_request

        import http.server
        for m in [urllib_parse, urllib_request, http.server]:
            self.assertTrue(m is not None)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_other_http_imports(self):
        import http
        import http.server
        import http.cookies
        import http.cookiejar
        self.assertTrue(True)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def from_import(module_name, *symbol_names, **kwargs):
    """
    Example use:
        >>> HTTPConnection = from_import('http.client', 'HTTPConnection')
        >>> HTTPServer = from_import('http.server', 'HTTPServer')
        >>> urlopen, urlparse = from_import('urllib.request', 'urlopen', 'urlparse')

    Equivalent to this on Py3:

        >>> from module_name import symbol_names[0], symbol_names[1], ...

    and this on Py2:

        >>> from future.moves.module_name import symbol_names[0], ...

    or:

        >>> from future.backports.module_name import symbol_names[0], ...

    except that it also handles dotted module names such as ``http.client``.
    """

    if PY3:
        return __import__(module_name)
    else:
        if 'backport' in kwargs and bool(kwargs['backport']):
            prefix = 'future.backports'
        else:
            prefix = 'future.moves'
        parts = prefix.split('.') + module_name.split('.')
        module = importlib.import_module(prefix + '.' + module_name)
        output = [getattr(module, name) for name in symbol_names]
        if len(output) == 1:
            return output[0]
        else:
            return output
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.handlers = {
                '__cobra_hello'   : self.handleHello,
                '__cobra_getattr' : self.handleGetAttr,
                '__cobra_setattr' : self.handleSetAttr,
                '__cobra_login'   : self.handleLogin
            }
        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def do_POST(self):
        # this is a json posted data list (args, kwargs)
        body =  json.loads(urllib.parse.unquote(self.rfile.read( int(self.headers['Content-Length'])) ))

        #self.handleClient(s.server, s, s.path, body=body)
        self.handleClient(body=body)
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def handleHello(self, objname): 
        '''
        Hello messages are used to get the initial cache of
        method names for the newly connected object.
        '''
        if verbose: print("GOT A HELLO")
        obj = self.server.getSharedObject(objname)
        ret = {}
        for name in dir(obj):
            if type(getattr(obj,name)) in (types.MethodType, types.BuiltinMethodType):
                ret[name] = True
        self.send_response(http.client.OK)
        self.end_headers()
        self.wfile.write(json.dumps(ret))
        return
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def handleSetAttr(self, objname, name, value): 
        if not self.server.attr:
            self.send_response(http.client.FORBIDDEN)
            self.end_headers()
            excinfo = "__setattr__ disabled"
            self.wfile.write(json.dumps(excinfo))
            return
        obj = self.server.getSharedObject(objname)
        if verbose: print("SETTING ATTRIBUTE:", urlparams['args'])
        setattr(obj, name, value) 
        self.send_response(http.client.OK)
        self.end_headers()
项目:vivisect-py3    作者:bat-serjo    | 项目源码 | 文件源码
def setAuthModule(self, authmod):
        '''
        Enable an authentication module for this server
        ( all connections *must* be authenticated through the authmod )

        NOTE: See cobra.auth.* for various auth module implementations

        Example:
            import cobra.auth.shadow as c_a_shadow
            authmod = c_a_shadow.ShadowFileAuth('passwdfile.txt')
            cdaemon = CobraDaemon()
            cdaemon.setAuthModule()
        '''
        self.authmod = authmod
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def from_import(module_name, *symbol_names, **kwargs):
    """
    Example use:
        >>> HTTPConnection = from_import('http.client', 'HTTPConnection')
        >>> HTTPServer = from_import('http.server', 'HTTPServer')
        >>> urlopen, urlparse = from_import('urllib.request', 'urlopen', 'urlparse')

    Equivalent to this on Py3:

        >>> from module_name import symbol_names[0], symbol_names[1], ...

    and this on Py2:

        >>> from future.moves.module_name import symbol_names[0], ...

    or:

        >>> from future.backports.module_name import symbol_names[0], ...

    except that it also handles dotted module names such as ``http.client``.
    """

    if PY3:
        return __import__(module_name)
    else:
        if 'backport' in kwargs and bool(kwargs['backport']):
            prefix = 'future.backports'
        else:
            prefix = 'future.moves'
        parts = prefix.split('.') + module_name.split('.')
        module = importlib.import_module(prefix + '.' + module_name)
        output = [getattr(module, name) for name in symbol_names]
        if len(output) == 1:
            return output[0]
        else:
            return output