其他分享
首页 > 其他分享> > dpdk 收发包

dpdk 收发包

作者:互联网

dpdk框架中,调用 rte_eal_init对端口的初始化操作已经基本完成,后面则是根据用户的设置,配置端口的收发包队列以及最终start端口,开始收发包:

a、rte_eth_dev_configure()函数完成端口配置:队列数配置、RSS、offload等等设置;
b、rte_eth_rx_queue_setup()、rte_eth_tx_queue_setup()函数分别设置端口的每个收发队列:ring空间申请、初始化等;
c、rte_eth_dev_start()函数:发送队列初始化buf填充,端口使能(具体可以参考代码或网卡芯片手册,均是相关寄存器设置);

ret = rte_eth_dev_configure(portid, nb_rx_queue, (uint16_t)n_tx_queue,&local_port_conf);

ret = rte_eth_tx_queue_setup(portid, queueid, nb_txd,  socketid, txconf);    

ret = rte_eth_rx_queue_setup(portid, queueid, nb_rxd, socketid, &rxq_conf, pktmbuf_pool[socketid]);
        /* Start device */
ret = rte_eth_dev_start(portid);                

收发包过程大致可以分为2个部分

收发包的配置

收发包的配置最主要的工作就是配置网卡的收发队列,设置DMA拷贝数据包的地址等,配置好地址后,网卡收到数据包后会通过DMA控制器直接把数据包拷贝到指定的内存地址。我们使用数据包时,只要去对应队列取出指定地址的数据即可。

  收发包的配置是从rte_eth_dev_configure()开始的,这里根据参数会配置队列的个数,以及接口的配置信息,如队列的使用模式,多队列的方式等。

前面会先进行一些各项检查,如果设备已经启动,就得先停下来才能配置(这时应该叫再配置吧)。然后把传进去的配置参数拷贝到设备的数据区。

memcpy(&dev->data->dev_conf, dev_conf, sizeof(dev->data->dev_conf));

  之后获取设备的信息,主要也是为了后面的检查使用:

(*dev->dev_ops->dev_infos_get)(dev, &dev_info);

  这里的dev_infos_get是在驱动初始化过程中设备初始化时配置的(eth_ixgbe_dev_init())

eth_dev->dev_ops = &ixgbe_eth_dev_ops;

重要的信息检查过后,下面就是对发送和接收队列进行配置
先看接收队列的配置,接收队列是从rte_eth_dev_tx_queue_config()开始的
在接收配置中,考虑的是有两种情况,一种是第一次配置;另一种是重新配置。所以,代码中都做了区分。
(1)如果是第一次配置,那么就为每个队列分配一个指针。
(2)如果是重新配置,配置的queue数量不为0,那么就取消之前的配置,重新配置。
(3)如果是重新配置,但要求的queue为0,那么释放已有的配置。

发送的配置也是同样的,在rte_eth_dev_tx_queue_config()

当收发队列配置完成后,就调用设备的配置函数,进行最后的配置。(*dev->dev_ops->dev_configure)(dev),我们找到对应的配置函数,进入ixgbe_dev_configure()来分析其过程,其实这个函数并没有做太多的事。

在函数中,先调用了ixgbe_check_mq_mode()来检查队列的模式。然后设置允许接收批量和向量的模式

 

接下来就是收发队列的初始化,非常关键的一部分内容,这部分内容按照收发分别介绍:

接收队列的初始化

  接收队列的初始化是从rte_eth_rx_queue_setup()开始的,这里的参数需要指定要初始化的port_id,queue_id,以及描述符的个数,还可以指定接收的配置,如释放和回写的阈值等。

依然如其他函数的套路一样,先进行各种检查,如初始化的队列号是否合法有效,设备如果已经启动,就不能继续初始化了。检查函数指针是否有效等。检查mbuf的数据大小是否满足默认的设备信息里的配置。

rte_eth_dev_info_get(port_id, &dev_info);

  这里获取了设备的配置信息,如果调用初始化函数时没有指定rx_conf配置,就会设备配置信息里的默认值

还检查了要初始化的队列号对应的队列指针是否为空,如果不为空,则说明这个队列已经初始化过了,就释放这个队列。

最后,调用到队列的setup函数做最后的初始化。

ret = (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id,     nb_rx_desc,socket_id, rx_conf, mp);

  对于ixgbe设备,rx_queue_setup就是函数ixgbe_dev_rx_queue_setup(),这里就是队列最终的初始化咯

依然是先检查,检查描述符的数量最大不能大于IXGBE_MAX_RING_DESC个,最小不能小于IXGBE_MIN_RING_DESC个。

  接下来的都是重点咯:
<1>.分配队列结构体,并填充结构

rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);

  填充结构体的所属内存池,描述符个数,队列号,队列所属接口号等成员。

<2>.分配描述符队列的空间,按照最大的描述符个数进行分配

rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);

  接着获取描述符队列的头和尾寄存器的地址,在收发包后,软件要对这个寄存器进行处理

rxq->rdt_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_RDT(rxq->reg_idx));
        rxq->rdh_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_RDH(rxq->reg_idx));

  设置队列的接收描述符ring的物理地址和虚拟地址。

rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

<3>分配sw_ring,这个ring中存储的对象是struct ixgbe_rx_entry,其实里面就是数据包mbuf的指针。

rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);

以上三步做完以后,新分配的队列结构体重要的部分就已经填充完了,下面需要重置一下其他成员

ixgbe_reset_rx_queue

先把分配的描述符队列清空,其实清空在分配的时候就已经做了,没必要重复做

for (i = 0; i < len; i++) {
        rxq->rx_ring[i] = zeroed_desc;
    }

然后初始化队列中一下其他成员

rxq->rx_nb_avail = 0;
rxq->rx_next_avail = 0;
rxq->rx_free_trigger = (uint16_t)(rxq->rx_free_thresh - 1);
rxq->rx_tail = 0;
rxq->nb_rx_hold = 0;
rxq->pkt_first_seg = NULL;
rxq->pkt_last_seg = NULL;

这样,接收队列就初始化完了。

发送队列的初始化

 

发送队列的初始化在前面的检查基本和接收队列一样,只有些许区别在于setup环节,我们就从这个函数说起:ixgbe_dev_tx_queue_setup()

在发送队列配置中,重点设置了tx_rs_threshtx_free_thresh的值。

然后分配了一个发送队列结构txq,之后分配发送队列ring的空间,并填充txq的结构体

txq->tx_ring_phys_addr = rte_mem_phy2mch(tz->memseg_id, tz->phys_addr);
    txq->tx_ring = (union ixgbe_adv_tx_desc *) tz->addr;

然后,分配队列的sw_ring,也挂载队列上。

重置发送队列ixgbe_reset_tx_queue()

和接收队列一样,也是要把队列ring(描述符ring)清空,设置发送队列sw_ring,设置其他参数,队尾位置设置为0

 
txq->tx_next_dd = (uint16_t)(txq->tx_rs_thresh - 1);
txq->tx_next_rs = (uint16_t)(txq->tx_rs_thresh - 1);
 
txq->tx_tail = 0;
txq->nb_tx_used = 0;
/*
 * Always allow 1 descriptor to be un-allocated to avoid
 * a H/W race condition
 */
txq->last_desc_cleaned = (uint16_t)(txq->nb_tx_desc - 1);
txq->nb_tx_free = (uint16_t)(txq->nb_tx_desc - 1);
txq->ctx_curr = 0;

