Python models.User 模块,get() 实例源码

我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用models.User.get()

项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def load_user_from_request(request):
    """
    User login validation logic. If the user exists with the given username
    and with the correct password then returns the user, otherwise None.

    Args:
        request (Request): The flask request object used in endpoint handlers

    Returns:
        models.User: The logged user, None if login fails
    """
    if not request.authorization:
        return None
    try:
        user = User.get(User.email == request.authorization['username'])
    except User.DoesNotExist:
        return None

    if user.verify_password(request.authorization['password']):
        return user
    return None
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def post(self):
        request_data = request.get_json(force=True)

        if 'email' not in request_data or 'password' not in request_data:
            abort(client.BAD_REQUEST)

        email = request_data['email']
        password = request_data['password']

        try:
            user = User.get(User.email == email)
        except User.DoesNotExist:
            abort(client.BAD_REQUEST)

        if not user.verify_password(password):
            abort(client.UNAUTHORIZED)

        login_user(user)
        return generate_response({}, client.OK)
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def delete(self, order_uuid):
        """ Delete a specific order. """
        try:
            obj = Order.get(uuid=str(order_uuid))
        except Order.DoesNotExist:
            return None, NOT_FOUND

        # get the user from the flask.g global object registered inside the
        # auth.py::verify() function, called by @auth.login_required decorator
        # and match it against the found user.
        # This is to prevent users from deleting other users' account.
        if auth.current_user != obj.user and auth.current_user.admin is False:
            return ({'message': "You can't delete another user's order"},
                    UNAUTHORIZED)

        obj.delete_instance(recursive=True)
        return None, NO_CONTENT
项目:python-webapp-blog    作者:tzshlyt    | 项目源码 | 文件源码
def parse_signed_cookie(cookie_str):
    try:
        L = cookie_str.split('-')
        if len(L) != 3:
            return None
        id, expires, md5 = L
        if int(expires) < time.time():
            return None
        user = User.get(id)
        if user is None:
            return None
        if md5 != hashlib.md5('%s-%s-%s-%s' % (id, user.password, expires, _COOKIE_KEY)).hexdigest():
            return None
        return user
    except:
        return None
项目:python-webapp-blog    作者:tzshlyt    | 项目源码 | 文件源码
def api_update_blog(blog_id):
    check_admin()
    i = ctx.request.input(name='', summary='', content='')
    name = i.name.strip()
    summary = i.summary.strip()
    content = i.content.strip()
    if not name:
        raise APIValueError('name', 'name cannot be empty.')
    if not summary:
        raise APIValueError('summary', 'summary cannot be empty.')
    if not content:
        raise APIValueError('content', 'content cannot be empty.')
    blog = Blog.get(blog_id)
    if blog is None:
        raise APIResourceNotFoundError('Blog')
    blog.name = name
    blog.summary = summary
    blog.content = content
    blog.update()
    return blog
