Python app.db 模块,session() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用app.db.session()

项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def create_user(user, email, password=None, role='user'):
    if not password:
        password = read_pwd()

    data = dict(
        user_name=user, email=email, password=hash_pwd(password), active=True)
    errors = schema.UserSchema(
        exclude=('version_id',)).validate(data, db.session)
    if errors:
        raise InvalidCommand('Validation Error: %s' % errors)

    role = model.Role.query.filter_by(name=role).one()
    u = model.User(**data)
    u.roles.append(role)
    db.session.add(u)  # @UndefinedVariable
    db.session.commit()  # @UndefinedVariable
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def cleanup(uploads_older_then=24, conversions_older_then=24 * 7, purge_empty_ebooks_dirs=False):
    since = datetime.now() - timedelta(hours=float(uploads_older_then))
    for upload in model.Upload.query.filter(model.Upload.created < since):
        logic.delete_upload(upload)
    db.session.commit()
    purge_empty_dirs(settings.UPLOAD_DIR, delete_root=False)
    since = datetime.now() - timedelta(hours=float(conversions_older_then))
    for batch in model.ConversionBatch.query.filter(model.ConversionBatch.created < since):
        logic.delete_conversion_batch(batch);
    for conversion in model.Conversion.query.filter(model.Conversion.created < since):
        logic.delete_conversion(conversion)

    db.session.commit()
    purge_empty_dirs(settings.BOOKS_CONVERTED_DIR, delete_root=False)

    if purge_empty_ebooks_dirs:
        purge_empty_dirs(settings.BOOKS_BASE_DIR, delete_root=False)
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def api_delete():
    uniqueid = request.args.get("uniqueid")
    file = File.query.filter_by(unique_id=uniqueid).first()

    if g.user.admin or (g.user.id == file.uploader_id):
        upload_folder = myapp.config['UPLOAD_FOLDER']
        try:
            folder = os.path.join(upload_folder, file.unique_id)
            cmpl_path = os.path.join(folder, file.name)

            pubkey = file.publickey
            db.session.delete(pubkey)
            db.session.delete(file)
            db.session.commit()

            os.remove(cmpl_path)
            os.rmdir(folder)
            return jsonify(response=responds['FILE_DELETED'])
        except Exception as e:
            print(e)
            return jsonify(response=responds['SOME_ERROR'])
    else:
        return jsonify(response=responds['FAILED_AUTHORIZATION'])
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def _adduser():
    if request.method == 'POST':
        if g.user.admin:
            form = AddUser()
            if form.validate():
                newuser = User(form.username.data, form.password.data, form.email.data)

                if form.adminuser.data is None:
                    newuser.admin = False
                else:
                    newuser.admin = form.adminuser.data

                session = db.session()
                try:
                    session.add(newuser)
                    session.commit()
                except Exception as e:
                    session.rollback()
                    return ret_dbfail_response(e)

                return jsonify(response=('New User ' + newuser.username + ' added.'))
            else:
                return jsonify(response=responds["FAILED_VALIDATION"])
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def add_slack_game_post():
    session = db.session

    data = flask.request.form

    token = data['token']
    if token != slack_token:
        return "You're not who you say you are. Wrong token {}".format(token)

    winner_email, loser_email = data['text'].split(' beat ')
    winner = session.query(User).filter(
        User.email == winner_email,
    ).first()
    loser = session.query(User).filter(
        User.email == loser_email
    ).first()
    if not winner:
        return 'no player with email {}'.format(winner_email)
    if not loser:
        return 'no player with email {}'.format(loser_email)

    add_game(session, winner, loser, slack_user_submitted_by=data['user_name'])

    session.commit()
    return "{} beat {}! {}'s score is now {} and {}'s score is now {}".format(winner.name, loser.name, winner.name,
                                                                              round(winner.elo, 3), loser.name,
                                                                              round(loser.elo, 3))
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def start_game_post():
    session = db.session
    # todo make this read from json
    in_progress_player_1_id = 1
    in_progress_player_2_id = 2
    game = Game(
        in_progress_player_1_id=in_progress_player_1_id,
        in_progress_player_2_id=in_progress_player_2_id
    )
    session.add(game)
    session.commit()
    return 'game id and maybe redirect to play game'
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def load_user(user_id):
    session = db.session
    return session.query(User).get(user_id)
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def dev_login(user_id):
    if ENVIRONMENT == 'dev':
        login_user(db.session.query(User).get(user_id))
    return redirect(url_for('index'))
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def index():

    session = db.session

    current_user = flask.g.user

    active_users = get_active_users(session)

    return flask.render_template('index.html',
                                 title='Cratejoy Darts',
                                 current_user=current_user,
                                 active_users=active_users,
                                 auth_url=get_google_authorization_url())
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def db(request, app):
    """Create test database tables"""
    _db.drop_all()
    # Create the tables based on the current model
    _db.create_all()

    user = User.create_test_user()
    TestClient.test_user = user
    app.test_client_class = TestClient
    app.response_class = TestResponse

    _db.session.commit()
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def session(request, monkeypatch):
    """Prevent the session from closing"""
    # Roll back at the end of every test
    request.addfinalizer(_db.session.remove)

    # Prevent the session from closing (make it a no-op) and
    # committing (redirect to flush() instead)
    # https://alextechrants.blogspot.com/2014/01/unit-testing-sqlalchemy-apps-part-2.html
    # monkeypatch.setattr(_db.session, 'commit', _db.session.flush)
    monkeypatch.setattr(_db.session, 'remove', lambda: None)
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def change_password(user, password=None):
    try:
        u = model.User.query.filter(
            or_(model.User.user_name == user, model.User.email == user)).one()  # @UndefinedVariable
    except NoResultFound:
        raise InvalidCommand('No such User')
    if not password:
        password = read_pwd()

    u.password = hash_pwd(password)
    db.session.commit()  # @UndefinedVariable
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def migrate_tables():
    import psycopg2
    print('This will migrate database to latest schema, you are advised to backup database before running this command')
    if prompt_bool('Do you want to continue?'):
        mdir = os.path.join(SQL_DIR, 'migration')
        version_obj=model.Version.query.one_or_none()

        if not version_obj:
                version_obj=model.Version(version=0, version_id=1)
                db.session.add(version_obj)
        old_version = version_obj.version
        if old_version == db_version:
            print('DB is at correct version %d'% old_version)
        scripts = []
        for script in os.listdir(mdir):
            m=re.match(r'v(\d+)\.sql', script)
            if m:
                version = int(m.group(1))
                if version <= db_version and version > old_version:
                    scripts.append((version, os.path.join(mdir,script)))

        scripts.sort()
        connection = psycopg2.connect(database=settings.DB_NAME,
                                      user = settings.DB_USER,
                                      password = settings.DB_PASSWORD,
                                      host = settings.DB_HOST,
                                      port = settings.DB_PORT)
        connection.autocommit = True
        #connection = db.engine.raw_connection()  # @UndefinedVariable
        try:
            c = connection.cursor()
            for v,fname in scripts:
                script = open(fname, 'rt', encoding='utf-8-sig').read()
                print('Upgrading database to version %d'% v)
                res = c.execute(script)
                version_obj.version = v
                db.session.commit()
                #connection.commit()
        finally:
            connection.close()
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def change_account(cuser, form):
    if form.validate() and cuser.check_password(form.oldpassword.data):
        session = db.session()
        try:
            if cuser.email is not form.email.data:
                cuser.email = form.email.data
            if cuser.username is not form.username.data:
                cuser.username = form.username.data

                cuser.password = form.password.data
            session.commit()
            return jsonify(response=responds['INFO_CHANGED'])
        except IntegrityError as e:
            session.rollback()
            return ret_dbfail_response(e)
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def deliver_file(file, request):
    upload_folder = myapp.config['UPLOAD_FOLDER']
    path = os.path.join(upload_folder, file.unique_id)
    file.dl_count += 1
    db.session.commit()
    filename = file.name
    return send_from_directory(path, filename, as_attachment=(request.referrer is not None))
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def api_unpublish():
    if request.method == 'GET':
        uniqueid = request.args['uniqueid']
        file = File.query.filter_by(unique_id=uniqueid).first()
        if file is not None:
            if g.user.admin or (g.user.id == file.uploader_id):
                key = file.publickey

                if key is not None:
                    file.publickey.public = False
                    db.session.commit()
                    url = request.host_url + "pub/dl/" + key.hash
                    return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url)

    return jsonify(response=responds['SOME_ERROR'])
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def api_publish():
    if request.method == 'GET':
        uniqueid = request.args['uniqueid']
        file = File.query.filter_by(unique_id=uniqueid).first()
        if file is not None:
            if g.user.admin or (g.user.id == file.uploader_id):
                key = file.publickey

                if (key is not None) and (key.public is False):
                    file.publickey.public = True
                    db.session.commit()
                    url = request.host_url + "pub/dl/" + key.hash
                    return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url)

    return jsonify(response=responds['SOME_ERROR'])
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def createadminuser():
    admin = User(email='{{ADMIN_MAIL}}', password='{{ADMIN_PASSWORD}}',
                 username='{{ADMIN_USER}}')
    admin.admin = True
    db.session().add(admin)
    db.session().commit()
    print("Admin User created.")
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def test_file_publichash(self):
        file = File('testfile.txt', helpers.unique_id(), 1337, 'text/html')

        key = PublicKey()

        key.public = True

        file.publickey = key

        db.session().add(file)
        db.session().commit()

        self.assertEqual(key, file.publickey)
