blob: 52e7c4ffd2282e7515fbefd6d519702a344113df [file] [log] [blame]
Cristian Dumitrescu35149b22020-08-28 10:26:28 +02001// SPDX-License-Identifier: GPL-2.0
2/* Copyright(c) 2020 Intel Corporation. */
3
4#define _GNU_SOURCE
5#include <poll.h>
6#include <pthread.h>
7#include <signal.h>
8#include <sched.h>
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <sys/mman.h>
13#include <sys/resource.h>
14#include <sys/socket.h>
15#include <sys/types.h>
16#include <time.h>
17#include <unistd.h>
18#include <getopt.h>
19#include <netinet/ether.h>
20#include <net/if.h>
21
22#include <linux/bpf.h>
23#include <linux/if_link.h>
24#include <linux/if_xdp.h>
25
26#include <bpf/libbpf.h>
27#include <bpf/xsk.h>
28#include <bpf/bpf.h>
29
Andrii Nakryikoc58f9812021-12-01 15:28:23 -080030/* libbpf APIs for AF_XDP are deprecated starting from v0.7 */
31#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
32
Cristian Dumitrescu35149b22020-08-28 10:26:28 +020033#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
34
35typedef __u64 u64;
36typedef __u32 u32;
37typedef __u16 u16;
38typedef __u8 u8;
39
40/* This program illustrates the packet forwarding between multiple AF_XDP
41 * sockets in multi-threaded environment. All threads are sharing a common
42 * buffer pool, with each socket having its own private buffer cache.
43 *
44 * Example 1: Single thread handling two sockets. The packets received by socket
45 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
46 * QB), while the packets received by socket B are forwarded to socket A. The
47 * thread is running on CPU core X:
48 *
49 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
50 *
51 * Example 2: Two threads, each handling two sockets. The thread running on CPU
52 * core X forwards all the packets received by socket A to socket B, and all the
53 * packets received by socket B to socket A. The thread running on CPU core Y is
54 * performing the same packet forwarding between sockets C and D:
55 *
56 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
57 * -c CX -c CY
58 */
59
60/*
61 * Buffer pool and buffer cache
62 *
63 * For packet forwarding, the packet buffers are typically allocated from the
64 * pool for packet reception and freed back to the pool for further reuse once
65 * the packet transmission is completed.
66 *
67 * The buffer pool is shared between multiple threads. In order to minimize the
68 * access latency to the shared buffer pool, each thread creates one (or
69 * several) buffer caches, which, unlike the buffer pool, are private to the
70 * thread that creates them and therefore cannot be shared with other threads.
71 * The access to the shared pool is only needed either (A) when the cache gets
72 * empty due to repeated buffer allocations and it needs to be replenished from
73 * the pool, or (B) when the cache gets full due to repeated buffer free and it
74 * needs to be flushed back to the pull.
75 *
76 * In a packet forwarding system, a packet received on any input port can
77 * potentially be transmitted on any output port, depending on the forwarding
78 * configuration. For AF_XDP sockets, for this to work with zero-copy of the
79 * packet buffers when, it is required that the buffer pool memory fits into the
80 * UMEM area shared by all the sockets.
81 */
82
83struct bpool_params {
84 u32 n_buffers;
85 u32 buffer_size;
86 int mmap_flags;
87
88 u32 n_users_max;
89 u32 n_buffers_per_slab;
90};
91
92/* This buffer pool implementation organizes the buffers into equally sized
93 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
94 * pool that are completely filled with buffer pointers (full slabs).
95 *
96 * Each buffer cache has a slab for buffer allocation and a slab for buffer
97 * free, with both of these slabs initially empty. When the cache's allocation
98 * slab goes empty, it is swapped with one of the available full slabs from the
99 * pool, if any is available. When the cache's free slab goes full, it is
100 * swapped for one of the empty slabs from the pool, which is guaranteed to
101 * succeed.
102 *
103 * Partially filled slabs never get traded between the cache and the pool
104 * (except when the cache itself is destroyed), which enables fast operation
105 * through pointer swapping.
106 */
107struct bpool {
108 struct bpool_params params;
109 pthread_mutex_t lock;
110 void *addr;
111
112 u64 **slabs;
113 u64 **slabs_reserved;
114 u64 *buffers;
115 u64 *buffers_reserved;
116
117 u64 n_slabs;
118 u64 n_slabs_reserved;
119 u64 n_buffers;
120
121 u64 n_slabs_available;
122 u64 n_slabs_reserved_available;
123
124 struct xsk_umem_config umem_cfg;
125 struct xsk_ring_prod umem_fq;
126 struct xsk_ring_cons umem_cq;
127 struct xsk_umem *umem;
128};
129
130static struct bpool *
131bpool_init(struct bpool_params *params,
132 struct xsk_umem_config *umem_cfg)
133{
134 struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
135 u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
136 u64 slabs_size, slabs_reserved_size;
137 u64 buffers_size, buffers_reserved_size;
138 u64 total_size, i;
139 struct bpool *bp;
140 u8 *p;
141 int status;
142
143 /* mmap prep. */
144 if (setrlimit(RLIMIT_MEMLOCK, &r))
145 return NULL;
146
147 /* bpool internals dimensioning. */
148 n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
149 params->n_buffers_per_slab;
150 n_slabs_reserved = params->n_users_max * 2;
151 n_buffers = n_slabs * params->n_buffers_per_slab;
152 n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
153
154 slabs_size = n_slabs * sizeof(u64 *);
155 slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
156 buffers_size = n_buffers * sizeof(u64);
157 buffers_reserved_size = n_buffers_reserved * sizeof(u64);
158
159 total_size = sizeof(struct bpool) +
160 slabs_size + slabs_reserved_size +
161 buffers_size + buffers_reserved_size;
162
163 /* bpool memory allocation. */
164 p = calloc(total_size, sizeof(u8));
165 if (!p)
166 return NULL;
167
168 /* bpool memory initialization. */
169 bp = (struct bpool *)p;
170 memcpy(&bp->params, params, sizeof(*params));
171 bp->params.n_buffers = n_buffers;
172
173 bp->slabs = (u64 **)&p[sizeof(struct bpool)];
174 bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
175 slabs_size];
176 bp->buffers = (u64 *)&p[sizeof(struct bpool) +
177 slabs_size + slabs_reserved_size];
178 bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
179 slabs_size + slabs_reserved_size + buffers_size];
180
181 bp->n_slabs = n_slabs;
182 bp->n_slabs_reserved = n_slabs_reserved;
183 bp->n_buffers = n_buffers;
184
185 for (i = 0; i < n_slabs; i++)
186 bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
187 bp->n_slabs_available = n_slabs;
188
189 for (i = 0; i < n_slabs_reserved; i++)
190 bp->slabs_reserved[i] = &bp->buffers_reserved[i *
191 params->n_buffers_per_slab];
192 bp->n_slabs_reserved_available = n_slabs_reserved;
193
194 for (i = 0; i < n_buffers; i++)
195 bp->buffers[i] = i * params->buffer_size;
196
197 /* lock. */
198 status = pthread_mutex_init(&bp->lock, NULL);
199 if (status) {
200 free(p);
201 return NULL;
202 }
203
204 /* mmap. */
205 bp->addr = mmap(NULL,
206 n_buffers * params->buffer_size,
207 PROT_READ | PROT_WRITE,
208 MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
209 -1,
210 0);
211 if (bp->addr == MAP_FAILED) {
212 pthread_mutex_destroy(&bp->lock);
213 free(p);
214 return NULL;
215 }
216
217 /* umem. */
218 status = xsk_umem__create(&bp->umem,
219 bp->addr,
220 bp->params.n_buffers * bp->params.buffer_size,
221 &bp->umem_fq,
222 &bp->umem_cq,
223 umem_cfg);
224 if (status) {
225 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
226 pthread_mutex_destroy(&bp->lock);
227 free(p);
228 return NULL;
229 }
230 memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
231
232 return bp;
233}
234
235static void
236bpool_free(struct bpool *bp)
237{
238 if (!bp)
239 return;
240
241 xsk_umem__delete(bp->umem);
242 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
243 pthread_mutex_destroy(&bp->lock);
244 free(bp);
245}
246
247struct bcache {
248 struct bpool *bp;
249
250 u64 *slab_cons;
251 u64 *slab_prod;
252
253 u64 n_buffers_cons;
254 u64 n_buffers_prod;
255};
256
257static u32
258bcache_slab_size(struct bcache *bc)
259{
260 struct bpool *bp = bc->bp;
261
262 return bp->params.n_buffers_per_slab;
263}
264
265static struct bcache *
266bcache_init(struct bpool *bp)
267{
268 struct bcache *bc;
269
270 bc = calloc(1, sizeof(struct bcache));
271 if (!bc)
272 return NULL;
273
274 bc->bp = bp;
275 bc->n_buffers_cons = 0;
276 bc->n_buffers_prod = 0;
277
278 pthread_mutex_lock(&bp->lock);
279 if (bp->n_slabs_reserved_available == 0) {
280 pthread_mutex_unlock(&bp->lock);
281 free(bc);
282 return NULL;
283 }
284
285 bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
286 bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
287 bp->n_slabs_reserved_available -= 2;
288 pthread_mutex_unlock(&bp->lock);
289
290 return bc;
291}
292
293static void
294bcache_free(struct bcache *bc)
295{
296 struct bpool *bp;
297
298 if (!bc)
299 return;
300
301 /* In order to keep this example simple, the case of freeing any
302 * existing buffers from the cache back to the pool is ignored.
303 */
304
305 bp = bc->bp;
306 pthread_mutex_lock(&bp->lock);
307 bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
308 bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
309 bp->n_slabs_reserved_available += 2;
310 pthread_mutex_unlock(&bp->lock);
311
312 free(bc);
313}
314
315/* To work correctly, the implementation requires that the *n_buffers* input
316 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
317 * is typically the case, with one exception taking place when large number of
318 * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
319 */
320static inline u32
321bcache_cons_check(struct bcache *bc, u32 n_buffers)
322{
323 struct bpool *bp = bc->bp;
324 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
325 u64 n_buffers_cons = bc->n_buffers_cons;
326 u64 n_slabs_available;
327 u64 *slab_full;
328
329 /*
330 * Consumer slab is not empty: Use what's available locally. Do not
331 * look for more buffers from the pool when the ask can only be
332 * partially satisfied.
333 */
334 if (n_buffers_cons)
335 return (n_buffers_cons < n_buffers) ?
336 n_buffers_cons :
337 n_buffers;
338
339 /*
340 * Consumer slab is empty: look to trade the current consumer slab
341 * (full) for a full slab from the pool, if any is available.
342 */
343 pthread_mutex_lock(&bp->lock);
344 n_slabs_available = bp->n_slabs_available;
345 if (!n_slabs_available) {
346 pthread_mutex_unlock(&bp->lock);
347 return 0;
348 }
349
350 n_slabs_available--;
351 slab_full = bp->slabs[n_slabs_available];
352 bp->slabs[n_slabs_available] = bc->slab_cons;
353 bp->n_slabs_available = n_slabs_available;
354 pthread_mutex_unlock(&bp->lock);
355
356 bc->slab_cons = slab_full;
357 bc->n_buffers_cons = n_buffers_per_slab;
358 return n_buffers;
359}
360
361static inline u64
362bcache_cons(struct bcache *bc)
363{
364 u64 n_buffers_cons = bc->n_buffers_cons - 1;
365 u64 buffer;
366
367 buffer = bc->slab_cons[n_buffers_cons];
368 bc->n_buffers_cons = n_buffers_cons;
369 return buffer;
370}
371
372static inline void
373bcache_prod(struct bcache *bc, u64 buffer)
374{
375 struct bpool *bp = bc->bp;
376 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
377 u64 n_buffers_prod = bc->n_buffers_prod;
378 u64 n_slabs_available;
379 u64 *slab_empty;
380
381 /*
382 * Producer slab is not yet full: store the current buffer to it.
383 */
384 if (n_buffers_prod < n_buffers_per_slab) {
385 bc->slab_prod[n_buffers_prod] = buffer;
386 bc->n_buffers_prod = n_buffers_prod + 1;
387 return;
388 }
389
390 /*
391 * Producer slab is full: trade the cache's current producer slab
392 * (full) for an empty slab from the pool, then store the current
393 * buffer to the new producer slab. As one full slab exists in the
394 * cache, it is guaranteed that there is at least one empty slab
395 * available in the pool.
396 */
397 pthread_mutex_lock(&bp->lock);
398 n_slabs_available = bp->n_slabs_available;
399 slab_empty = bp->slabs[n_slabs_available];
400 bp->slabs[n_slabs_available] = bc->slab_prod;
401 bp->n_slabs_available = n_slabs_available + 1;
402 pthread_mutex_unlock(&bp->lock);
403
404 slab_empty[0] = buffer;
405 bc->slab_prod = slab_empty;
406 bc->n_buffers_prod = 1;
407}
408
409/*
410 * Port
411 *
412 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
413 * packet forwarding to happen with no packet buffer copy, all the sockets need
414 * to share the same UMEM area, which is used as the buffer pool memory.
415 */
416#ifndef MAX_BURST_RX
417#define MAX_BURST_RX 64
418#endif
419
420#ifndef MAX_BURST_TX
421#define MAX_BURST_TX 64
422#endif
423
424struct burst_rx {
425 u64 addr[MAX_BURST_RX];
426 u32 len[MAX_BURST_RX];
427};
428
429struct burst_tx {
430 u64 addr[MAX_BURST_TX];
431 u32 len[MAX_BURST_TX];
432 u32 n_pkts;
433};
434
435struct port_params {
436 struct xsk_socket_config xsk_cfg;
437 struct bpool *bp;
438 const char *iface;
439 u32 iface_queue;
440};
441
442struct port {
443 struct port_params params;
444
445 struct bcache *bc;
446
447 struct xsk_ring_cons rxq;
448 struct xsk_ring_prod txq;
449 struct xsk_ring_prod umem_fq;
450 struct xsk_ring_cons umem_cq;
451 struct xsk_socket *xsk;
452 int umem_fq_initialized;
453
454 u64 n_pkts_rx;
455 u64 n_pkts_tx;
456};
457
458static void
459port_free(struct port *p)
460{
461 if (!p)
462 return;
463
464 /* To keep this example simple, the code to free the buffers from the
465 * socket's receive and transmit queues, as well as from the UMEM fill
466 * and completion queues, is not included.
467 */
468
469 if (p->xsk)
470 xsk_socket__delete(p->xsk);
471
472 bcache_free(p->bc);
473
474 free(p);
475}
476
477static struct port *
478port_init(struct port_params *params)
479{
480 struct port *p;
481 u32 umem_fq_size, pos = 0;
482 int status, i;
483
484 /* Memory allocation and initialization. */
485 p = calloc(sizeof(struct port), 1);
486 if (!p)
487 return NULL;
488
489 memcpy(&p->params, params, sizeof(p->params));
490 umem_fq_size = params->bp->umem_cfg.fill_size;
491
492 /* bcache. */
493 p->bc = bcache_init(params->bp);
494 if (!p->bc ||
495 (bcache_slab_size(p->bc) < umem_fq_size) ||
496 (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
497 port_free(p);
498 return NULL;
499 }
500
501 /* xsk socket. */
502 status = xsk_socket__create_shared(&p->xsk,
503 params->iface,
504 params->iface_queue,
505 params->bp->umem,
506 &p->rxq,
507 &p->txq,
508 &p->umem_fq,
509 &p->umem_cq,
510 &params->xsk_cfg);
511 if (status) {
512 port_free(p);
513 return NULL;
514 }
515
516 /* umem fq. */
517 xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
518
519 for (i = 0; i < umem_fq_size; i++)
520 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
521 bcache_cons(p->bc);
522
523 xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
524 p->umem_fq_initialized = 1;
525
526 return p;
527}
528
529static inline u32
530port_rx_burst(struct port *p, struct burst_rx *b)
531{
532 u32 n_pkts, pos, i;
533
534 /* Free buffers for FQ replenish. */
535 n_pkts = ARRAY_SIZE(b->addr);
536
537 n_pkts = bcache_cons_check(p->bc, n_pkts);
538 if (!n_pkts)
539 return 0;
540
541 /* RXQ. */
542 n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
543 if (!n_pkts) {
544 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
545 struct pollfd pollfd = {
546 .fd = xsk_socket__fd(p->xsk),
547 .events = POLLIN,
548 };
549
550 poll(&pollfd, 1, 0);
551 }
552 return 0;
553 }
554
555 for (i = 0; i < n_pkts; i++) {
556 b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
557 b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
558 }
559
560 xsk_ring_cons__release(&p->rxq, n_pkts);
561 p->n_pkts_rx += n_pkts;
562
563 /* UMEM FQ. */
564 for ( ; ; ) {
565 int status;
566
567 status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
568 if (status == n_pkts)
569 break;
570
571 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
572 struct pollfd pollfd = {
573 .fd = xsk_socket__fd(p->xsk),
574 .events = POLLIN,
575 };
576
577 poll(&pollfd, 1, 0);
578 }
579 }
580
581 for (i = 0; i < n_pkts; i++)
582 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
583 bcache_cons(p->bc);
584
585 xsk_ring_prod__submit(&p->umem_fq, n_pkts);
586
587 return n_pkts;
588}
589
590static inline void
591port_tx_burst(struct port *p, struct burst_tx *b)
592{
593 u32 n_pkts, pos, i;
594 int status;
595
596 /* UMEM CQ. */
597 n_pkts = p->params.bp->umem_cfg.comp_size;
598
599 n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
600
601 for (i = 0; i < n_pkts; i++) {
602 u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
603
604 bcache_prod(p->bc, addr);
605 }
606
607 xsk_ring_cons__release(&p->umem_cq, n_pkts);
608
609 /* TXQ. */
610 n_pkts = b->n_pkts;
611
612 for ( ; ; ) {
613 status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
614 if (status == n_pkts)
615 break;
616
617 if (xsk_ring_prod__needs_wakeup(&p->txq))
618 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
619 NULL, 0);
620 }
621
622 for (i = 0; i < n_pkts; i++) {
623 xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
624 xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
625 }
626
627 xsk_ring_prod__submit(&p->txq, n_pkts);
628 if (xsk_ring_prod__needs_wakeup(&p->txq))
629 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
630 p->n_pkts_tx += n_pkts;
631}
632
633/*
634 * Thread
635 *
636 * Packet forwarding threads.
637 */
638#ifndef MAX_PORTS_PER_THREAD
639#define MAX_PORTS_PER_THREAD 16
640#endif
641
642struct thread_data {
643 struct port *ports_rx[MAX_PORTS_PER_THREAD];
644 struct port *ports_tx[MAX_PORTS_PER_THREAD];
645 u32 n_ports_rx;
646 struct burst_rx burst_rx;
647 struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
648 u32 cpu_core_id;
649 int quit;
650};
651
652static void swap_mac_addresses(void *data)
653{
654 struct ether_header *eth = (struct ether_header *)data;
655 struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
656 struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
657 struct ether_addr tmp;
658
659 tmp = *src_addr;
660 *src_addr = *dst_addr;
661 *dst_addr = tmp;
662}
663
664static void *
665thread_func(void *arg)
666{
667 struct thread_data *t = arg;
668 cpu_set_t cpu_cores;
669 u32 i;
670
671 CPU_ZERO(&cpu_cores);
672 CPU_SET(t->cpu_core_id, &cpu_cores);
673 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
674
675 for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
676 struct port *port_rx = t->ports_rx[i];
677 struct port *port_tx = t->ports_tx[i];
678 struct burst_rx *brx = &t->burst_rx;
679 struct burst_tx *btx = &t->burst_tx[i];
680 u32 n_pkts, j;
681
682 /* RX. */
683 n_pkts = port_rx_burst(port_rx, brx);
684 if (!n_pkts)
685 continue;
686
687 /* Process & TX. */
688 for (j = 0; j < n_pkts; j++) {
689 u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
690 u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
691 addr);
692
693 swap_mac_addresses(pkt);
694
695 btx->addr[btx->n_pkts] = brx->addr[j];
696 btx->len[btx->n_pkts] = brx->len[j];
697 btx->n_pkts++;
698
699 if (btx->n_pkts == MAX_BURST_TX) {
700 port_tx_burst(port_tx, btx);
701 btx->n_pkts = 0;
702 }
703 }
704 }
705
706 return NULL;
707}
708
709/*
710 * Process
711 */
712static const struct bpool_params bpool_params_default = {
713 .n_buffers = 64 * 1024,
714 .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
715 .mmap_flags = 0,
716
717 .n_users_max = 16,
718 .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
719};
720
721static const struct xsk_umem_config umem_cfg_default = {
722 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
723 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
724 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
725 .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
726 .flags = 0,
727};
728
729static const struct port_params port_params_default = {
730 .xsk_cfg = {
731 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
732 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
733 .libbpf_flags = 0,
734 .xdp_flags = XDP_FLAGS_DRV_MODE,
735 .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
736 },
737
738 .bp = NULL,
739 .iface = NULL,
740 .iface_queue = 0,
741};
742
743#ifndef MAX_PORTS
744#define MAX_PORTS 64
745#endif
746
747#ifndef MAX_THREADS
748#define MAX_THREADS 64
749#endif
750
751static struct bpool_params bpool_params;
752static struct xsk_umem_config umem_cfg;
753static struct bpool *bp;
754
755static struct port_params port_params[MAX_PORTS];
756static struct port *ports[MAX_PORTS];
757static u64 n_pkts_rx[MAX_PORTS];
758static u64 n_pkts_tx[MAX_PORTS];
759static int n_ports;
760
761static pthread_t threads[MAX_THREADS];
762static struct thread_data thread_data[MAX_THREADS];
763static int n_threads;
764
765static void
766print_usage(char *prog_name)
767{
768 const char *usage =
769 "Usage:\n"
770 "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
771 "\n"
772 "-c CORE CPU core to run a packet forwarding thread\n"
773 " on. May be invoked multiple times.\n"
774 "\n"
775 "-b SIZE Number of buffers in the buffer pool shared\n"
776 " by all the forwarding threads. Default: %u.\n"
777 "\n"
778 "-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n"
779 " pair specifies one forwarding port. May be\n"
780 " invoked multiple times.\n"
781 "\n"
782 "-q QUEUE Network interface queue for RX and TX. Each\n"
783 " (INTERFACE, QUEUE) pair specified one\n"
784 " forwarding port. Default: %u. May be invoked\n"
785 " multiple times.\n"
786 "\n";
787 printf(usage,
788 prog_name,
789 bpool_params_default.n_buffers,
790 port_params_default.iface_queue);
791}
792
793static int
794parse_args(int argc, char **argv)
795{
796 struct option lgopts[] = {
797 { NULL, 0, 0, 0 }
798 };
799 int opt, option_index;
800
801 /* Parse the input arguments. */
802 for ( ; ;) {
803 opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
804 if (opt == EOF)
805 break;
806
807 switch (opt) {
808 case 'b':
809 bpool_params.n_buffers = atoi(optarg);
810 break;
811
812 case 'c':
813 if (n_threads == MAX_THREADS) {
814 printf("Max number of threads (%d) reached.\n",
815 MAX_THREADS);
816 return -1;
817 }
818
819 thread_data[n_threads].cpu_core_id = atoi(optarg);
820 n_threads++;
821 break;
822
823 case 'i':
824 if (n_ports == MAX_PORTS) {
825 printf("Max number of ports (%d) reached.\n",
826 MAX_PORTS);
827 return -1;
828 }
829
830 port_params[n_ports].iface = optarg;
831 port_params[n_ports].iface_queue = 0;
832 n_ports++;
833 break;
834
835 case 'q':
836 if (n_ports == 0) {
837 printf("No port specified for queue.\n");
838 return -1;
839 }
840 port_params[n_ports - 1].iface_queue = atoi(optarg);
841 break;
842
843 default:
844 printf("Illegal argument.\n");
845 return -1;
846 }
847 }
848
849 optind = 1; /* reset getopt lib */
850
851 /* Check the input arguments. */
852 if (!n_ports) {
853 printf("No ports specified.\n");
854 return -1;
855 }
856
857 if (!n_threads) {
858 printf("No threads specified.\n");
859 return -1;
860 }
861
862 if (n_ports % n_threads) {
863 printf("Ports cannot be evenly distributed to threads.\n");
864 return -1;
865 }
866
867 return 0;
868}
869
870static void
871print_port(u32 port_id)
872{
873 struct port *port = ports[port_id];
874
875 printf("Port %u: interface = %s, queue = %u\n",
876 port_id, port->params.iface, port->params.iface_queue);
877}
878
879static void
880print_thread(u32 thread_id)
881{
882 struct thread_data *t = &thread_data[thread_id];
883 u32 i;
884
885 printf("Thread %u (CPU core %u): ",
886 thread_id, t->cpu_core_id);
887
888 for (i = 0; i < t->n_ports_rx; i++) {
889 struct port *port_rx = t->ports_rx[i];
890 struct port *port_tx = t->ports_tx[i];
891
892 printf("(%s, %u) -> (%s, %u), ",
893 port_rx->params.iface,
894 port_rx->params.iface_queue,
895 port_tx->params.iface,
896 port_tx->params.iface_queue);
897 }
898
899 printf("\n");
900}
901
902static void
903print_port_stats_separator(void)
904{
905 printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
906 "----",
907 "------------",
908 "-------------",
909 "------------",
910 "-------------");
911}
912
913static void
914print_port_stats_header(void)
915{
916 print_port_stats_separator();
917 printf("| %4s | %12s | %13s | %12s | %13s |\n",
918 "Port",
919 "RX packets",
920 "RX rate (pps)",
921 "TX packets",
922 "TX_rate (pps)");
923 print_port_stats_separator();
924}
925
926static void
927print_port_stats_trailer(void)
928{
929 print_port_stats_separator();
930 printf("\n");
931}
932
933static void
934print_port_stats(int port_id, u64 ns_diff)
935{
936 struct port *p = ports[port_id];
937 double rx_pps, tx_pps;
938
939 rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
940 tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
941
942 printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
943 port_id,
944 p->n_pkts_rx,
945 rx_pps,
946 p->n_pkts_tx,
947 tx_pps);
948
949 n_pkts_rx[port_id] = p->n_pkts_rx;
950 n_pkts_tx[port_id] = p->n_pkts_tx;
951}
952
953static void
954print_port_stats_all(u64 ns_diff)
955{
956 int i;
957
958 print_port_stats_header();
959 for (i = 0; i < n_ports; i++)
960 print_port_stats(i, ns_diff);
961 print_port_stats_trailer();
962}
963
964static int quit;
965
966static void
967signal_handler(int sig)
968{
969 quit = 1;
970}
971
972static void remove_xdp_program(void)
973{
974 int i;
975
976 for (i = 0 ; i < n_ports; i++)
977 bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
978 port_params[i].xsk_cfg.xdp_flags);
979}
980
981int main(int argc, char **argv)
982{
983 struct timespec time;
984 u64 ns0;
985 int i;
986
987 /* Parse args. */
988 memcpy(&bpool_params, &bpool_params_default,
989 sizeof(struct bpool_params));
990 memcpy(&umem_cfg, &umem_cfg_default,
991 sizeof(struct xsk_umem_config));
992 for (i = 0; i < MAX_PORTS; i++)
993 memcpy(&port_params[i], &port_params_default,
994 sizeof(struct port_params));
995
996 if (parse_args(argc, argv)) {
997 print_usage(argv[0]);
998 return -1;
999 }
1000
1001 /* Buffer pool initialization. */
1002 bp = bpool_init(&bpool_params, &umem_cfg);
1003 if (!bp) {
1004 printf("Buffer pool initialization failed.\n");
1005 return -1;
1006 }
1007 printf("Buffer pool created successfully.\n");
1008
1009 /* Ports initialization. */
1010 for (i = 0; i < MAX_PORTS; i++)
1011 port_params[i].bp = bp;
1012
1013 for (i = 0; i < n_ports; i++) {
1014 ports[i] = port_init(&port_params[i]);
1015 if (!ports[i]) {
1016 printf("Port %d initialization failed.\n", i);
1017 return -1;
1018 }
1019 print_port(i);
1020 }
1021 printf("All ports created successfully.\n");
1022
1023 /* Threads. */
1024 for (i = 0; i < n_threads; i++) {
1025 struct thread_data *t = &thread_data[i];
1026 u32 n_ports_per_thread = n_ports / n_threads, j;
1027
1028 for (j = 0; j < n_ports_per_thread; j++) {
1029 t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1030 t->ports_tx[j] = ports[i * n_ports_per_thread +
1031 (j + 1) % n_ports_per_thread];
1032 }
1033
1034 t->n_ports_rx = n_ports_per_thread;
1035
1036 print_thread(i);
1037 }
1038
1039 for (i = 0; i < n_threads; i++) {
1040 int status;
1041
1042 status = pthread_create(&threads[i],
1043 NULL,
1044 thread_func,
1045 &thread_data[i]);
1046 if (status) {
1047 printf("Thread %d creation failed.\n", i);
1048 return -1;
1049 }
1050 }
1051 printf("All threads created successfully.\n");
1052
1053 /* Print statistics. */
1054 signal(SIGINT, signal_handler);
1055 signal(SIGTERM, signal_handler);
1056 signal(SIGABRT, signal_handler);
1057
1058 clock_gettime(CLOCK_MONOTONIC, &time);
1059 ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1060 for ( ; !quit; ) {
1061 u64 ns1, ns_diff;
1062
1063 sleep(1);
1064 clock_gettime(CLOCK_MONOTONIC, &time);
1065 ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1066 ns_diff = ns1 - ns0;
1067 ns0 = ns1;
1068
1069 print_port_stats_all(ns_diff);
1070 }
1071
1072 /* Threads completion. */
1073 printf("Quit.\n");
1074 for (i = 0; i < n_threads; i++)
1075 thread_data[i].quit = 1;
1076
1077 for (i = 0; i < n_threads; i++)
1078 pthread_join(threads[i], NULL);
1079
1080 for (i = 0; i < n_ports; i++)
1081 port_free(ports[i]);
1082
1083 bpool_free(bp);
1084
1085 remove_xdp_program();
1086
1087 return 0;
1088}