python-在Beam中读取和写入序列化的protobuf
作者:互联网
我想将序列化的protobuf消息的PCollection写入文本文件并将其读回应该很容易.但是经过几次尝试,我却没有这样做.如果有人有任何评论,将不胜感激.
// definition of proto.
syntax = "proto3";
package test;
message PhoneNumber {
string number = 1;
string country = 2;
}
我下面的python代码实现了一个简单的Beam管道,可将文本写入序列化的protobuf.
# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2
class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone.SerializeToString()
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.Create(["123-456-789,us", "345-567-789,ca"])
| beam.ParDo(ToProtoFn())
| beam.io.WriteToText('/Users/greeness/data/phone-pb'))
管道可以成功运行,并生成包含内容的文件:
$cat ~/data/phone-pb-00000-of-00001
123-456-789us
345-567-789ca
然后,我编写另一个管道以读取序列化的protobuf,并使用ParDo对其进行解析.
class ToCsvFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.ParseFromString(element)
yield ",".join([phone.number, phone.country])
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromText('/Users/greeness/data/phone*')
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('/Users/greeness/data/phone-csv'))
运行该错误消息.
File "/Library/Python/2.7/site-packages/apache_beam/runners/common.py", line 458, in process_outputs
for result in results:
File "phone_example.py", line 37, in process
phone.ParseFromString(element)
File "/Library/Python/2.7/site-packages/google/protobuf/message.py", line 185, in ParseFromString
self.MergeFromString(serialized)
File "/Library/Python/2.7/site-packages/google/protobuf/internal/python_message.py", line 1069, in MergeFromString
raise message_mod.DecodeError('Truncated message.')
DecodeError: Truncated message. [while running 'ParDo(ToCsvFn)']
因此,看起来序列化的protobuf字符串无法解析.我想念什么吗?谢谢你的帮助!
解决方法:
我通过实施的tfrecordio.py
找到了一个临时解决方案.
下面的代码正在工作.但是我仍然欢迎任何可以解决上述问题的评论.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2
def WriteTextToTFRecord():
class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | beam.Create(["123-456-789,us", "345-567-789,ca"])
processed = (
lines
| beam.ParDo(ToProtoFn())
| beam.io.WriteToTFRecord('/Users/greeness/data/phone-pb',
coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__)))
def ReadTFRecordAndSaveAsCSV():
class ToCsvFn(beam.DoFn):
def process(self, element):
yield ','.join([element.number, element.country])
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromTFRecord('/Users/greeness/data/phone-pb-*',
coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__))
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('/Users/greeness/data/phone-csv'))
if __name__ == '__main__':
WriteTextToTFRecord()
ReadTFRecordAndSaveAsCSV()
标签:protocol-buffers,apache-beam,apache-beam-io,python 来源: https://codeday.me/bug/20191025/1928482.html