mysql – 用sqlalchemy检查行和返回id的存在是否缓慢
作者:互联网
所有,
我正在读取一个csv文件并使用sqlalchemy将数据添加到MySQL数据库.其中一个表是地址表,它只能保存唯一的地址.这些地址与另一个具有地址id的外键字段的“语句”表之间存在关系.
因此,对于我的数据文件中的每一行,我创建一个新的语句obj,然后获取相关地址的id.如果地址已存在,则返回该id.否则,我创建一个新的地址obj并返回该id.这是使用下面的代码完成的,改编自this SO question.
def get_or_create(self, model, rec):
instance = self.session.query(model).filter_by(**dict(filter(lambda (x,y): x in model.__dict__.keys(), rec.iteritems()))).first()
if instance:
return instance
else:
instance = model(rec)
return instance
我正在使用GUID作为我的id字段,它是地址表的主键的一部分:
class address(Base):
__tablename__ = 'address'
id = id_column()
name = Column(String(75), primary_key=True)
Address_Line_One = Column(String(50), primary_key=True)
Address_Line_Two = Column(String(50), primary_key=True)
Address_Line_Three = Column(String(50), primary_key=True)
Address_Line_Four = Column(String(50), primary_key=True)
id_column()来自here,但由于其他地方的限制,它已被转换为CHAR(32).最后,这里有一个片段:
currStatement = statements(rec, id=currGUID)
currStatement.address = self.get_or_create(address, rec)
这一切都很好,除非它很慢.对于在一个事务中插入的~65,000个语句,我看到干净测试DB上的1.5小时插入时间.实时观察插入显示它快速达到~10,000行,然后插入速度开始下降.
我该怎么做才能加快插入时间?
编辑:
经过进一步测试,我发现插入时间慢是因为每个对象都是单独插入的.所以,我有~65,000行,每行都有几个sqlalchemy对象,单独插入.使用sqlalchemy 0.7,我如何批量插入对象?
解决方法:
好的!
所以答案是我单独插入每一行,并对每个地址检查进行数据包舍入.地址检查是最糟糕的部分,因为它的速度成倍增长.我计算插入原始数据(1.5小时),然后再插入相同的数据,需要~9小时!
因此,这个答案将重复我转换为批量插入语句所做的工作,以及一些需要注意的事项.
> sqlalchemy中的ORM将“帮助”
ORM很棒,但意识到它与批量插入不完全吻合.批量插入需要在会话中使用较低级别的执行语句.它们不会将ORM对象作为输入,而是使用字典列表和插入对象.因此,如果将完整行的csv文件转换为ORM对象,则不需要将它们添加到当前会话中,而是将它们转换为字典以供日后使用.
def asdict(obj):
return dict((col.name, getattr(obj, col.name))
for col in class_mapper(obj.__class__).mapped_table.c)
currGUID = uuid.uuid4()
currPrintOrMail = printOrMail(rec, id=currGUID)
currStatement = statements(rec, id=currGUID)
currAddress = self.get_or_create(address, rec)
currStatement.address = currAddress
self.currPrintOrMail_bulk.append(asdict(currPrintOrMail))
self.currStatement_bulk.append(asdict(currStatement))
asdict方法来自here.它会为您创建创建的ORM对象中的列的字典.它们永远不会被添加到会话中,并且此后不久就会丢失内存.
>关系会咬你
如果您已设置ORM关系:
class statements(Base):
__tablename__ = 'statements'
id = id_column()
county = Column(String(50),default='',nullable=False)
address_id = Column(CHAR(36), ForeignKey('address.id'))
address = relationship("address", backref=backref("statements", cascade=""))
printOrMail_id = Column(CHAR(36), ForeignKey('printOrMail.id'))
pom = relationship("printOrMail", backref=backref("statements", cascade=""))
property_id = Column(CHAR(36), ForeignKey('property.id'))
prop = relationship("property", backref=backref("statements", cascade=""))
确保背板中的级联是空白的!否则,将关系中的对象插入到会话中将通过其余对象插入cascade.当您尝试稍后批量插入您的值时,它们将作为重复项被拒绝…如果您很幸运.
这很重要,因为部分要求是获取有效地址的address_id(如果存在),并添加地址(如果不存在).由于查询循环跳闸太慢,我将get_or_create更改为:
def get_or_create(self, model, rec):
"""Check if current session has address. If not, query DB for it. If no one has the address, create and flush a new one to the session."""
instance = self.session.query(model).get((rec['Name'], rec['Address_Line_One'], rec['Address_Line_Two'], rec['Address_Line_Three'], rec['Address_Line_Four']))
if instance:
return instance
else:
instance = model(rec)
self.session.add(instance)
self.session.flush()
return instance
使用get会导致sqlalchemy首先检查会话,防止通过网络跳闸.但是,只有在会话中添加了新地址时才有效!还记得关系吗?这是级联插入语句的插入.此外,如果您没有flush()或autoflush = True,则get无法看到新添加的对象.
>创建会话时,请保留对象!
self.session = sessionmaker(autoflush = False,expire_on_commit = False)
如果您不包含expire_on_commit = False,那么您将丢失地址,并再次开始往返.
> ORM对象确实有插入
现在我们有一个要插入的ORM对象的字典列表.但我们还需要一个插入对象.
self.session.execute(printOrMail.__table__.insert(), self.currPrintOrMail_bulk)
self.session.execute(statements.__table__.insert(), self.currStatement_bulk)
Buried in the docs,似乎可以使用classname .__ table__作为insert所需的必要表对象.因此,在会话中,使用ORM类获取表以获取插入对象,运行带有字典列表的execute.事后不要忘了提交!
>不要耗尽内存
这将允许您成功地将批量插入和ORM与关系混合,并查询sqlalchemy中的唯一条目.请注意内存不足.我不得不一次批量插入~30,000条记录,否则py2.7(32位)会在2G左右使用时崩溃.
标签:mysql,sqlalchemy,bulkinsert,insert-update 来源: https://codeday.me/bug/20190826/1731050.html