Python flask sqlalchemy的简单使用及常用操作

2022-08-17 14:04:33
目录
前言flask sqlalchemy的配置使用sqlalchemy的增删改查查询数据增加数据修改数据删除数据总结

前言

说到面向对象,大家都不陌生。关系型数据库也是后端日常用来存储数据的,但数据库是关系型的,因此,ORM通过对象模型和数据库的关系模型之间建立映射,我们就能像操作对象一样来操作数据库。>

flask>

在python中,常用的ORM工具就是sqlalchemy了。下面就以一个简单的flask例子来说明吧。

首先,写一个最简单的flask项目,代码如下:

from flask import Flask

app = Flask(__name__)
@app.route('/')
def orm_test():
    return "hello"

接下来我们导入ORM配置,添加如下代码:

from flask_sqlalchemy import SQLAlchemy

def orm_config():
    url = "mysql+mysqlconnector://{user}:{pwd}@{host}:{port}/{db_name}?charset=utf8"
    orm_conf = {
        'SQLALCHEMY_DATABASE_URI': url
    }
    return orm_conf

# ORM 设置
app.config.from_mapping(orm_config)
db = SQLAlchemy(app)

这样我们就将ORM配置OK了。

    然后我们新增一个表table1的model
    # model表名
    class Table1(db.Model):
        # 表名
        __tablename__ = "table1"
    
        id = db.Column(db.Integer, primary_key=True)
        col = db.Column(db.String(64), nullable=False, unique=True, comment='字段释义')

    以上配置这是在数据源只有一个库的时候,但很多时候我们还需要访问别的库,这时需要在ORM配置和model上做一些设置。

    ORM配置中需要用到SQLALCHEMY_BINDS来添加数据库, model中__bind_key__来指定数据库了。

    具体修改如下:

    修改ORM配置:

    def orm_config():
        url = "mysql+mysqlconnector://{user}:{pwd}@{host}:{port}/{db_name}?charset=utf8"
        # 指定的别库
        other_url = "mysql+mysqlconnector://{user1}:{pwd1}@{host1}:{port1}/{db_name1}?charset=utf8"
        orm_conf = {
            'SQLALCHEMY_DATABASE_URI': url,
            # 添加别库
            "SQLALCHEMY_BINDS":{
                    "other_db":other_url
                },
        }
        return orm_conf

    表model指定库:

    class Table2(db.Model):
        # 指定别库
        __bind_key__ = 'other_db'
        __tablename__ = "table2"
        
        id = db.Column(db.Integer, primary_key=True)
        col = db.Column(db.String(64), nullable=False, unique=True, comment='字段释义')

    最后,我们在接口中使用下ORM。

    @app.route('/')
    def orm_test():
        # 查询table1数据
        rows = Table1.query.filter(Table1.id<5)
        res = []
        for row in rows:
            dict = {
                "id": row.id,
                "col": row.col
            }
            res.append(dict)
        return "hhh"

    当我们遇到复杂操作,不知道ORM语法该怎么写时,还可以直接用原生sql + ORM session execute的方式执行,示例如下:

    sql = "select count(*) as cnt from table1 group by col"
    rows = db.session.execute(sql)

    以上例子我们是查询table1表的id<5的数据。

    完整代码如下:

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    app = Flask(__name__)
    def orm_config():
        url = "mysql+mysqlconnector://{user}:{pwd}@{host}:{port}/{db_name}?charset=utf8"
        other_url = "mysql+mysqlconnector://{user1}:{pwd1}@{host1}:{port1}/{db_name1}?charset=utf8"
        orm_conf = {
            'SQLALCHEMY_DATABASE_URI': url,
            "SQLALCHEMY_BINDS":{
                    "other_db":other_url
                },
        }
        return orm_conf
    
    # ORM 设置
    app.config.from_mapping(orm_config)
    db = SQLAlchemy(app)
    
    # model表名
    class Table1(db.Model):
        # 表名
        __tablename__ = "table1"
    
        id = db.Column(db.Integer, primary_key=True)
        col = db.Column(db.String(64), nullable=False, unique=True, comment='字段释义')
    
    class Table2(db.Model):
        # 指定库
        __bind_key__ = 'other_db'
        __tablename__ = "table2"
    
        id = db.Column(db.Integer, primary_key=True)
        col = db.Column(db.String(64), nullable=False, unique=True, comment='字段释义')
    
    @app.route('/')
    def orm_test():
        # 查询table1数据
        rows = Table1.query.filter(Table1.id<5)
        res = []
        for row in rows:
            dict = {
                "id": row.id,
                "col": row.col
            }
            res.append(dict)
        return "hhh"
    if __name__ =="__main__":
        app.run()

    sqlalchemy的增删改查

    刚开始接触sqlalchemy时,对于语法不熟悉,写代码也是比较痛苦的。这里总结下sqlalchemy常用的语法吧。

    查询数据

    # 查询id<5的数据
    q = Table1.query.filter(Table1.id<5)
    # 查询过滤用 and、or
    from sqlalchemy import and_, or_
    q = Table1.query.filter(and_(Table1.id<5, Table1.col=='掘金'))
    q = Table1.query.filter(or_(Table1.id<5, Table1.col=='掘金'))
    # 查询过滤用in(语法:model.{字段名}.in_({列表}))
    q = Table1.query.filter(Table1.id.in_([1,2,3]))
    # 连表查询
    q = Table1.query.join(Table2, Table2.id==Table1.id) \
                       .filter(Table1.id<5)
    
    # 解析数据
    res = {'data': [dict(i) for i in q]}
    # 查询数据count
    count = q.count()

    增加数据

    row = Table1(id=1, col='掘金')
    db.session.add(row)
    db.seesion.commit()

    修改数据

     row = Table1.query.filter(Table1.id<5)
     update_data = {"col": "掘金"}
     row.update(update_data)
     db.session.commit()

    删除数据

    row = Table1.query.filter(Table1.id<5)
    row.delete()
    db.session.commit()

    备注: 增删改都要commit()

    总结

    我们在工程代码中使用sqlalchemy时,在配置时记得根据实际情况添加相关配置参数,比如连接池的数量、自动回收连接的秒数等等。

    到此这篇关于Python>