发送队列的初始化就完成了。

设备的启动

  经过上面的队列初始化,队列的ring和sw_ring都分配了,但是发现木有,DMA仍然还不知道要把数据包拷贝到哪里,我们说过,DPDK是零拷贝的,那么我们分配的mempool中的对象怎么和队列以及驱动联系起来呢?接下来就是最精彩的时刻了----建立mempool、queue、DMA、ring之间的关系。话说,这个为什么不是在队列的初始化中就做呢?

设备的启动是从rte_eth_dev_start()中开始的diag = (*dev->dev_ops->dev_start)(dev);

  进而,找到设备启动的真正启动函数:ixgbe_dev_start()

先检查设备的链路设置,暂时不支持半双工和固定速率的模式。看来是暂时只有自适应模式咯。

  然后把中断禁掉,同时,停掉适配器ixgbe_stop_adapter(hw);

在其中,就是调用了ixgbe_stop_adapter_generic();,主要的工作就是停止发送和接收单元。这是直接写寄存器来完成的。

  然后重启硬件,ixgbe_pf_reset_hw()->ixgbe_reset_hw()->ixgbe_reset_hw_82599(),最终都是设置寄存器,这里就不细究了。之后,就启动了硬件。

再然后是初始化接收单元:ixgbe_dev_rx_init()

  在这个函数中,主要就是设置各类寄存器,比如配置CRC校验,如果支持巨帧,配置对应的寄存器。还有如果配置了loopback模式,也要配置寄存器。

接下来最重要的就是为每个队列设置DMA寄存器,标识每个队列的描述符ring的地址,长度,头,尾等。

bus_addr = rxq->rx_ring_phys_addr;
IXGBE_WRITE_REG(hw, IXGBE_RDBAL(rxq->reg_idx),
        (uint32_t)(bus_addr & 0x00000000ffffffffULL));
IXGBE_WRITE_REG(hw, IXGBE_RDBAH(rxq->reg_idx),
        (uint32_t)(bus_addr >> 32));
IXGBE_WRITE_REG(hw, IXGBE_RDLEN(rxq->reg_idx),
        rxq->nb_rx_desc * sizeof(union ixgbe_adv_rx_desc));
IXGBE_WRITE_REG(hw, IXGBE_RDH(rxq->reg_idx), 0);
IXGBE_WRITE_REG(hw, IXGBE_RDT(rxq->reg_idx), 0);

  这里可以看到把描述符ring的物理地址写入了寄存器,还写入了描述符ring的长度。

下面还计算了数据包数据的长度,写入到寄存器中.然后对于网卡的多队列设置,也进行了配置ixgbe_dev_mq_rx_configure()

  同时如果设置了接收校验和,还对校验和进行了寄存器设置。

最后,调用ixgbe_set_rx_function()对接收函数再进行设置,主要是针对支持LRO,vector,bulk等处理方法。

  这样,接收单元的初始化就完成了。

 

接下来再初始化发送单元:ixgbe_dev_tx_init()

发送单元的的初始化和接收单元的初始化基本操作是一样的,都是填充寄存器的值,重点是设置描述符队列的基地址和长度。

bus_addr = txq->tx_ring_phys_addr;
IXGBE_WRITE_REG(hw, IXGBE_TDBAL(txq->reg_idx),
        (uint32_t)(bus_addr & 0x00000000ffffffffULL));
IXGBE_WRITE_REG(hw, IXGBE_TDBAH(txq->reg_idx),
        (uint32_t)(bus_addr >> 32));
IXGBE_WRITE_REG(hw, IXGBE_TDLEN(txq->reg_idx),
        txq->nb_tx_desc * sizeof(union ixgbe_adv_tx_desc));
/* Setup the HW Tx Head and TX Tail descriptor pointers */
IXGBE_WRITE_REG(hw, IXGBE_TDH(txq->reg_idx), 0);
IXGBE_WRITE_REG(hw, IXGBE_TDT(txq->reg_idx), 0);

 

最后配置一下多队列使用相关的寄存器:ixgbe_dev_mq_tx_configure()

如此,发送单元的初始化就完成了。

  收发单元初始化完毕后,就可以启动设备的收发单元咯:ixgbe_dev_rxtx_start()
先对每个发送队列的threshold相关寄存器进行设置,这是发送时的阈值参数,这个东西在发送部分有说明。

然后就是依次启动每个接收队列啦! ixgbe_dev_rx_queue_start()

  先检查,如果要启动的队列是合法的,那么就为这个接收队列分配存放mbuf的实际空间,

if (ixgbe_alloc_rx_queue_mbufs(rxq) != 0) 
{
    PMD_INIT_LOG(ERR, "Could not alloc mbuf for queue:%d",
             rx_queue_id);
    return -1;
}

在这里,你将找到终极答案--mempool、ring、queue ring、queue sw_ring的关系!

static int __attribute__((cold))
ixgbe_alloc_rx_queue_mbufs(struct ixgbe_rx_queue *rxq)
{
    struct ixgbe_rx_entry *rxe = rxq->sw_ring;
    uint64_t dma_addr;
    unsigned int i;

    /* Initialize software ring entries 
    队列所属内存池的ring中循环取出了nb_rx_desc个mbuf指针,
    填充rxq->sw_ring。每个指针都指向内存池里的一个数据包空间
    然后就先填充了新分配的mbuf结构,最最重要的是填充计算了dma_addr
    初始化queue ring,即rxd的信息,标明了驱动把数据包放在dma_addr处。
    最后把分配的mbuf“放入”queue 的sw_ring中,
    这样,驱动收过来的包,就直接放在了sw_ring中。
    */
    for (i = 0; i < rxq->nb_rx_desc; i++) {
        volatile union ixgbe_adv_rx_desc *rxd;
        struct rte_mbuf *mbuf = rte_mbuf_raw_alloc(rxq->mb_pool);

        if (mbuf == NULL) {
            PMD_INIT_LOG(ERR, "RX mbuf alloc failed queue_id=%u",
                     (unsigned) rxq->queue_id);
            return -ENOMEM;
        }

        mbuf->data_off = RTE_PKTMBUF_HEADROOM;
        mbuf->port = rxq->port_id;

        dma_addr =
            rte_cpu_to_le_64(rte_mbuf_data_iova_default(mbuf));
        rxd = &rxq->rx_ring[i];
        rxd->read.hdr_addr = 0;
        rxd->read.pkt_addr = dma_addr;
        rxe[i].mbuf = mbuf;
    }

    return 0;
}

  从队列所属内存池的ring中循环取出了nb_rx_desc个mbuf指针,也就是为了填充rxq->sw_ring。每个指针都指向内存池里的一个数据包空间。

然后就先填充了新分配的mbuf结构,最最重要的是填充计算了dma_addr

dma_addr = rte_cpu_to_le_64(rte_mbuf_data_dma_addr_default(mbuf)); 

  然后初始化queue ring,即rxd的信息,标明了驱动把数据包放在dma_addr处。最后一句,把分配的mbuf“放入”queue 的sw_ring中,这样,驱动收过来的包,就直接放在了sw_ring中。

以上最重要的工作就完成了,下面就可以使能DMA引擎啦,准备收包。

hw->mac.ops.enable_rx_dma(hw, rxctrl);

 

   再设置一下队列ring的头尾寄存器的值,这也是很重要的一点!头设置为0,尾设置为描述符个数减1,就是描述符填满整个ring

