Python bottle 模块,HTTPError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.HTTPError()

项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def login_required(perm=None):
    def _dec(func):
        def _view(*args, **kwargs):
            s = request.environ.get('beaker.session')
            if s.get("name", None) and s.get("authenticated", False):
                if perm:
                    perms = parse_permissions(s)
                    if perm not in perms or not perms[perm]:
                        if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
                            return HTTPError(403, "Forbidden")
                        else:
                            return redirect("/nopermission")

                return func(*args, **kwargs)
            else:
                if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
                    return HTTPError(403, "Forbidden")
                else:
                    return redirect("/login")

        return _view

    return _dec
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def enforce_api_rate_limit(action, limit_in_window):
    """Enforces API rate limit.

    Args:
        action: Action name.
        limit_in_window: Maximum number of requests of this action in a window.

    Raises:
        bottle.HTTPError: If the rate limit is exceeded.
    """
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if get_current_user()['organizer']:
        return
    username = get_current_username()
    if (model.record_last_api_access_time(username) <
            FLAGS.api_rate_limit_request_interval):
        bottle.abort(429, 'Rate limit exceeded (per-second limit).')
    count = model.increment_api_rate_limit_counter(username, action)
    if count > limit_in_window:
        bottle.abort(429, 'Rate limit exceeded (per-hour limit).')


# http://flask.pocoo.org/snippets/44/
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def addcrypted():

    package = request.forms.get('referer', 'ClickAndLoad Package')
    dlc = request.forms['crypted'].replace(" ", "+")

    dlc_path = join(DL_ROOT, package.replace("/", "").replace("\\", "").replace(":", "") + ".dlc")
    dlc_file = open(dlc_path, "wb")
    dlc_file.write(dlc)
    dlc_file.close()

    try:
        PYLOAD.addPackage(package, [dlc_path], 0)
    except:
        return HTTPError()
    else:
        return "success\r\n"
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def links():
    try:
        links = [toDict(x) for x in PYLOAD.statusDownloads()]
        ids = []
        for link in links:
            ids.append(link['fid'])

            if link['status'] == 12:
                link['info'] = "%s @ %s/s" % (link['format_eta'], formatSize(link['speed']))
            elif link['status'] == 5:
                link['percent'] = 0
                link['size'] = 0
                link['bleft'] = 0
                link['info'] = _("waiting %s") % link['format_wait']
            else:
                link['info'] = ""

        data = {'links': links, 'ids': ids}
        return data
    except Exception, e:
        print_exc()
        return HTTPError()
项目:bottlecap    作者:foxx    | 项目源码 | 文件源码
def test_dispatch_error(self, app, view_render_errors):
        """Dispatch raises an HTTPError"""

        @app.routecv
        class ExampleView(ExampleErrorView):
            render_errors = view_render_errors
            renderer_classes = [JSONRenderer]

        resp = app.webtest.get('/error',
            expect_errors=True)

        if view_render_errors:
            assert resp.status == '418 Teapot'
            assert resp.body == b'[1, 2, 3]'
            assert resp.headers['Content-Type'] == 'application/json'
        else:
            assert resp.status == '418 Teapot'
            assert b'DOCTYPE HTML PUBLIC' in resp.body
            assert resp.headers['Content-Type'] == 'text/html; charset=UTF-8'
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def tasks_view(task_id):
    response = {}

    task = db.view_task(task_id, details=True)
    if task:
        entry = task.to_dict()
        entry["guest"] = {}
        if task.guest:
            entry["guest"] = task.guest.to_dict()

        entry["errors"] = []
        for error in task.errors:
            entry["errors"].append(error.message)

        entry["sample"] = {}
        if task.sample_id:
            sample = db.view_sample(task.sample_id)
            entry["sample"] = sample.to_dict()

        response["task"] = entry
    else:
        return HTTPError(404, "Task not found")

    return jsonize(response)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def task_screenshots(task=0, screenshot=None):
    folder_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task), "shots")

    if os.path.exists(folder_path):
        if screenshot:
            screenshot_name = "{0}.jpg".format(screenshot)
            screenshot_path = os.path.join(folder_path, screenshot_name)
            if os.path.exists(screenshot_path):
                # TODO: Add content disposition.
                response.content_type = "image/jpeg"
                return open(screenshot_path, "rb").read()
            else:
                return HTTPError(404, screenshot_path)
        else:
            zip_data = StringIO()
            with ZipFile(zip_data, "w", ZIP_STORED) as zip_file:
                for shot_name in os.listdir(folder_path):
                    zip_file.write(os.path.join(folder_path, shot_name), shot_name)

            # TODO: Add content disposition.
            response.content_type = "application/zip"
            return zip_data.getvalue()
    else:
        return HTTPError(404, folder_path)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def get_files(task_id):
    if not task_id.isdigit():
        return HTTPError(code=404, output="The specified ID is invalid")

    files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files")
    zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip")

    with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive:
        root_len = len(os.path.abspath(files_path))
        for root, dirs, files in os.walk(files_path):
            archive_root = os.path.abspath(root)[root_len:]
            for f in files:
                fullpath = os.path.join(root, f)
                archive_name = os.path.join(archive_root, f)
                archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED)

    if not os.path.exists(files_path):
        return HTTPError(code=404, output="Files not found")

    response.content_type = "application/zip"
    response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id))
    return open(zip_file, "rb").read()
