我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sanic.response.redirect()。
def index(request): form = FeedbackForm(request) if request.method == 'POST' and form.validate(): note = form.note.data msg = '{} - {}'.format(datetime.now(), note) session.setdefault('fb', []).append(msg) return response.redirect('/') # NOTE: trusting user input here, never do that in production feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', [])) # Ah, f string, so, python 3.6, what do you expect from someone brave # enough to use sanic... :) content = f""" <h1>Form with CSRF Validation</h1> <p>Try <a href="/fail">form</a> that fails CSRF validation</p> {feedback} <form action="" method="POST"> {'<br>'.join(form.csrf_token.errors)} {form.csrf_token} {'<br>'.join(form.note.errors)} <br> {form.note(size=40, placeholder="say something..")} {form.submit} </form> """ return response.html(content)
def fail(request): form = FeedbackForm(request) if request.method == 'POST' and form.validate(): note = form.note.data msg = '{} - {}'.format(datetime.now(), note) session.setdefault('fb', []).append(msg) return response.redirect('/fail') feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', [])) content = f""" <h1>Form which fails CSRF Validation</h1> <p>This is the same as this <a href="/">form</a> except that CSRF validation always fail because we did not render the hidden csrf token</p> {feedback} <form action="" method="POST"> {'<br>'.join(form.csrf_token.errors)} {'<br>'.join(form.note.errors)} <br> {form.note(size=40, placeholder="say something..")} {form.submit} </form> """ return response.html(content)
def login(request): message = '' if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') # for demonstration purpose only, you should use more robust method if username == 'demo' and password == '1234': # use User proxy in sanic_auth, this should be some ORM model # object in production, the default implementation of # auth.login_user expects User.id and User.name available user = User(id=1, name=username) auth.login_user(request, user) return response.redirect('/') message = 'invalid username or password' return response.html(LOGIN_FORM.format(message))
def force_https(request): host = request.headers.get('Host', '') # get scheme, first by checking the x-forwarded-proto (from nginx/heroku etc) # then falling back on whether or not there is an sslcontext scheme = request.headers.get( 'x-forwarded-proto', "https" if request.transport.get_extra_info('sslcontext') else "http") if not host.startswith("localhost:") and scheme != "https": url = urlunparse(( "https", host, request.path, None, request.query_string, None)) return redirect(url)
def index(request): form = UploadForm(request) if form.validate_on_submit(): image = form.image.data # NOTE: trusting user submitted file names here, the name should be # sanitized in production. uploaded_file = Path(request.app.config.UPLOAD_DIR) / image.name uploaded_file.write_bytes(image.body) description = form.description.data or 'no description' session.setdefault('files', []).append((image.name, description)) return response.redirect('/') img = '<section><img src="/img/{}"><p>{}</p><hr></section>' images = ''.join(img.format(i, d) for i, d in session.get('files', [])) content = f""" <h1>Sanic-WTF file field validators example</h1> {images} <form action="" method="POST" enctype="multipart/form-data"> {'<br>'.join(form.csrf_token.errors)} {form.csrf_token} {'<br>'.join(form.image.errors)} {'<br>'.join(form.description.errors)} <br> {form.image.label} <br> {form.image} <br> {form.description.label} <br> {form.description(size=20, placeholder="description")} <br> {form.submit} </form> """ return response.html(content)
def logout(request): auth.logout_user(request) return response.redirect('/login')
def test_logout(app): auth = Auth(app) @app.post('/login') async def login(request): name = request.form.get('name') password = request.form.get('password') if name == 'demo' and password == '1234': auth.login_user(request, User(id=1, name=name)) return response.text('okay') return response.text('failed') @app.route('/logout') async def logout(request): auth.logout_user(request) return response.redirect('/user') @app.route('/user') async def user(request): user = auth.current_user(request) if user is not None: return response.text(user.name) return response.text('') payload = {'name': 'demo', 'password': '1234'} req, resp = app.test_client.post('/login', data=payload) assert resp.status == 200 and resp.text == 'okay' req, resp = app.test_client.get('/user') assert resp.status == 200 and resp.text == 'demo' req, reps = app.test_client.get('/logout') assert resp.status == 200 and resp.text == 'demo' req, resp = app.test_client.get('/user') assert resp.status == 200 and resp.text == '' req, reps = app.test_client.get('/logout') assert resp.status == 200 and resp.text == '' req, resp = app.test_client.get('/user') assert resp.status == 200 and resp.text == ''
def test_login_required(app): # the default is 'auth.login', change to 'login' to avoid using blueprint app.config.AUTH_LOGIN_ENDPOINT = 'login' auth = Auth(app) @app.post('/login') async def login(request): name = request.form.get('name') password = request.form.get('password') if name == 'demo' and password == '1234': auth.login_user(request, User(id=1, name=name)) return response.text('okay') return response.text('failed') @app.route('/logout') @auth.login_required async def logout(request): auth.logout_user(request) return response.redirect('/user') @app.route('/user') @auth.login_required(user_keyword='user') async def user(request, user): return response.text(user.name) payload = {'name': 'demo', 'password': '1234'} req, resp = app.test_client.get('/user', allow_redirects=False) assert resp.status == 302 assert resp.headers['Location'] == app.url_for('login') payload = {'name': 'demo', 'password': '1234'} req, resp = app.test_client.post('/login', data=payload) assert resp.status == 200 and resp.text == 'okay' req, resp = app.test_client.get('/user') assert resp.status == 200 and resp.text == 'demo'
def handle_no_auth(self, request): """Handle unauthorized user""" u = self.login_url or request.app.url_for(self.login_endpoint) return response.redirect(u)
def update_app_categories_handler(request, conf, current_user): toshi_id = request.form.get('toshi_id') categories = request.form.get('categories', "") if toshi_id is not None: tags = [s.strip() for s in categories.split(",")] async with conf.db.id.acquire() as con: categories = await con.fetch("SELECT * FROM categories WHERE tag = ANY($1)", tags) categories = [row['category_id'] for row in categories] async with con.transaction(): await con.execute("DELETE FROM app_categories WHERE toshi_id = $1", toshi_id) await con.executemany( "INSERT INTO app_categories VALUES ($1, $2) ON CONFLICT DO NOTHING", [(category_id, toshi_id) for category_id in categories]) if 'Referer' in request.headers: return redirect(request.headers['Referer']) return redirect("/{}/user/{}".format(conf.name, toshi_id)) return redirect("/{}/apps".format(conf.name))
def clientgetter(self, f): """Register a function as the client getter. The function accepts one parameter `client_key`, and it returns a client object with at least these information: - client_key: A random string - client_secret: A random string - redirect_uris: A list of redirect uris - default_realms: Default scopes of the client The client may contain more information, which is suggested: - default_redirect_uri: One of the redirect uris Implement the client getter:: @oauth.clientgetter def get_client(client_key): client = get_client_model(client_key) # Client is an object return client """ self._clientgetter = f return f
def confirm_authorization_request(self): """When consumer confirm the authrozation.""" server = self.server uri, http_method, body, headers = extract_params() try: realms, credentials = server.get_realms_and_credentials( uri, http_method=http_method, body=body, headers=headers ) ret = server.create_authorization_response( uri, http_method, body, headers, realms, credentials ) log.debug('Authorization successful.') return create_response(*ret) except errors.OAuth1Error as e: return redirect(e.in_uri(self.error_uri)) except errors.InvalidClientError as e: return redirect(e.in_uri(self.error_uri))
def new(request): if request.method == 'POST': name = request.form.get('name', '').strip().lower() age = request.form.get('age', '').strip() if name: is_uniq = await User.is_unique(doc=dict(name=name)) if is_uniq in (True, None): await User.insert_one(dict(name=name, age=int(age))) request['flash']('User was added successfully', 'success') return redirect(app.url_for('index')) else: request['flash']('This name was already taken', 'error') request['flash']('User name is required', 'error') return jinja.render('form.html', request, user={})
def chapter(request): """ ????????? : content_url ?????U??url????? : url ??????url : novels_name ???? :return: ??????? """ url = request.args.get('url', None) novels_name = request.args.get('novels_name', None) netloc = get_netloc(url) if netloc not in RULES.keys(): return redirect(url) if netloc in REPLACE_RULES.keys(): url = url.replace(REPLACE_RULES[netloc]['old'], REPLACE_RULES[netloc]['new']) content_url = RULES[netloc].content_url content = await cache_owllook_novels_chapter(url=url, netloc=netloc) if content: content = str(content).strip('[],, Jjs').replace(', ', '').replace('onerror', '').replace('js', '').replace( '????', '') return template( 'chapter.html', novels_name=novels_name, url=url, content_url=content_url, soup=content) else: return text('?????????????????????????????????{url}'.format(url=url))
def owllook_register(request): """ ???? :param request: :return: : -1 ?????????? : 0 ???????? : 1 ???? """ user = request['session'].get('user', None) if user: return redirect('/') else: ver_que_ans = ver_question() if ver_que_ans: request['session']['index'] = ver_que_ans return template( 'register.html', title='owllook - ?? - ????????', question=ver_que_ans[1] ) else: return redirect('/')
def noti_book(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() is_author = 0 data = await motor_db.user_message.find_one({'user': user}, {'author_latest': 1, '_id': 0}) if data: is_author = 1 author_list = data.get('author_latest', {}) return template('noti_book.html', title='???? - owllook'.format(user=user), is_login=1, is_author=is_author, author_list=author_list, user=user) else: return template('noti_book.html', title='???? - owllook'.format(user=user), is_login=1, is_author=is_author, user=user) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def admin_setting(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() data = await motor_db.user.find_one({'user': user}) if data: return template('admin_setting.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, register_time=data['register_time'], email=data.get('email', '???????')) else: return text('????') except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def login_required(f): @wraps(f) async def decorated_function(*args, **kwargs): for arg in args: if isinstance(arg, Request): uid = arg.get('session', {}).get('uid') if uid is None: return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url)) try: user = await User.objects.get(User.id == uid) arg['uid'] = uid arg['user'] = user except User.DoesNotExist: return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url)) break return await f(*args, **kwargs) return decorated_function
def admin_required(f): @wraps(f) async def decorated_function(*args, **kwargs): for arg in args: if isinstance(arg, Request): uid = arg.get('session', {}).get('uid') if uid is None: return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url)) try: user = await User.objects.get(User.id == uid, User.is_admin == 1) arg['uid'] = uid arg['user'] = user except User.DoesNotExist: return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url)) break return await f(*args, **kwargs) return decorated_function
def inc(request): request['session']['counter'] += 1 return response.redirect('/')
def dec(request): request['session']['counter'] -= 1 return response.redirect('/')
def requires_login(fn): async def check_login(request, *args, **kwargs): session_cookie = request.cookies.get('session') if session_cookie: async with app.pool.acquire() as con: admin = await con.fetchrow("SELECT admins.toshi_id FROM admins " "JOIN sessions ON admins.toshi_id = sessions.toshi_id " "WHERE sessions.session_id = $1", session_cookie) if admin: url = '{}/v1/user/{}'.format(ID_SERVICE_LOGIN_URL, admin['toshi_id']) resp = await app.http.get(url) if resp.status == 200: admin = await resp.json() if admin['custom']['avatar'].startswith('/'): admin['custom']['avatar'] = "{}{}".format(ID_SERVICE_LOGIN_URL, admin['custom']['avatar']) else: admin = None else: admin = None if not admin: return redirect("/login?redirect={}".format(request.path)) # keep the config object as the first argument if len(args) and isinstance(args[0], Config): args = (args[0], admin, *args[1:]) else: args = (admin, *args) rval = await fn(request, *args, **kwargs) return rval return check_login
def index(request, user): return redirect("/mainnet")
def post_logout(request): session_cookie = request.cookies.get('session') if session_cookie: async with app.pool.acquire() as con: await con.execute("DELETE FROM sessions " "WHERE sessions.session_id = $1", session_cookie) del request.cookies['session'] return redirect("/login")
def post_admin_add_remove(request, current_user, action): if 'toshi_id' in request.form: toshi_id = request.form.get('toshi_id') if not toshi_id: SanicException("Bad Arguments", status_code=400) elif 'username' in request.form: username = request.form.get('username') if not username: raise SanicException("Bad Arguments", status_code=400) if username[0] == '@': username = username[1:] if not username: raise SanicException("Bad Arguments", status_code=400) async with app.configs['mainnet'].db.id.acquire() as con: user = await con.fetchrow("SELECT * FROM users WHERE username = $1", username) if user is None and username.startswith("0x"): user = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", username) if user is None: raise SanicException("User not found", status_code=400) toshi_id = user['toshi_id'] else: SanicException("Bad Arguments", status_code=400) if action == 'add': print('adding admin: {}'.format(toshi_id)) async with app.pool.acquire() as con: await con.execute("INSERT INTO admins VALUES ($1) ON CONFLICT DO NOTHING", toshi_id) elif action == 'remove': print('removing admin: {}'.format(toshi_id)) async with app.pool.acquire() as con: await con.execute("DELETE FROM admins WHERE toshi_id = $1", toshi_id) await con.execute("DELETE FROM sessions WHERE toshi_id = $1", toshi_id) else: raise SanicException("Not Found", status_code=404) if 'Referer' in request.headers: return redirect(request.headers['Referer']) return redirect("/config")
def delete_dapp(request, conf, current_user, dapp_id): async with conf.db.id.acquire() as con: await con.execute("DELETE FROM dapps WHERE dapp_id = $1", int(dapp_id)) if 'Referer' in request.headers: return redirect(request.headers['Referer']) return redirect("/{}/dapps".format(conf.name))
def feature_app_handler_post(request, conf, current_user): print(request.form) toshi_id = request.form.get('toshi_id') featured = request.form.get('featured', False) if toshi_id is not None: async with conf.db.id.acquire() as con: await con.execute("UPDATE users SET featured = $2 WHERE toshi_id = $1", toshi_id, True if featured else False) if 'Referer' in request.headers: return redirect(request.headers['Referer']) return redirect("/{}/user/{}".format(conf.name, toshi_id)) return redirect("/{}/apps".format(conf.name))
def blocked_app_handler_post(request, conf, current_user): toshi_id = request.form.get('toshi_id') blocked = request.form.get('blocked', False) if toshi_id is not None: async with conf.db.id.acquire() as con: async with con.transaction(): await con.execute("UPDATE users SET blocked = $2 WHERE toshi_id = $1", toshi_id, True if blocked else False) if 'Referer' in request.headers: return redirect(request.headers['Referer']) return redirect("/{}/user/{}".format(conf.name, toshi_id)) return redirect("/{}/apps".format(conf.name))
def login_redirect(request): """Convenience method to return a redirect with "next" populated. :param request: Sanic request. :return: Redirect response. """ return redirect('{0}?next={1}'.format(settings.AUTH_CONFIG['login_url'], request.path))
def error_uri(self): """The error page URI. When something turns error, it will redirect to this error page. You can configure the error page URI with Flask config:: OAUTH1_PROVIDER_ERROR_URI = '/error' You can also define the error page by a named endpoint:: OAUTH1_PROVIDER_ERROR_ENDPOINT = 'oauth.error' """ error_uri = self.app.config.get('OAUTH1_PROVIDER_ERROR_URI') if error_uri: return error_uri error_endpoint = self.app.config.get('OAUTH1_PROVIDER_ERROR_ENDPOINT') if error_endpoint: return url_for(error_endpoint) return '/oauth/errors'
def authorize_handler(self, f): """Authorization handler decorator. This decorator will sort the parameters and headers out, and pre validate everything:: @app.route('/oauth/authorize', methods=['GET', 'POST']) @oauth.authorize_handler def authorize(*args, **kwargs): if request.method == 'GET': # render a page for user to confirm the authorization return render_template('oauthorize.html') confirm = request.form.get('confirm', 'no') return confirm == 'yes' """ @wraps(f) def decorated(*args, **kwargs): if request.method == 'POST': if not f(*args, **kwargs): uri = add_params_to_uri( self.error_uri, [('error', 'denied')] ) return redirect(uri) return self.confirm_authorization_request() server = self.server uri, http_method, body, headers = extract_params() try: realms, credentials = server.get_realms_and_credentials( uri, http_method=http_method, body=body, headers=headers ) kwargs['realms'] = realms kwargs.update(credentials) return f(*args, **kwargs) except errors.OAuth1Error as e: return redirect(e.in_uri(self.error_uri)) except errors.InvalidClientError as e: return redirect(e.in_uri(self.error_uri)) return decorated
def get_redirect_uri(self, token, request): """Redirect uri for this request token.""" log.debug('Get redirect uri of %r', token) tok = request.request_token or self._grantgetter(token=token) return tok.redirect_uri
def edit(request, id): user = await User.find_one(id) if not user: request['flash']('User not found', 'error') return redirect(app.url_for('index')) if request.method == 'POST': name = request.form.get('name', '').strip().lower() age = request.form.get('age', '').strip() if name: doc = dict(name=name, age=int(age)) is_uniq = await User.is_unique(doc=doc, id=user.id) if is_uniq in (True, None): # remove non-changed items user.clean_for_dirty(doc) if doc: await User.update_one({'_id': user.id}, {'$set': doc}) request['flash']('User was updated successfully', 'success') return redirect(app.url_for('index')) else: request['flash']('This name was already taken', 'error') request['flash']('User name is required', 'error') return jinja.render('form.html', request, user=user)
def destroy(request, id): user = await User.find_one(id) if not user: request['flash']('User not found', 'error') return redirect(app.url_for('index')) await user.destroy() request['flash']('User was deleted successfully', 'success') return redirect(app.url_for('index'))
def similar_user(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() similar_info = await motor_db.user_recommend.find_one({'user': user}) if similar_info: similar_user = similar_info['similar_user'][:20] user_tag = similar_info['user_tag'] updated_at = similar_info['updated_at'] return template('similar_user.html', title='?' + user + '?????', is_login=1, is_similar=1, user=user, similar_user=similar_user, user_tag=user_tag, updated_at=updated_at) else: return template('similar_user.html', title='?' + user + '?????', is_login=1, is_similar=0, user=user) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def book_list(request): user = request['session'].get('user', None) if user: try: return template('admin_book_list.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def add_session_to_request(request): # before each request initialize a session # using the client's request host = request.headers.get('host', None) user_agent = request.headers.get('user-agent', None) if user_agent: if CONFIG.VAL_HOST == 'true': if not host or host not in CONFIG.HOST: return redirect('http://www.owllook.net') if CONFIG.WEBSITE['IS_RUNNING']: await app.session_interface.open(request) else: return html("<h3>??????...</h3>") else: return html("<h3>??????...</h3>")
def handle_request(request): return response.redirect('/redirect')
def index(request): # generate a URL for the endpoint `post_handler` url = app.url_for('post_handler', post_id=5) # the URL is `/posts/5`, redirect to it return response.redirect(url)
def redirect_app(): app = Sanic('test_redirection') @app.route('/redirect_init') async def redirect_init(request): return redirect("/redirect_target") @app.route('/redirect_init_with_301') async def redirect_init_with_301(request): return redirect("/redirect_target", status=301) @app.route('/redirect_target') async def redirect_target(request): return text('OK') @app.route('/1') def handler(request): return redirect('/2') @app.route('/2') def handler(request): return redirect('/3') @app.route('/3') def handler(request): return text('OK') return app
def challenge(req, token): if await challenges.is_frozen(req.ip): logger.debug(f"Challenge is {token} but the account is frozen.") raise InvalidUsage("Account frozen") if not await challenges.solve(token): logger.debug(f"Challenge {token} is invalid.") unfreeze = await challenges.freeze(req.ip) raise InvalidUsage("Invalid token. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds")) await challenges.unfreeze(req.ip) logger.info(f"Challenge {token} validated") return redirect(req.app.url_for("index"))
def index(request): user = request['user'] if user: return response.redirect(app.url_for('user_panel.index')) else: return response.redirect(app.url_for('auth.LoginView'))
def logout(request): if 'uid' in request['session']: request['session'].pop('uid') return redirect(app.url_for('home.index'))
def index(request: Request): return redirect('/index.html') # ???? # @app.middleware('response') # async def access_control_all_origin(request: Request, response): # response.headers['Access-Control-Allow-Origin'] = '*'
def index(request: Request): return redirect('https://c-w.github.io/gutenberg-http/')
def migrate_users(request, current_user): from_env = request.form.get('from', None) to_env = request.form.get('to', None) toshi_ids = request.form.get('toshi_ids', None) apps_flag = request.form.get('apps', None) users_flag = request.form.get('users', None) limit = 1000 offset = 1000 * 1 if toshi_ids: toshi_ids = set(re.findall("0x[a-fA-f0-9]{40}", toshi_ids)) else: toshi_ids = set() print("MIGRATING USERS FROM '{}' TO '{}'".format(from_env, to_env)) async with app.configs[from_env].db.id.acquire() as con: if apps_flag == 'on': users = await con.fetch("SELECT * FROM users WHERE is_app = TRUE") print("APPS", len(users)) user_rows = list(users) else: user_rows = [] if users_flag == 'on': users = await con.fetch("SELECT * FROM users WHERE is_app = FALSE OFFSET $1 LIMIT $2", offset, limit) print("USERS", len(users)) user_rows.extend(list(users)) if len(toshi_ids) > 0: users = await con.fetch("SELECT * FROM users WHERE toshi_id = ANY($1)", toshi_ids) user_rows.extend(list(users)) for row in user_rows: toshi_ids.add(row['toshi_id']) avatar_rows = await con.fetch("SELECT * FROM avatars WHERE toshi_id = ANY($1)", toshi_ids) users = [] avatars = [] for row in user_rows: users.append((row['toshi_id'], row['payment_address'], row['created'], row['updated'], row['username'], row['name'], row['avatar'], row['about'], row['location'], row['is_public'], row['went_public'], row['is_app'], row['featured'])) for row in avatar_rows: avatars.append((row['toshi_id'], row['img'], row['hash'], row['format'], row['last_modified'])) print("INSERTING {} USERS".format(len(toshi_ids))) async with app.configs[to_env].db.id.acquire() as con: rval = await con.executemany( "INSERT INTO users (" "toshi_id, payment_address, created, updated, username, name, avatar, about, location, is_public, went_public, is_app, featured" ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) " "ON CONFLICT DO NOTHING", users) print("MIGRATED USERS: {} (of {})".format(rval, len(toshi_ids))) rval = await con.executemany( "INSERT INTO avatars (" "toshi_id, img, hash, format, last_modified" ") VALUES ($1, $2, $3, $4, $5) " "ON CONFLICT DO NOTHING", avatars) print("MIGRATED AVATARS: {} (of {})".format(rval, len(avatars))) return redirect(request.headers['Referer'] or "/config")
def qidian(request): user = request['session'].get('user', None) novels_type = request.args.get('type', '????').strip() first_type_title = "????" first_type = [ '??', '??', '??', '??', '??', '??', '??', '??', '??', '??', '??', '??', '???', ] if novels_type in first_type: novels_head = [novels_type] elif novels_type == first_type_title: novels_head = ['#'] else: return redirect('qidian') search_ranking = await cache_others_search_ranking(spider='qidian', novel_type=novels_type) title = "owllook - ??????" if user: return template('index.html', title=title, is_login=1, is_qidian=1, user=user, search_ranking=search_ranking, first_type=first_type, first_type_title=first_type_title, novels_head=novels_head) else: return template('index.html', title=title, is_login=0, is_qidian=1, search_ranking=search_ranking, first_type=first_type, first_type_title=first_type_title, novels_head=novels_head)
def books(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() data = await motor_db.user_message.find_one({'user': user}) if data: books_url = data.get('books_url', None) if books_url: result = [] for i in books_url: item_result = {} book_url = i.get('book_url', None) last_read_url = i.get("last_read_url", "") book_query = parse_qs(urlparse(book_url).query) last_read_chapter_name = parse_qs( last_read_url).get('name', ['??'])[0] item_result['novels_name'] = book_query.get('novels_name', '')[0] if book_query.get( 'novels_name', '') else '' item_result['book_url'] = book_url latest_data = await motor_db.latest_chapter.find_one({'owllook_chapter_url': book_url}) if latest_data: item_result['latest_chapter_name'] = latest_data['data']['latest_chapter_name'] item_result['owllook_content_url'] = latest_data['data']['owllook_content_url'] else: get_latest_data = await get_the_latest_chapter(book_url) or {} item_result['latest_chapter_name'] = get_latest_data.get( 'latest_chapter_name', '????????') item_result['owllook_content_url'] = get_latest_data.get( 'owllook_content_url', '') item_result['add_time'] = i.get('add_time', '') item_result["last_read_url"] = last_read_url if last_read_url else book_url item_result["last_read_chapter_name"] = last_read_chapter_name result.append(item_result) return template('admin_books.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, is_bookmark=1, result=result[::-1]) return template('admin_books.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, is_bookmark=0) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def search_user(request): user = request['session'].get('user', None) name = request.args.get('ss', None) if user and name: try: motor_db = motor_base.get_db() data = await motor_db.user_message.find_one({'user': name}) books_url = data.get('books_url', None) if data else None if books_url: result = [] for i in books_url: item_result = {} book_url = i.get('book_url', None) last_read_url = i.get("last_read_url", "") book_query = parse_qs(urlparse(book_url).query) last_read_chapter_name = parse_qs(last_read_url).get('name', ['??'])[0] item_result['novels_name'] = book_query.get('novels_name', '')[0] if book_query.get( 'novels_name', '') else '' item_result['book_url'] = book_url latest_data = await motor_db.latest_chapter.find_one({'owllook_chapter_url': book_url}) if latest_data: item_result['latest_chapter_name'] = latest_data['data']['latest_chapter_name'] item_result['owllook_content_url'] = latest_data['data']['owllook_content_url'] else: get_latest_data = await get_the_latest_chapter(book_url) or {} item_result['latest_chapter_name'] = get_latest_data.get('latest_chapter_name', '????????') item_result['owllook_content_url'] = get_latest_data.get('owllook_content_url', '') item_result['add_time'] = i.get('add_time', '') item_result["last_read_url"] = last_read_url if last_read_url else book_url item_result["last_read_chapter_name"] = last_read_chapter_name result.append(item_result) return template('search_user.html', title='{name}??? - owllook'.format(name=name), is_login=1, user=user, username=name, is_bookmark=1, result=result[::-1]) else: return template('search_user.html', title='{name}??? - owllook'.format(name=name), is_login=1, user=user, is_bookmark=0) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')