blob: 95ca9338cb6c090c3fdefa9998b111542965d1d4 [file] [log] [blame]
Steven Rostedt7a8e76a2008-09-29 23:02:38 -04001/*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6#include <linux/ring_buffer.h>
7#include <linux/spinlock.h>
8#include <linux/debugfs.h>
9#include <linux/uaccess.h>
10#include <linux/module.h>
11#include <linux/percpu.h>
12#include <linux/mutex.h>
13#include <linux/sched.h> /* used for sched_clock() (for now) */
14#include <linux/init.h>
15#include <linux/hash.h>
16#include <linux/list.h>
17#include <linux/fs.h>
18
19/* Up this if you want to test the TIME_EXTENTS and normalization */
20#define DEBUG_SHIFT 0
21
22/* FIXME!!! */
23u64 ring_buffer_time_stamp(int cpu)
24{
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
27}
28
29void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30{
31 /* Just stupid testing the normalize function and deltas */
32 *ts >>= DEBUG_SHIFT;
33}
34
35#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36#define RB_ALIGNMENT_SHIFT 2
37#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38#define RB_MAX_SMALL_DATA 28
39
40enum {
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
43};
44
45/* inline for ring buffer fast paths */
46static inline unsigned
47rb_event_length(struct ring_buffer_event *event)
48{
49 unsigned length;
50
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
53 /* undefined */
54 return -1;
55
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
58
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
61
62 case RINGBUF_TYPE_DATA:
63 if (event->len)
64 length = event->len << RB_ALIGNMENT_SHIFT;
65 else
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
68 default:
69 BUG();
70 }
71 /* not hit */
72 return 0;
73}
74
75/**
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
78 */
79unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80{
81 return rb_event_length(event);
82}
83
84/* inline for ring buffer fast paths */
85static inline void *
86rb_event_data(struct ring_buffer_event *event)
87{
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
90 if (event->len)
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
94}
95
96/**
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
99 */
100void *ring_buffer_event_data(struct ring_buffer_event *event)
101{
102 return rb_event_data(event);
103}
104
105#define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
107
108#define TS_SHIFT 27
109#define TS_MASK ((1ULL << TS_SHIFT) - 1)
110#define TS_DELTA_TEST (~TS_MASK)
111
112/*
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
116 */
117struct buffer_page {
118 union {
119 struct {
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
125 };
126 struct page page;
127 };
128};
129
130/*
131 * We need to fit the time_stamp delta into 27 bits.
132 */
133static inline int test_time_stamp(u64 delta)
134{
135 if (delta & TS_DELTA_TEST)
136 return 1;
137 return 0;
138}
139
140#define BUF_PAGE_SIZE PAGE_SIZE
141
142/*
143 * head_page == tail_page && head == tail then buffer is empty.
144 */
145struct ring_buffer_per_cpu {
146 int cpu;
147 struct ring_buffer *buffer;
148 spinlock_t lock;
149 struct lock_class_key lock_key;
150 struct list_head pages;
151 unsigned long head; /* read from head */
152 unsigned long tail; /* write to tail */
153 struct buffer_page *head_page;
154 struct buffer_page *tail_page;
155 unsigned long overrun;
156 unsigned long entries;
157 u64 write_stamp;
158 u64 read_stamp;
159 atomic_t record_disabled;
160};
161
162struct ring_buffer {
163 unsigned long size;
164 unsigned pages;
165 unsigned flags;
166 int cpus;
167 cpumask_t cpumask;
168 atomic_t record_disabled;
169
170 struct mutex mutex;
171
172 struct ring_buffer_per_cpu **buffers;
173};
174
175struct ring_buffer_iter {
176 struct ring_buffer_per_cpu *cpu_buffer;
177 unsigned long head;
178 struct buffer_page *head_page;
179 u64 read_stamp;
180};
181
182#define RB_WARN_ON(buffer, cond) \
183 if (unlikely(cond)) { \
184 atomic_inc(&buffer->record_disabled); \
185 WARN_ON(1); \
186 return -1; \
187 }
188
189/**
190 * check_pages - integrity check of buffer pages
191 * @cpu_buffer: CPU buffer with pages to test
192 *
193 * As a safty measure we check to make sure the data pages have not
194 * been corrupted.
195 */
196static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
197{
198 struct list_head *head = &cpu_buffer->pages;
199 struct buffer_page *page, *tmp;
200
201 RB_WARN_ON(cpu_buffer, head->next->prev != head);
202 RB_WARN_ON(cpu_buffer, head->prev->next != head);
203
204 list_for_each_entry_safe(page, tmp, head, list) {
205 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
206 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
207 }
208
209 return 0;
210}
211
212static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
213{
214 return cpu_buffer->head_page->size;
215}
216
217static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
218 unsigned nr_pages)
219{
220 struct list_head *head = &cpu_buffer->pages;
221 struct buffer_page *page, *tmp;
222 unsigned long addr;
223 LIST_HEAD(pages);
224 unsigned i;
225
226 for (i = 0; i < nr_pages; i++) {
227 addr = __get_free_page(GFP_KERNEL);
228 if (!addr)
229 goto free_pages;
230 page = (struct buffer_page *)virt_to_page(addr);
231 list_add(&page->list, &pages);
232 }
233
234 list_splice(&pages, head);
235
236 rb_check_pages(cpu_buffer);
237
238 return 0;
239
240 free_pages:
241 list_for_each_entry_safe(page, tmp, &pages, list) {
242 list_del_init(&page->list);
243 __free_page(&page->page);
244 }
245 return -ENOMEM;
246}
247
248static struct ring_buffer_per_cpu *
249rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
250{
251 struct ring_buffer_per_cpu *cpu_buffer;
252 int ret;
253
254 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
255 GFP_KERNEL, cpu_to_node(cpu));
256 if (!cpu_buffer)
257 return NULL;
258
259 cpu_buffer->cpu = cpu;
260 cpu_buffer->buffer = buffer;
261 spin_lock_init(&cpu_buffer->lock);
262 INIT_LIST_HEAD(&cpu_buffer->pages);
263
264 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
265 if (ret < 0)
266 goto fail_free_buffer;
267
268 cpu_buffer->head_page
269 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
270 cpu_buffer->tail_page
271 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
272
273 return cpu_buffer;
274
275 fail_free_buffer:
276 kfree(cpu_buffer);
277 return NULL;
278}
279
280static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
281{
282 struct list_head *head = &cpu_buffer->pages;
283 struct buffer_page *page, *tmp;
284
285 list_for_each_entry_safe(page, tmp, head, list) {
286 list_del_init(&page->list);
287 __free_page(&page->page);
288 }
289 kfree(cpu_buffer);
290}
291
Steven Rostedta7b13742008-09-29 23:02:39 -0400292/*
293 * Causes compile errors if the struct buffer_page gets bigger
294 * than the struct page.
295 */
296extern int ring_buffer_page_too_big(void);
297
Steven Rostedt7a8e76a2008-09-29 23:02:38 -0400298/**
299 * ring_buffer_alloc - allocate a new ring_buffer
300 * @size: the size in bytes that is needed.
301 * @flags: attributes to set for the ring buffer.
302 *
303 * Currently the only flag that is available is the RB_FL_OVERWRITE
304 * flag. This flag means that the buffer will overwrite old data
305 * when the buffer wraps. If this flag is not set, the buffer will
306 * drop data when the tail hits the head.
307 */
308struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
309{
310 struct ring_buffer *buffer;
311 int bsize;
312 int cpu;
313
Steven Rostedta7b13742008-09-29 23:02:39 -0400314 /* Paranoid! Optimizes out when all is well */
315 if (sizeof(struct buffer_page) > sizeof(struct page))
316 ring_buffer_page_too_big();
317
318
Steven Rostedt7a8e76a2008-09-29 23:02:38 -0400319 /* keep it in its own cache line */
320 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
321 GFP_KERNEL);
322 if (!buffer)
323 return NULL;
324
325 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
326 buffer->flags = flags;
327
328 /* need at least two pages */
329 if (buffer->pages == 1)
330 buffer->pages++;
331
332 buffer->cpumask = cpu_possible_map;
333 buffer->cpus = nr_cpu_ids;
334
335 bsize = sizeof(void *) * nr_cpu_ids;
336 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
337 GFP_KERNEL);
338 if (!buffer->buffers)
339 goto fail_free_buffer;
340
341 for_each_buffer_cpu(buffer, cpu) {
342 buffer->buffers[cpu] =
343 rb_allocate_cpu_buffer(buffer, cpu);
344 if (!buffer->buffers[cpu])
345 goto fail_free_buffers;
346 }
347
348 mutex_init(&buffer->mutex);
349
350 return buffer;
351
352 fail_free_buffers:
353 for_each_buffer_cpu(buffer, cpu) {
354 if (buffer->buffers[cpu])
355 rb_free_cpu_buffer(buffer->buffers[cpu]);
356 }
357 kfree(buffer->buffers);
358
359 fail_free_buffer:
360 kfree(buffer);
361 return NULL;
362}
363
364/**
365 * ring_buffer_free - free a ring buffer.
366 * @buffer: the buffer to free.
367 */
368void
369ring_buffer_free(struct ring_buffer *buffer)
370{
371 int cpu;
372
373 for_each_buffer_cpu(buffer, cpu)
374 rb_free_cpu_buffer(buffer->buffers[cpu]);
375
376 kfree(buffer);
377}
378
379static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
380
381static void
382rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
383{
384 struct buffer_page *page;
385 struct list_head *p;
386 unsigned i;
387
388 atomic_inc(&cpu_buffer->record_disabled);
389 synchronize_sched();
390
391 for (i = 0; i < nr_pages; i++) {
392 BUG_ON(list_empty(&cpu_buffer->pages));
393 p = cpu_buffer->pages.next;
394 page = list_entry(p, struct buffer_page, list);
395 list_del_init(&page->list);
396 __free_page(&page->page);
397 }
398 BUG_ON(list_empty(&cpu_buffer->pages));
399
400 rb_reset_cpu(cpu_buffer);
401
402 rb_check_pages(cpu_buffer);
403
404 atomic_dec(&cpu_buffer->record_disabled);
405
406}
407
408static void
409rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
410 struct list_head *pages, unsigned nr_pages)
411{
412 struct buffer_page *page;
413 struct list_head *p;
414 unsigned i;
415
416 atomic_inc(&cpu_buffer->record_disabled);
417 synchronize_sched();
418
419 for (i = 0; i < nr_pages; i++) {
420 BUG_ON(list_empty(pages));
421 p = pages->next;
422 page = list_entry(p, struct buffer_page, list);
423 list_del_init(&page->list);
424 list_add_tail(&page->list, &cpu_buffer->pages);
425 }
426 rb_reset_cpu(cpu_buffer);
427
428 rb_check_pages(cpu_buffer);
429
430 atomic_dec(&cpu_buffer->record_disabled);
431}
432
433/**
434 * ring_buffer_resize - resize the ring buffer
435 * @buffer: the buffer to resize.
436 * @size: the new size.
437 *
438 * The tracer is responsible for making sure that the buffer is
439 * not being used while changing the size.
440 * Note: We may be able to change the above requirement by using
441 * RCU synchronizations.
442 *
443 * Minimum size is 2 * BUF_PAGE_SIZE.
444 *
445 * Returns -1 on failure.
446 */
447int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
448{
449 struct ring_buffer_per_cpu *cpu_buffer;
450 unsigned nr_pages, rm_pages, new_pages;
451 struct buffer_page *page, *tmp;
452 unsigned long buffer_size;
453 unsigned long addr;
454 LIST_HEAD(pages);
455 int i, cpu;
456
457 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
458 size *= BUF_PAGE_SIZE;
459 buffer_size = buffer->pages * BUF_PAGE_SIZE;
460
461 /* we need a minimum of two pages */
462 if (size < BUF_PAGE_SIZE * 2)
463 size = BUF_PAGE_SIZE * 2;
464
465 if (size == buffer_size)
466 return size;
467
468 mutex_lock(&buffer->mutex);
469
470 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
471
472 if (size < buffer_size) {
473
474 /* easy case, just free pages */
475 BUG_ON(nr_pages >= buffer->pages);
476
477 rm_pages = buffer->pages - nr_pages;
478
479 for_each_buffer_cpu(buffer, cpu) {
480 cpu_buffer = buffer->buffers[cpu];
481 rb_remove_pages(cpu_buffer, rm_pages);
482 }
483 goto out;
484 }
485
486 /*
487 * This is a bit more difficult. We only want to add pages
488 * when we can allocate enough for all CPUs. We do this
489 * by allocating all the pages and storing them on a local
490 * link list. If we succeed in our allocation, then we
491 * add these pages to the cpu_buffers. Otherwise we just free
492 * them all and return -ENOMEM;
493 */
494 BUG_ON(nr_pages <= buffer->pages);
495 new_pages = nr_pages - buffer->pages;
496
497 for_each_buffer_cpu(buffer, cpu) {
498 for (i = 0; i < new_pages; i++) {
499 addr = __get_free_page(GFP_KERNEL);
500 if (!addr)
501 goto free_pages;
502 page = (struct buffer_page *)virt_to_page(addr);
503 list_add(&page->list, &pages);
504 }
505 }
506
507 for_each_buffer_cpu(buffer, cpu) {
508 cpu_buffer = buffer->buffers[cpu];
509 rb_insert_pages(cpu_buffer, &pages, new_pages);
510 }
511
512 BUG_ON(!list_empty(&pages));
513
514 out:
515 buffer->pages = nr_pages;
516 mutex_unlock(&buffer->mutex);
517
518 return size;
519
520 free_pages:
521 list_for_each_entry_safe(page, tmp, &pages, list) {
522 list_del_init(&page->list);
523 __free_page(&page->page);
524 }
525 return -ENOMEM;
526}
527
528static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
529{
530 return cpu_buffer->head_page == cpu_buffer->tail_page &&
531 cpu_buffer->head == cpu_buffer->tail;
532}
533
534static inline int rb_null_event(struct ring_buffer_event *event)
535{
536 return event->type == RINGBUF_TYPE_PADDING;
537}
538
539static inline void *rb_page_index(struct buffer_page *page, unsigned index)
540{
541 void *addr = page_address(&page->page);
542
543 return addr + index;
544}
545
546static inline struct ring_buffer_event *
547rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
548{
549 return rb_page_index(cpu_buffer->head_page,
550 cpu_buffer->head);
551}
552
553static inline struct ring_buffer_event *
554rb_iter_head_event(struct ring_buffer_iter *iter)
555{
556 return rb_page_index(iter->head_page,
557 iter->head);
558}
559
560/*
561 * When the tail hits the head and the buffer is in overwrite mode,
562 * the head jumps to the next page and all content on the previous
563 * page is discarded. But before doing so, we update the overrun
564 * variable of the buffer.
565 */
566static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
567{
568 struct ring_buffer_event *event;
569 unsigned long head;
570
571 for (head = 0; head < rb_head_size(cpu_buffer);
572 head += rb_event_length(event)) {
573
574 event = rb_page_index(cpu_buffer->head_page, head);
575 BUG_ON(rb_null_event(event));
576 /* Only count data entries */
577 if (event->type != RINGBUF_TYPE_DATA)
578 continue;
579 cpu_buffer->overrun++;
580 cpu_buffer->entries--;
581 }
582}
583
584static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
585 struct buffer_page **page)
586{
587 struct list_head *p = (*page)->list.next;
588
589 if (p == &cpu_buffer->pages)
590 p = p->next;
591
592 *page = list_entry(p, struct buffer_page, list);
593}
594
595static inline void
596rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
597{
598 cpu_buffer->tail_page->time_stamp = *ts;
599 cpu_buffer->write_stamp = *ts;
600}
601
602static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
603{
604 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
605 cpu_buffer->head = 0;
606}
607
608static void
609rb_reset_iter_read_page(struct ring_buffer_iter *iter)
610{
611 iter->read_stamp = iter->head_page->time_stamp;
612 iter->head = 0;
613}
614
615/**
616 * ring_buffer_update_event - update event type and data
617 * @event: the even to update
618 * @type: the type of event
619 * @length: the size of the event field in the ring buffer
620 *
621 * Update the type and data fields of the event. The length
622 * is the actual size that is written to the ring buffer,
623 * and with this, we can determine what to place into the
624 * data field.
625 */
626static inline void
627rb_update_event(struct ring_buffer_event *event,
628 unsigned type, unsigned length)
629{
630 event->type = type;
631
632 switch (type) {
633
634 case RINGBUF_TYPE_PADDING:
635 break;
636
637 case RINGBUF_TYPE_TIME_EXTEND:
638 event->len =
639 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
640 >> RB_ALIGNMENT_SHIFT;
641 break;
642
643 case RINGBUF_TYPE_TIME_STAMP:
644 event->len =
645 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
646 >> RB_ALIGNMENT_SHIFT;
647 break;
648
649 case RINGBUF_TYPE_DATA:
650 length -= RB_EVNT_HDR_SIZE;
651 if (length > RB_MAX_SMALL_DATA) {
652 event->len = 0;
653 event->array[0] = length;
654 } else
655 event->len =
656 (length + (RB_ALIGNMENT-1))
657 >> RB_ALIGNMENT_SHIFT;
658 break;
659 default:
660 BUG();
661 }
662}
663
664static inline unsigned rb_calculate_event_length(unsigned length)
665{
666 struct ring_buffer_event event; /* Used only for sizeof array */
667
668 /* zero length can cause confusions */
669 if (!length)
670 length = 1;
671
672 if (length > RB_MAX_SMALL_DATA)
673 length += sizeof(event.array[0]);
674
675 length += RB_EVNT_HDR_SIZE;
676 length = ALIGN(length, RB_ALIGNMENT);
677
678 return length;
679}
680
681static struct ring_buffer_event *
682__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
683 unsigned type, unsigned long length, u64 *ts)
684{
685 struct buffer_page *head_page, *tail_page;
686 unsigned long tail;
687 struct ring_buffer *buffer = cpu_buffer->buffer;
688 struct ring_buffer_event *event;
689
690 tail_page = cpu_buffer->tail_page;
691 head_page = cpu_buffer->head_page;
692 tail = cpu_buffer->tail;
693
694 if (tail + length > BUF_PAGE_SIZE) {
695 struct buffer_page *next_page = tail_page;
696
697 rb_inc_page(cpu_buffer, &next_page);
698
699 if (next_page == head_page) {
700 if (!(buffer->flags & RB_FL_OVERWRITE))
701 return NULL;
702
703 /* count overflows */
704 rb_update_overflow(cpu_buffer);
705
706 rb_inc_page(cpu_buffer, &head_page);
707 cpu_buffer->head_page = head_page;
708 rb_reset_read_page(cpu_buffer);
709 }
710
711 if (tail != BUF_PAGE_SIZE) {
712 event = rb_page_index(tail_page, tail);
713 /* page padding */
714 event->type = RINGBUF_TYPE_PADDING;
715 }
716
717 tail_page->size = tail;
718 tail_page = next_page;
719 tail_page->size = 0;
720 tail = 0;
721 cpu_buffer->tail_page = tail_page;
722 cpu_buffer->tail = tail;
723 rb_add_stamp(cpu_buffer, ts);
724 }
725
726 BUG_ON(tail + length > BUF_PAGE_SIZE);
727
728 event = rb_page_index(tail_page, tail);
729 rb_update_event(event, type, length);
730
731 return event;
732}
733
734static int
735rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
736 u64 *ts, u64 *delta)
737{
738 struct ring_buffer_event *event;
739 static int once;
740
741 if (unlikely(*delta > (1ULL << 59) && !once++)) {
742 printk(KERN_WARNING "Delta way too big! %llu"
743 " ts=%llu write stamp = %llu\n",
744 *delta, *ts, cpu_buffer->write_stamp);
745 WARN_ON(1);
746 }
747
748 /*
749 * The delta is too big, we to add a
750 * new timestamp.
751 */
752 event = __rb_reserve_next(cpu_buffer,
753 RINGBUF_TYPE_TIME_EXTEND,
754 RB_LEN_TIME_EXTEND,
755 ts);
756 if (!event)
757 return -1;
758
759 /* check to see if we went to the next page */
760 if (cpu_buffer->tail) {
761 /* Still on same page, update timestamp */
762 event->time_delta = *delta & TS_MASK;
763 event->array[0] = *delta >> TS_SHIFT;
764 /* commit the time event */
765 cpu_buffer->tail +=
766 rb_event_length(event);
767 cpu_buffer->write_stamp = *ts;
768 *delta = 0;
769 }
770
771 return 0;
772}
773
774static struct ring_buffer_event *
775rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
776 unsigned type, unsigned long length)
777{
778 struct ring_buffer_event *event;
779 u64 ts, delta;
780
781 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
782
783 if (cpu_buffer->tail) {
784 delta = ts - cpu_buffer->write_stamp;
785
786 if (test_time_stamp(delta)) {
787 int ret;
788
789 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
790 if (ret < 0)
791 return NULL;
792 }
793 } else {
794 rb_add_stamp(cpu_buffer, &ts);
795 delta = 0;
796 }
797
798 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
799 if (!event)
800 return NULL;
801
802 /* If the reserve went to the next page, our delta is zero */
803 if (!cpu_buffer->tail)
804 delta = 0;
805
806 event->time_delta = delta;
807
808 return event;
809}
810
811/**
812 * ring_buffer_lock_reserve - reserve a part of the buffer
813 * @buffer: the ring buffer to reserve from
814 * @length: the length of the data to reserve (excluding event header)
815 * @flags: a pointer to save the interrupt flags
816 *
817 * Returns a reseverd event on the ring buffer to copy directly to.
818 * The user of this interface will need to get the body to write into
819 * and can use the ring_buffer_event_data() interface.
820 *
821 * The length is the length of the data needed, not the event length
822 * which also includes the event header.
823 *
824 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
825 * If NULL is returned, then nothing has been allocated or locked.
826 */
827struct ring_buffer_event *
828ring_buffer_lock_reserve(struct ring_buffer *buffer,
829 unsigned long length,
830 unsigned long *flags)
831{
832 struct ring_buffer_per_cpu *cpu_buffer;
833 struct ring_buffer_event *event;
834 int cpu;
835
836 if (atomic_read(&buffer->record_disabled))
837 return NULL;
838
839 raw_local_irq_save(*flags);
840 cpu = raw_smp_processor_id();
841
842 if (!cpu_isset(cpu, buffer->cpumask))
843 goto out_irq;
844
845 cpu_buffer = buffer->buffers[cpu];
846 spin_lock(&cpu_buffer->lock);
847
848 if (atomic_read(&cpu_buffer->record_disabled))
849 goto no_record;
850
851 length = rb_calculate_event_length(length);
852 if (length > BUF_PAGE_SIZE)
853 return NULL;
854
855 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
856 if (!event)
857 goto no_record;
858
859 return event;
860
861 no_record:
862 spin_unlock(&cpu_buffer->lock);
863 out_irq:
864 local_irq_restore(*flags);
865 return NULL;
866}
867
868static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
869 struct ring_buffer_event *event)
870{
871 cpu_buffer->tail += rb_event_length(event);
872 cpu_buffer->tail_page->size = cpu_buffer->tail;
873 cpu_buffer->write_stamp += event->time_delta;
874 cpu_buffer->entries++;
875}
876
877/**
878 * ring_buffer_unlock_commit - commit a reserved
879 * @buffer: The buffer to commit to
880 * @event: The event pointer to commit.
881 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
882 *
883 * This commits the data to the ring buffer, and releases any locks held.
884 *
885 * Must be paired with ring_buffer_lock_reserve.
886 */
887int ring_buffer_unlock_commit(struct ring_buffer *buffer,
888 struct ring_buffer_event *event,
889 unsigned long flags)
890{
891 struct ring_buffer_per_cpu *cpu_buffer;
892 int cpu = raw_smp_processor_id();
893
894 cpu_buffer = buffer->buffers[cpu];
895
896 assert_spin_locked(&cpu_buffer->lock);
897
898 rb_commit(cpu_buffer, event);
899
900 spin_unlock(&cpu_buffer->lock);
901 raw_local_irq_restore(flags);
902
903 return 0;
904}
905
906/**
907 * ring_buffer_write - write data to the buffer without reserving
908 * @buffer: The ring buffer to write to.
909 * @length: The length of the data being written (excluding the event header)
910 * @data: The data to write to the buffer.
911 *
912 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
913 * one function. If you already have the data to write to the buffer, it
914 * may be easier to simply call this function.
915 *
916 * Note, like ring_buffer_lock_reserve, the length is the length of the data
917 * and not the length of the event which would hold the header.
918 */
919int ring_buffer_write(struct ring_buffer *buffer,
920 unsigned long length,
921 void *data)
922{
923 struct ring_buffer_per_cpu *cpu_buffer;
924 struct ring_buffer_event *event;
925 unsigned long event_length, flags;
926 void *body;
927 int ret = -EBUSY;
928 int cpu;
929
930 if (atomic_read(&buffer->record_disabled))
931 return -EBUSY;
932
933 local_irq_save(flags);
934 cpu = raw_smp_processor_id();
935
936 if (!cpu_isset(cpu, buffer->cpumask))
937 goto out_irq;
938
939 cpu_buffer = buffer->buffers[cpu];
940 spin_lock(&cpu_buffer->lock);
941
942 if (atomic_read(&cpu_buffer->record_disabled))
943 goto out;
944
945 event_length = rb_calculate_event_length(length);
946 event = rb_reserve_next_event(cpu_buffer,
947 RINGBUF_TYPE_DATA, event_length);
948 if (!event)
949 goto out;
950
951 body = rb_event_data(event);
952
953 memcpy(body, data, length);
954
955 rb_commit(cpu_buffer, event);
956
957 ret = 0;
958 out:
959 spin_unlock(&cpu_buffer->lock);
960 out_irq:
961 local_irq_restore(flags);
962
963 return ret;
964}
965
966/**
967 * ring_buffer_lock - lock the ring buffer
968 * @buffer: The ring buffer to lock
969 * @flags: The place to store the interrupt flags
970 *
971 * This locks all the per CPU buffers.
972 *
973 * Must be unlocked by ring_buffer_unlock.
974 */
975void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
976{
977 struct ring_buffer_per_cpu *cpu_buffer;
978 int cpu;
979
980 local_irq_save(*flags);
981
982 for_each_buffer_cpu(buffer, cpu) {
983 cpu_buffer = buffer->buffers[cpu];
984 spin_lock(&cpu_buffer->lock);
985 }
986}
987
988/**
989 * ring_buffer_unlock - unlock a locked buffer
990 * @buffer: The locked buffer to unlock
991 * @flags: The interrupt flags received by ring_buffer_lock
992 */
993void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
994{
995 struct ring_buffer_per_cpu *cpu_buffer;
996 int cpu;
997
998 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
999 if (!cpu_isset(cpu, buffer->cpumask))
1000 continue;
1001 cpu_buffer = buffer->buffers[cpu];
1002 spin_unlock(&cpu_buffer->lock);
1003 }
1004
1005 local_irq_restore(flags);
1006}
1007
1008/**
1009 * ring_buffer_record_disable - stop all writes into the buffer
1010 * @buffer: The ring buffer to stop writes to.
1011 *
1012 * This prevents all writes to the buffer. Any attempt to write
1013 * to the buffer after this will fail and return NULL.
1014 *
1015 * The caller should call synchronize_sched() after this.
1016 */
1017void ring_buffer_record_disable(struct ring_buffer *buffer)
1018{
1019 atomic_inc(&buffer->record_disabled);
1020}
1021
1022/**
1023 * ring_buffer_record_enable - enable writes to the buffer
1024 * @buffer: The ring buffer to enable writes
1025 *
1026 * Note, multiple disables will need the same number of enables
1027 * to truely enable the writing (much like preempt_disable).
1028 */
1029void ring_buffer_record_enable(struct ring_buffer *buffer)
1030{
1031 atomic_dec(&buffer->record_disabled);
1032}
1033
1034/**
1035 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1036 * @buffer: The ring buffer to stop writes to.
1037 * @cpu: The CPU buffer to stop
1038 *
1039 * This prevents all writes to the buffer. Any attempt to write
1040 * to the buffer after this will fail and return NULL.
1041 *
1042 * The caller should call synchronize_sched() after this.
1043 */
1044void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1045{
1046 struct ring_buffer_per_cpu *cpu_buffer;
1047
1048 if (!cpu_isset(cpu, buffer->cpumask))
1049 return;
1050
1051 cpu_buffer = buffer->buffers[cpu];
1052 atomic_inc(&cpu_buffer->record_disabled);
1053}
1054
1055/**
1056 * ring_buffer_record_enable_cpu - enable writes to the buffer
1057 * @buffer: The ring buffer to enable writes
1058 * @cpu: The CPU to enable.
1059 *
1060 * Note, multiple disables will need the same number of enables
1061 * to truely enable the writing (much like preempt_disable).
1062 */
1063void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1064{
1065 struct ring_buffer_per_cpu *cpu_buffer;
1066
1067 if (!cpu_isset(cpu, buffer->cpumask))
1068 return;
1069
1070 cpu_buffer = buffer->buffers[cpu];
1071 atomic_dec(&cpu_buffer->record_disabled);
1072}
1073
1074/**
1075 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1076 * @buffer: The ring buffer
1077 * @cpu: The per CPU buffer to get the entries from.
1078 */
1079unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1080{
1081 struct ring_buffer_per_cpu *cpu_buffer;
1082
1083 if (!cpu_isset(cpu, buffer->cpumask))
1084 return 0;
1085
1086 cpu_buffer = buffer->buffers[cpu];
1087 return cpu_buffer->entries;
1088}
1089
1090/**
1091 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1092 * @buffer: The ring buffer
1093 * @cpu: The per CPU buffer to get the number of overruns from
1094 */
1095unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1096{
1097 struct ring_buffer_per_cpu *cpu_buffer;
1098
1099 if (!cpu_isset(cpu, buffer->cpumask))
1100 return 0;
1101
1102 cpu_buffer = buffer->buffers[cpu];
1103 return cpu_buffer->overrun;
1104}
1105
1106/**
1107 * ring_buffer_entries - get the number of entries in a buffer
1108 * @buffer: The ring buffer
1109 *
1110 * Returns the total number of entries in the ring buffer
1111 * (all CPU entries)
1112 */
1113unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1114{
1115 struct ring_buffer_per_cpu *cpu_buffer;
1116 unsigned long entries = 0;
1117 int cpu;
1118
1119 /* if you care about this being correct, lock the buffer */
1120 for_each_buffer_cpu(buffer, cpu) {
1121 cpu_buffer = buffer->buffers[cpu];
1122 entries += cpu_buffer->entries;
1123 }
1124
1125 return entries;
1126}
1127
1128/**
1129 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1130 * @buffer: The ring buffer
1131 *
1132 * Returns the total number of overruns in the ring buffer
1133 * (all CPU entries)
1134 */
1135unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1136{
1137 struct ring_buffer_per_cpu *cpu_buffer;
1138 unsigned long overruns = 0;
1139 int cpu;
1140
1141 /* if you care about this being correct, lock the buffer */
1142 for_each_buffer_cpu(buffer, cpu) {
1143 cpu_buffer = buffer->buffers[cpu];
1144 overruns += cpu_buffer->overrun;
1145 }
1146
1147 return overruns;
1148}
1149
1150/**
1151 * ring_buffer_iter_reset - reset an iterator
1152 * @iter: The iterator to reset
1153 *
1154 * Resets the iterator, so that it will start from the beginning
1155 * again.
1156 */
1157void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1158{
1159 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1160
1161 iter->head_page = cpu_buffer->head_page;
1162 iter->head = cpu_buffer->head;
1163 rb_reset_iter_read_page(iter);
1164}
1165
1166/**
1167 * ring_buffer_iter_empty - check if an iterator has no more to read
1168 * @iter: The iterator to check
1169 */
1170int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1171{
1172 struct ring_buffer_per_cpu *cpu_buffer;
1173
1174 cpu_buffer = iter->cpu_buffer;
1175
1176 return iter->head_page == cpu_buffer->tail_page &&
1177 iter->head == cpu_buffer->tail;
1178}
1179
1180static void
1181rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1182 struct ring_buffer_event *event)
1183{
1184 u64 delta;
1185
1186 switch (event->type) {
1187 case RINGBUF_TYPE_PADDING:
1188 return;
1189
1190 case RINGBUF_TYPE_TIME_EXTEND:
1191 delta = event->array[0];
1192 delta <<= TS_SHIFT;
1193 delta += event->time_delta;
1194 cpu_buffer->read_stamp += delta;
1195 return;
1196
1197 case RINGBUF_TYPE_TIME_STAMP:
1198 /* FIXME: not implemented */
1199 return;
1200
1201 case RINGBUF_TYPE_DATA:
1202 cpu_buffer->read_stamp += event->time_delta;
1203 return;
1204
1205 default:
1206 BUG();
1207 }
1208 return;
1209}
1210
1211static void
1212rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1213 struct ring_buffer_event *event)
1214{
1215 u64 delta;
1216
1217 switch (event->type) {
1218 case RINGBUF_TYPE_PADDING:
1219 return;
1220
1221 case RINGBUF_TYPE_TIME_EXTEND:
1222 delta = event->array[0];
1223 delta <<= TS_SHIFT;
1224 delta += event->time_delta;
1225 iter->read_stamp += delta;
1226 return;
1227
1228 case RINGBUF_TYPE_TIME_STAMP:
1229 /* FIXME: not implemented */
1230 return;
1231
1232 case RINGBUF_TYPE_DATA:
1233 iter->read_stamp += event->time_delta;
1234 return;
1235
1236 default:
1237 BUG();
1238 }
1239 return;
1240}
1241
1242static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1243{
1244 struct ring_buffer_event *event;
1245 unsigned length;
1246
1247 /*
1248 * Check if we are at the end of the buffer.
1249 */
1250 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1251 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1252 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1253 rb_reset_read_page(cpu_buffer);
1254 return;
1255 }
1256
1257 event = rb_head_event(cpu_buffer);
1258
1259 if (event->type == RINGBUF_TYPE_DATA)
1260 cpu_buffer->entries--;
1261
1262 length = rb_event_length(event);
1263
1264 /*
1265 * This should not be called to advance the header if we are
1266 * at the tail of the buffer.
1267 */
1268 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1269 (cpu_buffer->head + length > cpu_buffer->tail));
1270
1271 rb_update_read_stamp(cpu_buffer, event);
1272
1273 cpu_buffer->head += length;
1274
1275 /* check for end of page */
1276 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1277 (cpu_buffer->head_page != cpu_buffer->tail_page))
1278 rb_advance_head(cpu_buffer);
1279}
1280
1281static void rb_advance_iter(struct ring_buffer_iter *iter)
1282{
1283 struct ring_buffer *buffer;
1284 struct ring_buffer_per_cpu *cpu_buffer;
1285 struct ring_buffer_event *event;
1286 unsigned length;
1287
1288 cpu_buffer = iter->cpu_buffer;
1289 buffer = cpu_buffer->buffer;
1290
1291 /*
1292 * Check if we are at the end of the buffer.
1293 */
1294 if (iter->head >= iter->head_page->size) {
1295 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1296 rb_inc_page(cpu_buffer, &iter->head_page);
1297 rb_reset_iter_read_page(iter);
1298 return;
1299 }
1300
1301 event = rb_iter_head_event(iter);
1302
1303 length = rb_event_length(event);
1304
1305 /*
1306 * This should not be called to advance the header if we are
1307 * at the tail of the buffer.
1308 */
1309 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1310 (iter->head + length > cpu_buffer->tail));
1311
1312 rb_update_iter_read_stamp(iter, event);
1313
1314 iter->head += length;
1315
1316 /* check for end of page padding */
1317 if ((iter->head >= iter->head_page->size) &&
1318 (iter->head_page != cpu_buffer->tail_page))
1319 rb_advance_iter(iter);
1320}
1321
1322/**
1323 * ring_buffer_peek - peek at the next event to be read
1324 * @buffer: The ring buffer to read
1325 * @cpu: The cpu to peak at
1326 * @ts: The timestamp counter of this event.
1327 *
1328 * This will return the event that will be read next, but does
1329 * not consume the data.
1330 */
1331struct ring_buffer_event *
1332ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1333{
1334 struct ring_buffer_per_cpu *cpu_buffer;
1335 struct ring_buffer_event *event;
1336
1337 if (!cpu_isset(cpu, buffer->cpumask))
1338 return NULL;
1339
1340 cpu_buffer = buffer->buffers[cpu];
1341
1342 again:
1343 if (rb_per_cpu_empty(cpu_buffer))
1344 return NULL;
1345
1346 event = rb_head_event(cpu_buffer);
1347
1348 switch (event->type) {
1349 case RINGBUF_TYPE_PADDING:
1350 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1351 rb_reset_read_page(cpu_buffer);
1352 goto again;
1353
1354 case RINGBUF_TYPE_TIME_EXTEND:
1355 /* Internal data, OK to advance */
1356 rb_advance_head(cpu_buffer);
1357 goto again;
1358
1359 case RINGBUF_TYPE_TIME_STAMP:
1360 /* FIXME: not implemented */
1361 rb_advance_head(cpu_buffer);
1362 goto again;
1363
1364 case RINGBUF_TYPE_DATA:
1365 if (ts) {
1366 *ts = cpu_buffer->read_stamp + event->time_delta;
1367 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1368 }
1369 return event;
1370
1371 default:
1372 BUG();
1373 }
1374
1375 return NULL;
1376}
1377
1378/**
1379 * ring_buffer_iter_peek - peek at the next event to be read
1380 * @iter: The ring buffer iterator
1381 * @ts: The timestamp counter of this event.
1382 *
1383 * This will return the event that will be read next, but does
1384 * not increment the iterator.
1385 */
1386struct ring_buffer_event *
1387ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1388{
1389 struct ring_buffer *buffer;
1390 struct ring_buffer_per_cpu *cpu_buffer;
1391 struct ring_buffer_event *event;
1392
1393 if (ring_buffer_iter_empty(iter))
1394 return NULL;
1395
1396 cpu_buffer = iter->cpu_buffer;
1397 buffer = cpu_buffer->buffer;
1398
1399 again:
1400 if (rb_per_cpu_empty(cpu_buffer))
1401 return NULL;
1402
1403 event = rb_iter_head_event(iter);
1404
1405 switch (event->type) {
1406 case RINGBUF_TYPE_PADDING:
1407 rb_inc_page(cpu_buffer, &iter->head_page);
1408 rb_reset_iter_read_page(iter);
1409 goto again;
1410
1411 case RINGBUF_TYPE_TIME_EXTEND:
1412 /* Internal data, OK to advance */
1413 rb_advance_iter(iter);
1414 goto again;
1415
1416 case RINGBUF_TYPE_TIME_STAMP:
1417 /* FIXME: not implemented */
1418 rb_advance_iter(iter);
1419 goto again;
1420
1421 case RINGBUF_TYPE_DATA:
1422 if (ts) {
1423 *ts = iter->read_stamp + event->time_delta;
1424 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1425 }
1426 return event;
1427
1428 default:
1429 BUG();
1430 }
1431
1432 return NULL;
1433}
1434
1435/**
1436 * ring_buffer_consume - return an event and consume it
1437 * @buffer: The ring buffer to get the next event from
1438 *
1439 * Returns the next event in the ring buffer, and that event is consumed.
1440 * Meaning, that sequential reads will keep returning a different event,
1441 * and eventually empty the ring buffer if the producer is slower.
1442 */
1443struct ring_buffer_event *
1444ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1445{
1446 struct ring_buffer_per_cpu *cpu_buffer;
1447 struct ring_buffer_event *event;
1448
1449 if (!cpu_isset(cpu, buffer->cpumask))
1450 return NULL;
1451
1452 event = ring_buffer_peek(buffer, cpu, ts);
1453 if (!event)
1454 return NULL;
1455
1456 cpu_buffer = buffer->buffers[cpu];
1457 rb_advance_head(cpu_buffer);
1458
1459 return event;
1460}
1461
1462/**
1463 * ring_buffer_read_start - start a non consuming read of the buffer
1464 * @buffer: The ring buffer to read from
1465 * @cpu: The cpu buffer to iterate over
1466 *
1467 * This starts up an iteration through the buffer. It also disables
1468 * the recording to the buffer until the reading is finished.
1469 * This prevents the reading from being corrupted. This is not
1470 * a consuming read, so a producer is not expected.
1471 *
1472 * Must be paired with ring_buffer_finish.
1473 */
1474struct ring_buffer_iter *
1475ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1476{
1477 struct ring_buffer_per_cpu *cpu_buffer;
1478 struct ring_buffer_iter *iter;
1479
1480 if (!cpu_isset(cpu, buffer->cpumask))
1481 return NULL;
1482
1483 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1484 if (!iter)
1485 return NULL;
1486
1487 cpu_buffer = buffer->buffers[cpu];
1488
1489 iter->cpu_buffer = cpu_buffer;
1490
1491 atomic_inc(&cpu_buffer->record_disabled);
1492 synchronize_sched();
1493
1494 spin_lock(&cpu_buffer->lock);
1495 iter->head = cpu_buffer->head;
1496 iter->head_page = cpu_buffer->head_page;
1497 rb_reset_iter_read_page(iter);
1498 spin_unlock(&cpu_buffer->lock);
1499
1500 return iter;
1501}
1502
1503/**
1504 * ring_buffer_finish - finish reading the iterator of the buffer
1505 * @iter: The iterator retrieved by ring_buffer_start
1506 *
1507 * This re-enables the recording to the buffer, and frees the
1508 * iterator.
1509 */
1510void
1511ring_buffer_read_finish(struct ring_buffer_iter *iter)
1512{
1513 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1514
1515 atomic_dec(&cpu_buffer->record_disabled);
1516 kfree(iter);
1517}
1518
1519/**
1520 * ring_buffer_read - read the next item in the ring buffer by the iterator
1521 * @iter: The ring buffer iterator
1522 * @ts: The time stamp of the event read.
1523 *
1524 * This reads the next event in the ring buffer and increments the iterator.
1525 */
1526struct ring_buffer_event *
1527ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1528{
1529 struct ring_buffer_event *event;
1530
1531 event = ring_buffer_iter_peek(iter, ts);
1532 if (!event)
1533 return NULL;
1534
1535 rb_advance_iter(iter);
1536
1537 return event;
1538}
1539
1540/**
1541 * ring_buffer_size - return the size of the ring buffer (in bytes)
1542 * @buffer: The ring buffer.
1543 */
1544unsigned long ring_buffer_size(struct ring_buffer *buffer)
1545{
1546 return BUF_PAGE_SIZE * buffer->pages;
1547}
1548
1549static void
1550rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1551{
1552 cpu_buffer->head_page
1553 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1554 cpu_buffer->tail_page
1555 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1556
1557 cpu_buffer->head = cpu_buffer->tail = 0;
1558 cpu_buffer->overrun = 0;
1559 cpu_buffer->entries = 0;
1560}
1561
1562/**
1563 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1564 * @buffer: The ring buffer to reset a per cpu buffer of
1565 * @cpu: The CPU buffer to be reset
1566 */
1567void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1568{
1569 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1570 unsigned long flags;
1571
1572 if (!cpu_isset(cpu, buffer->cpumask))
1573 return;
1574
1575 raw_local_irq_save(flags);
1576 spin_lock(&cpu_buffer->lock);
1577
1578 rb_reset_cpu(cpu_buffer);
1579
1580 spin_unlock(&cpu_buffer->lock);
1581 raw_local_irq_restore(flags);
1582}
1583
1584/**
1585 * ring_buffer_reset - reset a ring buffer
1586 * @buffer: The ring buffer to reset all cpu buffers
1587 */
1588void ring_buffer_reset(struct ring_buffer *buffer)
1589{
1590 unsigned long flags;
1591 int cpu;
1592
1593 ring_buffer_lock(buffer, &flags);
1594
1595 for_each_buffer_cpu(buffer, cpu)
1596 rb_reset_cpu(buffer->buffers[cpu]);
1597
1598 ring_buffer_unlock(buffer, flags);
1599}
1600
1601/**
1602 * rind_buffer_empty - is the ring buffer empty?
1603 * @buffer: The ring buffer to test
1604 */
1605int ring_buffer_empty(struct ring_buffer *buffer)
1606{
1607 struct ring_buffer_per_cpu *cpu_buffer;
1608 int cpu;
1609
1610 /* yes this is racy, but if you don't like the race, lock the buffer */
1611 for_each_buffer_cpu(buffer, cpu) {
1612 cpu_buffer = buffer->buffers[cpu];
1613 if (!rb_per_cpu_empty(cpu_buffer))
1614 return 0;
1615 }
1616 return 1;
1617}
1618
1619/**
1620 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1621 * @buffer: The ring buffer
1622 * @cpu: The CPU buffer to test
1623 */
1624int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1625{
1626 struct ring_buffer_per_cpu *cpu_buffer;
1627
1628 if (!cpu_isset(cpu, buffer->cpumask))
1629 return 1;
1630
1631 cpu_buffer = buffer->buffers[cpu];
1632 return rb_per_cpu_empty(cpu_buffer);
1633}
1634
1635/**
1636 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1637 * @buffer_a: One buffer to swap with
1638 * @buffer_b: The other buffer to swap with
1639 *
1640 * This function is useful for tracers that want to take a "snapshot"
1641 * of a CPU buffer and has another back up buffer lying around.
1642 * it is expected that the tracer handles the cpu buffer not being
1643 * used at the moment.
1644 */
1645int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1646 struct ring_buffer *buffer_b, int cpu)
1647{
1648 struct ring_buffer_per_cpu *cpu_buffer_a;
1649 struct ring_buffer_per_cpu *cpu_buffer_b;
1650
1651 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1652 !cpu_isset(cpu, buffer_b->cpumask))
1653 return -EINVAL;
1654
1655 /* At least make sure the two buffers are somewhat the same */
1656 if (buffer_a->size != buffer_b->size ||
1657 buffer_a->pages != buffer_b->pages)
1658 return -EINVAL;
1659
1660 cpu_buffer_a = buffer_a->buffers[cpu];
1661 cpu_buffer_b = buffer_b->buffers[cpu];
1662
1663 /*
1664 * We can't do a synchronize_sched here because this
1665 * function can be called in atomic context.
1666 * Normally this will be called from the same CPU as cpu.
1667 * If not it's up to the caller to protect this.
1668 */
1669 atomic_inc(&cpu_buffer_a->record_disabled);
1670 atomic_inc(&cpu_buffer_b->record_disabled);
1671
1672 buffer_a->buffers[cpu] = cpu_buffer_b;
1673 buffer_b->buffers[cpu] = cpu_buffer_a;
1674
1675 cpu_buffer_b->buffer = buffer_a;
1676 cpu_buffer_a->buffer = buffer_b;
1677
1678 atomic_dec(&cpu_buffer_a->record_disabled);
1679 atomic_dec(&cpu_buffer_b->record_disabled);
1680
1681 return 0;
1682}
1683