项目:bottle-py    作者:today-open    | 项目源码 | 文件源码
def audio_random(db):
    audio = random.choice(list(db.query(Page).filter_by(type=2).limit(10)))
    if audio:
        return json.dumps({
            'id': audio.id,
            'title': audio.title,
            'cover': audio.cover,
            'para': audio.para,
            'type': audio.type,
            'tag': audio.tag,
            'author': {
                'id': audio.author.id,
                'name': audio.author.name
            },
            'time': audio.time
        })
    return HTTPError(404, 'id not found.')
项目:bottle-py    作者:today-open    | 项目源码 | 文件源码
def native_latest_news(db):
    all = db.query(Page).filter_by(type=10).order_by(desc(Page.id)).limit(10)
    if all:
        return json.dumps([{
            'id': page.id,
            'key': page.id,
            'title': page.title,
            'cover': page.cover,
            'type': page.type,
            'tag': page.tag,
            'author': {
                'id': page.author.id,
                'name': page.author.name
            },
            'time': page.time
        } for page in all])
    return HTTPError(404, 'id not found.')
项目:bottle-py    作者:today-open    | 项目源码 | 文件源码
def latest_news(db):
    all = db.query(Page).filter_by(type=0).order_by(desc(Page.id)).limit(10)
    if all:
        return json.dumps([{
            'id': page.id,
            'key': page.id,
            'title': page.title,
            'cover': page.cover,
            'type': page.type,
            'tag': page.tag,
            'author': {
                'id': page.author.id,
                'name': page.author.name
            },
            'time': page.time
        } for page in all])
    return HTTPError(404, 'id not found.')
