其他分享
首页 > 其他分享> > UDP协议测试

UDP协议测试

作者:互联网

UDP协议测试

简介

UDP(UserDatagramProtocol)是一个简单的面向消息的传输层协议,尽管UDP提供标头和有效负载的完整性验证(通过校验和),但它不保证向上层协议提供消息传递,并且UDP层在发送后不会保留UDP 消息的状态。因此,UDP有时被称为不可靠的数据报协议。如果需要传输可靠性,则必须在用户应用程序中实现。
在Python中一般用socket库来创建udp协议传输。
在部分测试中,我们可能需要模拟udp发送数据或者接收数据。

代码示例

服务端

import socket
from common.common import logger_print
from UDP_TEST.read_config import *
from UDP_TEST.read_byte import *
import struct


logger = logger_print(logfile)


def udp_server_recv(port, byte_length):
    """
    udp协议接收字节流
    :param port: 端口,int类型
    :param byte_length: 字节长度限制大小,int类型
    :return:
    """
    # 获得本地ip地址
    recv_ip = socket.gethostbyname(socket.gethostname())
    logger.info("接收的ip地址为:%s,端口为:%s" % (recv_ip, port))
    # 建立服务端的套接字对象,第一个参数是ipv4协议,第二个参数是UDP协议
    udp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    # 绑定ip和端口
    udp_server_socket.bind((recv_ip, port))
    i = 0
    while True:
        try:
            data, addr = udp_server_socket.recvfrom(byte_length)
            logger.info(addr)
            logger.info(data)
            i += 1
            logger.info(i)
            d = chr(data[1])
            try:
                if d == "j":
                    text = read_j(data)
                    logger.info(text)
                    udp_server_socket.sendall('收到'.encode())
                elif d == "m":
                    text = read_m(data)
                    logger.info(text)
                elif d == "s":
                    text = read_s(data)
                    logger.info(text)
                elif d == "n":
                    text = read_n(data)
                    logger.info(text)
                elif d == "p":
                    text = read_fly_num(data)
                    logger.info(text)
                else:
                    logger.warning("未知类型")
            except Exception as e:
                logger.error(e)
        except Exception as e:
            logger.error(e)

客户端

import socket
from common.common import logger_print
from UDP_TEST.read_config import *
from UDP_TEST.read_byte import *
import struct


def udp_server_send(byte_stream, recv_ip, port):
    """
    udp协议发送字节流
    :param byte_stream: 字节流
    :param recv_ip: 目标地址
    :param port: 目标端口,int型
    :return:
    """
    # 建立服务端的套接字对象,第一个参数是ipv4协议,第二个参数是UDP协议
    udp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    # 接收方地址
    recv_addr = (recv_ip, port)
    logger.info(recv_addr)
    # 发送消息
    udp_server_socket.sendto(byte_stream, recv_addr)
    # 接收消息
    udp_server_socket.recvfrom(1024)
    logger.info("发送消息成功,%s" % byte_stream)
    # 关闭套接字对象
    udp_server_socket.close()

标签:info,协议,UDP,socket,text,udp,测试,logger
来源: https://www.cnblogs.com/caodingzheng/p/16675966.html