IXGBE_WRITE_REG(hw, IXGBE_RDH(rxq->reg_idx), 0);
IXGBE_WRITE_REG(hw, IXGBE_RDT(rxq->reg_idx), rxq->nb_rx_desc - 1);

数据包的获取和发送以及处理流程

 业务层面获取数据包是从rte_eth_rx_burst()开始的

int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
            rx_pkts, nb_pkts);

 

 这里的dev->rx_pkt_burst在驱动初始化的时候已经注册过了,对于ixgbe设备,就是ixgbe_recv_pkts()函数

   先了解网卡的DD标志,这个标志标识着一个描述符是否可用的情况:网卡在使用这个描述符前,先检查DD位是否为0,如果为0,那么就可以使用描述符,把数据拷贝到描述符指定的地址,之后把DD标志位置为1,否则表示不能使用这个描述符。而对于驱动而言,恰恰相反,在读取数据包时,先检查DD位是否为1,如果为1,表示网卡已经把数据放到了内存中,可以读取,读取完后,再把DD位设置为0,否则,就表示没有数据包可读。

 

 

 

RX_FIFO: 数据接收缓冲区

TX_FIFO: 数据发送缓冲区

DMA Engine:Direct Memory Access,即直接寄存器访问,是一种告诉的数据传输方式,允许在外部设备和存储器之间直接读写数据,数据的读写不消耗CPU资源,DMA控制器通过一组描述符环形队列与CPU互相操作完成数据包的收发。CPU通过操作DMA寄存器来与DMA控制器进行部分通信与初始化配置,主要寄存器有Base、Size、Tail、Head,head寄存器用于DMA往rx_ring里插入时使用,tail是应用通过写寄存器通知给DMA控制器当前可用的最后一个描述符(head->next 为tail时表示当前rx_ring存满了,再来报文会被记录rx_missed_error)。

Rx_queue: 收包队列结构体:我们主要关注两个环形队列rx_ring、sw_ring

Rx_ring:一个地址连续环形队列,存储的是描述符,描述符中包含将来存放数据包的物理地址、DD标志(下面会介绍DD标志)等,上面图中只画了存放数据包的物理地址,物理地址供网卡DMA模块使用,也称为DMA地址(硬件使用物理地址,当网卡收到数据包后会通过DMA操作将数据包拷贝到物理地址,物理地址是通过虚拟地址转换得到的,下面分析源码时会介绍)

Sw_ring: 存储的是将来存放数据包的虚拟地址,虚拟地址供应用使用(软件应用使用虚拟地址,应用往虚拟地址读写数据包)

DD标志:用于标识一个描述符buf是否可用,无论网卡是工作在轮询方式下还是中断方式,判断数据包接收成功或者是否发送成功都需要检查描述符中的完成状态位(Description Done)DD,该状态位由DMA控制器在完成操作后进行回写

Mbuf:对应于Mbuf内存池中的元素,通过alloc或者free操作内存池获取或者释放mbuf对象,这里需要说的一点是mbuf池创建的时候是连续的,但是rx_ring和sw_ring里指向的数据地址不一定是连续的,下面分析收包流程时会介绍

PCIE总线:采用高速串行通信互联标准,自上而下分为事务传输层、数据链路层、物理层,网卡与CPU之间数据包的传输、CPU对网卡寄存器的MMIO操作都通过PCIE进行传输

DMA寄存器:CPU配置网卡的操作通过操作网卡的寄存器,寄存器主要

以下是代码分析展示:

 

/**
 * Structure associated with each RX queue.
 */
struct ixgbe_rx_queue {
    struct rte_mempool  *mb_pool; /**< mbuf pool to populate RX ring. */
    /*rx_ring主要存储报文数据的物理地址,物理地址供网卡DMA使用,
    也称为DMA地址(硬件使用物理地址,将报文copy到报文物理位置上)。*/
    volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
    uint64_t            rx_ring_phys_addr; /**< RX ring DMA address. */
    volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
    volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
    /*sw_ring主要存储报文数据的虚拟地址,虚拟地址供应用使用
    (软件使用虚拟地址,读取报文)报文数据的物理地址可以由报文数据的虚拟地址转化得到。*/
    struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
    struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
    struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
    struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
    uint64_t            mbuf_initializer; /**< value to init mbufs */
    uint16_t            nb_rx_desc; /**< number of RX descriptors. */
    uint16_t            rx_tail;  /**< current value of RDT register. */
    uint16_t            nb_rx_hold; /**< number of held free RX desc. */
    uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
    uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
    uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
    uint8_t            rx_using_sse;
    /**< indicates that vector RX is in use */
#ifdef RTE_LIBRTE_SECURITY
    uint8_t            using_ipsec;
    /**< indicates that IPsec RX feature is in use */
#endif
#ifdef RTE_IXGBE_INC_VECTOR
    uint16_t            rxrearm_nb;     /**< number of remaining to be re-armed */
    uint16_t            rxrearm_start;  /**< the idx we start the re-arming from */
#endif
    uint16_t            rx_free_thresh; /**< max free RX desc to hold. */
    uint16_t            queue_id; /**< RX queue index. */
    uint16_t            reg_idx;  /**< RX queue register index. */
    uint16_t            pkt_type_mask;  /**< Packet type mask for different NICs. */
    uint16_t            port_id;  /**< Device port identifier. */
    uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
    uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
    uint8_t             rx_deferred_start; /**< not in global dev start. */
    /** flags to set in mbuf when a vlan is detected. */
    uint64_t            vlan_flags;
    uint64_t        offloads; /**< Rx offloads with DEV_RX_OFFLOAD_* */
    /** need to alloc dummy mbuf, for wraparound when scanning hw ring */
    struct rte_mbuf fake_mbuf;
    /** hold packets to return to application */
    struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
};

 

 

/**
 * @internal
 * The data part, with no function pointers, associated with each ethernet device.
 *
 * This structure is safe to place in shared memory to be common among different
 * processes in a multi-process configuration.
 */
struct rte_eth_dev_data {
    char name[RTE_ETH_NAME_MAX_LEN]; /**< Unique identifier name */

    void **rx_queues; /**< Array of pointers to RX queues.  多队列  */
    void **tx_queues; /**< Array of pointers to TX queues. */
    uint16_t nb_rx_queues; /**< Number of RX queues. */
    uint16_t nb_tx_queues; /**< Number of TX queues. */

    struct rte_eth_dev_sriov sriov;    /**< SRIOV data */

    void *dev_private;              /**< PMD-specific private data */

    struct rte_eth_link dev_link;   /**< Link-level information & status. */
    struct rte_eth_conf dev_conf;   /**< Configuration applied to device. */
    uint16_t mtu;                   /**< Maximum Transmission Unit. */
    uint32_t min_rx_buf_size;
            /**< Common RX buffer size handled by all queues. */

    uint64_t rx_mbuf_alloc_failed; /**< RX ring mbuf allocation failures. */
    struct ether_addr *mac_addrs;  /**< Device Ethernet link address. */
    uint64_t mac_pool_sel[ETH_NUM_RECEIVE_MAC_ADDR];
            /**< Bitmap associating MAC addresses to pools. */
    struct ether_addr *hash_mac_addrs;
            /**< Device Ethernet MAC addresses of hash filtering. */
    uint16_t port_id;           /**< Device [external] port identifier. */

