xsk: new descriptor addressing scheme

Currently, AF_XDP only supports a fixed frame-size memory scheme where
each frame is referenced via an index (idx). A user passes the frame
index to the kernel, and the kernel acts upon the data.  Some NICs,
however, do not have a fixed frame-size model, instead they have a
model where a memory window is passed to the hardware and multiple
frames are filled into that window (referred to as the "type-writer"
model).

By changing the descriptor format from the current frame index
addressing scheme, AF_XDP can in the future be extended to support
these kinds of NICs.

In the index-based model, an idx refers to a frame of size
frame_size. Addressing a frame in the UMEM is done by offseting the
UMEM starting address by a global offset, idx * frame_size + offset.
Communicating via the fill- and completion-rings are done by means of
idx.

In this commit, the idx is removed in favor of an address (addr),
which is a relative address ranging over the UMEM. To convert an
idx-based address to the new addr is simply: addr = idx * frame_size +
offset.

We also stop referring to the UMEM "frame" as a frame. Instead it is
simply called a chunk.

To transfer ownership of a chunk to the kernel, the addr of the chunk
is passed in the fill-ring. Note, that the kernel will mask addr to
make it chunk aligned, so there is no need for userspace to do
that. E.g., for a chunk size of 2k, passing an addr of 2048, 2050 or
3000 to the fill-ring will refer to the same chunk.

On the completion-ring, the addr will match that of the Tx descriptor,
passed to the kernel.

Changing the descriptor format to use chunks/addr will allow for
future changes to move to a type-writer based model, where multiple
frames can reside in one chunk. In this model passing one single chunk
into the fill-ring, would potentially result in multiple Rx
descriptors.

This commit changes the uapi of AF_XDP sockets, and updates the
documentation.

Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
diff --git a/net/xdp/xdp_umem.c b/net/xdp/xdp_umem.c
index 8799881..9ad791f 100644
--- a/net/xdp/xdp_umem.c
+++ b/net/xdp/xdp_umem.c
@@ -14,7 +14,7 @@
 
 #include "xdp_umem.h"
 
