数据库
首页 > 数据库> > mysql – 用sqlalchemy检查行和返回id的存在是否缓慢

mysql – 用sqlalchemy检查行和返回id的存在是否缓慢

作者:互联网

所有,

我正在读取一个csv文件并使用sqlalchemy将数据添加到MySQL数据库.其中一个表是地址表,它只能保存唯一的地址.这些地址与另一个具有地址id的外键字段的“语句”表之间存在关系.

因此,对于我的数据文件中的每一行,我创建一个新的语句obj,然后获取相关地址的id.如果地址已存在,则返回该id.否则,我创建一个新的地址obj并返回该id.这是使用下面的代码完成的,改编自this SO question.

def get_or_create(self, model, rec):
    instance = self.session.query(model).filter_by(**dict(filter(lambda (x,y): x in model.__dict__.keys(), rec.iteritems()))).first()
    if instance:
        return instance
    else:
        instance = model(rec)
        return instance

我正在使用GUID作为我的id字段,它是地址表的主键的一部分:

class address(Base):
    __tablename__ = 'address'
    id = id_column()
    name               = Column(String(75), primary_key=True)
    Address_Line_One   = Column(String(50), primary_key=True)
    Address_Line_Two   = Column(String(50), primary_key=True)
    Address_Line_Three = Column(String(50), primary_key=True)
    Address_Line_Four  = Column(String(50), primary_key=True)

id_column()来自here,但由于其他地方的限制,它已被转换为CHAR(32).最后,这里有一个片段:

currStatement   = statements(rec, id=currGUID)
currStatement.address = self.get_or_create(address, rec)

这一切都很好,除非它很慢.对于在一个事务中插入的~65,000个语句,我看到干净测试DB上的1.5小时插入时间.实时观察插入显示它快速达到~10,000行,然后插入速度开始下降.

我该怎么做才能加快插入时间?

编辑:

经过进一步测试,我发现插入时间慢是因为每个对象都是单独插入的.所以,我有~65,000行,每行都有几个sqlalchemy对象,单独插入.使用sqlalchemy 0.7,我如何批量插入对象?

解决方法:

好的!

所以答案是我单独插入每一行,并对每个地址检查进行数据包舍入.地址检查是最糟糕的部分,因为它的速度成倍增长.我计算插入原始数据(1.5小时),然后再插入相同的数据,需要~9小时!

因此,这个答案将重复我转换为批量插入语句所做的工作,以及一些需要注意的事项.

> sqlalchemy中的ORM将“帮助”

ORM很棒,但意识到它与批量插入不完全吻合.批量插入需要在会话中使用较低级别的执行语句.它们不会将ORM对象作为输入,而是使用字典列表和插入对象.因此,如果将完整行的csv文件转换为ORM对象,则不需要将它们添加到当前会话中,而是将它们转换为字典以供日后使用.

def asdict(obj):
    return dict((col.name, getattr(obj, col.name))
         for col in class_mapper(obj.__class__).mapped_table.c)

currGUID = uuid.uuid4()
currPrintOrMail = printOrMail(rec, id=currGUID)
currStatement   = statements(rec, id=currGUID)
currAddress = self.get_or_create(address, rec)
currStatement.address = currAddress

self.currPrintOrMail_bulk.append(asdict(currPrintOrMail))
self.currStatement_bulk.append(asdict(currStatement))

asdict方法来自here.它会为您创建创建的ORM对象中的列的字典.它们永远不会被添加到会话中,并且此后不久就会丢失内存.

>关系会咬你

如果您已设置ORM关系:

class statements(Base):
    __tablename__ = 'statements'
    id = id_column()
    county   = Column(String(50),default='',nullable=False)

    address_id = Column(CHAR(36), ForeignKey('address.id'))
    address = relationship("address", backref=backref("statements", cascade=""))

    printOrMail_id = Column(CHAR(36), ForeignKey('printOrMail.id'))
    pom = relationship("printOrMail", backref=backref("statements", cascade=""))

    property_id = Column(CHAR(36), ForeignKey('property.id'))
    prop = relationship("property", backref=backref("statements", cascade=""))

确保背板中的级联是空白的!否则,将关系中的对象插入到会话中将通过其余对象插入cascade.当您尝试稍后批量插入您的值时,它们将作为重复项被拒绝…如果您很幸运.

这很重要,因为部分要求是获取有效地址的address_id(如果存在),并添加地址(如果不存在).由于查询循环跳闸太慢,我将get_or_create更改为:

def get_or_create(self, model, rec):
    """Check if current session has address. If not, query DB for it. If no one has the address, create and flush a new one to the session."""
    instance = self.session.query(model).get((rec['Name'], rec['Address_Line_One'], rec['Address_Line_Two'], rec['Address_Line_Three'], rec['Address_Line_Four']))
    if instance:
        return instance
    else:
        instance = model(rec)
        self.session.add(instance)
        self.session.flush()
        return instance

使用get会导致sqlalchemy首先检查会话,防止通过网络跳闸.但是,只有在会话中添加了新地址时才有效!还记得关系吗?这是级联插入语句的插入.此外,如果您没有flush()或autoflush = True,则get无法看到新添加的对象.

>创建会话时,请保留对象!

self.session = sessionmaker(autoflush = False,expire_on_commit = False)

如果您不包含expire_on_commit = False,那么您将丢失地址,并再次开始往返.

> ORM对象确实有插入

现在我们有一个要插入的ORM对象的字典列表.但我们还需要一个插入对象.

self.session.execute(printOrMail.__table__.insert(), self.currPrintOrMail_bulk)
self.session.execute(statements.__table__.insert(), self.currStatement_bulk)

Buried in the docs,似乎可以使用classname .__ table__作为insert所需的必要表对象.因此,在会话中,使用ORM类获取表以获取插入对象,运行带有字典列表的execute.事后不要忘了提交!

>不要耗尽内存

这将允许您成功地将批量插入和ORM与关系混合,并查询sqlalchemy中的唯一条目.请注意内存不足.我不得不一次批量插入~30,000条记录,否则py2.7(32位)会在2G左右使用时崩溃.

标签:mysql,sqlalchemy,bulkinsert,insert-update
来源: https://codeday.me/bug/20190826/1731050.html