    __extension__
    uint8_t promiscuous   : 1, /**< RX promiscuous mode ON(1) / OFF(0). */
        scattered_rx : 1,  /**< RX of scattered packets is ON(1) / OFF(0) */
        all_multicast : 1, /**< RX all multicast mode ON(1) / OFF(0). */
        dev_started : 1,   /**< Device state: STARTED(1) / STOPPED(0). */
        lro         : 1;   /**< RX LRO is ON(1) / OFF(0) */
    uint8_t rx_queue_state[RTE_MAX_QUEUES_PER_PORT];
            /**< Queues state: STARTED(1) / STOPPED(0). */
    uint8_t tx_queue_state[RTE_MAX_QUEUES_PER_PORT];
            /**< Queues state: STARTED(1) / STOPPED(0). */
    uint32_t dev_flags;             /**< Capabilities. */
    enum rte_kernel_driver kdrv;    /**< Kernel driver passthrough. */
    int numa_node;                  /**< NUMA node connection. */
    struct rte_vlan_filter_conf vlan_filter_conf;
            /**< VLAN filter configuration. */
    struct rte_eth_dev_owner owner; /**< The port owner. */
} __rte_cache_aligned;

 

 

/*收发包的配置最主要的工作就是配置网卡的收发队列,设置DMA拷贝数据包的地址等,配置好地址后,
网卡收到数据包后会通过DMA控制器直接把数据包拷贝到指定的内存地址。我们使用数据包时,
只要去对应队列取出指定地址的数据即可。
收发包的配置是从rte_eth_dev_configure()开始的,这里根据参数会配置队列的个数,以及接口的配置信息,
如队列的使用模式,多队列的方式等。
前面会先进行一些各项检查,如果设备已经启动,就得先停下来才能配置(这时应该叫再配置吧)。
然后把传进去的配置参数拷贝到设备的数据区。
*/
int
rte_eth_dev_configure(uint16_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q,
              const struct rte_eth_conf *dev_conf)
{
    struct rte_eth_dev *dev;
    struct rte_eth_dev_info dev_info;
    struct rte_eth_conf orig_conf;
    int diag;
    int ret;

    RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);

    dev = &rte_eth_devices[port_id];

    RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);
    RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP);

    if (dev->data->dev_started) {
        RTE_ETHDEV_LOG(ERR,
            "Port %u must be stopped to allow configuration\n",
            port_id);
        return -EBUSY;
    }

     /* Store original config, as rollback required on failure */
    memcpy(&orig_conf, &dev->data->dev_conf, sizeof(dev->data->dev_conf));

    /*
     * Copy the dev_conf parameter into the dev structure.
     * rte_eth_dev_info_get() requires dev_conf, copy it before dev_info get
     */
    memcpy(&dev->data->dev_conf, dev_conf, sizeof(dev->data->dev_conf));

    rte_eth_dev_info_get(port_id, &dev_info);

    /* If number of queues specified by application for both Rx and Tx is
     * zero, use driver preferred values. This cannot be done individually
     * as it is valid for either Tx or Rx (but not both) to be zero.
     * If driver does not provide any preferred valued, fall back on
     * EAL defaults.
     */
    if (nb_rx_q == 0 && nb_tx_q == 0) {
        nb_rx_q = dev_info.default_rxportconf.nb_queues;
        if (nb_rx_q == 0)
            nb_rx_q = RTE_ETH_DEV_FALLBACK_RX_NBQUEUES;
        nb_tx_q = dev_info.default_txportconf.nb_queues;
        if (nb_tx_q == 0)
            nb_tx_q = RTE_ETH_DEV_FALLBACK_TX_NBQUEUES;
    }

    if (nb_rx_q > RTE_MAX_QUEUES_PER_PORT) {
        RTE_ETHDEV_LOG(ERR,
            "Number of RX queues requested (%u) is greater than max supported(%d)\n",
            nb_rx_q, RTE_MAX_QUEUES_PER_PORT);
        ret = -EINVAL;
        goto rollback;
    }

    if (nb_tx_q > RTE_MAX_QUEUES_PER_PORT) {
        RTE_ETHDEV_LOG(ERR,
            "Number of TX queues requested (%u) is greater than max supported(%d)\n",
            nb_tx_q, RTE_MAX_QUEUES_PER_PORT);
        ret = -EINVAL;
        goto rollback;
    }

    /*
     * Check that the numbers of RX and TX queues are not greater
     * than the maximum number of RX and TX queues supported by the
     * configured device.
     */
    if (nb_rx_q > dev_info.max_rx_queues) {
        RTE_ETHDEV_LOG(ERR, "Ethdev port_id=%u nb_rx_queues=%u > %u\n",
            port_id, nb_rx_q, dev_info.max_rx_queues);
        ret = -EINVAL;
        goto rollback;
    }

    if (nb_tx_q > dev_info.max_tx_queues) {
        RTE_ETHDEV_LOG(ERR, "Ethdev port_id=%u nb_tx_queues=%u > %u\n",
            port_id, nb_tx_q, dev_info.max_tx_queues);
        ret = -EINVAL;
        goto rollback;
    }

    /* Check that the device supports requested interrupts */
    if ((dev_conf->intr_conf.lsc == 1) &&
            (!(dev->data->dev_flags & RTE_ETH_DEV_INTR_LSC))) {
        RTE_ETHDEV_LOG(ERR, "Driver %s does not support lsc\n",
            dev->device->driver->name);
        ret = -EINVAL;
        goto rollback;
    }
    if ((dev_conf->intr_conf.rmv == 1) &&
            (!(dev->data->dev_flags & RTE_ETH_DEV_INTR_RMV))) {
        RTE_ETHDEV_LOG(ERR, "Driver %s does not support rmv\n",
            dev->device->driver->name);
        ret = -EINVAL;
        goto rollback;
    }

    /*
     * If jumbo frames are enabled, check that the maximum RX packet
     * length is supported by the configured device.
     */
    if (dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_JUMBO_FRAME) {
        if (dev_conf->rxmode.max_rx_pkt_len > dev_info.max_rx_pktlen) {
            RTE_ETHDEV_LOG(ERR,
                "Ethdev port_id=%u max_rx_pkt_len %u > max valid value %u\n",
                port_id, dev_conf->rxmode.max_rx_pkt_len,
                dev_info.max_rx_pktlen);
            ret = -EINVAL;
            goto rollback;
        } else if (dev_conf->rxmode.max_rx_pkt_len < ETHER_MIN_LEN) {
            RTE_ETHDEV_LOG(ERR,
                "Ethdev port_id=%u max_rx_pkt_len %u < min valid value %u\n",
                port_id, dev_conf->rxmode.max_rx_pkt_len,
                (unsigned)ETHER_MIN_LEN);
            ret = -EINVAL;
            goto rollback;
        }
    } else {
        if (dev_conf->rxmode.max_rx_pkt_len < ETHER_MIN_LEN ||
            dev_conf->rxmode.max_rx_pkt_len > ETHER_MAX_LEN)
            /* Use default value */
            dev->data->dev_conf.rxmode.max_rx_pkt_len =
                            ETHER_MAX_LEN;
    }

    /* Any requested offloading must be within its device capabilities */
    if ((dev_conf->rxmode.offloads & dev_info.rx_offload_capa) !=
         dev_conf->rxmode.offloads) {
        RTE_ETHDEV_LOG(ERR,
            "Ethdev port_id=%u requested Rx offloads 0x%"PRIx64" doesn't match Rx offloads "
            "capabilities 0x%"PRIx64" in %s()\n",
            port_id, dev_conf->rxmode.offloads,
            dev_info.rx_offload_capa,
            __func__);
        ret = -EINVAL;
        goto rollback;
    }
    if ((dev_conf->txmode.offloads & dev_info.tx_offload_capa) !=
         dev_conf->txmode.offloads) {
        RTE_ETHDEV_LOG(ERR,
            "Ethdev port_id=%u requested Tx offloads 0x%"PRIx64" doesn't match Tx offloads "
            "capabilities 0x%"PRIx64" in %s()\n",
            port_id, dev_conf->txmode.offloads,
            dev_info.tx_offload_capa,
            __func__);
        ret = -EINVAL;
        goto rollback;
    }

    if ((dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_CRC_STRIP) &&
            (dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_KEEP_CRC)) {
        RTE_ETHDEV_LOG(ERR,
            "Port id=%u not allowed to set both CRC STRIP and KEEP CRC offload flags\n",
            port_id);
        return -EINVAL;
    }

    /* Check that device supports requested rss hash functions. */
    if ((dev_info.flow_type_rss_offloads |
         dev_conf->rx_adv_conf.rss_conf.rss_hf) !=
        dev_info.flow_type_rss_offloads) {
        RTE_ETHDEV_LOG(ERR,
            "Ethdev port_id=%u invalid rss_hf: 0x%"PRIx64", valid value: 0x%"PRIx64"\n",
            port_id, dev_conf->rx_adv_conf.rss_conf.rss_hf,
            dev_info.flow_type_rss_offloads);
        ret = -EINVAL;
        goto rollback;
    }

    /*
     * Setup new number of RX/TX queues and reconfigure device.接收队列的初始化
参数需要指定要初始化的port_id,queue_id,以及描述符的个数,还可以指定接收的配置,如释放和回写的阈值等
     */
    diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q);
    if (diag != 0) {
        RTE_ETHDEV_LOG(ERR,
            "Port%u rte_eth_dev_rx_queue_config = %d\n",
            port_id, diag);
        ret = diag;
        goto rollback;
    }

    diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q);
    if (diag != 0) {
        RTE_ETHDEV_LOG(ERR,
            "Port%u rte_eth_dev_tx_queue_config = %d\n",
            port_id, diag);
        rte_eth_dev_rx_queue_config(dev, 0);
        ret = diag;
        goto rollback;
    }

    diag = (*dev->dev_ops->dev_configure)(dev);
    if (diag != 0) {
        RTE_ETHDEV_LOG(ERR, "Port%u dev_configure = %d\n",
            port_id, diag);
        rte_eth_dev_rx_queue_config(dev, 0);
        rte_eth_dev_tx_queue_config(dev, 0);
        ret = eth_err(port_id, diag);
        goto rollback;
    }

    /* Initialize Rx profiling if enabled at compilation time. */
    diag = __rte_eth_profile_rx_init(port_id, dev);
    if (diag != 0) {
        RTE_ETHDEV_LOG(ERR, "Port%u __rte_eth_profile_rx_init = %d\n",
            port_id, diag);
        rte_eth_dev_rx_queue_config(dev, 0);
        rte_eth_dev_tx_queue_config(dev, 0);
        ret = eth_err(port_id, diag);
        goto rollback;
    }

    return 0;

