SqlAlchemy归纳总结

时间:2019-01-11
本文章向大家介绍SqlAlchemy归纳总结,主要包括SqlAlchemy归纳总结使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

使用sqlalchemy有一段时间了,基本操作都熟悉了,所以今天把关于Sqlalchemy的使用归纳总结一下。

创表

Sqlalchemy是操作数据库的库,所以首先要创建数据库表,在这里我使用的是sqlite3。
首先在你的配置文件里面配置数据库位置
config.py

import os
# 项目根目录
basedir = os.path.abspath(os.path.dirname(__file__))
# 配置数据库路径
os.environ['DATABASE_URL] = 'sqlite:///' + os.path.join(basedir, 'database', 'test.sqlite')

接下来就是在模型类中创建表了
models.py

import os
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, schema, Boolean, Enum, DateTime, Text, ForeignKey, LargeBinary

engine = create_engine(os.environ['SQL_PATH'])

Base = declarative_base()

class User(Base):
    '''用户信息管理'''
    __tablename__ = 'user'

    Id = Column(Integer, primary_key=True, autoincrement='auto')
    username = Column(String(32), nullable=False, comment='用户名')
    password = Column(String(64), nullable=False, comment='密码')
    email = Column(String(100), default=None, comment='邮箱')
 
 # 创建表
Base.metadata.create_all(engine)

创建模型后,接下来再创建一个操作数据库的api类
databaseApi.py

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

class API(object):
    """
    自定义orm框架的api接口
    """

    def __init__(self, engine):
        # 创建与数据库的会话session class ,这里返回给session的是个class,不是实例
        session_factory  = sessionmaker(bind=engine)
        Session = scoped_session(session_factory)
        # 生成session 实例
        self.session = Session()

    def addone(self, obj, data):
        '''添加单条数据'''
        # 实例化对象
        emp = obj(**data)
        try:
            # 添加数据
            self.session.add(emp)
            # 提交修改
            self.session.commit()
            # 返回结果
            self.session.close()
            return True, 'ok'
        except Exception as e:
            # 添加失败后回滚数据
            self.session.rollback()
            self.session.close()
            # 返回状态和错误信息
            return False, e
    def addall(self, data):
        '''添加多条数据, 参数data应该是一个对象列表'''

        try:
            self.session.add_all(**data)
            self.session.commit()
            self.session.close()
            return True, 'ok'
        except Exception as e:
            self.session.rollback()
            self.session.close()
            return False, e
    def delete(self, obj, kwargs):
        '''删除指定数据,kwargs是一个删除条件的字典'''
        emp = obj()
        # 判断传入的删除条件是否正确
        if kwargs is not None:
            # 循环字典的键,即是表的属性,判断是否存在表中
            for lib in kwargs.keys():
                # 如果emp中存在此属性就返回None,不存在就返回notexists
                resu = getattr(emp, lib, 'notexists')
                if resu == 'notexists':
                    self.session.close()
                    return False, '{0} is not defind'.format(lib)
        else:
            self.session.close()
            return False, '请指定删除条件, 如果要删除全部内容请使用deall()函数'
        # 查询判断要删除的值是否存在, 如果不存在会返回None
        res = self.session.query(obj).filter_by(**kwargs).all()
        if res:
            try:
                # 删除数据
                for re in res:
                    self.session.delete(re)
                    self.session.commit()
                self.session.close()
                return True, 'ok'
            except Exception as e:
                self.session.rollback()
                self.session.close()
                return False,e
        else:
            self.session.close()
            return False,'notexists'
    def deall(self, obj):
        '''删除表中所有数据的方法'''
        res = self.session.query(obj).all()
        if res:
            try:
                for re in res:
                    self.session.delete(re)
                    self.session.commit()
                self.session.close()
                return True, 'ok'
            except Exception as e:
                self.session.rollback()
                self.session.close()
                return False, e
        else:
            self.session.close()
            return False, 'table is not content'
    def change(self, obj, contents, kwargs):
        '''修改数据, contents是修改的列和修改后值的字典,kwargs是条件字典'''
        emp = obj()
        # 对传递进来的不定长参数进行判断
        if kwargs is not None:
            # 循环字典的键,即是表的属性,判断是否存在表中
            for lib in kwargs.keys():
                # 如果emp中存在此属性就返回None,不存在就返回notexists
                resu = getattr(emp, lib, 'notexists')
                if resu == 'notexists':
                    self.session.close()
                    return False, '{0} is not defind'.format(lib)
        else:
            self.session.close()
            return False, '请指定修改条件, 不允许一次修改全部内容'
        # 对更改数据的查询和更改进行异常判断
        try:
            res = self.session.query(obj).filter_by(**kwargs).all()
            for re in res:
                for content in contents:
                    # 更新对象属性内容
                    setattr(re, content, contents[content])
            self.session.commit()
            self.session.close()
            return True, 'ok'
        except Exception as e:
            self.session.rollback()
            self.session.close()
            return False, e


    def query(self, obj, kwargs=None, nuique=False):
        '''查询指定对象, 返回列表包含的对象字典,可能是单个或多个'''
        emp = obj()
        res = None
         # 对传递进来的不定长参数进行判断
        if kwargs is not None:
            # 循环字典的键,即是表的属性,判断是否存在表中
            for lib in kwargs.keys():
                # 如果emp中存在此属性就返回None,不存在就返回notexists
                resu = getattr(emp, lib, 'notexists')
                if resu == 'notexists':
                    return None # 查询失败就返回None
            # 查询指定内容
            if nuique: # 如果指定了唯一参数就只返回一条数据
                res = self.session.query(obj).filter_by(**kwargs).first()
            else:
                res = self.session.query(obj).filter_by(**kwargs).all()
        else:
            # 没有指定参数就默认查询所有内容
            res = self.session.query(obj).all()
        self.session.close()
        return res

    def fuzzy_query(self, obj, kwargs, nuique=False):
        '''模糊查询'''
        res = None
        if nuique:
            res = self.session.query(obj).filter(*kwargs).first()
        else:
            res = self.session.query(obj).filter(*kwargs).all()
        self.session.close()
        return res

定义好模块和接口类后,接下来就可以在操作数据库了
views.py

from models import engine, User
from databaseApi import API

# 初始化数据库操作引擎
db = API(engine)

# 添加数据
data = {
	'username' : 'zhangsan',
	'password' : '123456', 
	'email' : 'zhangsan@163.com'
}
res, e = db.addone(User, data)
# 如果添加成功res为True,e为'ok', 如果添加失败res为False,e 为报错信息下面的类同
# 完全匹配查询
res = db.query(User) # 不带任何参数查询表中的所有内容
res = db.query(User, {'name': 'zhangsan'}) # 指定姓名
res = db.query(User, {'name': 'zhangsan'}, True) # 指定返回单条数据
# 不完全匹配查询
res = db.fuzzy_query(User, {User.name.like('zhang%')}) # 查询所有以zhang开头的
res = db.fuzzy_query(User, {User.name.like('zhang%'), User.eamil != None}) # 添加条件,性zhang并且邮箱不为空的

# 删除数据
res, e = db.delete(User, {'name' : 'lisi'})
res, e = db.deall(User) # 这个方法删除表中的所有数据,慎用。

# 修改数据
# 第一个参数是修改的表,第二个参数是修改的字段和修改后的值,第三个参数是修改的条件
res, e = db.change(User, {'email': 'zhang_san@163.com'}, {'name' : 'zhangsan'})

上面的接口类中基本能满足大部分操作数据库的需要了,下面在附上搜集的一些高级查询的使用。
附记(下面是原文)
SQLAlchemy 几种查询方式总结

#带条件查询
kwargs = {User.Id > 10} # 可以将查询条件封装成一个字典
kwargs = {User.Id > 10, User.name='zhangsan'}  # 可以有多个条件
kwargs = [User.Id > 10, User.name='zhangsan'] # 可以将条件封装成一个列表
kwargs = {User.name.like('zhang%')} # 模糊匹配以'zhang'开头的
kwargs = {User.name != 'zhangsan'} # 取反
session.query(User).filter(*kwarg1).all() # 执行查询,返回结果

# sql过滤
session.query(User).filter("Id>id").params(id=10).all()
#关联查询 
session.query(User, Address).filter(User.Id == Address.user_id).all()
session.query(User).join(User.addresses).all()
session.query(User).outerjoin(User.addresses).all()
#聚合查询
session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all()
session.query(User.name, func.sum(User.Id).label("user_id_sum")).group_by(User.name).all()
#子查询
stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
session.query(User, stmt.c.address_count).outerjoin((stmt, User.Id == stmt.c.user_id)).order_by(User.Id).all()
#exists
session.query(User).filter(exists().where(Address.user_id == User.id))
session.query(User).filter(User.addresses.any())
# 限制返回字段
session.query(User.name, User.created_at,                     
             User.updated_at).filter_by(name="zhangsan").order_by(            
             User.created_at).first()
            
# 记录总数查询
from sqlalchemy import func

session.query(func.count(User.id))
session.query(func.count(User.id)).group_by(User.name)\

from sqlalchemy import distinct

session.query(func.count(distinct(User.name)))

对Sqlalchemy的学习就到这里啦,后续更加深入学了在更新。
@快乐是一切