aoe: become I/O request queue handler for increased user control

To allow users to choose an elevator algorithm for their particular
workloads, change from a make_request-style driver to an
I/O-request-queue-handler-style driver.

We have to do a couple of things that might be surprising.  We manipulate
the page _count directly on the assumption that we still have no guarantee
that users of the block layer are prohibited from submitting bios
containing pages with zero reference counts.[1] If such a prohibition now
exists, I can get rid of the _count manipulation.

Just as before this patch, we still keep track of the sk_buffs that the
network layer still hasn't finished yet and cap the resources we use with
a "pool" of skbs.[2]

Now that the block layer maintains the disk stats, the aoe driver's
diskstats function can go away.

1. https://lkml.org/lkml/2007/3/1/374
2. https://lkml.org/lkml/2007/7/6/241

Signed-off-by: Ed Cashin <ecashin@coraid.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 40bae1a..635dc98 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -19,6 +19,17 @@
 static struct aoedev *devlist;
 static DEFINE_SPINLOCK(devlist_lock);
 
+/*
+ * Users who grab a pointer to the device with aoedev_by_aoeaddr or
+ * aoedev_by_sysminor_m automatically get a reference count and must
+ * be responsible for performing a aoedev_put.  With the addition of
+ * async kthread processing I'm no longer confident that we can
+ * guarantee consistency in the face of device flushes.
+ *
+ * For the time being, we only bother to add extra references for
+ * frames sitting on the iocq.  When the kthreads finish processing
+ * these frames, they will aoedev_put the device.
+ */
 struct aoedev *
 aoedev_by_aoeaddr(int maj, int min)
 {
@@ -28,13 +39,25 @@
 	spin_lock_irqsave(&devlist_lock, flags);
 
 	for (d=devlist; d; d=d->next)
-		if (d->aoemajor == maj && d->aoeminor == min)
+		if (d->aoemajor == maj && d->aoeminor == min) {
+			d->ref++;
 			break;
+		}
 
 	spin_unlock_irqrestore(&devlist_lock, flags);
 	return d;
 }
 
+void
+aoedev_put(struct aoedev *d)
+{
+	ulong flags;
+
+	spin_lock_irqsave(&devlist_lock, flags);
+	d->ref--;
+	spin_unlock_irqrestore(&devlist_lock, flags);
+}
+
 static void
 dummy_timer(ulong vp)
 {
@@ -47,21 +70,26 @@
 	add_timer(&d->timer);
 }
 
-void
-aoe_failbuf(struct aoedev *d, struct buf *buf)
+static void
+aoe_failip(struct aoedev *d)
 {
+	struct request *rq;
 	struct bio *bio;
+	unsigned long n;
 
-	if (buf == NULL)
+	aoe_failbuf(d, d->ip.buf);
+
+	rq = d->ip.rq;
+	if (rq == NULL)
 		return;
-	buf->flags |= BUFFL_FAIL;
-	if (buf->nframesout == 0) {
-		if (buf == d->inprocess) /* ensure we only process this once */
-			d->inprocess = NULL;
-		bio = buf->bio;
-		mempool_free(buf, d->bufpool);
-		bio_endio(bio, -EIO);
+	while ((bio = d->ip.nxbio)) {
+		clear_bit(BIO_UPTODATE, &bio->bi_flags);
+		d->ip.nxbio = bio->bi_next;
+		n = (unsigned long) rq->special;
+		rq->special = (void *) --n;
 	}
+	if ((unsigned long) rq->special == 0)
+		aoe_end_request(d, rq, 0);
 }
 
 void
@@ -70,8 +98,11 @@
 	struct aoetgt *t, **tt, **te;
 	struct frame *f;
 	struct list_head *head, *pos, *nx;
+	struct request *rq;
 	int i;
 
+	d->flags &= ~DEVFL_UP;
+
 	/* clean out active buffers on all targets */
 	tt = d->targets;
 	te = tt + NTARGETS;
@@ -92,22 +123,20 @@
 		t->nout = 0;
 	}
 
