我如何检测dynamodb2中的未写项目?
作者:互联网
我正在将代码从dynamodb迁移到dynamodb2.该代码执行批处理写入,而我遇到的主要问题之一是检测未处理的消息.我的代码不断从队列中提取消息,然后将它们批量插入DynamoDB表中.似乎有很多项目(约20%)从未进入表中,而我却没有收到任何错误消息.因此,我的问题是,如何在未插入项目的情况下如何进行捕获以及如何对其进行重新处理?这是我的dynamodb代码的一部分,它可以完成此操作:
def do_batch_write(items,conn,table,diagn):
batch_list = conn.new_batch_write_list()
batch_list.add_batch(table, puts=items)
iTry = 0
rems = []
while True:
iTry = iTry + 1
try:
response = conn.batch_write_item(batch_list)
except Exception, e:
tRetry = 5
log.error("Error while attempting batch_write_item, try %d, retrying after %d secs: %s" % (iTry, tRetry, str(e)))
time.sleep(tRetry)
continue
unprocessed = response.get('UnprocessedItems', None)
if not unprocessed:
if len(items) == 1 and diagn:
log.info("Trivial batch processed")
break
batch_list = conn.new_batch_write_list()
unprocessed_list = unprocessed[table.name]
items = []
for u in unprocessed_list:
item_attr = u['PutRequest']['Item']
item = table.new_item( attrs=item_attr)
items.append(item)
rems.append(len(items))
batch_list.add_batch(table, puts=items)
return iTry
这是我要修改的dynamodb2代码块,以处理未处理/废弃的项目.
with table.batch_write() as batch:
while True:
m = inq.read()
mStr = json.dumps(m)
pid = m['primaryId']
sid = m['secondaryId']
item_data = {"primaryId" : pid, "secondaryId"] : sid, "message"] : mStr}
batch.put_item(data=item_data)
我查看了this page,但那里没有任何帮助.您能帮我弄清楚如何修改吗?谢谢
更新:我仍然有缺少物品的问题.我将上述代码块修改如下:
i = 0
with table.batch_write() as batch:
while True:
m = inq.read()
i = i + 1
mStr = json.dumps(m)
pid = m['primaryId']
sid = m['secondaryId']
item_data = {"primaryId" : pid, "secondaryId"] : sid, "message"] : mStr}
batch.put_item(data=item_data)
if i == 25:
batch.resend_unprocessed()
i = 0
但是,在仔细记录所有传入数据后发现的结果(为了节省空间,上面的代码片段未包含日志打印语句),至少在一种情况下,我看到了以下内容:
> put_item将一组大约20个连续摄取的消息添加到批处理中
>调用resend_unprocessed()时,它将报告0个未处理的项目
>当我尝试从DDB表中检索20条消息中的任何一条时,都找不到它们
因此,当boto成功将项目写入表中时,我似乎并不真正信任它.看起来像个错误,还是dynamodb2的某种“功能”?
我之前忘记提及的一件事:我有几个相同的“工作程序”进程在同一AWS EC2实例上并行运行,这些流程从相同的输入队列读取并写入相同的Dynamo表.我创建了其中几个以跟上传入数据的数量.我给他们的印象是,他们不应该为进入桌子而战,即使他们之间存在某种冲突,也必须“在炸药的幕后”解决.即使这导致某些项目以某种方式被丢弃,也不应在resend_unprocessed()中报告已成功处理了所有项目.
解决方法:
看来这是可能的.
批量写入可能无法写入“所有”项目.在这种情况下,API成功,但是未写入的项目在响应中表示为“ UnprocessedItems”.您需要调查一下,然后重试这些项目.
发生这种情况的典型原因是表吞吐量超过了(也可能还有其他原因).
添加相关代码段(由于以下要点):
while True:
response = dynamodb_conn.batch_write_item(batch_list)
unprocessed = response.get('UnprocessedItems', None)
if not unprocessed:
break
batch_list = dynamodb_conn.new_batch_write_list()
unprocessed_list = unprocessed[table_name]
items = []
for u in unprocessed_list:
item_attr = u['PutRequest']['Item']
item = dynamodb_table.new_item(
attrs=item_attr
)
items.append(item)
batch_list.add_batch(dynamodb_table, puts=items)
这些额外的阅读材料将告诉您详细信息-最后一个也是python代码.
> BatchWriteItem – Amazon DynamoDB
> The correct way of using DynamoDB BatchWriteItem with boto
>为此的Python要点:https://gist.github.com/griggheo/2698152
标签:amazon-dynamodb,boto,batch-processing,python 来源: https://codeday.me/bug/20191122/2056406.html