rollback:
    memcpy(&dev->data->dev_conf, &orig_conf, sizeof(dev->data->dev_conf));

    return ret;
}
static int
rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
{
    uint16_t old_nb_queues = dev->data->nb_rx_queues;
    void **rxq;
    unsigned i;

    if (dev->data->rx_queues == NULL && nb_queues != 0) { /* first time configuration */
        //分配队列结构体,并填充结构
        dev->data->rx_queues = rte_zmalloc("ethdev->rx_queues",
                sizeof(dev->data->rx_queues[0]) * nb_queues,
                RTE_CACHE_LINE_SIZE);
        if (dev->data->rx_queues == NULL) {
            dev->data->nb_rx_queues = 0;
            return -(ENOMEM);
        }
    }
  ----------------------------------
    dev->data->nb_rx_queues = nb_queues;
    return 0;
}

 

int
rte_eth_rx_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
               uint16_t nb_rx_desc, unsigned int socket_id,
               const struct rte_eth_rxconf *rx_conf,
               struct rte_mempool *mp)
{
    int ret;
    uint32_t mbp_buf_size;
    struct rte_eth_dev *dev;
    struct rte_eth_dev_info dev_info;
    struct rte_eth_rxconf local_conf;
    void **rxq;

    RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
//检查描述符的数量最大不能大于IXGBE_MAX_RING_DESC个,最小不能小于IXGBE_MIN_RING_DESC个
    dev = &rte_eth_devices[port_id];
    if (rx_queue_id >= dev->data->nb_rx_queues) {
        RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", rx_queue_id);
        return -EINVAL;
    }

    RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);
    RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_setup, -ENOTSUP);

    /*
     * Check the size of the mbuf data buffer.
     * This value must be provided in the private data of the memory pool.
     * First check that the memory pool has a valid private data.
     */
    rte_eth_dev_info_get(port_id, &dev_info);
    if (mp->private_data_size < sizeof(struct rte_pktmbuf_pool_private)) {
        RTE_ETHDEV_LOG(ERR, "%s private_data_size %d < %d\n",
            mp->name, (int)mp->private_data_size,
            (int)sizeof(struct rte_pktmbuf_pool_private));
        return -ENOSPC;
    }
    mbp_buf_size = rte_pktmbuf_data_room_size(mp);

    if ((mbp_buf_size - RTE_PKTMBUF_HEADROOM) < dev_info.min_rx_bufsize) {
        RTE_ETHDEV_LOG(ERR,
            "%s mbuf_data_room_size %d < %d (RTE_PKTMBUF_HEADROOM=%d + min_rx_bufsize(dev)=%d)\n",
            mp->name, (int)mbp_buf_size,
            (int)(RTE_PKTMBUF_HEADROOM + dev_info.min_rx_bufsize),
            (int)RTE_PKTMBUF_HEADROOM,
            (int)dev_info.min_rx_bufsize);
        return -EINVAL;
    }

    /* Use default specified by driver, if nb_rx_desc is zero */
    if (nb_rx_desc == 0) {
        nb_rx_desc = dev_info.default_rxportconf.ring_size;
        /* If driver default is also zero, fall back on EAL default */
        if (nb_rx_desc == 0)
            nb_rx_desc = RTE_ETH_DEV_FALLBACK_RX_RINGSIZE;
    }

    if (nb_rx_desc > dev_info.rx_desc_lim.nb_max ||
            nb_rx_desc < dev_info.rx_desc_lim.nb_min ||
            nb_rx_desc % dev_info.rx_desc_lim.nb_align != 0) {

        RTE_ETHDEV_LOG(ERR,
            "Invalid value for nb_rx_desc(=%hu), should be: <= %hu, >= %hu, and a product of %hu\n",
            nb_rx_desc, dev_info.rx_desc_lim.nb_max,
            dev_info.rx_desc_lim.nb_min,
            dev_info.rx_desc_lim.nb_align);
        return -EINVAL;
    }

    if (dev->data->dev_started &&
        !(dev_info.dev_capa &
            RTE_ETH_DEV_CAPA_RUNTIME_RX_QUEUE_SETUP))
        return -EBUSY;

    if (dev->data->dev_started &&
        (dev->data->rx_queue_state[rx_queue_id] !=
            RTE_ETH_QUEUE_STATE_STOPPED))
        return -EBUSY;

    rxq = dev->data->rx_queues;
    if (rxq[rx_queue_id]) {
        RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
                    -ENOTSUP);
        (*dev->dev_ops->rx_queue_release)(rxq[rx_queue_id]);
        rxq[rx_queue_id] = NULL;
    }

    if (rx_conf == NULL)
        rx_conf = &dev_info.default_rxconf;

    local_conf = *rx_conf;

    /*
     * If an offloading has already been enabled in
     * rte_eth_dev_configure(), it has been enabled on all queues,
     * so there is no need to enable it in this queue again.
     * The local_conf.offloads input to underlying PMD only carries
     * those offloadings which are only enabled on this queue and
     * not enabled on all queues.
     */
    local_conf.offloads &= ~dev->data->dev_conf.rxmode.offloads;

    /*
     * New added offloadings for this queue are those not enabled in
     * rte_eth_dev_configure() and they must be per-queue type.
     * A pure per-port offloading can't be enabled on a queue while
     * disabled on another queue. A pure per-port offloading can't
     * be enabled for any queue as new added one if it hasn't been
     * enabled in rte_eth_dev_configure().
     */
    if ((local_conf.offloads & dev_info.rx_queue_offload_capa) !=
         local_conf.offloads) {
        RTE_ETHDEV_LOG(ERR,
            "Ethdev port_id=%d rx_queue_id=%d, new added offloads 0x%"PRIx64" must be "
            "within pre-queue offload capabilities 0x%"PRIx64" in %s()\n",
            port_id, rx_queue_id, local_conf.offloads,
            dev_info.rx_queue_offload_capa,
            __func__);
        return -EINVAL;
    }
