blob: ffc4853a068e0a6080bde8a03bd38c62f73c01ed [file] [log] [blame]
Chuck Leverf531a5d2015-10-24 17:27:43 -04001/*
2 * Copyright (c) 2015 Oracle. All rights reserved.
3 *
4 * Support for backward direction RPCs on RPC/RDMA.
5 */
6
7#include <linux/module.h>
8
9#include "xprt_rdma.h"
10
11#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
12# define RPCDBG_FACILITY RPCDBG_TRANS
13#endif
14
15static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
16 struct rpc_rqst *rqst)
17{
18 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
19 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
20
21 spin_lock(&buf->rb_reqslock);
22 list_del(&req->rl_all);
23 spin_unlock(&buf->rb_reqslock);
24
25 rpcrdma_destroy_req(&r_xprt->rx_ia, req);
26
27 kfree(rqst);
28}
29
30static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
31 struct rpc_rqst *rqst)
32{
33 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
34 struct rpcrdma_regbuf *rb;
35 struct rpcrdma_req *req;
36 struct xdr_buf *buf;
37 size_t size;
38
39 req = rpcrdma_create_req(r_xprt);
40 if (!req)
41 return -ENOMEM;
42 req->rl_backchannel = true;
43
44 size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
45 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
46 if (IS_ERR(rb))
47 goto out_fail;
48 req->rl_rdmabuf = rb;
49
50 size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
51 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
52 if (IS_ERR(rb))
53 goto out_fail;
54 rb->rg_owner = req;
55 req->rl_sendbuf = rb;
56 /* so that rpcr_to_rdmar works when receiving a request */
57 rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
58
59 buf = &rqst->rq_snd_buf;
60 buf->head[0].iov_base = rqst->rq_buffer;
61 buf->head[0].iov_len = 0;
62 buf->tail[0].iov_base = NULL;
63 buf->tail[0].iov_len = 0;
64 buf->page_len = 0;
65 buf->len = 0;
66 buf->buflen = size;
67
68 return 0;
69
70out_fail:
71 rpcrdma_bc_free_rqst(r_xprt, rqst);
72 return -ENOMEM;
73}
74
75/* Allocate and add receive buffers to the rpcrdma_buffer's
76 * existing list of rep's. These are released when the
77 * transport is destroyed.
78 */
79static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
80 unsigned int count)
81{
82 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
83 struct rpcrdma_rep *rep;
84 unsigned long flags;
85 int rc = 0;
86
87 while (count--) {
88 rep = rpcrdma_create_rep(r_xprt);
89 if (IS_ERR(rep)) {
90 pr_err("RPC: %s: reply buffer alloc failed\n",
91 __func__);
92 rc = PTR_ERR(rep);
93 break;
94 }
95
96 spin_lock_irqsave(&buffers->rb_lock, flags);
97 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
98 spin_unlock_irqrestore(&buffers->rb_lock, flags);
99 }
100
101 return rc;
102}
103
104/**
105 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
106 * @xprt: transport associated with these backchannel resources
107 * @reqs: number of concurrent incoming requests to expect
108 *
109 * Returns 0 on success; otherwise a negative errno
110 */
111int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
112{
113 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
114 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
115 struct rpc_rqst *rqst;
116 unsigned int i;
117 int rc;
118
119 /* The backchannel reply path returns each rpc_rqst to the
120 * bc_pa_list _after_ the reply is sent. If the server is
121 * faster than the client, it can send another backward
122 * direction request before the rpc_rqst is returned to the
123 * list. The client rejects the request in this case.
124 *
125 * Twice as many rpc_rqsts are prepared to ensure there is
126 * always an rpc_rqst available as soon as a reply is sent.
127 */
Chuck Lever124fa172015-10-24 17:27:51 -0400128 if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
129 goto out_err;
130
Chuck Leverf531a5d2015-10-24 17:27:43 -0400131 for (i = 0; i < (reqs << 1); i++) {
132 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
133 if (!rqst) {
134 pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
135 __func__);
136 goto out_free;
137 }
138
139 rqst->rq_xprt = &r_xprt->rx_xprt;
140 INIT_LIST_HEAD(&rqst->rq_list);
141 INIT_LIST_HEAD(&rqst->rq_bc_list);
142
143 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
144 goto out_free;
145
146 spin_lock_bh(&xprt->bc_pa_lock);
147 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
148 spin_unlock_bh(&xprt->bc_pa_lock);
149 }
150
151 rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
152 if (rc)
153 goto out_free;
154
155 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
156 if (rc)
157 goto out_free;
158
159 buffer->rb_bc_srv_max_requests = reqs;
160 request_module("svcrdma");
161
162 return 0;
163
164out_free:
165 xprt_rdma_bc_destroy(xprt, reqs);
166
Chuck Lever124fa172015-10-24 17:27:51 -0400167out_err:
Chuck Leverf531a5d2015-10-24 17:27:43 -0400168 pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
169 return -ENOMEM;
170}
171
172/**
Chuck Lever83128a62015-10-24 17:27:59 -0400173 * rpcrdma_bc_marshal_reply - Send backwards direction reply
174 * @rqst: buffer containing RPC reply data
175 *
176 * Returns zero on success.
177 */
178int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
179{
180 struct rpc_xprt *xprt = rqst->rq_xprt;
181 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
182 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
183 struct rpcrdma_msg *headerp;
184 size_t rpclen;
185
186 headerp = rdmab_to_msg(req->rl_rdmabuf);
187 headerp->rm_xid = rqst->rq_xid;
188 headerp->rm_vers = rpcrdma_version;
189 headerp->rm_credit =
190 cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
191 headerp->rm_type = rdma_msg;
192 headerp->rm_body.rm_chunks[0] = xdr_zero;
193 headerp->rm_body.rm_chunks[1] = xdr_zero;
194 headerp->rm_body.rm_chunks[2] = xdr_zero;
195
196 rpclen = rqst->rq_svec[0].iov_len;
197
198 pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
199 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
200 pr_info("RPC: %s: RPC/RDMA: %*ph\n",
201 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
202 pr_info("RPC: %s: RPC: %*ph\n",
203 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
204
205 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
206 req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
207 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
208
209 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
210 req->rl_send_iov[1].length = rpclen;
211 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
212
213 req->rl_niovs = 2;
214 return 0;
215}
216
217/**
Chuck Leverf531a5d2015-10-24 17:27:43 -0400218 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
219 * @xprt: transport associated with these backchannel resources
220 * @reqs: number of incoming requests to destroy; ignored
221 */
222void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
223{
224 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
225 struct rpc_rqst *rqst, *tmp;
226
227 spin_lock_bh(&xprt->bc_pa_lock);
228 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
229 list_del(&rqst->rq_bc_pa_list);
230 spin_unlock_bh(&xprt->bc_pa_lock);
231
232 rpcrdma_bc_free_rqst(r_xprt, rqst);
233
234 spin_lock_bh(&xprt->bc_pa_lock);
235 }
236 spin_unlock_bh(&xprt->bc_pa_lock);
237}
238
239/**
240 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
241 * @rqst: request to release
242 */
243void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
244{
245 struct rpc_xprt *xprt = rqst->rq_xprt;
246
247 smp_mb__before_atomic();
248 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
249 clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
250 smp_mb__after_atomic();
251
252 spin_lock_bh(&xprt->bc_pa_lock);
253 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
254 spin_unlock_bh(&xprt->bc_pa_lock);
255}