blob: e38f471a5402d45a071aab264118915d6add3eac [file] [log] [blame]
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +03001/*
2 * Definitions for the 'struct ptr_ring' datastructure.
3 *
4 * Author:
5 * Michael S. Tsirkin <mst@redhat.com>
6 *
7 * Copyright (C) 2016 Red Hat, Inc.
8 *
9 * This program is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2 of the License, or (at your
12 * option) any later version.
13 *
14 * This is a limited-size FIFO maintaining pointers in FIFO order, with
15 * one CPU producing entries and another consuming entries from a FIFO.
16 *
17 * This implementation tries to minimize cache-contention when there is a
18 * single producer and a single consumer CPU.
19 */
20
21#ifndef _LINUX_PTR_RING_H
22#define _LINUX_PTR_RING_H 1
23
24#ifdef __KERNEL__
25#include <linux/spinlock.h>
26#include <linux/cache.h>
27#include <linux/types.h>
28#include <linux/compiler.h>
29#include <linux/cache.h>
30#include <linux/slab.h>
31#include <asm/errno.h>
32#endif
33
34struct ptr_ring {
35 int producer ____cacheline_aligned_in_smp;
36 spinlock_t producer_lock;
37 int consumer ____cacheline_aligned_in_smp;
38 spinlock_t consumer_lock;
39 /* Shared consumer/producer data */
40 /* Read-only by both the producer and the consumer */
41 int size ____cacheline_aligned_in_smp; /* max entries in queue */
42 void **queue;
43};
44
45/* Note: callers invoking this in a loop must use a compiler barrier,
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +030046 * for example cpu_relax(). If ring is ever resized, callers must hold
47 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
48 * producer_lock, the next call to __ptr_ring_produce may fail.
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +030049 */
50static inline bool __ptr_ring_full(struct ptr_ring *r)
51{
52 return r->queue[r->producer];
53}
54
55static inline bool ptr_ring_full(struct ptr_ring *r)
56{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +030057 bool ret;
58
59 spin_lock(&r->producer_lock);
60 ret = __ptr_ring_full(r);
61 spin_unlock(&r->producer_lock);
62
63 return ret;
64}
65
66static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67{
68 bool ret;
69
70 spin_lock_irq(&r->producer_lock);
71 ret = __ptr_ring_full(r);
72 spin_unlock_irq(&r->producer_lock);
73
74 return ret;
75}
76
77static inline bool ptr_ring_full_any(struct ptr_ring *r)
78{
79 unsigned long flags;
80 bool ret;
81
82 spin_lock_irqsave(&r->producer_lock, flags);
83 ret = __ptr_ring_full(r);
84 spin_unlock_irqrestore(&r->producer_lock, flags);
85
86 return ret;
87}
88
89static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90{
91 bool ret;
92
93 spin_lock_bh(&r->producer_lock);
94 ret = __ptr_ring_full(r);
95 spin_unlock_bh(&r->producer_lock);
96
97 return ret;
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +030098}
99
100/* Note: callers invoking this in a loop must use a compiler barrier,
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300101 * for example cpu_relax(). Callers must hold producer_lock.
Michael S. Tsirkin8b032bd2017-12-05 21:29:37 +0200102 * Callers are responsible for making sure pointer that is being queued
103 * points to a valid data.
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300104 */
105static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
106{
Jason Wang982fb492016-06-30 14:45:31 +0800107 if (unlikely(!r->size) || r->queue[r->producer])
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300108 return -ENOSPC;
109
Michael S. Tsirkin8b032bd2017-12-05 21:29:37 +0200110 /* Make sure the pointer we are storing points to a valid data. */
111 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
112 smp_wmb();
113
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300114 r->queue[r->producer++] = ptr;
115 if (unlikely(r->producer >= r->size))
116 r->producer = 0;
117 return 0;
118}
119
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200120/*
121 * Note: resize (below) nests producer lock within consumer lock, so if you
122 * consume in interrupt or BH context, you must disable interrupts/BH when
123 * calling this.
124 */
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300125static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
126{
127 int ret;
128
129 spin_lock(&r->producer_lock);
130 ret = __ptr_ring_produce(r, ptr);
131 spin_unlock(&r->producer_lock);
132
133 return ret;
134}
135
136static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
137{
138 int ret;
139
140 spin_lock_irq(&r->producer_lock);
141 ret = __ptr_ring_produce(r, ptr);
142 spin_unlock_irq(&r->producer_lock);
143
144 return ret;
145}
146
147static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
148{
149 unsigned long flags;
150 int ret;
151
152 spin_lock_irqsave(&r->producer_lock, flags);
153 ret = __ptr_ring_produce(r, ptr);
154 spin_unlock_irqrestore(&r->producer_lock, flags);
155
156 return ret;
157}
158
159static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
160{
161 int ret;
162
163 spin_lock_bh(&r->producer_lock);
164 ret = __ptr_ring_produce(r, ptr);
165 spin_unlock_bh(&r->producer_lock);
166
167 return ret;
168}
169
170/* Note: callers invoking this in a loop must use a compiler barrier,
171 * for example cpu_relax(). Callers must take consumer_lock
172 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300173 * If ring is never resized, and if the pointer is merely
174 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300175 */
176static inline void *__ptr_ring_peek(struct ptr_ring *r)
177{
Jason Wang982fb492016-06-30 14:45:31 +0800178 if (likely(r->size))
179 return r->queue[r->consumer];
180 return NULL;
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300181}
182
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300183/* Note: callers invoking this in a loop must use a compiler barrier,
184 * for example cpu_relax(). Callers must take consumer_lock
185 * if the ring is ever resized - see e.g. ptr_ring_empty.
186 */
187static inline bool __ptr_ring_empty(struct ptr_ring *r)
188{
189 return !__ptr_ring_peek(r);
190}
191
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300192static inline bool ptr_ring_empty(struct ptr_ring *r)
193{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300194 bool ret;
195
196 spin_lock(&r->consumer_lock);
197 ret = __ptr_ring_empty(r);
198 spin_unlock(&r->consumer_lock);
199
200 return ret;
201}
202
203static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
204{
205 bool ret;
206
207 spin_lock_irq(&r->consumer_lock);
208 ret = __ptr_ring_empty(r);
209 spin_unlock_irq(&r->consumer_lock);
210
211 return ret;
212}
213
214static inline bool ptr_ring_empty_any(struct ptr_ring *r)
215{
216 unsigned long flags;
217 bool ret;
218
219 spin_lock_irqsave(&r->consumer_lock, flags);
220 ret = __ptr_ring_empty(r);
221 spin_unlock_irqrestore(&r->consumer_lock, flags);
222
223 return ret;
224}
225
226static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
227{
228 bool ret;
229
230 spin_lock_bh(&r->consumer_lock);
231 ret = __ptr_ring_empty(r);
232 spin_unlock_bh(&r->consumer_lock);
233
234 return ret;
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300235}
236
237/* Must only be called after __ptr_ring_peek returned !NULL */
238static inline void __ptr_ring_discard_one(struct ptr_ring *r)
239{
240 r->queue[r->consumer++] = NULL;
241 if (unlikely(r->consumer >= r->size))
242 r->consumer = 0;
243}
244
245static inline void *__ptr_ring_consume(struct ptr_ring *r)
246{
247 void *ptr;
248
249 ptr = __ptr_ring_peek(r);
250 if (ptr)
251 __ptr_ring_discard_one(r);
252
Michael S. Tsirkin8b032bd2017-12-05 21:29:37 +0200253 /* Make sure anyone accessing data through the pointer is up to date. */
254 /* Pairs with smp_wmb in __ptr_ring_produce. */
255 smp_read_barrier_depends();
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300256 return ptr;
257}
258
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200259/*
260 * Note: resize (below) nests producer lock within consumer lock, so if you
261 * call this in interrupt or BH context, you must disable interrupts/BH when
262 * producing.
263 */
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300264static inline void *ptr_ring_consume(struct ptr_ring *r)
265{
266 void *ptr;
267
268 spin_lock(&r->consumer_lock);
269 ptr = __ptr_ring_consume(r);
270 spin_unlock(&r->consumer_lock);
271
272 return ptr;
273}
274
275static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
276{
277 void *ptr;
278
279 spin_lock_irq(&r->consumer_lock);
280 ptr = __ptr_ring_consume(r);
281 spin_unlock_irq(&r->consumer_lock);
282
283 return ptr;
284}
285
286static inline void *ptr_ring_consume_any(struct ptr_ring *r)
287{
288 unsigned long flags;
289 void *ptr;
290
291 spin_lock_irqsave(&r->consumer_lock, flags);
292 ptr = __ptr_ring_consume(r);
293 spin_unlock_irqrestore(&r->consumer_lock, flags);
294
295 return ptr;
296}
297
298static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
299{
300 void *ptr;
301
302 spin_lock_bh(&r->consumer_lock);
303 ptr = __ptr_ring_consume(r);
304 spin_unlock_bh(&r->consumer_lock);
305
306 return ptr;
307}
308
309/* Cast to structure type and call a function without discarding from FIFO.
310 * Function must return a value.
311 * Callers must take consumer_lock.
312 */
313#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
314
315#define PTR_RING_PEEK_CALL(r, f) ({ \
316 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
317 \
318 spin_lock(&(r)->consumer_lock); \
319 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
320 spin_unlock(&(r)->consumer_lock); \
321 __PTR_RING_PEEK_CALL_v; \
322})
323
324#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
325 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
326 \
327 spin_lock_irq(&(r)->consumer_lock); \
328 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
329 spin_unlock_irq(&(r)->consumer_lock); \
330 __PTR_RING_PEEK_CALL_v; \
331})
332
333#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
334 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
335 \
336 spin_lock_bh(&(r)->consumer_lock); \
337 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
338 spin_unlock_bh(&(r)->consumer_lock); \
339 __PTR_RING_PEEK_CALL_v; \
340})
341
342#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
343 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
344 unsigned long __PTR_RING_PEEK_CALL_f;\
345 \
346 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
347 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
348 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
349 __PTR_RING_PEEK_CALL_v; \
350})
351
Eric Dumazet59af5b82017-08-16 10:36:47 -0700352static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300353{
Eric Dumazet59af5b82017-08-16 10:36:47 -0700354 return kcalloc(size, sizeof(void *), gfp);
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300355}
356
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300357static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
358{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300359 r->queue = __ptr_ring_init_queue_alloc(size, gfp);
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300360 if (!r->queue)
361 return -ENOMEM;
362
363 r->size = size;
364 r->producer = r->consumer = 0;
365 spin_lock_init(&r->producer_lock);
366 spin_lock_init(&r->consumer_lock);
367
368 return 0;
369}
370
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800371static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
372 int size, gfp_t gfp,
373 void (*destroy)(void *))
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300374{
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300375 int producer = 0;
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300376 void **old;
377 void *ptr;
378
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200379 while ((ptr = __ptr_ring_consume(r)))
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300380 if (producer < size)
381 queue[producer++] = ptr;
382 else if (destroy)
383 destroy(ptr);
384
385 r->size = size;
386 r->producer = producer;
387 r->consumer = 0;
388 old = r->queue;
389 r->queue = queue;
390
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800391 return old;
392}
393
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200394/*
395 * Note: producer lock is nested within consumer lock, so if you
396 * resize you must make sure all uses nest correctly.
397 * In particular if you consume ring in interrupt or BH context, you must
398 * disable interrupts/BH when doing so.
399 */
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800400static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
401 void (*destroy)(void *))
402{
403 unsigned long flags;
404 void **queue = __ptr_ring_init_queue_alloc(size, gfp);
405 void **old;
406
407 if (!queue)
408 return -ENOMEM;
409
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200410 spin_lock_irqsave(&(r)->consumer_lock, flags);
411 spin_lock(&(r)->producer_lock);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800412
413 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
414
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200415 spin_unlock(&(r)->producer_lock);
416 spin_unlock_irqrestore(&(r)->consumer_lock, flags);
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300417
418 kfree(old);
419
420 return 0;
421}
422
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200423/*
424 * Note: producer lock is nested within consumer lock, so if you
425 * resize you must make sure all uses nest correctly.
426 * In particular if you consume ring in interrupt or BH context, you must
427 * disable interrupts/BH when doing so.
428 */
Eric Dumazet59af5b82017-08-16 10:36:47 -0700429static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
430 unsigned int nrings,
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800431 int size,
432 gfp_t gfp, void (*destroy)(void *))
433{
434 unsigned long flags;
435 void ***queues;
436 int i;
437
Eric Dumazet59af5b82017-08-16 10:36:47 -0700438 queues = kmalloc_array(nrings, sizeof(*queues), gfp);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800439 if (!queues)
440 goto noqueues;
441
442 for (i = 0; i < nrings; ++i) {
443 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
444 if (!queues[i])
445 goto nomem;
446 }
447
448 for (i = 0; i < nrings; ++i) {
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200449 spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
450 spin_lock(&(rings[i])->producer_lock);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800451 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
452 size, gfp, destroy);
Michael S. Tsirkin7c560122017-02-19 07:17:17 +0200453 spin_unlock(&(rings[i])->producer_lock);
454 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
Michael S. Tsirkin59e6ae52016-06-30 14:45:33 +0800455 }
456
457 for (i = 0; i < nrings; ++i)
458 kfree(queues[i]);
459
460 kfree(queues);
461
462 return 0;
463
464nomem:
465 while (--i >= 0)
466 kfree(queues[i]);
467
468 kfree(queues);
469
470noqueues:
471 return -ENOMEM;
472}
473
Michael S. Tsirkin5d49de52016-06-13 23:54:45 +0300474static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
475{
476 void *ptr;
477
478 if (destroy)
479 while ((ptr = ptr_ring_consume(r)))
480 destroy(ptr);
Michael S. Tsirkin2e0ab8c2016-06-13 23:54:31 +0300481 kfree(r->queue);
482}
483
484#endif /* _LINUX_PTR_RING_H */