//队列最终的初始化
    ret = (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,
                          socket_id, &local_conf, mp);
    if (!ret) {
        if (!dev->data->min_rx_buf_size ||
            dev->data->min_rx_buf_size > mbp_buf_size)
            dev->data->min_rx_buf_size = mbp_buf_size;
    }

    return eth_err(port_id, ret);
}
int __attribute__((cold))
ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
             uint16_t queue_idx,
             uint16_t nb_desc,
             unsigned int socket_id,
             const struct rte_eth_rxconf *rx_conf,
             struct rte_mempool *mp)
{
    const struct rte_memzone *rz;
    struct ixgbe_rx_queue *rxq;
    struct ixgbe_hw     *hw;
    uint16_t len;
    struct ixgbe_adapter *adapter =
        (struct ixgbe_adapter *)dev->data->dev_private;
    uint64_t offloads;

    PMD_INIT_FUNC_TRACE();
    hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);

    offloads = rx_conf->offloads | dev->data->dev_conf.rxmode.offloads;

    /*
     * Validate number of receive descriptors.
     * It must not exceed hardware maximum, and must be multiple
     * of IXGBE_ALIGN.
     */
    if (nb_desc % IXGBE_RXD_ALIGN != 0 ||
            (nb_desc > IXGBE_MAX_RING_DESC) ||
            (nb_desc < IXGBE_MIN_RING_DESC)) {
        return -EINVAL;
    }

    /* Free memory prior to re-allocation if needed... */
    if (dev->data->rx_queues[queue_idx] != NULL) {
        ixgbe_rx_queue_release(dev->data->rx_queues[queue_idx]);
        dev->data->rx_queues[queue_idx] = NULL;
    }

    /* First allocate the rx queue data structure 
    分配队列结构体,并填充结构 */
    rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);
    if (rxq == NULL)
        return -ENOMEM;
    //填充结构体的所属内存池,描述符个数,队列号,队列所属接口号等成员
    rxq->mb_pool = mp;
    rxq->nb_rx_desc = nb_desc;
    rxq->rx_free_thresh = rx_conf->rx_free_thresh;
    rxq->queue_id = queue_idx;
    rxq->reg_idx = (uint16_t)((RTE_ETH_DEV_SRIOV(dev).active == 0) ?
        queue_idx : RTE_ETH_DEV_SRIOV(dev).def_pool_q_idx + queue_idx);
    rxq->port_id = dev->data->port_id;
    if (rte_eth_dev_must_keep_crc(dev->data->dev_conf.rxmode.offloads))
        rxq->crc_len = ETHER_CRC_LEN;
    else
        rxq->crc_len = 0;
    rxq->drop_en = rx_conf->rx_drop_en;
    rxq->rx_deferred_start = rx_conf->rx_deferred_start;
    rxq->offloads = offloads;

    /*
     * The packet type in RX descriptor is different for different NICs.
     * Some bits are used for x550 but reserved for other NICS.
     * So set different masks for different NICs.
     */
    if (hw->mac.type == ixgbe_mac_X550 ||
        hw->mac.type == ixgbe_mac_X550EM_x ||
        hw->mac.type == ixgbe_mac_X550EM_a ||
        hw->mac.type == ixgbe_mac_X550_vf ||
        hw->mac.type == ixgbe_mac_X550EM_x_vf ||
        hw->mac.type == ixgbe_mac_X550EM_a_vf)
        rxq->pkt_type_mask = IXGBE_PACKET_TYPE_MASK_X550;
    else
        rxq->pkt_type_mask = IXGBE_PACKET_TYPE_MASK_82599;

    /*
     * Allocate RX ring hardware descriptors. A memzone large enough to
     * handle the maximum ring size is allocated in order to allow for
     * resizing in later calls to the queue setup function.
     */
     //分配描述符队列的空间,按照最大的描述符个数进行分配
    rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);
    if (rz == NULL) {
        ixgbe_rx_queue_release(rxq);
        return -ENOMEM;
    }

    /*
     * Zero init all the descriptors in the ring.
     */
    memset(rz->addr, 0, RX_RING_SZ);

    /*
     * Modified to setup VFRDT for Virtual Function
     */
     //获取描述符队列的头和尾寄存器的地址,在收发包后,软件要对这个寄存器进行处理
    if (hw->mac.type == ixgbe_mac_82599_vf ||
        hw->mac.type == ixgbe_mac_X540_vf ||
        hw->mac.type == ixgbe_mac_X550_vf ||
        hw->mac.type == ixgbe_mac_X550EM_x_vf ||
        hw->mac.type == ixgbe_mac_X550EM_a_vf) {
        rxq->rdt_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_VFRDT(queue_idx));
        rxq->rdh_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_VFRDH(queue_idx));
    } else {
        rxq->rdt_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_RDT(rxq->reg_idx));
        rxq->rdh_reg_addr =
            IXGBE_PCI_REG_ADDR(hw, IXGBE_RDH(rxq->reg_idx));
    }
//设置队列的接收描述符ring的物理地址和虚拟地址
    rxq->rx_ring_phys_addr = rz->iova;
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

    /*
     * Certain constraints must be met in order to use the bulk buffer
     * allocation Rx burst function. If any of Rx queues doesn't meet them
     * the feature should be disabled for the whole port.
     */
    if (check_rx_burst_bulk_alloc_preconditions(rxq)) {
        PMD_INIT_LOG(DEBUG, "queue[%d] doesn't meet Rx Bulk Alloc "
                    "preconditions - canceling the feature for "
                    "the whole port[%d]",
                 rxq->queue_id, rxq->port_id);
        adapter->rx_bulk_alloc_allowed = false;
    }

    /*
     * Allocate software ring. Allow for space at the end of the
     * S/W ring to make sure look-ahead logic in bulk alloc Rx burst
     * function does not access an invalid memory region.
     */
    len = nb_desc;
    if (adapter->rx_bulk_alloc_allowed)
        len += RTE_PMD_IXGBE_RX_MAX_BURST;
