gRPC流式传输,以视频传输为例
作者:互联网
流式传输需要关键字stream,stream 修饰的结构在使用时应该是可以迭代的,即iterable。下面是client -> server 的视频流式传输。
video.proto
syntax = "proto3"; service New{ rpc Video_transport(stream FrameRequest) returns (FrameResponse); // no check for all frames } message FrameRequest { bytes f_data = 1; // frame data field int32 goon = 2; // if goon == 0, this is end informer, if >0, not the final frame } message FrameResponse{ string state = 1; }
如client代码中所示,根据yield返回生成器的特性,每次取完一帧,就会用生成器将该帧的信息返回给stub,发送给server。
client.py
from grpc_out.video_pb2 import * from grpc_out.video_pb2_grpc import NewStub from PIL import Image import cv2, grpc import numpy as np fheight, fwidth, fchannels, fps = 0,0,0,0 def transport_video(video_path): cap = cv2.VideoCapture(video_path) # read video if not cap.isOpened(): print('the video does not exist.') return False global fheight, fwidth, fchannels, fps # information that help to reconstruct the video h, w = 256, 256 fps = int(cap.get(cv2.CAP_PROP_FPS)) ret, frame = cap.read() while ret: img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = np.array(cv2.resize(img, (h, w))) # resize so that the data block won't be too big fheight, fwidth, fchannels = img.shape yield FrameRequest(f_data=np.ndarray.tobytes(img), goon=1) # 'yield' could provide an iterable genetator ret, frame = cap.read() cap.release() print('video read ended.') yield FrameRequest(f_data=None, goon=0) # send end signal def run(): with grpc.insecure_channel('10.xx.xx.xx:50000') as channel: # then the channel could close after the 'with' block stub = NewStub(channel) ## transport video to server video_path = './examples/C_30080000.mp4' response = stub.Video_transport(transport_video(video_path)) print(response.state) if __name__ == "__main__": run()
server.py
from concurrent import futures from grpc_out.video_pb2 import * from grpc_out.video_pb2_grpc import NewServicer, add_NewServicer_to_server import os, subprocess, cv2, grpc import numpy as np from PIL import Image # the implementation of interfaces in NewServicer class New(NewServicer): def __init__(self) -> None: self.frames = [] def save_video(self, video_path): # construct video from frames and save to local disk video = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (height, width))# things about height, width, channels, fps should be provided for img in self.frames: img_arr = np.frombuffer(img, dtype=np.uint8) # bytes -> ndarray img_arr = np.reshape(img_arr, (height, width, channels)) img = cv2.cvtColor(np.array(Image.fromarray(img_arr)), cv2.COLOR_RGB2BGR) video.write(img ) video.release() self.frames.clear() def Video_transport(self, request_iterator, context): print(' *** Reciving video frames...') if len(self.frames): print('Clear frames cache...') self.frames.clear() for f_info in request_iterator: # it's an iterator, each element contains frame's bytes if not f_info.goon: # ened, empty frame self.save_video(video_path) print('---> ' + str(len(self.frames)) + ' frames (all) received.') return FrameResponse(state='ok') self.frames.append(f_info.f_data) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # thread pool supports multi-request add_NewServicer_to_server(New(), server) # add servicer to server server.add_insecure_port('[::]:520') server.start() server.wait_for_termination() # block the calling thread if the server terminates if __name__ == "__main__": serve()
上面双方启动后,会产生一个client->server的流,流中每次只传输一帧,只有当一个视频中的所有帧都传输完毕后,Video_transport 这个服务才会结束并给出response。
标签:img,为例,gRPC,cv2,server,传输,video,frames,import 来源: https://www.cnblogs.com/grainrain/p/15968945.html