项目:bottle-py    作者:today-open    | 项目源码 | 文件源码
def anchor_news(anchor, db):
    all = db.query(Page).filter_by(type=0).order_by(desc(Page.id)).filter(Page.id < anchor).limit(10)
    if all:
        return json.dumps([{
            'id': page.id,
            'key': page.id,
            'title': page.title,
            'cover': page.cover,
            'type': page.type,
            'tag': page.tag,
            'author': {
                'id': page.author.id,
                'name': page.author.name
            },
            'time': page.time
        } for page in all])
    return HTTPError(404, 'id not found.')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def ensure_admin():
    """Ensures the client is an admin.

    Raises:
        HTTPError: If the client is not authenticated as an admin.
    """
    if not is_admin():
        response = bottle.HTTPError(401, 'Unauthorized')
        response.headers['WWW-Authenticate'] =  'Basic realm="admin only" domain="/"'
        raise response
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def json_api_handler(handler):
    """Bottle handler decorator for JSON REST API handlers.

    Args:
        handler: A Bottle handler function.

    Returns:
        A wrapped Bottle handler function.
    """
    @functools.wraps(handler)
    def wrapped_handler(*args, **kwargs):
        try:
            handler_result = handler(*args, **kwargs)
        except bottle.HTTPResponse as handler_result:
            pass
        except Exception:
            logging.exception('Uncaught exception')
            handler_result = bottle.HTTPError(500, 'Internal Server Error')
        if isinstance(handler_result, bottle.HTTPResponse):
            # For now, we do not support raising successful HTTPResponse.
            assert handler_result.status_code // 100 != 2
            # Forcibly convert HTTPError to HTTPResponse to avoid formatting.
            response = handler_result.copy(cls=bottle.HTTPResponse)
            response_data = {'ok': False, 'error': handler_result.body}
        else:
            assert isinstance(handler_result, dict)
            response = bottle.response.copy(cls=bottle.HTTPResponse)
            response_data = handler_result
            response_data['ok'] = True
        response.body = ujson.dumps(response_data, double_precision=6)
        response.content_type = 'application/json'
        return response
    return wrapped_handler
项目:canister    作者:dagnelies    | 项目源码 | 文件源码
def secret():
    if not session.user:
        err = bottle.HTTPError(401, "Login required")
        err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private')
        return err

    return 'Welcome %s! <a href="/">Go back</a> (note that there is no reliable way to logout using basic auth)' % session.user
项目:canister    作者:dagnelies    | 项目源码 | 文件源码
def secret():
    if not session.user:
        err = bottle.HTTPError(401, "Login required")
        err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private')
        return err

    return 'You are authenticated as ' + str(session.user)
项目:canister    作者:dagnelies    | 项目源码 | 文件源码
def index():
    if can.session.user:
        return "Hi " + str(can.session.user) + "!";
    else:
        err = bottle.HTTPError(401, "Login required")
        err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private')
        return err
项目:experiment-manager    作者:softfire-eu    | 项目源码 | 文件源码
def get_certificate():
    username = post_get('username')
    if not username:
        raise bottle.HTTPError(500, "Username missing")
    password = post_get('password', default=None)
    days = int(post_get('days', default=None))
    cert_gen = CertificateGenerator()
    cert_gen.generate(password, username, days)
    openvpn_config = cert_gen.get_openvpn_config()
    headers = {
        'Content-Type': 'text/plain;charset=UTF-8',
        'Content-Disposition': 'attachment; filename="softfire-vpn_%s.ovpn"' % username,
        "Content-Length": len(openvpn_config)
    }
    return bottle.HTTPResponse(openvpn_config, 200, **headers)
项目:experiment-manager    作者:softfire-eu    | 项目源码 | 文件源码
def server_static(filename):
    """ route to the css and static files"""
    if ".." in filename:
        return HTTPError(status=403)
    return bottle.static_file(filename, root='%s/static' % get_config('api', 'view-path', '/etc/softfire/views'))


#########
# Utils #
#########
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def call_api(func, args=""):
    response.headers.replace("Content-type", "application/json")
    response.headers.append("Cache-Control", "no-cache, must-revalidate")

    s = request.environ.get('beaker.session')
    if 'session' in request.POST:
        s = s.get_by_id(request.POST['session'])

    if not s or not s.get("authenticated", False):
        return HTTPError(403, json.dumps("Forbidden"))

    if not PYLOAD.isAuthorized(func, {"role": s["role"], "permission": s["perms"]}):
        return HTTPError(401, json.dumps("Unauthorized"))

    args = args.split("/")[1:]
    kwargs = {}

    for x, y in chain(request.GET.iteritems(), request.POST.iteritems()):
        if x == "session": continue
        kwargs[x] = unquote(y)

    try:
        return callApi(func, *args, **kwargs)
    except Exception, e:
        print_exc()
        return HTTPError(500, json.dumps({"error": e.message, "traceback": format_exc()}))
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def callApi(func, *args, **kwargs):
    if not hasattr(PYLOAD.EXTERNAL, func) or func.startswith("_"):
        print "Invalid API call", func
        return HTTPError(404, json.dumps("Not Found"))

    result = getattr(PYLOAD, func)(*[literal_eval(x) for x in args],
                                   **dict([(x, literal_eval(y)) for x, y in kwargs.iteritems()]))

    # null is invalid json  response
    if result is None: result = True

    return json.dumps(result, cls=TBaseEncoder)