//分配sw_ring,这个ring中存储的对象是struct ixgbe_rx_entry,其实里面就是数据包mbuf的指针
    rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);
    if (!rxq->sw_ring) {
        ixgbe_rx_queue_release(rxq);
        return -ENOMEM;
    }

    /*
     * Always allocate even if it's not going to be needed in order to
     * simplify the code.
     *
     * This ring is used in LRO and Scattered Rx cases and Scattered Rx may
     * be requested in ixgbe_dev_rx_init(), which is called later from
     * dev_start() flow.
     */
    rxq->sw_sc_ring =
        rte_zmalloc_socket("rxq->sw_sc_ring",
                   sizeof(struct ixgbe_scattered_rx_entry) * len,
                   RTE_CACHE_LINE_SIZE, socket_id);
    if (!rxq->sw_sc_ring) {
        ixgbe_rx_queue_release(rxq);
        return -ENOMEM;
    }

    PMD_INIT_LOG(DEBUG, "sw_ring=%p sw_sc_ring=%p hw_ring=%p "
                "dma_addr=0x%"PRIx64,
             rxq->sw_ring, rxq->sw_sc_ring, rxq->rx_ring,
             rxq->rx_ring_phys_addr);

    if (!rte_is_power_of_2(nb_desc)) {
        PMD_INIT_LOG(DEBUG, "queue[%d] doesn't meet Vector Rx "
                    "preconditions - canceling the feature for "
                    "the whole port[%d]",
                 rxq->queue_id, rxq->port_id);
        adapter->rx_vec_allowed = false;
    } else
        ixgbe_rxq_vec_setup(rxq);

    dev->data->rx_queues[queue_idx] = rxq;

    ixgbe_reset_rx_queue(adapter, rxq);

    return 0;
}
/* Reset dynamic ixgbe_rx_queue fields back to defaults */
static void __attribute__((cold))
ixgbe_reset_rx_queue(struct ixgbe_adapter *adapter, struct ixgbe_rx_queue *rxq)
{
    static const union ixgbe_adv_rx_desc zeroed_desc = {{0}};
    unsigned i;
    uint16_t len = rxq->nb_rx_desc;

    /*
     * By default, the Rx queue setup function allocates enough memory for
     * IXGBE_MAX_RING_DESC.  The Rx Burst bulk allocation function requires
     * extra memory at the end of the descriptor ring to be zero'd out.
     */
    if (adapter->rx_bulk_alloc_allowed)
        /* zero out extra memory */
        len += RTE_PMD_IXGBE_RX_MAX_BURST;

    /*
     * Zero out HW ring memory. Zero out extra memory at the end of
     * the H/W ring so look-ahead logic in Rx Burst bulk alloc function
     * reads extra memory as zeros.
     *///先把分配的描述符队列清空
    for (i = 0; i < len; i++) {
        rxq->rx_ring[i] = zeroed_desc;
    }

    /*
     * initialize extra software ring entries. Space for these extra
     * entries is always allocated
     */
    memset(&rxq->fake_mbuf, 0x0, sizeof(rxq->fake_mbuf));
    for (i = rxq->nb_rx_desc; i < len; ++i) {
        rxq->sw_ring[i].mbuf = &rxq->fake_mbuf;
    }
//初始化队列中 成员
    rxq->rx_nb_avail = 0;
    rxq->rx_next_avail = 0;
    rxq->rx_free_trigger = (uint16_t)(rxq->rx_free_thresh - 1);
    rxq->rx_tail = 0;
    rxq->nb_rx_hold = 0;
    rxq->pkt_first_seg = NULL;
    rxq->pkt_last_seg = NULL;

#ifdef RTE_IXGBE_INC_VECTOR
    rxq->rxrearm_start = 0;
    rxq->rxrearm_nb = 0;
#endif
}

 

 

int
rte_eth_dev_start(uint16_t port_id)
{
    struct rte_eth_dev *dev;
    struct rte_eth_dev_info dev_info;
    int diag;

    RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);

    dev = &rte_eth_devices[port_id];

    RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_start, -ENOTSUP);

    if (dev->data->dev_started != 0) {
        RTE_ETHDEV_LOG(INFO,
            "Device with port_id=%"PRIu16" already started\n",
            port_id);
        return 0;
    }

    rte_eth_dev_info_get(port_id, &dev_info);

    /* Lets restore MAC now if device does not support live change */
    if (*dev_info.dev_flags & RTE_ETH_DEV_NOLIVE_MAC_ADDR)
        rte_eth_dev_mac_restore(dev, &dev_info);
//----------   ixgbevf_dev_start
    diag = (*dev->dev_ops->dev_start)(dev);
    if (diag == 0)
        dev->data->dev_started = 1;
    else
        return eth_err(port_id, diag);

    rte_eth_dev_config_restore(dev, &dev_info, port_id);

    if (dev->data->dev_conf.intr_conf.lsc == 0) {
        RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->link_update, -ENOTSUP);
        (*dev->dev_ops->link_update)(dev, 0);
    }
    return 0;
}

 

//nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],  rx_pkts, nb_pkts);
/*Rx_queue: 收包队列结构体:我们主要关注两个环形队列rx_ring、sw_ring

Rx_ring:一个地址连续环形队列,存储的是描述符,描述符中包含将来存放数据包的物理地址、DD标志(下面会介绍DD标志)等,上面图中只画了存放数据包的物理地址,物理地址供网卡DMA模块使用,也称为DMA地址(硬件使用物理地址,当网卡收到数据包后会通过DMA操作将数据包拷贝到物理地址,物理地址是通过虚拟地址转换得到的,下面分析源码时会介绍)

Sw_ring: 存储的是将来存放数据包的虚拟地址,虚拟地址供应用使用(软件应用使用虚拟地址,应用往虚拟地址读写数据包)

DD标志:用于标识一个描述符buf是否可用,无论网卡是工作在轮询方式下还是中断方式,判断数据包接收成功或者是否发送成功都需要检查描述符中的完成状态位(Description Done)DD,该状态位由DMA控制器在完成操作后进行回写

Mbuf:对应于Mbuf内存池中的元素,通过alloc或者free操作内存池获取或者释放mbuf对象,这里需要说的一点是mbuf池创建的时候是连续的,但是rx_ring和sw_ring里指向的数据地址不一定是连续的,下面分析收包流程时会介绍

PCIE总线:采用高速串行通信互联标准,自上而下分为事务传输层、数据链路层、物理层,网卡与CPU之间数据包的传输、CPU对网卡寄存器的MMIO操作都通过PCIE进行传输
               */