项目:learn-flask    作者:ProfNandaa    | 项目源码 | 文件源码
def login():
    next_ = request.values.get('next')
    if request.method == 'POST':
        form = {
            'username' : request.form['username'],
            'password' : request.form['password']
        }
        user = User.validate(**form)
        if user[0]:
            if request.form['next'] != '':
                session['user'] = {
                    'username': user[1].username,
                    'id': user[1].id
                }
                return redirect(request.form['next'])
            return redirect(url_for('home'))
    return render_template('login.html', next=next_)
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def test_move(self):
        # add game and check it
        game = Game.create(white='123', black='456', state='Ke1,ke8')
        self.assertTrue(abs((game.date_created - game.date_state).total_seconds()) < 1)
        self.assertEqual(game.next_color, WHITE)
        self.assertEqual(Game.select().count(), 1)
        self.assertEqual(Move.select().count(), 0)
        self.assertFalse(game.ended)
        # wait a second
        time.sleep(1)
        # add move and check
        game.add_move('K', 'e1-e2', 'Ke2,ke8')
        self.assertEqual(Move.select().count(), 1)
        # reload game
        game = Game.get(pk=game.pk)
        self.assertEqual(game.next_color, BLACK)
        self.assertEqual(game.state, 'Ke2,ke8')
        self.assertTrue((game.date_state - game.date_created).total_seconds() > 1)
        self.assertAlmostEqual(game.moves.get().time_move, 1, places=1)
        self.assertFalse(game.ended)
        # add move with ending game
        game.add_move('k', 'e8-e7', 'Ke2,ke7', True)
        self.assertTrue(game.ended)
        self.assertEqual(game.winner, BLACK)
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def test_game_over_1(self):
        # add game and check it
        game = Game.create(white='123', black='456', state='Ke1,ke8')
        self.assertFalse(game.ended)
        self.assertIsNone(game.end_reason)
        self.assertIsNone(game.date_end)
        # game over with saving
        game.game_over(END_CHECKMATE)
        self.assertTrue(game.ended)
        self.assertEqual(game.end_reason, 1)
        self.assertIsNotNone(game.date_end)
        self.assertIsNone(game.winner)
        # reload game
        game = Game.get(pk=game.pk)
        self.assertTrue(game.ended)
        self.assertEqual(game.end_reason, 1)
        self.assertIsNotNone(game.date_end)
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def test_game_over_2(self):
        # add game and check it
        game = Game.create(white='123', black='456', state='Ke1,ke8')
        self.assertFalse(game.ended)
        self.assertIsNone(game.end_reason)
        self.assertIsNone(game.date_end)
        # game over without saving
        game.game_over(END_CHECKMATE, datetime(2015, 1, 27, 12, 0, 0), False, WHITE)
        self.assertTrue(game.ended)
        self.assertEqual(game.end_reason, 1)
        self.assertEqual(game.date_end, datetime(2015, 1, 27, 12, 0, 0))
        self.assertEqual(game.winner, WHITE)
        # reload game
        game = Game.get(pk=game.pk)
        self.assertFalse(game.ended)
        self.assertIsNone(game.end_reason)
        self.assertIsNone(game.date_end)
        self.assertIsNone(game.winner)
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def accept(game_id):
    try:
        pool = GamePool.get(GamePool.pk == game_id)
    except GamePool.DoesNotExist:
        return send_error('Game not found')
    except Exception as e:
        return send_error('Wrong format')
    if pool.user1 and pool.user1 == request.user:
        return send_error('You cannot start game with yourself')
    with config.DB.atomic():
        pool.player2 = generate_token(True)
        pool.user2 = request.user
        pool.is_started = True
        pool.save()
        game = Game.new_game(
            pool.player1, pool.player2, pool.type_game, pool.time_limit,
            white_user=pool.user1, black_user=pool.user2
        )
        delete_cache('wait_{}'.format(pool.player1))
        result = {'game': pool.player2}
        result.update(game.get_info(consts.BLACK))
    return send_data(result)
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def invited(token):
    try:
        enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token))
    except:
        return send_error('game not found')
    enemy_user = None
    user_id = get_cache('user_{}'.format(enemy_token))
    if user_id:
        try:
            enemy_user = User.get(pk=user_id)
        except User.DoesNotExist:
# TODO: if user not found game will be created with None as white player
            pass
    user_token = generate_token(True)
    game = Game.new_game(
        enemy_token, user_token, game_type, game_limit,
        white_user=enemy_user, black_user=request.user
    )
    delete_cache('wait_{}'.format(enemy_token))
    result = {'game': user_token}
    result.update(game.get_info(consts.BLACK))
    return send_data(result)
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def get(self):
        result = []
        count = 0
        for pool in GamePool.select().where(
            GamePool.is_started == False,
            GamePool.is_lost == False,
            GamePool.player1 is not None,
        ).order_by(GamePool.date_created.desc()):
            if pool.user1 and pool.user1 == request.user:
                continue
            result.append({
                'id': pool.pk,
                'date_created': pool.date_created.isoformat(),
                'user': pool.user1.username if pool.user1 else None,
                'type': consts.TYPES[pool.type_game]['name'],
                'limit': pool.time_limit,
            })
            count += 1
            if count > 9:
                break
        return {'games': result}
