Python bottle.response 模块,status() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用bottle.response.status()

项目:sysdweb    作者:ogarcia    | 项目源码 | 文件源码
def get_main():
    services = []
    for service in config.sections():
        service_status = get_service_action(service, 'status')
        if service_status['status'] == 'not-found':
            cls = 'active'
        elif service_status['status'] == 'inactive' or service_status['status'] == 'failed':
            cls = 'danger'
        elif service_status['status'] == 'active':
            cls = 'success'
        else:
            cls = 'warning'
        disabled_start = True if cls == 'active' or cls == 'success' else False
        disabled_stop = True if cls == 'active' or cls == 'danger' else False
        disabled_restart = True if cls == 'active' or cls == 'danger' else False
        services.append({'class': cls,
            'disabled_start': disabled_start,
            'disabled_stop': disabled_stop,
            'disabled_restart': disabled_restart,
            'title': config.get(service, 'title'),
            'service': service})
    return template('index', hostname=gethostname(), services=services)
项目:base1k    作者:xiaq    | 项目源码 | 文件源码
def http(host, port):
    from bottle import route, response, run

    @route('/')
    def root():
        return http_root_str

    @route('/<s:path>')
    def do_conversion(s):
        try:
            i = parse_input(s)
        except ValueError as e:
            response.status = 400
            return str(e)
        return convert(i)

    run(host=host, port=port)
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def dtos_get_films_with_actors():
    try:
        queryObject = QueryObject(
            filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
            expand = ['FilmActors.Actor', 'FilmCategories']
        )
        resultSerialData = dataService.dataViewDto.getItems("Film", queryObject)

        return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)

    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: POST: api/datasource/crud/operations/dtos/TestAction
# with: Content-Type: application/json and body - {"param1":1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def post_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
#    try:
#        result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
#        response.content_type = "application/json; charset=utf-8"
#        return json.dumps(result, cls=CustomEncoder)
#    except dalUtils.StatusCodeError as err:
#        response.status = err.value
#    except:
#        abort(400, 'Bad Request')


# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def entities_get_films_with_actors():
    try:
        queryObject = QueryObject(
            filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
            expand = ['FilmActors.Actor', 'FilmCategories']
        )
        resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject)

        return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)

    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: api/datasource/crud/operations/entities/TestAction
# with: Content-Type: application/json and body - {"param1":1}
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _assert_admin(self, cert):
        if cert is not None:
            if isinstance(cert, bytes):
                cert = load_certificate(cert_bytes=cert)

            if isinstance(cert, x509.Certificate):
                host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
            else:
                raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')

        else:
            logger.warning('Admin command received by unauthentified host.')
            raise HTTPResponse(
                status=401,
                body={'message': 'Authentication required'}
            )

        if host not in self.admin_hosts:
            logger.warning('Host %s unauthorized for admin commands.', host)
            raise HTTPResponse(
                status=403,
                body={'message': 'Host {} unauthorized for admin commands'.format(host)}
            )
