python – 流式传输不工作之前的BigQuery表截断
作者:互联网
我们使用BigQuery Python API来运行一些分析.为此,我们创建了以下适配器:
def stream_data(self, table, data, schema, how=None):
r = self.connector.tables().list(projectId=self._project_id,
datasetId='lbanor').execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if table_exists:
if how == 'WRITE_TRUNCATE':
self.connector.tables().delete(projectId=self._project_id,
datasetId='lbanor',
tableId=table).execute()
body = {
'tableReference': {
'tableId': table,
'projectId': self._project_id,
'datasetId': 'lbanor'
},
'schema': schema
}
self.connector.tables().insert(projectId=(
self._project_id),
datasetId='lbanor',
body=body).execute()
else:
body = {
'tableReference': {
'tableId': table,
'projectId': self._project_id,
'datasetId': 'lbanor'
},
'schema': schema
}
self.connector.tables().insert(projectId=(
self._project_id),
datasetId='lbanor',
body=body).execute()
body = {
'rows': [
{
'json': data,
'insertId': str(uuid.uuid4())
}
]
}
self.connector.tabledata().insertAll(projectId=(
self._project_id),
datasetId='lbanor',
tableId=table,
body=body).execute(num_retries=5)
其中connector只是构建对象.
其主要目的是将数据流式传输到给定的表.如果表已经存在并且“how”输入作为“WRITE_TRUNCATE”传递,则首先删除该表并再次创建.
之后,继续执行数据流.
当表没有被反复删除时,一切正常.
例如,这是我们运行脚本时没有模拟写截断选项的结果(for循环保持调用stream_data,其中how = None):
[
{
"date": "2016-04-25",
"unix_date": "1461606664981207",
"init_cv_date": "2016-03-12",
"end_cv_date": "2016-03-25",
"days_trained": "56",
"days_validated": "14",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.31729249914663893"
},
{
"date": "2016-04-25",
"unix_date": "1461606599745107",
"init_cv_date": "2016-03-06",
"end_cv_date": "2016-03-25",
"days_trained": "80",
"days_validated": "20",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.32677143128667446"
},
{
"date": "2016-04-25",
"unix_date": "1461606688950415",
"init_cv_date": "2016-03-14",
"end_cv_date": "2016-03-25",
"days_trained": "48",
"days_validated": "12",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.3129267723358932"
},
{
"date": "2016-04-25",
"unix_date": "1461606707195122",
"init_cv_date": "2016-03-16",
"end_cv_date": "2016-03-25",
"days_trained": "40",
"days_validated": "10",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.310620987663015"
},
{
"date": "2016-04-25",
"unix_date": "1461606622432947",
"init_cv_date": "2016-03-08",
"end_cv_date": "2016-03-25",
"days_trained": "72",
"days_validated": "18",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.32395802949369296"
}
]
但是当我们使用输入how =“WRITE_TRUNCATE”的相同适配器时,其行为发生了变化并变得不可预测.
有时它工作,数据保存到表中.但有时,即使没有引发错误,也没有数据保存到表中.
尝试查询表时,不返回任何数据.它只返回“查询返回零结果”.
删除表格,再次创建表格并流式传输数据时出错.我们犯了一些错误吗?
如果您需要更多信息,请告诉我.提前致谢!
解决方法:
请参阅Jordan Tigani的回答和Sean Chen对https://stackoverflow.com/a/36417177/132438的评论(两位BigQuery工程师).
总结是:
>重新创建或截断表格时“您需要在流式传输前等待2分钟才能避免数据被丢弃.
这样就可以解释为什么你会得到这种非确定性的行为.
标签:python,google-bigquery 来源: https://codeday.me/bug/20190829/1758713.html