tracing/ring-buffer: Move poll wake ups into ring buffer code

Move the logic to wake up on ring buffer data into the ring buffer
code itself. This simplifies the tracing code a lot and also has the
added benefit that waiters on one of the instance buffers can be woken
only when data is added to that instance instead of data added to
any instance.

Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 7244acd..56b6ea3 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -8,6 +8,7 @@
 #include <linux/trace_clock.h>
 #include <linux/trace_seq.h>
 #include <linux/spinlock.h>
+#include <linux/irq_work.h>
 #include <linux/debugfs.h>
 #include <linux/uaccess.h>
 #include <linux/hardirq.h>
@@ -442,6 +443,12 @@
 	return ret;
 }
 
+struct rb_irq_work {
+	struct irq_work			work;
+	wait_queue_head_t		waiters;
+	bool				waiters_pending;
+};
+
 /*
  * head_page == tail_page && head == tail then buffer is empty.
  */
@@ -476,6 +483,8 @@
 	struct list_head		new_pages; /* new pages to add */
 	struct work_struct		update_pages_work;
 	struct completion		update_done;
+
+	struct rb_irq_work		irq_work;
 };
 
 struct ring_buffer {
@@ -495,6 +504,8 @@
 	struct notifier_block		cpu_notify;
 #endif
 	u64				(*clock)(void);
+
+	struct rb_irq_work		irq_work;
 };
 
 struct ring_buffer_iter {
@@ -506,6 +517,118 @@
 	u64				read_stamp;
 };
 
+/*
+ * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
+ *
+ * Schedules a delayed work to wake up any task that is blocked on the
+ * ring buffer waiters queue.
+ */
+static void rb_wake_up_waiters(struct irq_work *work)
+{
+	struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
+
+	wake_up_all(&rbwork->waiters);
+}
+
+/**
+ * ring_buffer_wait - wait for input to the ring buffer
+ * @buffer: buffer to wait on
+ * @cpu: the cpu buffer to wait on
+ *
+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
+ * as data is added to any of the @buffer's cpu buffers. Otherwise
+ * it will wait for data to be added to a specific cpu buffer.
+ */
+void ring_buffer_wait(struct ring_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	DEFINE_WAIT(wait);
+	struct rb_irq_work *work;
+
+	/*
+	 * Depending on what the caller is waiting for, either any
+	 * data in any cpu buffer, or a specific buffer, put the
+	 * caller on the appropriate wait queue.
+	 */
+	if (cpu == RING_BUFFER_ALL_CPUS)
+		work = &buffer->irq_work;
+	else {
+		cpu_buffer = buffer->buffers[cpu];
+		work = &cpu_buffer->irq_work;
+	}
+
+
+	prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
+
+	/*
+	 * The events can happen in critical sections where
+	 * checking a work queue can cause deadlocks.
+	 * After adding a task to the queue, this flag is set
+	 * only to notify events to try to wake up the queue
+	 * using irq_work.
+	 *
+	 * We don't clear it even if the buffer is no longer
+	 * empty. The flag only causes the next event to run
+	 * irq_work to do the work queue wake up. The worse
+	 * that can happen if we race with !trace_empty() is that
+	 * an event will cause an irq_work to try to wake up
+	 * an empty queue.
+	 *
+	 * There's no reason to protect this flag either, as
+	 * the work queue and irq_work logic will do the necessary
+	 * synchronization for the wake ups. The only thing
+	 * that is necessary is that the wake up happens after
+	 * a task has been queued. It's OK for spurious wake ups.
+	 */
+	work->waiters_pending = true;
+
+	if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty(buffer)) ||
+	    (cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu(buffer, cpu)))
+		schedule();
+
+	finish_wait(&work->waiters, &wait);
+}
+
+/**
+ * ring_buffer_poll_wait - poll on buffer input
+ * @buffer: buffer to wait on
+ * @cpu: the cpu buffer to wait on
+ * @filp: the file descriptor
+ * @poll_table: The poll descriptor
+ *
+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
+ * as data is added to any of the @buffer's cpu buffers. Otherwise
+ * it will wait for data to be added to a specific cpu buffer.
+ *
+ * Returns POLLIN | POLLRDNORM if data exists in the buffers,
+ * zero otherwise.
+ */
+int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
+			  struct file *filp, poll_table *poll_table)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	struct rb_irq_work *work;
+
+	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
+	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
+		return POLLIN | POLLRDNORM;
+
+	if (cpu == RING_BUFFER_ALL_CPUS)
+		work = &buffer->irq_work;
+	else {
+		cpu_buffer = buffer->buffers[cpu];
+		work = &cpu_buffer->irq_work;
+	}
+
+	work->waiters_pending = true;
+	poll_wait(filp, &work->waiters, poll_table);
+
+	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
+	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
+		return POLLIN | POLLRDNORM;
+	return 0;
+}
+
 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
 #define RB_WARN_ON(b, cond)						\
 	({								\
@@ -1061,6 +1184,7 @@
 	cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
 	INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
 	init_completion(&cpu_buffer->update_done);
+	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
 
 	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
 			    GFP_KERNEL, cpu_to_node(cpu));
@@ -1156,6 +1280,8 @@
 	buffer->clock = trace_clock_local;
 	buffer->reader_lock_key = key;
 
+	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
+
 	/* need at least two pages */
 	if (nr_pages < 2)
 		nr_pages = 2;
@@ -2610,6 +2736,22 @@
 	rb_end_commit(cpu_buffer);
 }
 
+static __always_inline void
+rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
+{
+	if (buffer->irq_work.waiters_pending) {
+		buffer->irq_work.waiters_pending = false;
+		/* irq_work_queue() supplies it's own memory barriers */
+		irq_work_queue(&buffer->irq_work.work);
+	}
+
+	if (cpu_buffer->irq_work.waiters_pending) {
+		cpu_buffer->irq_work.waiters_pending = false;
+		/* irq_work_queue() supplies it's own memory barriers */
+		irq_work_queue(&cpu_buffer->irq_work.work);
+	}
+}
+
 /**
  * ring_buffer_unlock_commit - commit a reserved
  * @buffer: The buffer to commit to
@@ -2629,6 +2771,8 @@
 
 	rb_commit(cpu_buffer, event);
 
+	rb_wakeups(buffer, cpu_buffer);
+
 	trace_recursive_unlock();
 
 	preempt_enable_notrace();
@@ -2801,6 +2945,8 @@
 
 	rb_commit(cpu_buffer, event);
 
+	rb_wakeups(buffer, cpu_buffer);
+
 	ret = 0;
  out:
 	preempt_enable_notrace();