项目:flask-template    作者:pwgraham91    | 项目源码 | 文件源码
def load_user(user_id):
    session = db.session
    return session.query(User).get(user_id)
项目:flask-template    作者:pwgraham91    | 项目源码 | 文件源码
def dev_login(user_id):
    if ENVIRONMENT == 'dev':
        login_user(db.session.query(User).get(user_id))
    return redirect(url_for('index'))
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def export_users():
    session = db.session
    return flask.Response(json.dumps([user.dict for user in session.query(User).all()]), mimetype=u'application/json')
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def export_games():
    session = db.session
    return flask.Response(
        json.dumps([game_dict(session, game) for game in session.query(Game).all()]),
        mimetype=u'application/json')
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def add_game_post():
    session = db.session

    data = flask.request.json

    winner_id = data['winner_id']
    winner = session.query(User).get(winner_id)
    if not winner:
        raise Exception('no player with id {}'.format(winner_id))

    loser_id = data['loser_id']
    loser = session.query(User).get(loser_id)
    if not loser:
        raise Exception('no player with id {}'.format(loser_id))

    game = add_game(session, winner, loser, submitted_by_id=flask.g.user.id)

    session.commit()
    return flask.Response(json.dumps({
        'id': game.id,
        'success': True
    }), mimetype=u'application/json')
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def remove_game_get(game_id):
    session = db.session

    current_user = flask.g.user
    if not current_user.admin:
        return flask.Response(json.dumps({
            'success': False,
            'message': 'Access Denied',
            'affected_player_ids': [],
            'updated_game_ids': []
        }), mimetype=u'application/json')

    game = session.query(Game).get(game_id)
    if not game:
        return flask.Response(json.dumps({
            'success': False,
            'message': 'No Such Game',
            'affected_player_ids': [],
            'updated_game_ids': []
        }), mimetype=u'application/json')

    affected_player_ids, updated_game_ids = remove_game(session, game)

    return flask.Response(json.dumps({
        'affected_player_ids': list(affected_player_ids),
        'updated_game_ids': list(updated_game_ids)
    }), mimetype=u'application/json')
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def callback():
    current_user = flask.g.user
    session = db.session

    # Redirect user to home page if already logged in.
    if current_user is not None and current_user.is_authenticated:
        return redirect(url_for('index'))
    if 'error' in request.args:
        if request.args.get('error') == 'access_denied':
            return 'You denied access.'
        return 'Error encountered.'
    if 'code' not in request.args and 'state' not in request.args:
        return redirect(url_for('login'))
    else:
        # Execution reaches here when user has
        # successfully authenticated our app.
        google = get_google_auth(state=flask_session['oauth_state'])
        try:
            token = google.fetch_token(
                Auth.TOKEN_URI,
                client_secret=Auth.CLIENT_SECRET,
                authorization_response=request.url.replace('http://', 'https://'),
            )
        except HTTPError:
            return 'HTTPError occurred.'
        google = get_google_auth(token=token)
        # todo: cool stuff in here
        resp = google.get(Auth.USER_INFO)
        if resp.status_code == 200:
            user_data = resp.json()
            if user_data.get('hd') != 'cratejoy.com':
                return 'Only cratejoy.com emails allowed'
            email = user_data['email']
            user = User.query.filter_by(email=email).first()
            if user is None:
                user = User(
                    name=user_data['name'] or user_data['email'].split('@')[0].capitalize(),
                    email=email,
                    avatar=user_data['picture']
                )
            user.tokens = json.dumps(token)
            session.add(user)
            session.commit()
            login_user(user)
            return redirect(url_for('index'))
        return 'Could not fetch your information.'
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def _upload():
    if request.method == 'POST':
        file = request.files['files[]']

        # get filename and folders
        file_name = secure_filename(file.filename)
        directory = str(unique_id())
        upload_folder = myapp.config['UPLOAD_FOLDER']

        if file.filename == '':
            return redirect(request.url)
        if file:
            #and allowed_file(file.filename)

            save_dir = os.path.join(upload_folder, directory)

            if not os.path.exists(save_dir):
                os.makedirs(save_dir)

            cmpl_path = os.path.join(save_dir, file_name)
            file.save(cmpl_path)
            size = os.stat(cmpl_path).st_size

            # create our file from the model and add it to the database
            dbfile = File(file_name, directory, size, file.mimetype)

            g.user.uploads.append(dbfile)
            db.session().add(dbfile)
            db.session().commit()

            if "image" in dbfile.mimetype:
                get_thumbnail(cmpl_path, "100")
                thumbnail_url = request.host_url + 'thumbs/' + directory
            else:
                thumbnail_url = ""

            url = request.host_url + 'uploads/' + directory
            delete_url = url
            delete_type = "DELETE"

            file = {"name": file_name, "url": url, "thumbnailUrl": thumbnail_url, "deleteUrl": delete_url,
                    "deleteType": delete_type, "uid": directory}
            return jsonify(files=[file])

        else:
            return jsonify(files=[{"name": file_name, "error": responds['FILETYPE_NOT_ALLOWED']}])