项目:dark-chess    作者:AHAPX    | 项目源码 | 文件源码
def post(self, game_id):
        try:
            pool = GamePool.get(GamePool.pk == game_id)
        except GamePool.DoesNotExist:
            raise errors.APINotFound('game')
        except Exception as e:
            raise errors.APIException('wrong format')
        if pool.user1 and pool.user1 == request.user:
            raise errors.APIException('you cannot start game with yourself')
        pool.player2 = generate_token(True)
        pool.user2 = request.user
        pool.is_started = True
        pool.save()
        game = Game.new_game(
            pool.player1, pool.player2, pool.type_game, pool.time_limit,
            white_user=pool.user1, black_user=pool.user2
        )
        delete_cache('wait_{}'.format(pool.player1))
        result = {'game': pool.player2}
        result.update(game.get_info(consts.BLACK))
        return result
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_updating_account_profile_settings(self):
        """
        Account profile settings can update full name, about, and website.
        """
        self.assertEqual('', self.user.full_name)
        self.assertEqual('', self.user.about)
        self.assertEqual('', self.user.website)
        arguments = {
            'full_name' : 'Some User',
            'about' : 'About text.',
            'website' : 'https://mltshp.com/'
        }
        self.post_url('/account/settings/profile', arguments=arguments)
        user_reloaded = User.get('id = %s', self.user.id)
        self.assertEqual('Some User', user_reloaded.full_name)
        self.assertEqual('About text.', user_reloaded.about)
        self.assertEqual('https://mltshp.com/', user_reloaded.website)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_updating_user_profile_photo(self):
        """
        Emulate the variables that nginx passes in via the upload module
        and see if the file gets uploaded.
        """
        photo_path = os.path.abspath("static/images/test-avatar.png")
        arguments = {
            'photo_path': photo_path,
            'photo_content_type': "image/png",
            'photo_name': os.path.basename(photo_path),
            'photo_size': os.path.getsize(photo_path)
        }
        response = self.post_url('/account/settings/profile', arguments=arguments)
        user_reloaded = User.get('id = %s', self.user.id)
        self.assertTrue(user_reloaded.profile_image)
        self.assertTrue(user_reloaded.profile_image_url().find('/account') > -1)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_pagination_returns_correct_counts(self):
        """
        This tests creating 111 shared files for a user and then tests that pagination
        on their user page returns the correct pages
        """
        user = User.get('name="admin"')
        user_shake = user.shake()
        source_file = Sourcefile(width=10, height=10, file_key='mumbles', thumb_key='bumbles')
        source_file.save()

        missing_ids = []

        for i in range(111):
            sf = Sharedfile(source_id = source_file.id, user_id = user.id, name="shgaredfile.png", title='shared file', share_key='asdf', content_type='image/png', deleted=0)  
            sf.save()
            sf.add_to_shake(user_shake)

        for i in range(12):
            response = self.fetch('/user/admin/%s' % (i + 1))
            self.assertEqual(response.code, 200)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_reset_password_finish(self):
        """
        Posting the actual reset-password will reset it
        """
        self.user.create_reset_password_token()

        self.http_client.fetch(self.get_url("/account/reset-password/%s" % (self.user.reset_password_token)), self.stop)
        response = self.wait()

        request = HTTPRequest(
                    url = self.get_url("/account/reset-password/%s" % (self.user.reset_password_token)),
                    method='POST',
                    headers={"Cookie":"_xsrf=%s" % (self.xsrf)},
                    body="password=%s&password_again=%s&_xsrf=%s" % ("qwertyqwerty", "qwertyqwerty", self.xsrf)
                    )
        self.http_client.fetch(request, self.stop)
        response = self.wait()

        self.user = User.get("id=%s", 1)
        self.assertEqual(self.user.reset_password_token, "")
        self.assertEqual(self.user.hashed_password, User.generate_password_digest('qwertyqwerty'))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_reset_password_throws_error_if_passwords_dont_match(self):
        """
        Send two different passwords.
        """
        self.user.create_reset_password_token()
        reset_token = self.user.reset_password_token

        request = HTTPRequest(
                    url = self.get_url("/account/reset-password/%s" % (reset_token)),
                    method='POST',
                    headers={"Cookie":"_xsrf=%s" % (self.xsrf)},
                    body="password=%s&password_again=%s&_xsrf=%s" % ("qwertyqwerty", "poiupoiu", self.xsrf)
        )
        self.http_client.fetch(request, self.stop)
        response = self.wait()
        self.assertTrue(response.body.find("Those passwords didn't match, or are invalid. Please try again.") > -1)
        self.user = User.get("id = %s", 1)
        self.assertEqual(self.user.hashed_password, User.generate_password_digest('asdfasdf'))
        self.assertEqual(reset_token, self.user.reset_password_token)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_clears_all_saves(self):
        self.upload_file(self.test_file1_path, self.test_file1_sha1, self.test_file1_content_type, 1, self.receiver_sid, self.xsrf)

        sharedfile = Sharedfile.get('id=1')
        n = Notification.new_save(self.sender, sharedfile)
        n = Notification.new_save(self.sender, sharedfile)


        request = HTTPRequest(self.get_url('/account/clear-notification'), 'POST', {'Cookie':'_xsrf=%s;sid=%s' % (self.xsrf, self.receiver_sid)}, '_xsrf=%s&type=save' % (self.xsrf))
        self.http_client.fetch(request, self.stop)
        response = self.wait()

        j = json_decode(response.body)
        self.assertEqual(j['response'], "0 new saves")
        ns = Notification.where('receiver_id=%s' % (self.receiver.id))
        for n in ns:
            self.assertTrue(n.deleted)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def test_clears_all_favorites(self):
        self.upload_file(self.test_file1_path, self.test_file1_sha1, self.test_file1_content_type, 1, self.receiver_sid, self.xsrf)

        sharedfile = Sharedfile.get('id=1')
        n = Notification.new_favorite(self.sender, sharedfile)
        n = Notification.new_favorite(self.sender, sharedfile)

        request = HTTPRequest(self.get_url('/account/clear-notification'), 'POST', {'Cookie':'_xsrf=%s;sid=%s' % (self.xsrf, self.receiver_sid)}, '_xsrf=%s&type=favorite' % (self.xsrf))
        self.http_client.fetch(request, self.stop)
        response = self.wait()

        j = json_decode(response.body)
        self.assertEqual(j['response'], "0 new likes")
        ns = Notification.where('receiver_id=%s' % (self.receiver.id))
        for n in ns:
            self.assertTrue(n.deleted)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self):
        url = self.get_argument('url', None)
        if not url:
            self.render("tools/save-video.html", url = url, title = None, description=None)
        url = Sourcefile.make_oembed_url(url.strip())
        if url:
            current_user = self.get_current_user_object();
            shake_id = self.get_argument('shake_id', None)
            if not shake_id:
                self.destination_shake = Shake.get('user_id=%s and type=%s', current_user.id, 'user')
            else:
                self.destination_shake = Shake.get('id=%s', shake_id)
                if not self.destination_shake:
                    return self.render("tools/save-video-error.html", message="We couldn't save the video to specified shake. Please contact support.")
                if not self.destination_shake.can_update(current_user.id):
                    return self.render("tools/save-video-error.html", message="We couldn't save the video to specified shake. Please contact support.")
                if current_user.email_confirmed != 1:
                    return self.render("tools/save-video-error.html", message="You must confirm your email address before you can post.")
            self.handle_oembed_url(url)
        else:
            self.render("tools/save-video-error.html", message="We could not load the embed code. The video server may be down. Please contact support.")
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self, share_key):
        sharedfile = Sharedfile.get_by_share_key(share_key)
        if not sharedfile:
            raise tornado.web.HTTPError(404)

        expanded = self.get_argument("expanded", False)
        if expanded:
            expanded = True

        # Prevent IE from caching AJAX requests
        self.set_header("Cache-Control","no-store, no-cache, must-revalidate");
        self.set_header("Pragma","no-cache");
        self.set_header("Expires", 0);

        user = self.get_current_user_object()
        can_comment = user and user.email_confirmed == 1 and not options.readonly

        comments = sharedfile.comments()
        html_response = self.render_string("image/quick-comments.html", sharedfile=sharedfile,
            comments=comments, current_user=user,
            can_comment=can_comment,
            expanded=expanded)
        return self.write({'result' : 'ok', 'count' : len(comments), 'html' : html_response })
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, share_key, comment_id):
        shared_file = models.Sharedfile.get_by_share_key(share_key)
        user = self.get_current_user_object()
        comment = Comment.get("id=%s", comment_id)

        if not shared_file or not comment:
            raise tornado.web.HTTPError(404)

        existing_comment_like = models.CommentLike.get("comment_id = %s and user_id = %s",
                comment.id, user.id)

        if existing_comment_like:
            existing_comment_like.deleted = 1
            existing_comment_like.save()

        json = self.get_argument("json", False)
        if json:
            self.set_header("Cache-Control","no-store, no-cache, must-revalidate");
            self.set_header("Pragma","no-cache");
            self.set_header("Expires", 0);
            count = models.CommentLike.where_count("comment_id = %s", comment.id)
            return self.write(json_encode({'response':'ok', 'count': count, 'like' : True }))
        else:
            return self.redirect("/p/%s?salty" % (share_key,))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self, shake_name):
        shake = Shake.get("name=%s and deleted=0", shake_name)
        if not shake:
            raise tornado.web.HTTPError(404)

        value = {
            'title' : escape.xhtml_escape(shake.title) if shake.title else '',
            'title_raw' : shake.title if shake.title else '',
            'description' : escape.xhtml_escape(shake.description) if shake.description else '',
            'description_raw' : shake.description if shake.description else ''
        }
        # prevents IE from caching ajax requests.
        self.set_header("Cache-Control","no-store, no-cache, must-revalidate"); 
        self.set_header("Pragma","no-cache");
        self.set_header("Expires", 0);
        return self.write(escape.json_encode(value))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_name):
        current_user = self.get_current_user_object()
        shake_to_update = Shake.get('name=%s and user_id=%s and type=%s and deleted=0', shake_name, current_user.id, 'group')
        new_title = self.get_argument('title', None)
        new_description = self.get_argument('description', None)

        if not shake_to_update:
            return self.write({'error':'No permission to update shake.'})

        if new_title:
            shake_to_update.title = new_title
        if new_description:
            shake_to_update.description = new_description
        shake_to_update.save()

        return self.redirect('/shake/' + shake_to_update.name + '/quick-details')
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_id):
        is_json = self.get_argument('json', None)
        user = self.get_current_user_object()

        shake = Shake.get('id=%s and deleted=0', shake_id)
        if not shake:
            if is_json:
                return self.write(json_encode({'error':'Shake not found.'}))
            else:
                return self.redirect(shake.path())

        if not user.subscribe(shake):
            if is_json:
                return self.write(json_encode({'error':'error'}))
            else:
                return self.redirect(shake.path())
        else:
            if is_json:
                return self.write(json_encode({'subscription_status': True}))
            else:
                return self.redirect(shake.path())
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_id):
        is_json = self.get_argument('json', None)
        user = self.get_current_user_object()

        shake = Shake.get('id=%s and deleted=0', shake_id)
        if not shake:
            if is_json:
                return self.write({'error':'Shake not found.'})
            else:
                return self.redirect(shake.path())

        if not user.unsubscribe(shake):
            if is_json:
                return self.write({'error':'error'})
            else:
                return self.redirect(shake.path())
        else:
            if is_json:
                return self.write(json_encode({'subscription_status': False}))
            else:
                return self.redirect(shake.path())
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_name):
        shake = Shake.get('name=%s and type=%s', shake_name, "group")
        if not shake:
            raise tornado.web.HTTPError(404) 
        sender = self.get_current_user_object()
        receiver = User.get('name=%s and deleted=0', self.get_argument('name', None))
        is_json = self.get_argument('json', None)

        if not receiver:
            if is_json:
                return self.write({'error':'error'})
            else:
                return self.redirect('/%s' % (shake_name))
        Notification.new_invitation(sender, receiver, shake.id)

        if is_json:
            return self.write({'invitation_status':True})
        else:
            return self.redirect('/%s' % (shake_name))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_name=None):
        shake = Shake.get('name=%s and deleted=0', shake_name)
        current_user_object = self.get_current_user_object()

        if not shake:
            raise tornado.web.HTTPError(404)

        if current_user_object.can_request_invitation_to_shake(shake.id):
            current_user_object.request_invitation_to_shake(shake.id)
            if self.get_argument('json', None):
                return self.write({'status':'ok'})
            else:
                return self.redirect('/%s' % (shake.name))
        else:
            if self.get_argument('json', None):
                return self.write({'status':'error', 'message':'not allowed'})
            else:
                return self.redirect('/')
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self, shake_name=None, page=None):
        shake_object = Shake.get("name=%s", shake_name)
        if not page:
            page = 1

        page = int(page)

        if not shake_object:
            raise tornado.web.HTTPError(404)

        followers = shake_object.subscribers(page=page)
        follower_count = shake_object.subscriber_count()
        url_format = '/shake/%s/followers/' % shake_object.name
        url_format = url_format + '%d'
        return self.render("shakes/followers.html", followers=followers,
            shake_info=shake_object,
            url_format=url_format, follower_count=follower_count,
            page=page)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self, shake_name):
        shake = Shake.get("name=%s and deleted=0", shake_name)
        if not shake:
            raise tornado.web.HTTPError(404)

        current_user = self.get_current_user_object()
        invitation, invitation_requests = _invitations(shake, current_user)

        #is this user a shake manager?
        managers = shake.managers()
        is_shake_manager = False
        if managers and current_user:
            for manager in managers:
                if manager.id == current_user.id:
                    is_shake_manager = True
                    break

        followers = shake.subscribers()
        follower_count = shake.subscriber_count()

        return self.render("shakes/members.html", shake=shake, invitation=invitation,
            managers=shake.managers(), current_user_obj=current_user,
            invitation_requests=invitation_requests, shake_editor=shake.owner(),
            is_shake_manager=is_shake_manager, followers=followers,
            follower_count=follower_count)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self):
        where = "deleted=0 ORDER BY id DESC LIMIT 21"
        before_id = self.get_argument('before', None)
        if before_id is not None:
            where = "id < %d AND %s" % (int(before_id), where)
        users = User.where(where)

        prev_link = None
        next_link = None
        if len(users) == 21:
            next_link = "?before=%d" % users[-1].id

        for user in users[:20]:
            files = user.sharedfiles(per_page=1)
            user.last_sharedfile = len(files) == 1 and files[0] or None

        return self.render("admin/new-users.html", users=users[:20],
            previous_link=prev_link, next_link=next_link)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self):
        where = "nsfw=1 AND deleted=0 ORDER BY id DESC LIMIT 21"
        before_id = self.get_argument('before', None)
        if before_id is not None:
            where = "id < %d AND %s" % (int(before_id), where)
        users = User.where(where)

        prev_link = None
        next_link = None
        if len(users) == 21:
            next_link = "?before=%d" % users[-1].id

        for user in users[:20]:
            files = user.sharedfiles(per_page=1)
            user.last_sharedfile = len(files) == 1 and files[0] or None

        return self.render("admin/nsfw-users.html", users=users[:20],
            previous_link=prev_link, next_link=next_link)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self):
        category = ShakeCategory(name=self.get_argument('name', ''), short_name=self.get_argument('short_name', ''))
        category.save()
        return self.redirect('/admin/shake-categories')


