blob: 3d01b328f777a3befc23d07898c610c28bc337fd [file] [log] [blame]
Chuck Leverf531a5d2015-10-24 17:27:43 -04001/*
2 * Copyright (c) 2015 Oracle. All rights reserved.
3 *
4 * Support for backward direction RPCs on RPC/RDMA.
5 */
6
7#include <linux/module.h>
8
9#include "xprt_rdma.h"
10
11#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
12# define RPCDBG_FACILITY RPCDBG_TRANS
13#endif
14
15static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
16 struct rpc_rqst *rqst)
17{
18 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
19 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
20
21 spin_lock(&buf->rb_reqslock);
22 list_del(&req->rl_all);
23 spin_unlock(&buf->rb_reqslock);
24
25 rpcrdma_destroy_req(&r_xprt->rx_ia, req);
26
27 kfree(rqst);
28}
29
30static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
31 struct rpc_rqst *rqst)
32{
33 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
34 struct rpcrdma_regbuf *rb;
35 struct rpcrdma_req *req;
36 struct xdr_buf *buf;
37 size_t size;
38
39 req = rpcrdma_create_req(r_xprt);
40 if (!req)
41 return -ENOMEM;
42 req->rl_backchannel = true;
43
44 size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
45 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
46 if (IS_ERR(rb))
47 goto out_fail;
48 req->rl_rdmabuf = rb;
49
50 size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
51 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
52 if (IS_ERR(rb))
53 goto out_fail;
54 rb->rg_owner = req;
55 req->rl_sendbuf = rb;
56 /* so that rpcr_to_rdmar works when receiving a request */
57 rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
58
59 buf = &rqst->rq_snd_buf;
60 buf->head[0].iov_base = rqst->rq_buffer;
61 buf->head[0].iov_len = 0;
62 buf->tail[0].iov_base = NULL;
63 buf->tail[0].iov_len = 0;
64 buf->page_len = 0;
65 buf->len = 0;
66 buf->buflen = size;
67
68 return 0;
69
70out_fail:
71 rpcrdma_bc_free_rqst(r_xprt, rqst);
72 return -ENOMEM;
73}
74
75/* Allocate and add receive buffers to the rpcrdma_buffer's
76 * existing list of rep's. These are released when the
77 * transport is destroyed.
78 */
79static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
80 unsigned int count)
81{
82 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
83 struct rpcrdma_rep *rep;
84 unsigned long flags;
85 int rc = 0;
86
87 while (count--) {
88 rep = rpcrdma_create_rep(r_xprt);
89 if (IS_ERR(rep)) {
90 pr_err("RPC: %s: reply buffer alloc failed\n",
91 __func__);
92 rc = PTR_ERR(rep);
93 break;
94 }
95
96 spin_lock_irqsave(&buffers->rb_lock, flags);
97 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
98 spin_unlock_irqrestore(&buffers->rb_lock, flags);
99 }
100
101 return rc;
102}
103
104/**
105 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
106 * @xprt: transport associated with these backchannel resources
107 * @reqs: number of concurrent incoming requests to expect
108 *
109 * Returns 0 on success; otherwise a negative errno
110 */
111int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
112{
113 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
114 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
115 struct rpc_rqst *rqst;
116 unsigned int i;
117 int rc;
118
119 /* The backchannel reply path returns each rpc_rqst to the
120 * bc_pa_list _after_ the reply is sent. If the server is
121 * faster than the client, it can send another backward
122 * direction request before the rpc_rqst is returned to the
123 * list. The client rejects the request in this case.
124 *
125 * Twice as many rpc_rqsts are prepared to ensure there is
126 * always an rpc_rqst available as soon as a reply is sent.
127 */
128 for (i = 0; i < (reqs << 1); i++) {
129 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
130 if (!rqst) {
131 pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
132 __func__);
133 goto out_free;
134 }
135
136 rqst->rq_xprt = &r_xprt->rx_xprt;
137 INIT_LIST_HEAD(&rqst->rq_list);
138 INIT_LIST_HEAD(&rqst->rq_bc_list);
139
140 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
141 goto out_free;
142
143 spin_lock_bh(&xprt->bc_pa_lock);
144 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
145 spin_unlock_bh(&xprt->bc_pa_lock);
146 }
147
148 rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
149 if (rc)
150 goto out_free;
151
152 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
153 if (rc)
154 goto out_free;
155
156 buffer->rb_bc_srv_max_requests = reqs;
157 request_module("svcrdma");
158
159 return 0;
160
161out_free:
162 xprt_rdma_bc_destroy(xprt, reqs);
163
164 pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
165 return -ENOMEM;
166}
167
168/**
169 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
170 * @xprt: transport associated with these backchannel resources
171 * @reqs: number of incoming requests to destroy; ignored
172 */
173void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
174{
175 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
176 struct rpc_rqst *rqst, *tmp;
177
178 spin_lock_bh(&xprt->bc_pa_lock);
179 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
180 list_del(&rqst->rq_bc_pa_list);
181 spin_unlock_bh(&xprt->bc_pa_lock);
182
183 rpcrdma_bc_free_rqst(r_xprt, rqst);
184
185 spin_lock_bh(&xprt->bc_pa_lock);
186 }
187 spin_unlock_bh(&xprt->bc_pa_lock);
188}
189
190/**
191 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
192 * @rqst: request to release
193 */
194void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
195{
196 struct rpc_xprt *xprt = rqst->rq_xprt;
197
198 smp_mb__before_atomic();
199 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
200 clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
201 smp_mb__after_atomic();
202
203 spin_lock_bh(&xprt->bc_pa_lock);
204 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
205 spin_unlock_bh(&xprt->bc_pa_lock);
206}