#post -> username, password
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def local_check(function):
    def _view(*args, **kwargs):
        if request.environ.get('REMOTE_ADDR', "0") in ('127.0.0.1', 'localhost') \
        or request.environ.get('HTTP_HOST','0') == '127.0.0.1:9666':
            return function(*args, **kwargs)
        else:
            return HTTPError(403, "Forbidden")

    return _view
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def flashgot():
    if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot":
        return HTTPError()

    autostart = int(request.forms.get('autostart', 0))
    package = request.forms.get('package', None)
    urls = filter(lambda x: x != "", request.forms['urls'].split("\n"))
    folder = request.forms.get('dir', None)

    if package:
        PYLOAD.addPackage(package, urls, autostart)
    else:
        PYLOAD.generateAndAddPackages(urls, autostart)

    return ""
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def packages():
    print "/json/packages"
    try:
        data = PYLOAD.getQueue()

        for package in data:
            package['links'] = []
            for file in PYLOAD.get_package_files(package['id']):
                package['links'].append(PYLOAD.get_file_info(file))

        return data

    except:
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def package(id):
    try:
        data = toDict(PYLOAD.getPackageData(id))
        data["links"] = [toDict(x) for x in data["links"]]

        for pyfile in data["links"]:
            if pyfile["status"] == 0:
                pyfile["icon"] = "status_finished.png"
            elif pyfile["status"] in (2, 3):
                pyfile["icon"] = "status_queue.png"
            elif pyfile["status"] in (9, 1):
                pyfile["icon"] = "status_offline.png"
            elif pyfile["status"] == 5:
                pyfile["icon"] = "status_waiting.png"
            elif pyfile["status"] == 8:
                pyfile["icon"] = "status_failed.png"
            elif pyfile["status"] == 4:
                pyfile["icon"] = "arrow_right.png"
            elif pyfile["status"] in (11, 13):
                pyfile["icon"] = "status_proc.png"
            else:
                pyfile["icon"] = "status_downloading.png"

        tmp = data["links"]
        tmp.sort(key=get_sort_key)
        data["links"] = tmp
        return data

    except:
        print_exc()
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def package_order(ids):
    try:
        pid, pos = ids.split("|")
        PYLOAD.orderPackage(int(pid), int(pos))
        return {"response": "success"}
    except:
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def abort_link(id):
    try:
        PYLOAD.stopDownloads([id])
        return {"response": "success"}
    except:
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def move_package(dest, id):
    try:
        PYLOAD.movePackage(dest, id)
        return {"response": "success"}
    except:
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def edit_package():
    try:
        id = int(request.forms.get("pack_id"))
        data = {"name": request.forms.get("pack_name").decode("utf8", "ignore"),
                "folder": request.forms.get("pack_folder").decode("utf8", "ignore"),
                 "password": request.forms.get("pack_pws").decode("utf8", "ignore")}

        PYLOAD.setPackageData(id, data)
        return {"response": "success"}

    except:
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def change_password():

    user = request.POST["user_login"]
    oldpw = request.POST["login_current_password"]
    newpw = request.POST["login_new_password"]

    if not PYLOAD.changePassword(user, oldpw, newpw):
        print "Wrong password"
        return HTTPError()
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def js_dynamic(path):
    response.headers['Expires'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
                                                time.gmtime(time.time() + 60 * 60 * 24 * 2))
    response.headers['Cache-control'] = "public"
    response.headers['Content-Type'] = "text/javascript; charset=UTF-8"

    try:
        # static files are not rendered
        if "static" not in path and "mootools" not in path:
            t = env.get_template("js/%s" % path)
            return t.render()
        else:
            return static_file(path, root=join(PROJECT_DIR, "media", "js"))
    except:
        return HTTPError(404, "Not Found")
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_chunked_not_terminated(self):
        self._test_chunked('1\r\nx\r\n', HTTPError)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_chunked_wrong_size(self):
        self._test_chunked('2\r\nx\r\n', HTTPError)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_chunked_illegal_size(self):
        self._test_chunked('x\r\nx\r\n', HTTPError)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_chunked_not_chunked_at_all(self):
        self._test_chunked('abcdef', HTTPError)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_json_tobig(self):
        """ Environ: Request.json property with huge body. """
        test = dict(a=5, tobig='x' * bottle.BaseRequest.MEMFILE_MAX)
        e = {'CONTENT_TYPE': 'application/json'}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob(json_dumps(test)))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
        self.assertRaises(HTTPError, lambda: BaseRequest(e).json)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def testBasic(self):
        self.assertMatches('/static', '/static')
        self.assertMatches('/\\:its/:#.+#/:test/:name#[a-z]+#/',
                           '/:its/a/cruel/world/',
                           test='cruel', name='world')
        self.assertMatches('/:test', '/test', test='test') # No tail
        self.assertMatches(':test/', 'test/', test='test') # No head
        self.assertMatches('/:test/', '/test/', test='test') # Middle
        self.assertMatches(':test', 'test', test='test') # Full wildcard
        self.assertMatches('/:#anon#/match', '/anon/match') # Anon wildcards
        self.assertRaises(bottle.HTTPError, self.match, '//no/m/at/ch/')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def testNewSyntax(self):
        self.assertMatches('/static', '/static')
        self.assertMatches('/\\<its>/<:re:.+>/<test>/<name:re:[a-z]+>/',
                           '/<its>/a/cruel/world/',
                           test='cruel', name='world')
        self.assertMatches('/<test>', '/test', test='test') # No tail
        self.assertMatches('<test>/', 'test/', test='test') # No head
        self.assertMatches('/<test>/', '/test/', test='test') # Middle
        self.assertMatches('<test>', 'test', test='test') # Full wildcard
        self.assertMatches('/<:re:anon>/match', '/anon/match') # Anon wildcards
        self.assertRaises(bottle.HTTPError, self.match, '//no/m/at/ch/')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def testValueErrorInFilter(self):
        self.r.add_filter('test', lambda x: ('.*', int, int))

        self.assertMatches('/int/<i:test>', '/int/5', i=5) # No tail
        self.assertRaises(bottle.HTTPError, self.match, '/int/noint')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def testIntFilter(self):
        self.assertMatches('/object/<id:int>', '/object/567', id=567)
        self.assertRaises(bottle.HTTPError, self.match, '/object/abc')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def testFloatFilter(self):
        self.assertMatches('/object/<id:float>', '/object/1', id=1)
        self.assertMatches('/object/<id:float>', '/object/1.1', id=1.1)
        self.assertMatches('/object/<id:float>', '/object/.1', id=0.1)
        self.assertMatches('/object/<id:float>', '/object/1.', id=1)
        self.assertRaises(bottle.HTTPError, self.match, '/object/abc')
        self.assertRaises(bottle.HTTPError, self.match, '/object/')
        self.assertRaises(bottle.HTTPError, self.match, '/object/.')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_401(self):
        """ WSGI: abort(401, '') (HTTP 401) """
        @bottle.route('/')
        def test(): bottle.abort(401)
        self.assertStatus(401, '/')
        @bottle.error(401)
        def err(e):
            bottle.response.status = 200
            return str(type(e))
        self.assertStatus(200, '/')
        self.assertBody("<class 'bottle.HTTPError'>",'/')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_view_error(self):
        """ WSGI: Test if view-decorator reacts on non-dict return values correctly."""
        @bottle.route('/tpl')
        @bottle.view('stpl_t2main')
        def test():
            return bottle.HTTPError(401, 'The cake is a lie!')
        self.assertInBody('The cake is a lie!', '/tpl')
        self.assertInBody('401 Unauthorized', '/tpl')
        self.assertStatus(401, '/tpl')
