NFSv4: Add functions to order RPC calls
NFSv4 file state-changing functions such as OPEN, CLOSE, LOCK,... are all
labelled with "sequence identifiers" in order to prevent the server from
reordering RPC requests, as this could cause its file state to
become out of sync with the client.
Currently the NFS client code enforces this ordering locally using
semaphores to restrict access to structures until the RPC call is done.
This, of course, only works with synchronous RPC calls, since the
user process must first grab the semaphore.
By dropping semaphores, and instead teaching the RPC engine to hold
the RPC calls until they are ready to be sent, we can extend this
process to work nicely with asynchronous RPC calls too.
This patch adds a new list called "rpc_sequence" that defines the order
of the RPC calls to be sent. We add one such list for each state_owner.
When an RPC call is ready to be sent, it checks if it is top of the
rpc_sequence list. If so, it proceeds. If not, it goes back to sleep,
and loops until it hits top of the list.
Once the RPC call has completed, it can then bump the sequence id counter,
and remove itself from the rpc_sequence list, and then wake up the next
sleeper.
Note that the state_owner sequence ids and lock_owner sequence ids are
all indexed to the same rpc_sequence list, so OPEN, LOCK,... requests
are all ordered w.r.t. each other.
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 6c564ef..fcd28a2 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -604,7 +604,7 @@
RESERVE_SPACE(8+sizeof(arg->stateid.data));
WRITE32(OP_CLOSE);
- WRITE32(arg->seqid);
+ WRITE32(arg->seqid->sequence->counter);
WRITEMEM(arg->stateid.data, sizeof(arg->stateid.data));
return 0;
@@ -732,9 +732,9 @@
struct nfs_open_to_lock *ol = opargs->u.open_lock;
RESERVE_SPACE(40);
- WRITE32(ol->open_seqid);
+ WRITE32(ol->open_seqid->sequence->counter);
WRITEMEM(&ol->open_stateid, sizeof(ol->open_stateid));
- WRITE32(ol->lock_seqid);
+ WRITE32(ol->lock_seqid->sequence->counter);
WRITE64(ol->lock_owner.clientid);
WRITE32(4);
WRITE32(ol->lock_owner.id);
@@ -744,7 +744,7 @@
RESERVE_SPACE(20);
WRITEMEM(&el->stateid, sizeof(el->stateid));
- WRITE32(el->seqid);
+ WRITE32(el->seqid->sequence->counter);
}
return 0;
@@ -775,7 +775,7 @@
RESERVE_SPACE(44);
WRITE32(OP_LOCKU);
WRITE32(arg->type);
- WRITE32(opargs->seqid);
+ WRITE32(opargs->seqid->sequence->counter);
WRITEMEM(&opargs->stateid, sizeof(opargs->stateid));
WRITE64(arg->offset);
WRITE64(arg->length);
@@ -826,7 +826,7 @@
*/
RESERVE_SPACE(8);
WRITE32(OP_OPEN);
- WRITE32(arg->seqid);
+ WRITE32(arg->seqid->sequence->counter);
encode_share_access(xdr, arg->open_flags);
RESERVE_SPACE(16);
WRITE64(arg->clientid);
@@ -941,7 +941,7 @@
RESERVE_SPACE(8+sizeof(arg->stateid.data));
WRITE32(OP_OPEN_CONFIRM);
WRITEMEM(arg->stateid.data, sizeof(arg->stateid.data));
- WRITE32(arg->seqid);
+ WRITE32(arg->seqid->sequence->counter);
return 0;
}
@@ -953,7 +953,7 @@
RESERVE_SPACE(8+sizeof(arg->stateid.data));
WRITE32(OP_OPEN_DOWNGRADE);
WRITEMEM(arg->stateid.data, sizeof(arg->stateid.data));
- WRITE32(arg->seqid);
+ WRITE32(arg->seqid->sequence->counter);
encode_share_access(xdr, arg->open_flags);
return 0;
}
@@ -1416,6 +1416,9 @@
};
int status;
+ status = nfs_wait_on_sequence(args->seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
@@ -1437,6 +1440,9 @@
};
int status;
+ status = nfs_wait_on_sequence(args->seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
@@ -1464,6 +1470,9 @@
};
int status;
+ status = nfs_wait_on_sequence(args->seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
@@ -1485,6 +1494,9 @@
};
int status;
+ status = nfs_wait_on_sequence(args->seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
@@ -1506,6 +1518,9 @@
};
int status;
+ status = nfs_wait_on_sequence(args->seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
@@ -1525,8 +1540,17 @@
struct compound_hdr hdr = {
.nops = 2,
};
+ struct nfs_lock_opargs *opargs = args->u.lock;
+ struct nfs_seqid *seqid;
int status;
+ if (opargs->new_lock_owner)
+ seqid = opargs->u.open_lock->lock_seqid;
+ else
+ seqid = opargs->u.exist_lock->seqid;
+ status = nfs_wait_on_sequence(seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
@@ -1569,6 +1593,9 @@
};
int status;
+ status = nfs_wait_on_sequence(args->u.locku->seqid, req->rq_task);
+ if (status != 0)
+ goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);