首页 > 数据库> > sqlalchemy介绍和快速使用,创建操作数据表,scoped_session线程安全,基本增删查改,一对多,多对多,flask-sqlalchemy使用和flask-migrate使用
sqlalchemy介绍和快速使用,创建操作数据表,scoped_session线程安全,基本增删查改,一对多,多对多,flask-sqlalchemy使用和flask-migrate使用
作者:互联网
1 sqlalchemy介绍和快速使用
# django 的orm框架,对象关系映射,只能在djagno中用
# sqlalchemy:独立的orm框架,轻松的集成到任意项目中去,SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果
# djagno 的orm,sqlalchemy,peewee。。。 目前异步的orm框架,没有特别好的
# 安装:pip3 install sqlalchemy
# 组成部分(了解)
Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类
Schema/Types,架构和类型
SQL Exprression Language,SQL表达式语言
# SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
1.1 原生操作的快速使用
# 原生操作,写原生sql,用的比较少
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/luffy_api",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
# 从连接池中拿一个链接
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from luffy_banner"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
2 创建操作数据表
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class Users(Base):
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
email = Column(String(32), unique=True)
#datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
__tablename__ = 'users' # 数据库表名称
__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
Index('ix_id_name', 'name', 'email'), #索引
)
# orm不能创建数据库,手动创建
#sqlalchemy只能创建表和删除表,不能新增,删除字段
# 创建表,同步到数据库
def init_db():
"""
根据类创建数据库表
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 创建被Base管理的所有表
Base.metadata.create_all(engine)
def drop_db():
"""
根据类删除数据库表
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
# init_db()
drop_db()
# sqlalchemy 只能创建表,删除表,不能增加删除字段 (在flask中,可以使用第三方支持)
3 scoped_session线程安全
# 1 新增数据
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
from sqlalchemy.orm import scoped_session
from threading import Thread
# 第一步:创建engine
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第二步:生成session,每次执行数据库操作时,都需要创建一个Session
Session = sessionmaker(bind=engine)
# 原来的时候
# session = Session()
# 变成了它,它就是线程安全,session对象在flask框架中就可以是全局的,任意线程使用的是就是这个全局session,但是不会出现数据错乱问题
# 本质内部使用了local对象,保证了,每个线程实际上使用的是当前线程自己的session
session = scoped_session(Session)
# scoped_session类的对象,正常来讲是没有 add, close,commit...方法和属性的,但是实际上有,是通过create_proxy_methods装饰器,设置进去的(通过反射setattr写进去的)
def task(i):
# 第三步:插入数据
user = Users(name='lqz%s'%i, email='33%s@qq.com'%i, extra='lqz is handsome')
session.add(user)
# 第四步:提交
session.commit()
# 第五步:关闭链接
session.close()
for i in range(10):
t = Thread(target=task,args=[i,])
t.start()
4 基本增删查改
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
from sqlalchemy.sql import text
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()
# 1 增加数据一条
# session.add(对象)
# 2 同时增加多条
# session.add_all([Users(name='lqz123', email='asdf@qq.com', extra='asfasdf'),Users(name='pyy', email='aseedf@qq.com', extra='asdfeedd')])
# 3 删除 ---》查出来删
# res = session.query(Users).filter(Users.id > 17).delete()
# res = session.query(Users).filter(Users.name == 'lqz').delete()
# print(res)
# 4 修改---->查出来改
# res=session.query(Users).filter(Users.id > 10).update({"name" : "lqz"})
# 类似于django的F查询
# session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
# session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
# 5 查询 ----》查询特别多
# 5.1 查所有
# res = session.query(Users).all()
# 5.2 指定查询的字段select name as xx,age from users;
# res = session.query(Users.name.label('xx'), Users.email)
# 5.3 通过filter 过滤
# filter写条件 > < ==
# res=session.query(Users).filter(Users.id > 0).all()
# res=session.query(Users).filter(Users.name == 'lqz2099').all()
# 5.4 filter_by 过滤,filter_by表达式
# res = session.query(Users).filter_by(name='lqz',email='332@qq.com').all()
# res = session.query(Users).filter_by(name='lqz').first()
# 5.5 自定制where部分查询sql
# res = session.query(Users).filter(text("id>:value or name=:name")).params(value=16, name='lqz099').all()
# res = session.query(Users).filter(Users.id<13 , Users.name=='lqz099').all()
# 5.6 纯自定义sql---》django中支持这个
# res = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()
# 5.7 filter和filter_by的其他使用
# 条件
# res = session.query(Users).filter_by(name='lqz').all()
# 表达式,and条件连接
# ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
# between
# res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz').all()
# res = session.query(Users).filter(Users.id.between(7, 9)).all()
# res = session.query(Users).filter(Users.id.between(7, 9)) # 原生sql
# res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz') # 原生sql
# in
# res = session.query(Users).filter(Users.id.in_([7,8,9])).all()
# ~非,除。。外
# res = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()
# 二次筛选
# res = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz099'))).all()
# 5.8 与 或 非
# or_包裹的都是or条件,and_包裹的都是and条件
from sqlalchemy import and_, or_
# res = session.query(Users).filter(and_(Users.id > 3, Users.name == 'lqz099')).all()
# res = session.query(Users).filter(or_(Users.id < 2, Users.name == 'lqz099')).all()
# res = session.query(Users).filter(
# or_(
# Users.id < 2,
# and_(Users.name == 'lqz099', Users.id > 3),
# Users.extra != ""
# ))
# 5.9 通配符,以e开头,不以e开头
# res = session.query(Users).filter(Users.name.like('lqz%')).all()
# res = session.query(Users).filter(~Users.name.like('%lqz%')).all()
# 5.10 限制,用于分页,区间
# res = session.query(Users)[1:2]
# 每页显示5条,第3页的数据
# res = session.query(Users)[3*5:3*5+5]
# 5.11 排序,根据name降序排列(从大到小)
# res = session.query(Users).order_by(Users.name.desc()).all()
# 第一个条件重复后,再按第二个条件升序排
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 5.12 分组
# 分组
from sqlalchemy.sql import func
# select * from user group by name;
# res = session.query(Users).group_by(Users.name).all()
# 分组之后取最大id,id之和,最小id
# select max(id),sum(id),min(id) from user group by name;
# res = session.query(
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id)).group_by(Users.name).all()
# haviing筛选
# select max(id),sum(id),min(id) from user where email like 33% group by name having min(id) > 2;
res = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).filter(Users.email.like('33%')).group_by(Users.name).having(func.min(Users.id) > 2).all()
### 5.13 链表操作
# select * from users,favor where user.id= favor.nid; # 先笛卡尔积再过滤
# ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
#join表,默认是inner join
# select * from person inner join favor on person.favor_id=favor.id;
# ret = session.query(Person).join(Favor).all()
#isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# select * from person left join favor on person.favor_id=favor.id;
# ret = session.query(Person).join(Favor, isouter=True).all()
# 右链接
# ret = session.query(Favor).join(Person, isouter=True).all()
# 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from person left join favor on person.id=favor.id;
# ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all()
## 5.14 union和union all的区别?
# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
#union和union all的区别?
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union_all(q2).all()
print(res)
# 第四步:提交
session.commit()
# 第五步:关闭链接
session.close()
5 一对多
###一对多关系
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 跟数据库无关,不会新增字段,只用于快速链表操作
# 基于对象的跨表查询:
# 类名,backref用于反向查询
hobby = relationship('Hobby', backref='pers')
def __repr__(self):
return self.name
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Person, Hobby
from sqlalchemy.sql import text
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()
# 1 一对多增加
# 方式一:
# session.add(Hobby(caption='足球'))
# session.add(Person(name='lqz',hobby_id=1))
# session.add(Person(name='pyy',hobby_id=2)) # 有约束,增加失败
# 方式二:
# session.add(Person(name='刘亦菲', hobby=Hobby(caption='橄榄球')))
# 2 一对多查询
# 2.1 基于对象的跨表查询
### 正向查询
# res = session.query(Person).filter(Person.name == '刘亦菲').first()
# print(res)
# print(res.hobby.caption)
## 反向查询
# res=session.query(Hobby).filter_by(caption='足球').first()
# print(res)
# # 取出喜欢足球的所有人---》反向查
# print(res.pers)
# 2.2 基于连表的跨表查询
# select * from person,hobby where person.hobby_id=hobby.id and person.name=彭于晏;
# res=session.query(Person,Hobby).filter(Person.hobby_id==Hobby.id,Person.name=='彭于晏').all()
#select * from person inner join hobby on person.hobby_id=hobby.id where person.name=彭于晏;
res = session.query(Person).join(Hobby).filter(Person.name=='彭于晏').all()
print(res)
session.commit()
session.close()
6 多对多
#多对多
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
#方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
girls = relationship('Girl', secondary='boy2girl', backref='boys')
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Boy,Girl,Boy2Girl
from sqlalchemy.sql import text
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()
# 1 多对多增加
# 方式一:所有表都有,一个个增加
# session.add(Boy(name='dxl'))
# session.add(Girl(name='zqh'))
# session.add_all([Boy(name='dxl'),Girl(name='zqh')])
# session.add_all([Boy2Girl(boy_id=1,girl_id=1),Boy2Girl(boy_id=1,girl_id=2)])
# 方式二:
# session.add(Boy(name='张涛',girls=[Girl(name='芙蓉姐姐'),Girl(name='凤姐')]))
# 2 一对多查询
# 2.1 基于对象的跨表查询
### 正向查询
# res=session.query(Boy).filter_by(name='张涛').first()
# print(res.girls)
## 反向查询
# res=session.query(Girl).filter_by(name='凤姐').first()
# print(res.boys)
# 2.2 基于连表的跨表查询
# print(res)
session.commit()
session.close()
7 flask-sqlalchemy使用和flask-migrate使用
# flask-sqlalchemy:帮助我们快速把sqlalchemy集成到flask中
# 表字段增加删除不支持:flask-migrate,以后增加了字段,删除字段只需要两条迁移命令就完成了
### flask—sqlalchemy
# 第一步:导入SQLAlchemy,实例化得到对象db
# 第二步:注册
db.init_app(app)
# 第三步:创建表模型,继承db.Model
# 第四步:使用session
db.session 处理了线程安全
### 把表同步到数据库中---》flask-migrate
## 基于flask_script,定制几个命令 python manage.py db init/ migrate/upgrade
# 第一个命令:
python38 manage.py db init # 初始化,项目使用的时候,只敲一次,生成migrations文件夹
# 第二个命令:只要改了表结构,增加表,删除表,增加字段,删除字段
python38 manage.py db migrate # 不会在数据库中生成记录,只是记录变化等同于makemigrations
# 第三个命令
python38 manage.py db upgrade # 等同于migrate
8 请求上下文分析
def wsgi_app(self, environ, start_response):
# 读完了:ctx:是RequestContext的对象,内部有当次请求的request对象,session对象,app对象,flash对象
ctx = self.request_context(environ)
error = None
try:
try:
ctx.push()
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)
请求上下文执行流程(ctx):
-0 flask项目一启动,有6个全局变量
-_request_ctx_stack:LocalStack对象
-_app_ctx_stack :LocalStack对象
-request : LocalProxy对象
-session : LocalProxy对象
-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
-2 wsgi_app()
-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session,flash,当前app对象
-2.2 执行: ctx.push():RequestContext对象的push方法
-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
-2.2.3 push方法源码:
def push(self, obj):
#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
#Local(),flask封装的支持线程和协程的local对象
# 一开始取不到stack,返回None
rv = getattr(self._local, "stack", None)
if rv is None:
#走到这,self._local.stack=[],rv=self._local.stack
self._local.stack = rv = []
# 把ctx放到了列表中
#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
rv.append(obj)
return rv
-3 如果在视图函数中使用request对象,比如:print(request)
-3.1 会调用request对象的__str__方法,request类是:LocalProxy
-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
-3.2.1 内部执行self._get_current_object()
-3.2.2 _get_current_object()方法的源码如下:
def _get_current_object(self):
if not hasattr(self.__local, "__release_local__"):
#self.__local() 在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
# 用了隐藏属性
#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
#这个地方的返回值就是request对象(当此请求的request,没有乱)
return self.__local()
try:
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError("no object bound to %s" % self.__name__)
-3.2.3 _lookup_req_object函数源码如下:
def _lookup_req_object(name):
#name是'request'字符串
#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
#通过反射,去ctx中把request对象返回
return getattr(top, name)
-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
其他的东西:
-session:
-请求来了opensession
-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
if self.session is None:
#self是ctx,ctx中有个app就是flask对象, self.app.session_interface也就是它:SecureCookieSessionInterface()
session_interface = self.app.session_interface
self.session = session_interface.open_session(self.app, self.request)
if self.session is None:
#经过上面还是None的话,生成了个空session
self.session = session_interface.make_null_session(self.app)
-请求走了savesession
-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
-请求扩展相关
before_first_request,before_request,after_request依次执行
-flask有一个请求上下文,一个应用上下文
-ctx:
-是:RequestContext对象:封装了request和session
-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
-app_ctx:
-是:AppContext(self) 对象:封装了当前的app和g
-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
-g是个什么鬼?
专门用来存储用户信息的g对象,g的全称的为global
g对象在一次请求中的所有的代码的地方,都是可以使用的
-代理模式
-request和session就是代理对象,用的就是代理模式
标签:sqlalchemy,Users,flask,self,数据表,session,query,id,name 来源: https://www.cnblogs.com/chunyouqudongwuyuan/p/16580633.html