uint16_t
ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
        uint16_t nb_pkts)
{
    struct ixgbe_rx_queue *rxq;
    volatile union ixgbe_adv_rx_desc *rx_ring;
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *sw_ring;
    struct ixgbe_rx_entry *rxe;
    struct rte_mbuf *rxm;
    struct rte_mbuf *nmb;
    union ixgbe_adv_rx_desc rxd;
    uint64_t dma_addr;
    uint32_t staterr;
    uint32_t pkt_info;
    uint16_t pkt_len;
    uint16_t rx_id;
    uint16_t nb_rx;
    uint16_t nb_hold;
    uint64_t pkt_flags;
    uint64_t vlan_flags;

    nb_rx = 0;
    nb_hold = 0;
    rxq = rx_queue;
    rx_id = rxq->rx_tail;//从队列的tail位置开始取包
    rx_ring = rxq->rx_ring;
    sw_ring = rxq->sw_ring;
    vlan_flags = rxq->vlan_flags;
    while (nb_rx < nb_pkts) {//循环获取nb_pkts个包
        /*
         * The order of operations here is important as the DD status
         * bit must not be read after any other descriptor fields.
         * rx_ring and rxdp are pointing to volatile data so the order
         * of accesses cannot be reordered by the compiler. If they were
         * not volatile, they could be reordered which could lead to
         * using invalid descriptor fields when read from rxd.
         */
        rxdp = &rx_ring[rx_id];
        staterr = rxdp->wb.upper.status_error;
    //检查DD位是否为1,是1则说明该位置已放入数据包,否则表示没有报文,退出
        if (!(staterr & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
            break;
        rxd = *rxdp;

        /*
         * End of packet.
         *
         * If the IXGBE_RXDADV_STAT_EOP flag is not set, the RX packet
         * is likely to be invalid and to be dropped by the various
         * validation checks performed by the network stack.
         *
         * Allocate a new mbuf to replenish the RX ring descriptor.
         * If the allocation fails:
         *    - arrange for that RX descriptor to be the first one
         *      being parsed the next time the receive function is
         *      invoked [on the same queue].
         *
         *    - Stop parsing the RX ring and return immediately.
         *
         * This policy do not drop the packet received in the RX
         * descriptor for which the allocation of a new mbuf failed.
         * Thus, it allows that packet to be later retrieved if
         * mbuf have been freed in the mean time.
         * As a side effect, holding RX descriptors instead of
         * systematically giving them back to the NIC may lead to
         * RX ring exhaustion situations.
         * However, the NIC can gracefully prevent such situations
         * to happen by sending specific "back-pressure" flow control
         * frames to its peer(s).
         */
        PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
               "ext_err_stat=0x%08x pkt_len=%u",
               (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
               (unsigned) rx_id, (unsigned) staterr,
               (unsigned) rte_le_to_cpu_16(rxd.wb.upper.length));
        //mempool的ring中分配一个新的--mbuf 申请一个mbuf(nmb),用于交换
        nmb = rte_mbuf_raw_alloc(rxq->mb_pool);
        if (nmb == NULL) {
            PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                   "queue_id=%u", (unsigned) rxq->port_id,
                   (unsigned) rxq->queue_id);
            rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
            break;
        }

        nb_hold++;
       
        rxe = &sw_ring[rx_id];
        rx_id++;
        if (rx_id == rxq->nb_rx_desc)
            rx_id = 0;

        /* Prefetch next mbuf while processing current one. */
        rte_ixgbe_prefetch(sw_ring[rx_id].mbuf);

        /*
         * When next RX descriptor is on a cache-line boundary,
         * prefetch the next 4 RX descriptors and the next 8 pointers
         * to mbufs.
         */
        if ((rx_id & 0x3) == 0) {
            rte_ixgbe_prefetch(&rx_ring[rx_id]);
            rte_ixgbe_prefetch(&sw_ring[rx_id]);
        }
        //从sw_ring中读取一个报文mbuf(存入rxm)
        rxm = rxe->mbuf;
         //往sw_ring中填空一个新报文mbuf(nmb)
        rxe->mbuf = nmb;
         //新mbuf对应的报文数据物理地址填入rx_ring对应位置,并将hdr_addr置0(DD位置0)
        //在驱动读取完数据包后,要把描述符的DD标志位置为0,同时设置新的DMA地址指向新的mbuf空间,这么描述符就可以再次被网卡硬件使用,拷贝数据到mbuf空间了
//将新申请的mbuf的虚拟地址转换为物理地址,为rx_ring的缓冲区填充做准备
         dma_addr =
            rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));
        rxdp->read.hdr_addr = 0;
        rxdp->read.pkt_addr = dma_addr;
        //换出来的太子rxm就是我们要取出来的数据包指针,在下面填充一些必要的信息,就可以把包返给接收的用户了

        /*
         * Initialize the returned mbuf.
         * 1) setup generic mbuf fields:
         *    - number of segments,
         *    - next segment,
         *    - packet length,
         *    - RX port identifier.
         * 2) integrate hardware offload data, if any:
         *    - RSS flag & hash,
         *    - IP checksum flag,
         *    - VLAN TCI, if any,
         *    - error flags.
         */
        pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.wb.upper.length) -
                      rxq->crc_len);
        //对读取mbuf的报文信息进行初始化
        rxm->data_off = RTE_PKTMBUF_HEADROOM;
        rte_packet_prefetch((char *)rxm->buf_addr + rxm->data_off);
        rxm->nb_segs = 1;
        rxm->next = NULL;
        rxm->pkt_len = pkt_len;
        rxm->data_len = pkt_len;
        rxm->port = rxq->port_id;

        pkt_info = rte_le_to_cpu_32(rxd.wb.lower.lo_dword.data);
        /* Only valid if PKT_RX_VLAN set in pkt_flags */
        rxm->vlan_tci = rte_le_to_cpu_16(rxd.wb.upper.vlan);

        pkt_flags = rx_desc_status_to_pkt_flags(staterr, vlan_flags);
        pkt_flags = pkt_flags | rx_desc_error_to_pkt_flags(staterr);
        pkt_flags = pkt_flags |
            ixgbe_rxd_pkt_info_to_pkt_flags((uint16_t)pkt_info);
        rxm->ol_flags = pkt_flags;
        rxm->packet_type =
            ixgbe_rxd_pkt_info_to_pkt_type(pkt_info,
                               rxq->pkt_type_mask);

        if (likely(pkt_flags & PKT_RX_RSS_HASH))
            rxm->hash.rss = rte_le_to_cpu_32(
                        rxd.wb.lower.hi_dword.rss);
        else if (pkt_flags & PKT_RX_FDIR) {
            rxm->hash.fdir.hash = rte_le_to_cpu_16(
                    rxd.wb.lower.hi_dword.csum_ip.csum) &
                    IXGBE_ATR_HASH_MASK;
            rxm->hash.fdir.id = rte_le_to_cpu_16(
                    rxd.wb.lower.hi_dword.csum_ip.ip_id);
        }
        /*
         * Store the mbuf address into the next entry of the array
         * of returned packets.
         *///读取的报文mbuf存入rx_pkts
        rx_pkts[nb_rx++] = rxm;
    }
    rxq->rx_tail = rx_id;

    /*
     * If the number of free RX descriptors is greater than the RX free
     * threshold of the queue, advance the Receive Descriptor Tail (RDT)
     * register.
     * Update the RDT with the value of the last processed RX descriptor
     * minus 1, to guarantee that the RDT register is never equal to the
     * RDH register, which creates a "full" ring situtation from the
     * hardware point of view...
     */
    nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
    if (nb_hold > rxq->rx_free_thresh) {
        PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
               "nb_hold=%u nb_rx=%u",
               (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
               (unsigned) rx_id, (unsigned) nb_hold,
               (unsigned) nb_rx);
        rx_id = (uint16_t) ((rx_id == 0) ?
                     (rxq->nb_rx_desc - 1) : (rx_id - 1));
        IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
        nb_hold = 0;
    }
    rxq->nb_rx_hold = nb_hold;
    return nb_rx;
}

 

标签:rx,dev,queue,收发,rxq,ring,id,dpdk
来源: https://www.cnblogs.com/codestack/p/15674144.html