-	/* clean out the in-process buffer (if any) */
-	aoe_failbuf(d, d->inprocess);
-	d->inprocess = NULL;
+	/* clean out the in-process request (if any) */
+	aoe_failip(d);
 	d->htgt = NULL;
 
-	/* clean out all pending I/O */
-	while (!list_empty(&d->bufq)) {
-		struct buf *buf = container_of(d->bufq.next, struct buf, bufs);
-		list_del(d->bufq.next);
-		aoe_failbuf(d, buf);
+	/* fast fail all pending I/O */
+	if (d->blkq) {
+		while ((rq = blk_peek_request(d->blkq))) {
+			blk_start_request(rq);
+			aoe_end_request(d, rq, 1);
+		}
 	}
 
 	if (d->gd)
 		set_capacity(d->gd, 0);
-
-	d->flags &= ~DEVFL_UP;
 }
 
 static void
@@ -120,6 +149,7 @@
 		aoedisk_rm_sysfs(d);
 		del_gendisk(d->gd);
 		put_disk(d->gd);
+		blk_cleanup_queue(d->blkq);
 	}
 	t = d->targets;
 	e = t + NTARGETS;
@@ -128,7 +158,6 @@
 	if (d->bufpool)
 		mempool_destroy(d->bufpool);
 	skbpoolfree(d);
-	blk_cleanup_queue(d->blkq);
 	kfree(d);
 }
 
@@ -155,7 +184,8 @@
 		spin_lock(&d->lock);
 		if ((!all && (d->flags & DEVFL_UP))
 		|| (d->flags & (DEVFL_GDALLOC|DEVFL_NEWSIZE))
-		|| d->nopen) {
+		|| d->nopen
+		|| d->ref) {
 			spin_unlock(&d->lock);
 			dd = &d->next;
 			continue;
@@ -176,12 +206,15 @@
 	return 0;
 }
 
-/* I'm not really sure that this is a realistic problem, but if the
-network driver goes gonzo let's just leak memory after complaining. */
+/* This has been confirmed to occur once with Tms=3*1000 due to the
+ * driver changing link and not processing its transmit ring.  The
+ * problem is hard enough to solve by returning an error that I'm
+ * still punting on "solving" this.
+ */
 static void
 skbfree(struct sk_buff *skb)
 {
-	enum { Sms = 100, Tms = 3*1000};
+	enum { Sms = 250, Tms = 30 * 1000};
 	int i = Tms / Sms;
 
 	if (skb == NULL)
@@ -222,8 +255,10 @@
 	spin_lock_irqsave(&devlist_lock, flags);
 
 	for (d=devlist; d; d=d->next)
-		if (d->sysminor == sysminor)
+		if (d->sysminor == sysminor) {
+			d->ref++;
 			break;
+		}
 	if (d)
 		goto out;
 	d = kcalloc(1, sizeof *d, GFP_ATOMIC);
@@ -231,7 +266,6 @@
 		goto out;
 	INIT_WORK(&d->work, aoecmd_sleepwork);
 	spin_lock_init(&d->lock);
-	skb_queue_head_init(&d->sendq);
 	skb_queue_head_init(&d->skbpool);
 	init_timer(&d->timer);
 	d->timer.data = (ulong) d;
@@ -240,7 +274,7 @@
 	add_timer(&d->timer);
 	d->bufpool = NULL;	/* defer to aoeblk_gdalloc */
 	d->tgt = d->targets;
-	INIT_LIST_HEAD(&d->bufq);
+	d->ref = 1;
 	d->sysminor = sysminor;
 	d->aoemajor = AOEMAJOR(sysminor);
 	d->aoeminor = AOEMINOR(sysminor);
@@ -274,6 +308,7 @@
 	struct aoedev *d;
 	ulong flags;
 
+	aoe_flush_iocq();
 	while ((d = devlist)) {
 		devlist = d->next;