项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def _error(error, code):
    """Return an error as json"""
    _error = {
        "status": str(code),
        "title": errors[code],
        "detail": error.body
    }
    traceback.print_exception(type(error), error, error.traceback)
    response.headers["Content-Type"] = "application/vnd.api+json"
    return response_object(errors=[_error])
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_metadata():
    if request.query.file_hash == '':
        response.status = 400
        return jsonize({'message': 'file_hash parameter is missing'})
    file_hash = clean_hash(request.query.file_hash)
    if not valid_hash(file_hash):
        response.status = 400
        return jsonize({'message': 'Invalid hash format (use MD5, SHA1 or SHA2)'})
    file_hash = get_file_id(file_hash)
    if file_hash is None:
        response.status = 404
        return jsonize({'message': 'Metadata not found in the database'})

    mdc = MetaController()
    res = mdc.read(file_hash)
    if res is None:
        log_event("metadata", file_hash)
    return dumps(change_date_to_str(res))
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def last_uploaded():
    number = request.query.get("n")
    if number is None:
        response.status = 400
        return jsonize({"error": 1, "error_message": "Parameter n is missing"})
    if number.isdigit() is False:
        response.status = 400
        return jsonize({"error": 2, "error_message": "Parameter n must be a number"})
    if int(number) == 0:
        return jsonize({"error": 3, "error_message": "Parameter n must be greater than zero."})

    pc = PackageController()
    lasts = pc.last_updated(int(number))
    for i in range(0, len(lasts)):  # Convert datetime objects
        lasts[i] = change_date_to_str(lasts[i])
    return jsonize(lasts)
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def auth_server():
    username = request.forms.get('username')
    if username == 'user_ok':
        return {'access_token': 'my-nifty-access-token'}
    elif username == 'auth_fail':
        response.status = 400
        return {'error': 'errored with HTTP 400 on request'}
    elif username == 'create_fail':
        return {'access_token': 'the-token-with-which-create-will-fail'}
    elif username == 'delete_fail':
        return {'access_token': 'the-token-with-which-delete-will-fail'}
    elif username == 'empty_page':
        return {'access_token': 'the-token-which-causes-an-empty-page'}
    else:
        return {}
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def create():
    auth_token = request.headers.get('Authorization').split()[1]
    if auth_token == 'my-nifty-access-token':
        return {'response': 'Successful creation of user.'}
    elif auth_token == 'the-token-with-which-create-will-fail':
        response.status = 403
        return {'error': 'Permission denied'}
    elif auth_token == 'the-token-which-causes-an-empty-page':
        response.status = 403
        return "empty"
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def delete():
    auth_token = request.headers.get('Authorization').split()[1]
    if auth_token == 'my-nifty-access-token':
        return {'response': 'Successful deletion of user.'}
    elif auth_token == 'the-token-with-which-delete-will-fail':
        response.status = 403
        return {'error': 'Permission denied'}
项目:sysdweb    作者:ogarcia    | 项目源码 | 文件源码
def get_service_action(service, action):
    if service in config.sections():
        sdbus = systemdBus(True) if config.get('DEFAULT', 'scope', fallback='system') == 'user' else systemdBus()
        unit = config.get(service, 'unit')
        if action == 'start':
            return {action: 'OK'} if sdbus.start_unit(unit) else {action: 'Fail'}
        elif action == 'stop':
            return {action: 'OK'} if sdbus.stop_unit(unit) else {action: 'Fail'}
        elif action == 'restart':
            return {action: 'OK'} if sdbus.restart_unit(unit) else {action: 'Fail'}
        elif action == 'reload':
            return {action: 'OK'} if sdbus.reload_unit(unit) else {action: 'Fail'}
        elif action == 'reloadorrestart':
            return {action: 'OK'} if sdbus.reload_or_restart_unit(unit) else {action: 'Fail'}
        elif action == 'status':
            if sdbus.get_unit_load_state(unit) != 'not-found':
                return {action: str(sdbus.get_unit_active_state(unit))}
            else:
                return {action: 'not-found'}
        elif action == 'journal':
            return get_service_journal(service, 100)
        else:
            response.status = 400
            return {'msg': 'Sorry, but cannot perform \'{}\' action.'.format(action)}
    else:
        response.status = 400
        return {'msg': 'Sorry, but \'{}\' is not defined in config.'.format(service)}
项目:sysdweb    作者:ogarcia    | 项目源码 | 文件源码
def get_service_journal(service, lines):
    if service in config.sections():
        if get_service_action(service, 'status')['status'] == 'not-found':
            return {'journal': 'not-found'}
        try:
            lines = int(lines)
        except Exception as e:
            response.status = 500
            return {'msg': '{}'.format(e)}
        unit = config.get(service, 'unit')
        journal = Journal(unit)
        return {'journal': journal.get_tail(lines)}
    else:
        response.status = 400
        return {'msg': 'Sorry, but \'{}\' is not defined in config.'.format(service)}
