Python bottle.response 模块,set_header() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用bottle.response.set_header()

项目:ATLeS    作者:liffiton    | 项目源码 | 文件源码
def download():
    trackrels = request.query.tracks.split('|')

    # write the archive into a temporary in-memory file-like object
    temp = tempfile.SpooledTemporaryFile()
    with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive:
        for trackrel in trackrels:
            base_wildcard = trackrel.replace("-track.csv", "*")
            paths = config.TRACKDIR.glob(base_wildcard)
            for path in paths:
                archive.write(str(path),
                              str(path.relative_to(config.TRACKDIR))
                              )

    temp.seek(0)

    # force a download; give it a filename and mime type
    response.set_header('Content-Disposition', 'attachment; filename="data.zip"')
    response.set_header('Content-Type', 'application/zip')

    # relying on garbage collector to delete tempfile object
    # (and hence the file itself) when done sending
    return temp
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def get_files(task_id):
    if not task_id.isdigit():
        return HTTPError(code=404, output="The specified ID is invalid")

    files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files")
    zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip")

    with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive:
        root_len = len(os.path.abspath(files_path))
        for root, dirs, files in os.walk(files_path):
            archive_root = os.path.abspath(root)[root_len:]
            for f in files:
                fullpath = os.path.join(root, f)
                archive_name = os.path.join(archive_root, f)
                archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED)

    if not os.path.exists(files_path):
        return HTTPError(code=404, output="Files not found")

    response.content_type = "application/zip"
    response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id))
    return open(zip_file, "rb").read()
项目:BREIZHCTF2K16    作者:Ganapati    | 项目源码 | 文件源码
def page(data=None):
    IV = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8))
    json_data = json.dumps({'username': 'Guest', 'flag': FLAG})
    data = cipher(json_data, KEY, IV)
    if request.get_cookie("secret_data"):
        secret_data = request.get_cookie("secret_data")
        try:
            try:
                if "libwww-perl" in request.headers.get('User-Agent'): # Anti Padbuster simple
                    response.set_header('Set-Cookie', 'secret_data=%s' % data)
                    return "Attack detected."
                plain = uncipher(secret_data, KEY)
                data = json.loads(plain)
                print data
                return "Hello %s." % data['username']
            except PaddingError:
                response.set_header('Set-Cookie', 'secret_data=%s' % data)
                return "Padding error."
        except:
            response.set_header('Set-Cookie', 'secret_data=%s' % data)
            return "Secret value error."
    else:
        response.set_header('Set-Cookie', 'secret_data=%s' % data)
        return '<a href="/">Enter website</a>'
项目:dac    作者:jlonij    | 项目源码 | 文件源码
def info():
    '''
    Return service information.
    '''
    try:
        test_report_stamp = time.ctime(os.path.getctime(DOCTEST_OUTPUT))
    except:
        test_report_stamp = ''

    try:
        with open('/tmp/dac_test_results') as fh:
            test_report = fh.read()
    except:
        test_report = ''

    resp = {'sums': mk_md5sums(),
            'test_errors': test_errors(),
            'test_report': test_report,
            'test_report_stamp': test_report_stamp,
            'stats': stats}

    response.set_header('Content-Type', 'application/json')
    return resp
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def fetch_static(filename):
    response.set_header('Cache-Control', 'max-age=600')
    return static_file(filename, root='static')
项目:first_timer_scraper    作者:niccokunzmann    | 项目源码 | 文件源码
def badge(number):
    response.set_header('Cache-Control', 'public, max-age={}'.format(SECONDS_TO_RESCRAPE))
    response.set_header('Access-Control-Allow-Origin','*')
    response.content_type = "image/svg+xml;charset=utf-8"
    if number < 10:
        return template("First Timers-1-blue.svg", number=number)
    if number < 100:
        return template("First Timers-10-blue.svg", number=number)
    return template("First Timers-100-blue.svg", number=number)
项目:brother_ql_web    作者:pklaus    | 项目源码 | 文件源码
def get_preview_image():
    context = get_label_context(request)
    im = create_label_im(**context)
    return_format = request.query.get('return_format', 'png')
    if return_format == 'base64':
        import base64
        response.set_header('Content-type', 'text/plain')
        return base64.b64encode(image_to_png_bytes(im))
    else:
        response.set_header('Content-type', 'image/png')
        return image_to_png_bytes(im)