#class CreateShakeSharedFilesHandler(BaseHandler):
#    @tornado.web.authenticated
#    def get(self):
#        user = self.current_user
#        if user['name'] in ['admin',]:
#            shared_files = Sharedfile.all()
#            for sf in shared_files:
#                user = User.get('id = %s', sf.user_id)
#                user_shake = user.shake()
#                ssf = Shakesharedfile.get("shake_id = %s and sharedfile_id = %s", user_shake.id, sf.id)
#                if not ssf:
#                    ssf = Shakesharedfile(shake_id = user_shake.id, sharedfile_id = sf.id)
#                    ssf.save()
#                else:
#                    shared_file = Sharedfile.get('id=%s', ssf.sharedfile_id)
#                    ssf.created_at = shared_file.created_at
#                    ssf.save()
#        return self.redirect("/")
项目:aiopeewee    作者:kszucs    | 项目源码 | 文件源码
def test_atomic_with_delete(flushdb):
    for i in range(3):
        await User.create(username=f'u{i}')

    async with db.atomic():
        user = await User.get(User.username == 'u1')
        await user.delete_instance()

    usernames = [u.username async for u in User.select()]
    assert sorted(usernames) == ['u0', 'u2']

    async with db.atomic():
        async with db.atomic():
            user = await User.get(User.username == 'u2')
            await user.delete_instance()

    usernames = [u.username async for u in User.select()]
    assert usernames == ['u0']
项目:graphql-pynamodb    作者:yfilali    | 项目源码 | 文件源码
def identity(payload):
    user_id = payload['identity']
    try:
        g.user = User.get(user_id, None)
        return g.user
    except User.DoesNotExist:
        return None
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def test_get_users_list_not_authenticated__unauthorized(self):
        add_user(None, TEST_USER_PSW)

        resp = self.app.get(API_ENDPOINT.format('users/'))
        assert resp.status_code == UNAUTHORIZED
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def load_user(user_id):
    """
    Current user loading logic. If the user exists return it, otherwise None.

    Args:
        user_id (int): Peewee user id

    Returns:
        models.User: The requested user
    """
    try:
        return User.get(User.id == user_id)
    except User.DoesNotExist:
        return None
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self):
        """ Get all the orders."""
        data = Order.json_list(Order.select())
        return generate_response(data, OK)