项目:sysdweb    作者:ogarcia    | 项目源码 | 文件源码
def get_service_journal_page(service):
    if service in config.sections():
        if get_service_action(service, 'status')['status'] == 'not-found':
            abort(400,'Sorry, but service \'{}\' unit not found in system.'.format(config.get(service, 'title')))
        journal_lines = get_service_journal(service, 100)
        return template('journal', hostname=gethostname(), service=config.get(service, 'title'), journal=journal_lines['journal'])
    else:
        abort(400, 'Sorry, but \'{}\' is not defined in config.'.format(service))

# Serve static content
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def dtos_test_action():
    try:
        param1 = request.json['param1']
        # TODO: Add some actions in here

    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def put_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1}
#@patch('/api/datasource/crud/<entitySetName>')
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def patch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: api/datasource/crud/:entitySetName
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def post_entityset(entitySetName):
    # test1 = json.loads(request.body.read())
    try:
        result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def put_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')

# PATCH: api/datasource/crud/batch/:entitySetName
#@patch('/api/datasource/crud/batch/<entitySetName>')
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def patch_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: api/datasource/crud/batch/:entitySetName
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def delete_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _get_cert_config_if_allowed(self, domain, cert):
        if cert is not None:
            if isinstance(cert, bytes):
                cert = load_certificate(cert_bytes=cert)

            if isinstance(cert, x509.Certificate):
                host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
            else:
                raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.')

        else:
            logger.warning('Request received for domain %s by unauthentified host.', domain)
            raise HTTPResponse(
                status=401,
                body={'message': 'Authentication required'}
            )

        certconfig = self.certificates_config.match(domain)

        if certconfig:
            logger.debug('Domain %s matches pattern %s', domain, certconfig.pattern)
            if host in self.admin_hosts or host in certconfig.allowed_hosts:
                return certconfig
            else:
                logger.warning('Host %s unauthorized for domain %s.', host, domain)
                raise HTTPResponse(
                    status=403,
                    body={'message': 'Host {} unauthorized for domain {}'.format(host, domain)}
                )
        else:
            logger.warning('No config matching domain %s found.', domain)
            raise HTTPResponse(
                status=404,
                body={'message': 'No configuration found for domain {}'.format(domain)}
            )
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_auth(self):
        request_data = request.json

        csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend())  # pylint: disable=unsubscriptable-object

        if not csr.is_signature_valid:
            raise HTTPResponse(
                status=400,
                body={'message': 'The certificate signing request signature is invalid.'}
            )

        host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
        csr_file = os.path.join(self.csr_path, "%s.csr" % (host))
        crt_file = os.path.join(self.crt_path, "%s.crt" % (host))

        if os.path.isfile(crt_file):
            crt = load_certificate(crt_file)

            if crt.public_key().public_numbers() == csr.public_key().public_numbers():
                return {
                    'status': 'authorized',
                    'crt': dump_pem(crt).decode()
                }
            else:
                raise HTTPResponse(
                    status=409,
                    body={'message': 'Mismatch between the certificate signing request and the certificate.'}
                )

        else:
            # Save CSR
            with open(csr_file, 'w') as f:
                f.write(csr.public_bytes(serialization.Encoding.PEM).decode())
            response.status = 202
            return {
                'status': 'pending'
            }
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_revoke_cert(self, domain):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        self.acmeproxy.revoke_certificate(domain)

        return {
            'status': 'revoked'
        }
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_delete_cert(self, domain):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        self.acmeproxy.delete_certificate(domain)

        return {
            'status': 'deleted'
        }
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_health_check(self):
        response.status = 200
        return {
            'status': 'alive'
        }