项目:dac-training    作者:jlonij    | 项目源码 | 文件源码
def predict():
    '''
    Get the current DAC prediction.
    '''
    linker = dac.EntityLinker(debug=True, candidates=True)
    result = linker.link(request.query.url, request.query.ne.encode('utf-8'))
    result = result['linkedNEs'][0]
    response.set_header('Content-Type', 'application/json')
    return result
项目:ATLeS    作者:liffiton    | 项目源码 | 文件源码
def get_image(tgtbox, boxes_rpc, width=648):
    box = _get_box_rpc(tgtbox, boxes_rpc)

    if box.lock_exists():
        # must yield then return in a generator function
        yield template('error', errormsg="It looks like an experiment is currently running on this box.  Image capture only works while the box is idle.")
        return

    try:
        try:
            box.start_img_stream(width)
        except OSError as e:
            # must yield then return in a generator function
            yield template('error', errormsg="Failed to start image stream capture.", exception=e)
            return

        # all set; carry on
        response.set_header('Content-type', 'multipart/x-mixed-replace; boundary=atlesframe')
        while True:
            if box.lock_exists():
                return
            time.sleep(0.5)
            imgdata = box.get_image()
            if imgdata is None:
                return
            if len(imgdata) < 16:
                continue  # ignore dud frames
            yield b"\n".join([b"--atlesframe", b"Content-Type: image/jpeg", b"", imgdata, b""])
    finally:
        box.stop_img_stream()
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def download_report(task_id):
    if not task_id.isdigit():
        return HTTPError(code=404, output="The specified ID is invalid")

    report_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "reports", "report.html")

    if not os.path.exists(report_path):
        return HTTPError(code=404, output="Report not found")

    response.content_type = "text/html"
    response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_{0}.html".format(task_id))

    return open(report_path, "rb").read()
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def get_pcap(task_id):
    if not task_id.isdigit():
        return HTTPError(code=404, output="The specified ID is invalid")

    pcap_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcap")

    if not os.path.exists(pcap_path):
        return HTTPError(code=404, output="PCAP not found")

    response.content_type = "application/vnd.tcpdump.pcap"
    response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_{0}.pcap".format(task_id))

    return open(pcap_path, "rb").read()
项目:auto-tagging-image-search    作者:OsamaJBR    | 项目源码 | 文件源码
def get_images():
    image_path = request.query.get('path',None)
    if os.path.exists(image_path):
        data = open(image_path,'rb').read()
        response.set_header('Content-type', 'image/jpeg')
        return data
    else:
        HTTPResponse(status=404,body=json.dumps({'error' : 'image not found'}))

#------------------
# MAIN
#------------------
项目:validator    作者:spacedirectory    | 项目源码 | 文件源码
def error400(error):
    _add_cors_headers(response)
    response.set_header('Content-Type', 'application/json')
    return json.dumps({'detail': error.body})
项目:validator    作者:spacedirectory    | 项目源码 | 文件源码
def error404(error):
    _add_cors_headers(response)
    response.set_header('Content-Type', 'application/json')
    return json.dumps({'detail': 'Page not found'})
项目:rill    作者:PermaData    | 项目源码 | 文件源码
def create_routes(host, port):
    """
    Define registry routes
    """
    @route("/runtimes/", method=['OPTIONS', 'GET'])
    def get_registry():
        """
        Get data about rill runtime
        """
        from rill.runtime import Runtime

        response.set_header('Access-Control-Allow-Origin', '*')
        response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
        response.set_header('Allow', 'GET, OPTIONS')
        response.set_header(
            'Access-Control-Allow-Headers',
            'Content-Type, Authorization'
        )

        if request.method == 'OPTIONS':
            return 'GET,OPTIONS'

        response.content_type = 'application/json'

        runtime = Runtime()

        runtime_meta = runtime.get_runtime_meta()
        runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port)
        runtime_meta['protocol'] = 'websocket'
        runtime_meta['id'] = 'rill_' + urlparse(address).netloc
        runtime_meta['seen'] = str(datetime.now())
        return json.dumps([runtime_meta])
