fs: dlm: remove wq_alloc mutex
This patch cleanups the code for allocating a new buffer in the dlm
writequeue mechanism. There was a possible tuneup to allow scheduling
while a new writequeue entry needs to be allocated because either no
sending page is available or are full. To avoid multiple concurrent
users checking at the same time if an entry is available or full
alloc_wq was introduce that those are waiting if there is currently a
new writequeue entry in process to be queued so possible further users
will check on the new allocated writequeue entry if it's full.
To simplify the code we just remove this mutex and switch that the
already introduced spin lock will be held during writequeue check,
allocation and queueing. So other users can never check on available
writequeues while there is a new one in process but not queued yet.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 6d500eb..4919faf 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -86,7 +86,6 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
- struct mutex wq_alloc;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -270,8 +269,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
- mutex_init(&con->wq_alloc);
-
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@@ -1176,16 +1173,15 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]);
}
-static struct writequeue_entry *new_writequeue_entry(struct connection *con,
- gfp_t allocation)
+static struct writequeue_entry *new_writequeue_entry(struct connection *con)
{
struct writequeue_entry *entry;
- entry = kzalloc(sizeof(*entry), allocation);
+ entry = kzalloc(sizeof(*entry), GFP_ATOMIC);
if (!entry)
return NULL;
- entry->page = alloc_page(allocation | __GFP_ZERO);
+ entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
if (!entry->page) {
kfree(entry);
return NULL;
@@ -1200,8 +1196,8 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
}
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
- gfp_t allocation, char **ppc,
- void (*cb)(void *data), void *data)
+ char **ppc, void (*cb)(void *data),
+ void *data)
{
struct writequeue_entry *e;
@@ -1217,29 +1213,25 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
e->end += len;
e->users++;
- spin_unlock(&con->writequeue_lock);
-
- return e;
+ goto out;
}
}
- spin_unlock(&con->writequeue_lock);
- e = new_writequeue_entry(con, allocation);
+ e = new_writequeue_entry(con);
if (!e)
- return NULL;
+ goto out;
kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
atomic_inc(&con->writequeue_cnt);
-
- spin_lock(&con->writequeue_lock);
if (cb)
cb(data);
list_add_tail(&e->list, &con->writequeue);
- spin_unlock(&con->writequeue_lock);
+out:
+ spin_unlock(&con->writequeue_lock);
return e;
};
@@ -1250,37 +1242,19 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
{
struct writequeue_entry *e;
struct dlm_msg *msg;
- bool sleepable;
msg = kzalloc(sizeof(*msg), allocation);
if (!msg)
return NULL;
- /* this mutex is being used as a wait to avoid multiple "fast"
- * new writequeue page list entry allocs in new_wq_entry in
- * normal operation which is sleepable context. Without it
- * we could end in multiple writequeue entries with one
- * dlm message because multiple callers were waiting at
- * the writequeue_lock in new_wq_entry().
- */
- sleepable = gfpflags_normal_context(allocation);
- if (sleepable)
- mutex_lock(&con->wq_alloc);
-
kref_init(&msg->ref);
- e = new_wq_entry(con, len, allocation, ppc, cb, data);
+ e = new_wq_entry(con, len, ppc, cb, data);
if (!e) {
- if (sleepable)
- mutex_unlock(&con->wq_alloc);
-
kfree(msg);
return NULL;
}
- if (sleepable)
- mutex_unlock(&con->wq_alloc);
-
msg->ppc = *ppc;
msg->len = len;
msg->entry = e;