-#define XDP_UMEM_MIN_FRAME_SIZE 2048
+#define XDP_UMEM_MIN_CHUNK_SIZE 2048
 
 static void xdp_umem_unpin_pages(struct xdp_umem *umem)
 {
@@ -151,12 +151,12 @@ static int xdp_umem_account_pages(struct xdp_umem *umem)
 
 static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
 {
-	u32 frame_size = mr->frame_size, frame_headroom = mr->frame_headroom;
+	u32 chunk_size = mr->chunk_size, headroom = mr->headroom;
+	unsigned int chunks, chunks_per_page;
 	u64 addr = mr->addr, size = mr->len;
-	unsigned int nframes, nfpp;
 	int size_chk, err;
 
-	if (frame_size < XDP_UMEM_MIN_FRAME_SIZE || frame_size > PAGE_SIZE) {
+	if (chunk_size < XDP_UMEM_MIN_CHUNK_SIZE || chunk_size > PAGE_SIZE) {
 		/* Strictly speaking we could support this, if:
 		 * - huge pages, or*
 		 * - using an IOMMU, or
@@ -166,7 +166,7 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
 		return -EINVAL;
 	}
 
-	if (!is_power_of_2(frame_size))
+	if (!is_power_of_2(chunk_size))
 		return -EINVAL;
 
 	if (!PAGE_ALIGNED(addr)) {
@@ -179,33 +179,30 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
 	if ((addr + size) < addr)
 		return -EINVAL;
 
-	nframes = (unsigned int)div_u64(size, frame_size);
-	if (nframes == 0 || nframes > UINT_MAX)
+	chunks = (unsigned int)div_u64(size, chunk_size);
+	if (chunks == 0)
 		return -EINVAL;
 
-	nfpp = PAGE_SIZE / frame_size;
-	if (nframes < nfpp || nframes % nfpp)
+	chunks_per_page = PAGE_SIZE / chunk_size;
+	if (chunks < chunks_per_page || chunks % chunks_per_page)
 		return -EINVAL;
 
-	frame_headroom = ALIGN(frame_headroom, 64);
+	headroom = ALIGN(headroom, 64);
 
-	size_chk = frame_size - frame_headroom - XDP_PACKET_HEADROOM;
+	size_chk = chunk_size - headroom - XDP_PACKET_HEADROOM;
 	if (size_chk < 0)
 		return -EINVAL;
 
 	umem->pid = get_task_pid(current, PIDTYPE_PID);
-	umem->size = (size_t)size;
 	umem->address = (unsigned long)addr;
-	umem->props.frame_size = frame_size;
-	umem->props.nframes = nframes;
-	umem->frame_headroom = frame_headroom;
+	umem->props.chunk_mask = ~((u64)chunk_size - 1);
+	umem->props.size = size;
+	umem->headroom = headroom;
+	umem->chunk_size_nohr = chunk_size - headroom;
 	umem->npgs = size / PAGE_SIZE;
 	umem->pgs = NULL;
 	umem->user = NULL;
 
-	umem->frame_size_log2 = ilog2(frame_size);
-	umem->nfpp_mask = nfpp - 1;
-	umem->nfpplog2 = ilog2(nfpp);
 	refcount_set(&umem->users, 1);
 
 	err = xdp_umem_account_pages(umem);
diff --git a/net/xdp/xdp_umem.h b/net/xdp/xdp_umem.h
index 0881cf4..aeadd1b 100644
--- a/net/xdp/xdp_umem.h
+++ b/net/xdp/xdp_umem.h
@@ -18,35 +18,20 @@ struct xdp_umem {
 	struct xsk_queue *cq;
 	struct page **pgs;
 	struct xdp_umem_props props;
-	u32 npgs;
-	u32 frame_headroom;
-	u32 nfpp_mask;
-	u32 nfpplog2;
-	u32 frame_size_log2;
+	u32 headroom;
+	u32 chunk_size_nohr;
 	struct user_struct *user;
 	struct pid *pid;
 	unsigned long address;
-	size_t size;
 	refcount_t users;
 	struct work_struct work;
+	u32 npgs;
 };
 
-static inline char *xdp_umem_get_data(struct xdp_umem *umem, u32 idx)
+static inline char *xdp_umem_get_data(struct xdp_umem *umem, u64 addr)
 {
-	u64 pg, off;
-	char *data;
-
-	pg = idx >> umem->nfpplog2;
-	off = (idx & umem->nfpp_mask) << umem->frame_size_log2;
-
-	data = page_address(umem->pgs[pg]);
-	return data + off;
-}
-
-static inline char *xdp_umem_get_data_with_headroom(struct xdp_umem *umem,
-						    u32 idx)
-{
-	return xdp_umem_get_data(umem, idx) + umem->frame_headroom;
+	return page_address(umem->pgs[addr >> PAGE_SHIFT]) +
+		(addr & (PAGE_SIZE - 1));
 }
 
 bool xdp_umem_validate_queues(struct xdp_umem *umem);
diff --git a/net/xdp/xdp_umem_props.h b/net/xdp/xdp_umem_props.h
index 2cf8ec4..40eab10 100644
--- a/net/xdp/xdp_umem_props.h
+++ b/net/xdp/xdp_umem_props.h
@@ -7,8 +7,8 @@
 #define XDP_UMEM_PROPS_H_
 
 struct xdp_umem_props {
-	u32 frame_size;
-	u32 nframes;
+	u64 chunk_mask;
+	u64 size;
 };
 
 #endif /* XDP_UMEM_PROPS_H_ */
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 966307c..4688c75 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -41,24 +41,27 @@ bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs)
 
 static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
 {
-	u32 id, len = xdp->data_end - xdp->data;
+	u32 len = xdp->data_end - xdp->data;
 	void *buffer;
+	u64 addr;
 	int err;
 
 	if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index)
 		return -EINVAL;
 
-	if (!xskq_peek_id(xs->umem->fq, &id)) {
+	if (!xskq_peek_addr(xs->umem->fq, &addr) ||
+	    len > xs->umem->chunk_size_nohr) {
 		xs->rx_dropped++;
 		return -ENOSPC;
 	}
 
-	buffer = xdp_umem_get_data_with_headroom(xs->umem, id);
+	addr += xs->umem->headroom;
+
+	buffer = xdp_umem_get_data(xs->umem, addr);
 	memcpy(buffer, xdp->data, len);
-	err = xskq_produce_batch_desc(xs->rx, id, len,
-				      xs->umem->frame_headroom);
+	err = xskq_produce_batch_desc(xs->rx, addr, len);
 	if (!err)
-		xskq_discard_id(xs->umem->fq);
+		xskq_discard_addr(xs->umem->fq);
 	else
 		xs->rx_dropped++;
 
@@ -95,10 +98,10 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
 
 static void xsk_destruct_skb(struct sk_buff *skb)
 {
-	u32 id = (u32)(long)skb_shinfo(skb)->destructor_arg;
+	u64 addr = (u64)(long)skb_shinfo(skb)->destructor_arg;
 	struct xdp_sock *xs = xdp_sk(skb->sk);
 
-	WARN_ON_ONCE(xskq_produce_id(xs->umem->cq, id));
+	WARN_ON_ONCE(xskq_produce_addr(xs->umem->cq, addr));
 
 	sock_wfree(skb);
 }
@@ -123,14 +126,15 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
 
 	while (xskq_peek_desc(xs->tx, &desc)) {
 		char *buffer;
-		u32 id, len;
+		u64 addr;
+		u32 len;
 
 		if (max_batch-- == 0) {
 			err = -EAGAIN;
 			goto out;
 		}
 
-		if (xskq_reserve_id(xs->umem->cq)) {
+		if (xskq_reserve_addr(xs->umem->cq)) {
 			err = -EAGAIN;
 			goto out;
 		}
@@ -153,8 +157,8 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
 		}
 
 		skb_put(skb, len);
-		id = desc.idx;
-		buffer = xdp_umem_get_data(xs->umem, id) + desc.offset;
+		addr = desc.addr;
+		buffer = xdp_umem_get_data(xs->umem, addr);
 		err = skb_store_bits(skb, 0, buffer, len);
 		if (unlikely(err)) {
 			kfree_skb(skb);
@@ -164,7 +168,7 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
 		skb->dev = xs->dev;
 		skb->priority = sk->sk_priority;
 		skb->mark = sk->sk_mark;
-		skb_shinfo(skb)->destructor_arg = (void *)(long)id;
+		skb_shinfo(skb)->destructor_arg = (void *)(long)addr;
 		skb->destructor = xsk_destruct_skb;
 
 		err = dev_direct_xmit(skb, xs->queue_id);
diff --git a/net/xdp/xsk_queue.c b/net/xdp/xsk_queue.c
index ebe85e5..6c32e92 100644
--- a/net/xdp/xsk_queue.c
+++ b/net/xdp/xsk_queue.c
@@ -17,7 +17,7 @@ void xskq_set_umem(struct xsk_queue *q, struct xdp_umem_props *umem_props)
 
 static u32 xskq_umem_get_ring_size(struct xsk_queue *q)
 {
-	return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u32);
+	return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u64);
 }
 
 static u32 xskq_rxtx_get_ring_size(struct xsk_queue *q)
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index b5924e7..337e5ad 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -27,7 +27,7 @@ struct xdp_rxtx_ring {
 /* Used for the fill and completion queues for buffers */
 struct xdp_umem_ring {
 	struct xdp_ring ptrs;
-	u32 desc[0] ____cacheline_aligned_in_smp;
+	u64 desc[0] ____cacheline_aligned_in_smp;
 };
 
 struct xsk_queue {
@@ -76,24 +76,25 @@ static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
 
 /* UMEM queue */
 
-static inline bool xskq_is_valid_id(struct xsk_queue *q, u32 idx)
+static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
 {
-	if (unlikely(idx >= q->umem_props.nframes)) {
+	if (addr >= q->umem_props.size) {
 		q->invalid_descs++;
 		return false;
 	}
+
 	return true;
 }
 
-static inline u32 *xskq_validate_id(struct xsk_queue *q, u32 *id)
+static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
 {
 	while (q->cons_tail != q->cons_head) {
 		struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 		unsigned int idx = q->cons_tail & q->ring_mask;
 
-		*id = READ_ONCE(ring->desc[idx]);
-		if (xskq_is_valid_id(q, *id))
-			return id;
+		*addr = READ_ONCE(ring->desc[idx]) & q->umem_props.chunk_mask;
+		if (xskq_is_valid_addr(q, *addr))
+			return addr;
 
 		q->cons_tail++;
 	}
@@ -101,7 +102,7 @@ static inline u32 *xskq_validate_id(struct xsk_queue *q, u32 *id)
 	return NULL;
 }
 
-static inline u32 *xskq_peek_id(struct xsk_queue *q, u32 *id)
+static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
 {
 	if (q->cons_tail == q->cons_head) {
 		WRITE_ONCE(q->ring->consumer, q->cons_tail);
@@ -111,19 +112,19 @@ static inline u32 *xskq_peek_id(struct xsk_queue *q, u32 *id)
 		smp_rmb();
 	}
 
-	return xskq_validate_id(q, id);
+	return xskq_validate_addr(q, addr);
 }
 
-static inline void xskq_discard_id(struct xsk_queue *q)
+static inline void xskq_discard_addr(struct xsk_queue *q)
 {
 	q->cons_tail++;
 }
 
-static inline int xskq_produce_id(struct xsk_queue *q, u32 id)
+static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
 {
 	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 
-	ring->desc[q->prod_tail++ & q->ring_mask] = id;
+	ring->desc[q->prod_tail++ & q->ring_mask] = addr;
 
 	/* Order producer and data */
 	smp_wmb();
@@ -132,7 +133,7 @@ static inline int xskq_produce_id(struct xsk_queue *q, u32 id)
 	return 0;
 }
 
-static inline int xskq_reserve_id(struct xsk_queue *q)
+static inline int xskq_reserve_addr(struct xsk_queue *q)
 {
 	if (xskq_nb_free(q, q->prod_head, 1) == 0)
 		return -ENOSPC;
@@ -145,16 +146,11 @@ static inline int xskq_reserve_id(struct xsk_queue *q)
 
 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d)
 {
-	u32 buff_len;
-
-	if (unlikely(d->idx >= q->umem_props.nframes)) {
-		q->invalid_descs++;
+	if (!xskq_is_valid_addr(q, d->addr))
 		return false;
-	}
 
-	buff_len = q->umem_props.frame_size;
-	if (unlikely(d->len > buff_len || d->len == 0 ||
-		     d->offset > buff_len || d->offset + d->len > buff_len)) {
+	if (((d->addr + d->len) & q->umem_props.chunk_mask) !=
+	    (d->addr & q->umem_props.chunk_mask)) {
 		q->invalid_descs++;
 		return false;
 	}
@@ -199,7 +195,7 @@ static inline void xskq_discard_desc(struct xsk_queue *q)
 }
 
 static inline int xskq_produce_batch_desc(struct xsk_queue *q,
-					  u32 id, u32 len, u16 offset)
+					  u64 addr, u32 len)
 {
 	struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
 	unsigned int idx;
@@ -208,9 +204,8 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
 		return -ENOSPC;
 
 	idx = (q->prod_head++) & q->ring_mask;
-	ring->desc[idx].idx = id;
+	ring->desc[idx].addr = addr;
 	ring->desc[idx].len = len;
-	ring->desc[idx].offset = offset;
 
 	return 0;
 }