drbd: detach from frozen backing device

* drbd-8.3:
  documentation: Documented detach's --force and disk's --disk-timeout
  drbd: Implemented the disk-timeout option
  drbd: Force flag for the detach operation
  drbd: Allow new IOs while the local disk in in FAILED state
  drbd: Bitmap IO functions can not return prematurely if the disk breaks
  drbd: Added a kref to bm_aio_ctx
  drbd: Hold a reference to ldev while doing meta-data IO
  drbd: Keep a reference to the bio until the completion handler finished
  drbd: Implemented wait_until_done_or_disk_failure()
  drbd: Replaced md_io_mutex by an atomic: md_io_in_use
  drbd: moved md_io into mdev
  drbd: Immediately allow completion of IOs, that wait for IO completions on a failed disk
  drbd: Keep a reference to barrier acked requests

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index aeb483d..58b5b61 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -114,18 +114,44 @@
 
 static int w_al_write_transaction(struct drbd_work *, int);
 
+void *drbd_md_get_buffer(struct drbd_conf *mdev)
+{
+	int r;
+
+	wait_event(mdev->misc_wait,
+		   (r = atomic_cmpxchg(&mdev->md_io_in_use, 0, 1)) == 0 ||
+		   mdev->state.disk <= D_FAILED);
+
+	return r ? NULL : page_address(mdev->md_io_page);
+}
+
+void drbd_md_put_buffer(struct drbd_conf *mdev)
+{
+	if (atomic_dec_and_test(&mdev->md_io_in_use))
+		wake_up(&mdev->misc_wait);
+}
+
+static bool md_io_allowed(struct drbd_conf *mdev)
+{
+	enum drbd_disk_state ds = mdev->state.disk;
+	return ds >= D_NEGOTIATING || ds == D_ATTACHING;
+}
+
+void wait_until_done_or_disk_failure(struct drbd_conf *mdev, unsigned int *done)
+{
+	wait_event(mdev->misc_wait, *done || !md_io_allowed(mdev));
+}
+
 static int _drbd_md_sync_page_io(struct drbd_conf *mdev,
 				 struct drbd_backing_dev *bdev,
 				 struct page *page, sector_t sector,
 				 int rw, int size)
 {
 	struct bio *bio;
-	struct drbd_md_io md_io;
 	int err;
 
-	md_io.mdev = mdev;
-	init_completion(&md_io.event);
-	md_io.error = 0;
+	mdev->md_io.done = 0;
+	mdev->md_io.error = -ENODEV;
 
 	if ((rw & WRITE) && !test_bit(MD_NO_FUA, &mdev->flags))
 		rw |= REQ_FUA | REQ_FLUSH;
@@ -137,17 +163,25 @@
 	err = -EIO;
 	if (bio_add_page(bio, page, size, 0) != size)
 		goto out;
-	bio->bi_private = &md_io;
+	bio->bi_private = &mdev->md_io;
 	bio->bi_end_io = drbd_md_io_complete;
 	bio->bi_rw = rw;
 
+	if (!get_ldev_if_state(mdev, D_ATTACHING)) {  /* Corresponding put_ldev in drbd_md_io_complete() */
+		dev_err(DEV, "ASSERT FAILED: get_ldev_if_state() == 1 in _drbd_md_sync_page_io()\n");
+		err = -ENODEV;
+		goto out;
+	}
+
+	bio_get(bio); /* one bio_put() is in the completion handler */
+	atomic_inc(&mdev->md_io_in_use); /* drbd_md_put_buffer() is in the completion handler */
 	if (drbd_insert_fault(mdev, (rw & WRITE) ? DRBD_FAULT_MD_WR : DRBD_FAULT_MD_RD))
 		bio_endio(bio, -EIO);
 	else
 		submit_bio(rw, bio);
-	wait_for_completion(&md_io.event);
+	wait_until_done_or_disk_failure(mdev, &mdev->md_io.done);
 	if (bio_flagged(bio, BIO_UPTODATE))
-		err = md_io.error;
+		err = mdev->md_io.error;
 
  out:
 	bio_put(bio);
@@ -160,7 +194,7 @@
 	int err;
 	struct page *iop = mdev->md_io_page;
 
-	D_ASSERT(mutex_is_locked(&mdev->md_io_mutex));
+	D_ASSERT(atomic_read(&mdev->md_io_in_use) == 1);
 
 	BUG_ON(!bdev->md_bdev);
 
@@ -344,8 +378,14 @@
 		return 0;
 	}
 
-	mutex_lock(&mdev->md_io_mutex); /* protects md_io_buffer, al_tr_cycle, ... */
-	buffer = page_address(mdev->md_io_page);
+	buffer = drbd_md_get_buffer(mdev); /* protects md_io_buffer, al_tr_cycle, ... */
+	if (!buffer) {
+		dev_err(DEV, "disk failed while waiting for md_io buffer\n");
+		aw->err = -EIO;
+		complete(&((struct update_al_work *)w)->event);
+		put_ldev(mdev);
+		return 1;
+	}
 
 	memset(buffer, 0, sizeof(*buffer));
 	buffer->magic = cpu_to_be32(DRBD_AL_MAGIC);
@@ -415,7 +455,7 @@
 		mdev->al_tr_number++;
 	}
 
-	mutex_unlock(&mdev->md_io_mutex);
+	drbd_md_put_buffer(mdev);
 	complete(&((struct update_al_work *)w)->event);
 	put_ldev(mdev);
 
@@ -506,8 +546,9 @@
 	/* lock out all other meta data io for now,
 	 * and make sure the page is mapped.
 	 */
-	mutex_lock(&mdev->md_io_mutex);
-	b = page_address(mdev->md_io_page);
+	b = drbd_md_get_buffer(mdev);
+	if (!b)
+		return 0;
 
 	/* Always use the full ringbuffer space for now.
 	 * possible optimization: read in all of it,
@@ -528,7 +569,7 @@
 
 		/* IO error */
 		if (rv == -1) {
-			mutex_unlock(&mdev->md_io_mutex);
+			drbd_md_put_buffer(mdev);
 			return 0;
 		}
 
@@ -558,7 +599,7 @@
 	if (!found_valid) {
 		if (found_initialized != mx)
 			dev_warn(DEV, "No usable activity log found.\n");
-		mutex_unlock(&mdev->md_io_mutex);
+		drbd_md_put_buffer(mdev);
 		return 1;
 	}
 
@@ -573,7 +614,7 @@
 		if (!expect(rv != 0))
 			goto cancel;
 		if (rv == -1) {
-			mutex_unlock(&mdev->md_io_mutex);
+			drbd_md_put_buffer(mdev);
 			return 0;
 		}
 
@@ -643,7 +684,7 @@
 	mdev->al_tr_pos = (to + 1) % (MD_AL_SECTORS*512/MD_BLOCK_SIZE);
 
 	/* ok, we are done with it */
-	mutex_unlock(&mdev->md_io_mutex);
+	drbd_md_put_buffer(mdev);
 
 	dev_info(DEV, "Found %d transactions (%d active extents) in activity log.\n",
 	     transactions, active_extents);