项目:flask-template    作者:pwgraham91    | 项目源码 | 文件源码
def callback():
    current_user = flask.g.user
    session = db.session

    # Redirect user to home page if already logged in.
    if current_user is not None and current_user.is_authenticated:
        return redirect(url_for('index'))
    if 'error' in request.args:
        if request.args.get('error') == 'access_denied':
            return 'You are denied access.'
        return 'Error encountered.'
    if 'code' not in request.args and 'state' not in request.args:
        return redirect(url_for('login'))
    else:
        # Execution reaches here when user has
        # successfully authenticated our app.
        google = get_google_auth(state=flask_session['oauth_state'])
        try:
            token = google.fetch_token(
                Auth.TOKEN_URI,
                client_secret=Auth.CLIENT_SECRET,
                authorization_response=request.url.replace('http://', 'https://'),
            )
        except HTTPError:
            return 'HTTPError occurred.'
        google = get_google_auth(token=token)
        # todo: cool stuff in here
        resp = google.get(Auth.USER_INFO)
        if resp.status_code == 200:
            user_data = resp.json()
            email = user_data['email']
            user = User.query.filter_by(email=email).first()
            if user is None:
                user = User(
                    name=user_data['name'] or user_data['email'].split('@')[0].capitalize(),
                    email=email,
                    avatar=user_data['picture']
                )
            user.tokens = json.dumps(token)
            session.add(user)
            session.commit()
            login_user(user)
            return redirect(url_for('index'))
        return 'Could not fetch your information.'