项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def delete_resources():
    """Delete all resources."""
    resources = get_resources()
    resources.clear()
    response.status = 204
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def jsonp(data, callback):
    reply = {"status": "OK", "data": data}
    return callback + "([" + jsonize(reply) + "]);"
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def api_process_file():
    file_hash = clean_hash(request.query.file_hash)
    if len(file_hash) != 40:
        response.status = 400
        return jsonize({'message': 'Invalid hash format (use sha1)'})

    res = process_file(file_hash, True)
    if res is None:
        response.status = 404
        return jsonize("File not found in the database")

    return jsonize("File processed")
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_file():
    tmp_folder = "/tmp/mass_download"
    subprocess.call(["mkdir", "-p", tmp_folder])
    file_hash = clean_hash(request.query.file_hash)
    key = ''
    if len(file_hash) == 40:
        key = 'sha1'
    else:
        response.status = 400
        return jsonize({'message': 'Invalid hash format (use sha1)'})

    pc = PackageController()
    res = pc.searchFile(file_hash)

    if res is None:
        response.status = 404
        return jsonize({'message': 'File not found in the database'})
    if res == 1:
        response.status = 400
        return jsonize({'message': 'File not available for downloading'})
    res = pc.getFile(file_hash)
    zip_name = os.path.join(tmp_folder, str(file_hash) + '.zip')
    file_name = os.path.join(tmp_folder, str(file_hash) + '.codex')
    fd = open(file_name, "wb")
    fd.write(res)
    fd.close()
    subprocess.call(["zip", "-ju", "-P", "codex", zip_name, file_name])
    return static_file(str(file_hash) + ".zip", root=tmp_folder, download=True)
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def add_file():
    # tags = request.forms.get('name')
    upload = request.files.get('file')
    form_date = request.forms.get('file_date')
    try:  # validate
        process_date(form_date)
    except ValueError:
        # response.status = 422 #status can't be added because angular will not
        # show the message.
        return jsonize({'message': 'Invalid date format'})
    logging.debug("add_file(). date=" + str(form_date))
    if form_date is None:
        form_date = datetime.datetime.now()
    name = upload.filename
    data_bin = upload.file.read()
    file_id = hashlib.sha1(data_bin).hexdigest()
    logging.debug("add_file(): file_id=" + str(file_id))
    status = upload_file(data_bin)
    process_file(file_id)  # ToDo: add a redis job
    update_date(file_id, form_date)
    if(status == "ok"):
        return jsonize({'message': 'Added with ' + str(file_id)})
    elif(status == "already exists"):
        return jsonize({'message': 'Already exists ' + str(file_id)})
    elif(status == "virustotal"):
        return jsonize({'message': 'Already exists ' + str(file_id)})
    else:
        return jsonize({'message': 'Error'})
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_result_from_av():
    hash_id = request.query.file_hash
    if len(hash_id) == 0:
        response.status = 400
        return jsonize({'error': 4, 'error_message': 'file_hash parameter is missing.'})
    hash_id = clean_hash(hash_id)
    if not valid_hash(hash_id):
        return jsonize({'error': 5, 'error_message': 'Invalid hash format.'})
    if(len(hash_id) != 40):
        data = "1=" + str(hash_id)
        res = SearchModule.search_by_id(data, 1, [], True)
        if(len(res) == 0):
            response.status = 400
            return jsonize({'error': 6, 'error_message': 'File not found'})
        else:
            sha1 = res[0]["sha1"]
    else:
        sha1 = hash_id
    key_manager = KeyManager()

    if(key_manager.check_keys_in_secrets()):
        av_result = get_av_result(sha1, 'high')
    else:
        return jsonize({'error': 7, "error_message": "Error: VirusTotal API key missing from secrets.py file"})
    if(av_result.get('status') == "added"):
        return jsonize({"message": "AV scans downloaded."})
    elif(av_result.get('status') == "already_had_it"):
        return jsonize({"message": "File already have AV scans."})
    elif(av_result.get('status') == "not_found"):
        return jsonize({"error": 10, "error_message": "Not found on VT."})
    elif(av_result.get('status') == "no_key_available"):
        return jsonize({"error": 11, "error_message": "No key available right now. Please try again later."})
    else:
        logging.error("av_result for hash=" + str(sha1))
        logging.error("av_result=" + str(av_result))
        return jsonize({"error": 9, "error_message": "Cannot get analysis."})
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_index(token=None):
    """
    provides an index of buttons

    This function is called from far far away, over the Internet
    """

    logging.info('Serving index page')

    try:
        if 'key' not in settings['server']:
            pass

        elif decode_token(settings, token) != 'index':
            raise ValueError('Invalid label in token')

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to serve the index page")
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'

    items = []
    global buttons
    for button in buttons:
        items.append({
            'label': button,
            'delete-url': '/delete/'+settings['tokens'].get(button+'-delete'),
            'initialise-url': '/initialise/'+settings['tokens'].get(button+'-initialise'),
            'push-url': '/'+settings['tokens'].get(button),
            })
    logging.debug('Buttons: {}'.format(items))

    return template('views/list_items', prefix=settings['server']['url'], items=items)