项目:dac    作者:jlonij    | 项目源码 | 文件源码
def index():
    '''
    Return the entity linker result.
    '''
    url = request.params.get('url')
    ne = request.params.get('ne')
    model = request.params.get('model')
    debug = request.params.get('debug')
    features = request.params.get('features')
    candidates = request.params.get('candidates')
    callback = request.params.get('callback')

    if not url:
        abort(400, 'No fitting argument ("url=...") given.')

    try:
        linker = dac.EntityLinker(model=model, debug=debug,
            features=features, candidates=candidates)
        result = linker.link(url, ne)
    except Exception as e:
        result = []
        result['status'] = 'error'
        result['message'] = str(e)
        result['hostname'] = hostname

    if result['status'] == 'ok':
        result['linkedNEs'] = array_to_utf(result['linkedNEs'])
        result['hostname'] = hostname
        result = json.dumps(result, sort_keys=True)

    if callback:
        result = unicode(callback) + u'(' + result + u');'

    response.set_header('Content-Type', 'application/json')
    return result
项目:chrome-crawler    作者:lqf96    | 项目源码 | 文件源码
def _recv_msg(self):
        """ Route for sending message to backend. """
        # Get message data
        msg_data = json.loads(unquote(b64decode(req.params.data)))
        # Trigger correspond event
        callbacks = self._callback.get(msg_data["event"])
        if callbacks:
            for callback in callbacks:
                callback(msg_data["data"])
        # Message received
        res.set_header(b"content-type", b"application/json")
        return json.dumps({"status": "success"})
    # Client side message subscription
    # (Used to push message to client)
项目:chrome-crawler    作者:lqf96    | 项目源码 | 文件源码
def _msg_polling(self):
        """ Front-end message polling route. """
        # Set content type
        res.set_header(b"content-type", b"text/event-stream")
        # Send response header
        yield "retry: 2000\n\n"
        # Polling loop
        while True:
            # Check message sending queue
            while self._msg_queue:
                current_msg = self._msg_queue.popleft()
                yield "data: "+b64encode(quote(json.dumps(current_msg)))+"\n\n"
            sleep(0)
    # Server side data
项目:chrome-crawler    作者:lqf96    | 项目源码 | 文件源码
def _bknd_data(self):
        """ Return backend data (including configuration). """
        res.set_header(b"content-type", b"application/json")
        return json.dumps(self.srv_data)
    # [ Static Routes ]
    # Index redirection
项目:chrome-crawler    作者:lqf96    | 项目源码 | 文件源码
def _crawler_static(path):
        """ Serve Chrome crawler static files. """
        # Fetch file content
        try:
            file_content = resource_string(_pkg_name, "chrome_crawler_frontend/"+path)
        except IOError:
            abort(404, "Not Found")
        # Guess MIME type from file name
        mime_type = guess_type(path)[0]
        if mime_type:
            res.set_header(b"content-type", mime_type)
        # Return file content
        return file_content
    # Custom static files
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def create_app(engine):
    app = Bottle()

    @app.error()
    @app.error(404)
    def handle_error(error):
        if issubclass(type(error.exception), ApiException):
            response.status = error.exception.code
        else:
            response.status = error.status_code
        response.set_header('Content-type', 'application/json')
        resp = {
            'type': type(error.exception).__name__,
            'message': repr(error.exception) if error.exception else '',
            'traceback': error.traceback,
            'status_code': response.status
        }
        log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\
                    % (resp['type'], resp['message'], resp['status_code'], resp['traceback']))
        return '%s %s' % (resp['status_code'], resp['message'])

    @app.route('/ping', method=['GET'])
    def ping():
        return {'name': 'xFlow', 'version': '0.1' }

    @app.route('/publish', method=['POST'])
    def publish():
        data = json.loads(request.body.read())
        try:
            publish_schema.validate(data)
        except jsonschema.ValidationError as err:
            raise BadRequest(err)

        stream = data['stream']
        event = json.dumps(data['event'])
        try:
            engine.publish(stream, event)
        except core.KinesisStreamDoesNotExist as ex:
            raise NotFoundException(str(ex))
        return {}

    @app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET'])
    def track(workflow_id, execution_id):
        try:
            tracking_info = engine.track(workflow_id, execution_id)
            return tracking_info
        except (core.CloudWatchStreamDoesNotExist,
                core.WorkflowDoesNotExist,
                core.CloudWatchLogDoesNotExist) as ex:
            raise NotFoundException(str(ex))
        raise Exception("Something went wrong!")

    return app