orangefs: bufmap rewrite
new waiting-for-slot logics:
* make request for slot wait for bufmap to be set up if it
comes before it's installed *OR* while it's running down
* make closing control device wait for all slots to be freed
* waiting itself rewritten to (open-coded) analogues of wait_event_...
primitives - we would need wait_event_locked() and, pardon an obscenely
long name, wait_event_interruptible_exclusive_timeout_locked().
* we never wait for more than slot_timeout_secs in total and,
if during the wait the daemon goes away, we only allow
ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS for it to come back.
* (cosmetical) bitmap is used instead of an array of zeroes and ones
* old (and only reached if we are about to corrupt memory) waiting
for daemon restart in service_operation() removed.
[Martin's fixes folded]
Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
diff --git a/fs/orangefs/orangefs-bufmap.c b/fs/orangefs/orangefs-bufmap.c
index cd484665..96faf4e 100644
--- a/fs/orangefs/orangefs-bufmap.c
+++ b/fs/orangefs/orangefs-bufmap.c
@@ -7,7 +7,133 @@
#include "orangefs-kernel.h"
#include "orangefs-bufmap.h"
-DECLARE_WAIT_QUEUE_HEAD(orangefs_bufmap_init_waitq);
+struct slot_map {
+ int c;
+ wait_queue_head_t q;
+ int count;
+ unsigned long *map;
+};
+
+static struct slot_map rw_map = {
+ .c = -1,
+ .q = __WAIT_QUEUE_HEAD_INITIALIZER(rw_map.q)
+};
+static struct slot_map readdir_map = {
+ .c = -1,
+ .q = __WAIT_QUEUE_HEAD_INITIALIZER(readdir_map.q)
+};
+
+
+static void install(struct slot_map *m, int count, unsigned long *map)
+{
+ spin_lock(&m->q.lock);
+ m->c = m->count = count;
+ m->map = map;
+ wake_up_all_locked(&m->q);
+ spin_unlock(&m->q.lock);
+}
+
+static void mark_killed(struct slot_map *m)
+{
+ spin_lock(&m->q.lock);
+ m->c -= m->count + 1;
+ spin_unlock(&m->q.lock);
+}
+
+static void run_down(struct slot_map *m)
+{
+ DEFINE_WAIT(wait);
+ spin_lock(&m->q.lock);
+ if (m->c != -1) {
+ for (;;) {
+ if (likely(list_empty(&wait.task_list)))
+ __add_wait_queue_tail(&m->q, &wait);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
+ if (m->c == -1)
+ break;
+
+ spin_unlock(&m->q.lock);
+ schedule();
+ spin_lock(&m->q.lock);
+ }
+ __remove_wait_queue(&m->q, &wait);
+ __set_current_state(TASK_RUNNING);
+ }
+ m->map = NULL;
+ spin_unlock(&m->q.lock);
+}
+
+static void put(struct slot_map *m, int slot)
+{
+ int v;
+ spin_lock(&m->q.lock);
+ __clear_bit(slot, m->map);
+ v = ++m->c;
+ if (unlikely(v == 1)) /* no free slots -> one free slot */
+ wake_up_locked(&m->q);
+ else if (unlikely(v == -1)) /* finished dying */
+ wake_up_all_locked(&m->q);
+ spin_unlock(&m->q.lock);
+}
+
+static int wait_for_free(struct slot_map *m)
+{
+ long left = slot_timeout_secs * HZ;
+ DEFINE_WAIT(wait);
+
+ do {
+ long n = left, t;
+ if (likely(list_empty(&wait.task_list)))
+ __add_wait_queue_tail_exclusive(&m->q, &wait);
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ if (m->c > 0)
+ break;
+
+ if (m->c < 0) {
+ /* we are waiting for map to be installed */
+ /* it would better be there soon, or we go away */
+ if (n > ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ)
+ n = ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ;
+ }
+ spin_unlock(&m->q.lock);
+ t = schedule_timeout(n);
+ spin_lock(&m->q.lock);
+ if (unlikely(!t) && n != left && m->c < 0)
+ left = t;
+ else
+ left = t + (left - n);
+ if (unlikely(signal_pending(current)))
+ left = -EINTR;
+ } while (left > 0);
+
+ if (!list_empty(&wait.task_list))
+ list_del(&wait.task_list);
+ else if (left <= 0 && waitqueue_active(&m->q))
+ __wake_up_locked_key(&m->q, TASK_INTERRUPTIBLE, NULL);
+ __set_current_state(TASK_RUNNING);
+
+ if (likely(left > 0))
+ return 0;
+
+ return left < 0 ? -EINTR : -ETIMEDOUT;
+}
+
+static int get(struct slot_map *m)
+{
+ int res = 0;
+ spin_lock(&m->q.lock);
+ if (unlikely(m->c <= 0))
+ res = wait_for_free(m);
+ if (likely(!res)) {
+ m->c--;
+ res = find_first_zero_bit(m->map, m->count);
+ __set_bit(res, m->map);
+ }
+ spin_unlock(&m->q.lock);
+ return res;
+}
/* used to describe mapped buffers */
struct orangefs_bufmap_desc {
@@ -18,8 +144,6 @@
};
static struct orangefs_bufmap {
- atomic_t refcnt;
-
int desc_size;
int desc_shift;
int desc_count;
@@ -30,12 +154,12 @@
struct orangefs_bufmap_desc *desc_array;
/* array to track usage of buffer descriptors */
- int *buffer_index_array;
- spinlock_t buffer_index_lock;
+ unsigned long *buffer_index_array;
/* array to track usage of buffer descriptors for readdir */
- int readdir_index_array[ORANGEFS_READDIR_DEFAULT_DESC_COUNT];
- spinlock_t readdir_index_lock;
+#define N DIV_ROUND_UP(ORANGEFS_READDIR_DEFAULT_DESC_COUNT, BITS_PER_LONG)
+ unsigned long readdir_index_array[N];
+#undef N
} *__orangefs_bufmap;
static DEFINE_SPINLOCK(orangefs_bufmap_lock);
@@ -58,30 +182,6 @@
kfree(bufmap);
}
-static struct orangefs_bufmap *orangefs_bufmap_ref(void)
-{
- struct orangefs_bufmap *bufmap = NULL;
-
- spin_lock(&orangefs_bufmap_lock);
- if (__orangefs_bufmap) {
- bufmap = __orangefs_bufmap;
- atomic_inc(&bufmap->refcnt);
- }
- spin_unlock(&orangefs_bufmap_lock);
- return bufmap;
-}
-
-static void orangefs_bufmap_unref(struct orangefs_bufmap *bufmap)
-{
- if (atomic_dec_and_lock(&bufmap->refcnt, &orangefs_bufmap_lock)) {
- __orangefs_bufmap = NULL;
- spin_unlock(&orangefs_bufmap_lock);
-
- orangefs_bufmap_unmap(bufmap);
- orangefs_bufmap_free(bufmap);
- }
-}
-
/*
* XXX: Can the size and shift change while the caller gives up the
* XXX: lock between calling this and doing something useful?
@@ -137,21 +237,18 @@
if (!bufmap)
goto out;
- atomic_set(&bufmap->refcnt, 1);
bufmap->total_size = user_desc->total_size;
bufmap->desc_count = user_desc->count;
bufmap->desc_size = user_desc->size;
bufmap->desc_shift = ilog2(bufmap->desc_size);
- spin_lock_init(&bufmap->buffer_index_lock);
bufmap->buffer_index_array =
- kcalloc(bufmap->desc_count, sizeof(int), GFP_KERNEL);
+ kzalloc(DIV_ROUND_UP(bufmap->desc_count, BITS_PER_LONG), GFP_KERNEL);
if (!bufmap->buffer_index_array) {
gossip_err("orangefs: could not allocate %d buffer indices\n",
bufmap->desc_count);
goto out_free_bufmap;
}
- spin_lock_init(&bufmap->readdir_index_lock);
bufmap->desc_array =
kcalloc(bufmap->desc_count, sizeof(struct orangefs_bufmap_desc),
@@ -294,24 +391,18 @@
if (__orangefs_bufmap) {
spin_unlock(&orangefs_bufmap_lock);
gossip_err("orangefs: error: bufmap already initialized.\n");
- ret = -EALREADY;
+ ret = -EINVAL;
goto out_unmap_bufmap;
}
__orangefs_bufmap = bufmap;
+ install(&rw_map,
+ bufmap->desc_count,
+ bufmap->buffer_index_array);
+ install(&readdir_map,
+ ORANGEFS_READDIR_DEFAULT_DESC_COUNT,
+ bufmap->readdir_index_array);
spin_unlock(&orangefs_bufmap_lock);
- /*
- * If there are operations in orangefs_bufmap_init_waitq, wake them up.
- * This scenario occurs when the client-core is restarted and I/O
- * requests in the in-progress or waiting tables are restarted. I/O
- * requests cannot be restarted until the shared memory system is
- * completely re-initialized, so we put the I/O requests in this
- * waitq until initialization has completed. NOTE: the I/O requests
- * are also on a timer, so they don't wait forever just in case the
- * client-core doesn't come back up.
- */
- wake_up_interruptible(&orangefs_bufmap_init_waitq);
-
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"orangefs_bufmap_initialize: exiting normally\n");
return 0;
@@ -334,91 +425,28 @@
*/
void orangefs_bufmap_finalize(void)
{
+ struct orangefs_bufmap *bufmap = __orangefs_bufmap;
+ if (!bufmap)
+ return;
gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: called\n");
- BUG_ON(!__orangefs_bufmap);
- orangefs_bufmap_unref(__orangefs_bufmap);
+ mark_killed(&rw_map);
+ mark_killed(&readdir_map);
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"orangefs_bufmap_finalize: exiting normally\n");
}
-struct slot_args {
- int slot_count;
- int *slot_array;
- spinlock_t *slot_lock;
- wait_queue_head_t *slot_wq;
-};
-
-static int wait_for_a_slot(struct slot_args *slargs, int *buffer_index)
+void orangefs_bufmap_run_down(void)
{
- int ret = -1;
- int i = 0;
- DEFINE_WAIT(wait_entry);
-
- while (1) {
- /*
- * check for available desc, slot_lock is the appropriate
- * index_lock
- */
- spin_lock(slargs->slot_lock);
- prepare_to_wait_exclusive(slargs->slot_wq,
- &wait_entry,
- TASK_INTERRUPTIBLE);
- for (i = 0; i < slargs->slot_count; i++)
- if (slargs->slot_array[i] == 0) {
- slargs->slot_array[i] = 1;
- *buffer_index = i;
- ret = 0;
- break;
- }
- spin_unlock(slargs->slot_lock);
-
- /* if we acquired a buffer, then break out of while */
- if (ret == 0)
- break;
-
- if (!signal_pending(current)) {
- gossip_debug(GOSSIP_BUFMAP_DEBUG,
- "[BUFMAP]: waiting %d "
- "seconds for a slot\n",
- slot_timeout_secs);
- if (!schedule_timeout(slot_timeout_secs * HZ)) {
- gossip_debug(GOSSIP_BUFMAP_DEBUG,
- "*** wait_for_a_slot timed out\n");
- ret = -ETIMEDOUT;
- break;
- }
- gossip_debug(GOSSIP_BUFMAP_DEBUG,
- "[BUFMAP]: woken up by a slot becoming available.\n");
- continue;
- }
-
- gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs: %s interrupted.\n",
- __func__);
- ret = -EINTR;
- break;
- }
-
- spin_lock(slargs->slot_lock);
- finish_wait(slargs->slot_wq, &wait_entry);
- spin_unlock(slargs->slot_lock);
- return ret;
-}
-
-static void put_back_slot(struct slot_args *slargs, int buffer_index)
-{
- /* slot_lock is the appropriate index_lock */
- spin_lock(slargs->slot_lock);
- if (buffer_index < 0 || buffer_index >= slargs->slot_count) {
- spin_unlock(slargs->slot_lock);
+ struct orangefs_bufmap *bufmap = __orangefs_bufmap;
+ if (!bufmap)
return;
- }
-
- /* put the desc back on the queue */
- slargs->slot_array[buffer_index] = 0;
- spin_unlock(slargs->slot_lock);
-
- /* wake up anyone who may be sleeping on the queue */
- wake_up_interruptible(slargs->slot_wq);
+ run_down(&rw_map);
+ run_down(&readdir_map);
+ spin_lock(&orangefs_bufmap_lock);
+ __orangefs_bufmap = NULL;
+ spin_unlock(&orangefs_bufmap_lock);
+ orangefs_bufmap_unmap(bufmap);
+ orangefs_bufmap_free(bufmap);
}
/*
@@ -431,23 +459,12 @@
*/
int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index)
{
- struct orangefs_bufmap *bufmap = orangefs_bufmap_ref();
- struct slot_args slargs;
- int ret;
-
- if (!bufmap) {
- gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n");
- return -EIO;
+ int ret = get(&rw_map);
+ if (ret >= 0) {
+ *mapp = __orangefs_bufmap;
+ *buffer_index = ret;
+ ret = 0;
}
-
- slargs.slot_count = bufmap->desc_count;
- slargs.slot_array = bufmap->buffer_index_array;
- slargs.slot_lock = &bufmap->buffer_index_lock;
- slargs.slot_wq = &bufmap_waitq;
- ret = wait_for_a_slot(&slargs, buffer_index);
- if (ret)
- orangefs_bufmap_unref(bufmap);
- *mapp = bufmap;
return ret;
}
@@ -460,15 +477,7 @@
*/
void orangefs_bufmap_put(int buffer_index)
{
- struct slot_args slargs;
- struct orangefs_bufmap *bufmap = __orangefs_bufmap;
-
- slargs.slot_count = bufmap->desc_count;
- slargs.slot_array = bufmap->buffer_index_array;
- slargs.slot_lock = &bufmap->buffer_index_lock;
- slargs.slot_wq = &bufmap_waitq;
- put_back_slot(&slargs, buffer_index);
- orangefs_bufmap_unref(bufmap);
+ put(&rw_map, buffer_index);
}
/*
@@ -484,36 +493,18 @@
*/
int orangefs_readdir_index_get(struct orangefs_bufmap **mapp, int *buffer_index)
{
- struct orangefs_bufmap *bufmap = orangefs_bufmap_ref();
- struct slot_args slargs;
- int ret;
-
- if (!bufmap) {
- gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n");
- return -EIO;
+ int ret = get(&readdir_map);
+ if (ret >= 0) {
+ *mapp = __orangefs_bufmap;
+ *buffer_index = ret;
+ ret = 0;
}
-
- slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT;
- slargs.slot_array = bufmap->readdir_index_array;
- slargs.slot_lock = &bufmap->readdir_index_lock;
- slargs.slot_wq = &readdir_waitq;
- ret = wait_for_a_slot(&slargs, buffer_index);
- if (ret)
- orangefs_bufmap_unref(bufmap);
- *mapp = bufmap;
return ret;
}
void orangefs_readdir_index_put(struct orangefs_bufmap *bufmap, int buffer_index)
{
- struct slot_args slargs;
-
- slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT;
- slargs.slot_array = bufmap->readdir_index_array;
- slargs.slot_lock = &bufmap->readdir_index_lock;
- slargs.slot_wq = &readdir_waitq;
- put_back_slot(&slargs, buffer_index);
- orangefs_bufmap_unref(bufmap);
+ put(&readdir_map, buffer_index);
}
/*