#
# invoked from bt.tn
#
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_press(button=None):
    """
    Processes the press of a bt.tn device

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    try:

        button = decode_token(settings, button)

        context = load_button(settings, button)

        return handle_button(context)

    except socket.error as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to push '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 500
            return 'Internal error'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to push '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_initialise(button=None):
    """
    Initialises a room

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Initialising button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='initialise')

        context = load_button(settings, button)
        delete_room(context)

        global buttons
        buttons.pop(button, None)

        context = load_button(settings, button)
        context['spark']['id'] = get_room(context)

        return 'OK'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to initialise '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_delete(button=None):
    """
    Deletes a room

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Deleting button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='delete')

        context = load_button(settings, button)
        delete_room(context)

        global buttons
        buttons.pop(button, None)

        return 'OK'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to delete '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
项目:im-a-teapot    作者:vpetersson    | 项目源码 | 文件源码
def raise_error(error_code, msg):
    response.status = error_code
    return msg
项目:XDocs    作者:gaojiuli    | 项目源码 | 文件源码
def destroy(resources, resource_id):
    """destroy the resource from app.data"""
    if resources not in app.resources or resource_id not in app.data[resources]:
        abort(404, "Not Found")
    del app.data[resources][resource_id]
    response.status = 204
    return ""
项目:ray    作者:felipevolpone    | 项目源码 | 文件源码
def dispatch(url):
    """
        This class is the beginning of all entrypoints in the Ray API. Here, each url
        will be redirect to the right handler
    """

    url = bottle_req.path
    log.info('request: %s', bottle_req.url)

    if url[-1] == '/':
        url = url[:-1]

    response_code = 200

    try:
        processed = process(url, bottle_req, bottle_resp)

        try:
            from_func, http_status = processed[0], processed[1]
            bottle_resp.status = http_status
            return from_func
        except:
            return processed

    except exceptions.RayException as e:
        log.exception('ray exception: ')
        response_code = e.http_code

    except:
        log.exception('exception:')
        raise

    bottle_resp.status = response_code
项目:c-bastion    作者:ImmobilienScout24    | 项目源码 | 文件源码
def status():
    return 'OK'
项目:c-bastion    作者:ImmobilienScout24    | 项目源码 | 文件源码
def create_user_with_key():
    """
    Create a user directory with a keyfile on the shared volume, data
    arriving in the payload of the request with a JSON payload.
    """
    username = username_from_request(request)
    if not username:
        response.status = 422
        return {'error': "Parameter 'username' not specified"}
    elif not username_valid(username):
        response.status = 400
        return {'error':
                "Invalid parameter 'username': '{0}' not allowed.".
                format(username)
                }

    pubkey = request.json.get('pubkey')
    if not pubkey:
        response.status = 422
        return {'error': "Parameter 'pubkey' not specified"}

    abs_home_path = normpath(os.path.join(HOME_PATH_PREFIX, username))

    username_was_added = check_and_add(username)

    # Do the actual creation
    store_pubkey(username, abs_home_path, pubkey)

    response.status = 201
    return {'response':
            'Successful creation of user {0} and/or upload of key.'
            .format(username)}
项目:jerrybuild    作者:fboender    | 项目源码 | 文件源码
def normalize(request, jobdef):
    if request.headers['X-Github-Event'] == 'ping':
        response.status = 200
        response.body = "pong"
        return False

    env = {}
    raw_body = request.body.read()
    body = json.load(request.body)
    secret = bytes(jobdef.custom_params['secret'])

    if request.headers['X-Github-Event'] != 'push':
        raise NotImplementedError("Only push and ping events are currently supported")

    # Verify hash signature
    hashtype, signature = request.headers['X-Hub-Signature'].split('=', 1)
    if hashtype != 'sha1':
        raise ValueError("Invalid hashtype received")

    res = hmac.new(secret, msg=raw_body, digestmod=hashlib.sha1)
    # Python <2.7 doesn't have a secure compare for hmac, so we just compare
    # manually. This leaves this code vulnerable to timing attacks,
    # unfortunately.
    if str(res.hexdigest()) != str(signature):
        raise ValueError("Invalid secret")

    env['provider'] = 'github'
    env["event"] = request.headers['X-Github-Event']
    env['repo_type'] = 'git'
    env['repo_url'] = body['repository']['clone_url']
    env['repo_url_ssh'] = body['repository']['ssh_url']
    env['repo_url_http'] = body['repository']['clone_url']
    env['ref'] = body['ref']
    env['commit'] = body['after']

    mail_to = []
    mail_to.append(body['repository']['owner']['email'])
    mail_to.append(body['pusher']['email'])
    env['mail_to'] = ', '.join(mail_to)

    return env
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def create_app(engine):
    app = Bottle()

    @app.error()
    @app.error(404)
    def handle_error(error):
        if issubclass(type(error.exception), ApiException):
            response.status = error.exception.code
        else:
            response.status = error.status_code
        response.set_header('Content-type', 'application/json')
        resp = {
            'type': type(error.exception).__name__,
            'message': repr(error.exception) if error.exception else '',
            'traceback': error.traceback,
            'status_code': response.status
        }
        log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\
                    % (resp['type'], resp['message'], resp['status_code'], resp['traceback']))
        return '%s %s' % (resp['status_code'], resp['message'])

    @app.route('/ping', method=['GET'])
    def ping():
        return {'name': 'xFlow', 'version': '0.1' }

    @app.route('/publish', method=['POST'])
    def publish():
        data = json.loads(request.body.read())
        try:
            publish_schema.validate(data)
        except jsonschema.ValidationError as err:
            raise BadRequest(err)

        stream = data['stream']
        event = json.dumps(data['event'])
        try:
            engine.publish(stream, event)
        except core.KinesisStreamDoesNotExist as ex:
            raise NotFoundException(str(ex))
        return {}

    @app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET'])
    def track(workflow_id, execution_id):
        try:
            tracking_info = engine.track(workflow_id, execution_id)
            return tracking_info
        except (core.CloudWatchStreamDoesNotExist,
                core.WorkflowDoesNotExist,
                core.CloudWatchLogDoesNotExist) as ex:
            raise NotFoundException(str(ex))
        raise Exception("Something went wrong!")

    return app
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_inbound_call(button=None):
    """
    Handles an inbound phone call

    This function is called from twilio cloud back-end
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Receiving inbound call for button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='call')

        context = load_button(settings, button)
        update, twilio_action = get_push_details(context)

        response.content_type = 'text/xml'
        behaviour = twilio.twiml.Response()

        say = None
        if 'call' in twilio_action:
            for line in twilio_action['call']:
                if line.keys()[0] == 'say':
                    say = line['say']
                    break

        if say is None:
            say = "What's up Doc?"

        behaviour.say(say)
        return str(behaviour)

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to handle inbound call for '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'

#
# the collection of buttons that we manage
#