项目:bottlecap    作者:foxx    | 项目源码 | 文件源码
def dispatch(self):
        raise HTTPError('418 Teapot', [1,2,3])
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def tasks_create_file():
    response = {}

    data = request.files.file
    package = request.forms.get("package", "")
    timeout = request.forms.get("timeout", "")
    priority = request.forms.get("priority", 1)
    options = request.forms.get("options", "")
    machine = request.forms.get("machine", "")
    platform = request.forms.get("platform", "")
    tags = request.forms.get("tags", None)
    custom = request.forms.get("custom", "")
    memory = request.forms.get("memory", 'False')
    clock = request.forms.get("clock", None)
    shrike_url = request.forms.get("shrike_url", None)
    shrike_msg = request.forms.get("shrike_msg", None)
    shrike_sid = request.forms.get("shrike_sid", None)
    shrike_refer = request.forms.get("shrike_refer", None)

    if memory.upper() == 'FALSE' or memory == '0':
        memory = False
    else:
        memory = True

    enforce_timeout = request.forms.get("enforce_timeout", 'False')
    if enforce_timeout.upper() == 'FALSE' or enforce_timeout == '0':
        enforce_timeout = False
    else:
        enforce_timeout = True

    temp_file_path = store_temp_file(data.file.read(), data.filename)
    try:
        task_ids = db.demux_sample_and_add_to_db(file_path=temp_file_path, package=package, timeout=timeout, options=options, priority=priority,
                machine=machine, platform=platform, custom=custom, memory=memory, enforce_timeout=enforce_timeout, tags=tags, clock=clock,
                shrike_url=shrike_url, shrike_msg=shrike_msg, shrike_sid=shrike_sid, shrike_refer=shrike_refer)
    except CuckooDemuxError as e:
        return HTTPError(500, e)

    response["task_ids"] = task_ids
    return jsonize(response)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def tasks_reschedule(task_id):
    response = {}

    if not db.view_task(task_id):
        return HTTPError(404, "There is no analysis with the specified ID")

    if db.reschedule(task_id):
        response["status"] = "OK"
    else:
        return HTTPError(500, "An error occurred while trying to "
                              "reschedule the task")

    return jsonize(response)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def tasks_delete(task_id):
    response = {}

    task = db.view_task(task_id)
    if task:
        if task.status == TASK_RUNNING:
            return HTTPError(500, "The task is currently being "
                                  "processed, cannot delete")

        if db.delete_task(task_id):
            delete_folder(os.path.join(CUCKOO_ROOT, "storage",
                                       "analyses", "%d" % task_id))
            if FULL_DB:
                task = results_db.analysis.find_one({"info.id": task_id})
                for processes in task.get("behavior", {}).get("processes", []):
                    [results_db.calls.remove(call) for call in processes.get("calls", [])]

                results_db.analysis.remove({"info.id": task_id})

            response["status"] = "OK"
        else:
            return HTTPError(500, "An error occurred while trying to "
                                  "delete the task")
    else:
        return HTTPError(404, "Task not found")

    return jsonize(response)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def files_get(sha256):
    file_path = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256)
    if os.path.exists(file_path):
        response.content_type = "application/octet-stream; charset=UTF-8"
        return open(file_path, "rb").read()
    else:
        return HTTPError(404, "File not found")
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def pcap_get(task_id):
    file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses",
                             "%d" % task_id, "dump.pcap")
    if os.path.exists(file_path):
        response.content_type = "application/octet-stream; charset=UTF-8"
        try:
            return open(file_path, "rb").read()
        except:
            return HTTPError(500, "An error occurred while reading PCAP")
    else:
        return HTTPError(404, "File not found")
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def machines_view(name=None):
    response = {}

    machine = db.view_machine(name=name)
    if machine:
        response["machine"] = machine.to_dict()
    else:
        return HTTPError(404, "Machine not found")

    return jsonize(response)