RDS: IB: split send completion handling and do batch ack
Similar to what we did with receive CQ completion handling, we split
the transmit completion handler so that it lets us implement batched
work completion handling.
We re-use the cq_poll routine and makes use of RDS_IB_SEND_OP to
identify the send vs receive completion event handler invocation.
Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 727759b..3a8cd31 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -25,6 +25,7 @@
#define RDS_IB_RECYCLE_BATCH_COUNT 32
#define RDS_IB_WC_MAX 32
+#define RDS_IB_SEND_OP BIT_ULL(63)
extern struct rw_semaphore rds_ib_devices_lock;
extern struct list_head rds_ib_devices;
@@ -118,9 +119,11 @@
struct ib_pd *i_pd;
struct ib_cq *i_send_cq;
struct ib_cq *i_recv_cq;
+ struct ib_wc i_send_wc[RDS_IB_WC_MAX];
struct ib_wc i_recv_wc[RDS_IB_WC_MAX];
/* interrupt handling */
+ struct tasklet_struct i_send_tasklet;
struct tasklet_struct i_recv_tasklet;
/* tx */
@@ -217,7 +220,6 @@
struct rds_ib_statistics {
uint64_t s_ib_connect_raced;
uint64_t s_ib_listen_closed_stale;
- uint64_t s_ib_tx_cq_call;
uint64_t s_ib_evt_handler_call;
uint64_t s_ib_tasklet_call;
uint64_t s_ib_tx_cq_event;
@@ -371,7 +373,7 @@
void rds_ib_xmit_complete(struct rds_connection *conn);
int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
void rds_ib_send_init_ring(struct rds_ib_connection *ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 28e0979..8f51d0d 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -250,11 +250,34 @@
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status,
wc->byte_len, be32_to_cpu(wc->ex.imm_data));
- rds_ib_recv_cqe_handler(ic, wc, ack_state);
+
+ if (wc->wr_id & RDS_IB_SEND_OP)
+ rds_ib_send_cqe_handler(ic, wc);
+ else
+ rds_ib_recv_cqe_handler(ic, wc, ack_state);
}
}
}
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+ struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+ struct rds_connection *conn = ic->conn;
+ struct rds_ib_ack_state state;
+
+ rds_ib_stats_inc(s_ib_tasklet_call);
+
+ memset(&state, 0, sizeof(state));
+ poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+ ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+ poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+ if (rds_conn_up(conn) &&
+ (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+ test_bit(0, &conn->c_map_queued)))
+ rds_send_xmit(ic->conn);
+}
+
static void rds_ib_tasklet_fn_recv(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
@@ -304,6 +327,18 @@
}
}
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+ struct rds_connection *conn = context;
+ struct rds_ib_connection *ic = conn->c_transport_data;
+
+ rdsdebug("conn %p cq %p\n", conn, cq);
+
+ rds_ib_stats_inc(s_ib_evt_handler_call);
+
+ tasklet_schedule(&ic->i_send_tasklet);
+}
+
/*
* This needs to be very careful to not leave IS_ERR pointers around for
* cleanup to trip over.
@@ -337,7 +372,8 @@
ic->i_pd = rds_ibdev->pd;
cq_attr.cqe = ic->i_send_ring.w_nr + 1;
- ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+ ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
rds_ib_cq_event_handler, conn,
&cq_attr);
if (IS_ERR(ic->i_send_cq)) {
@@ -703,6 +739,7 @@
wait_event(rds_ib_ring_empty_wait,
rds_ib_ring_empty(&ic->i_recv_ring) &&
(atomic_read(&ic->i_signaled_sends) == 0));
+ tasklet_kill(&ic->i_send_tasklet);
tasklet_kill(&ic->i_recv_tasklet);
/* first destroy the ib state that generates callbacks */
@@ -809,8 +846,10 @@
}
INIT_LIST_HEAD(&ic->ib_node);
+ tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+ (unsigned long)ic);
tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
- (unsigned long) ic);
+ (unsigned long)ic);
mutex_init(&ic->i_recv_mutex);
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4e88047..670882c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -195,7 +195,7 @@
send->s_op = NULL;
- send->s_wr.wr_id = i;
+ send->s_wr.wr_id = i | RDS_IB_SEND_OP;
send->s_wr.sg_list = send->s_sge;
send->s_wr.ex.imm_data = 0;
@@ -237,81 +237,73 @@
* unallocs the next free entry in the ring it doesn't alter which is
* the next to be freed, which is what this is concerned with.
*/
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
{
- struct rds_connection *conn = context;
- struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_message *rm = NULL;
- struct ib_wc wc;
+ struct rds_connection *conn = ic->conn;
struct rds_ib_send_work *send;
u32 completed;
u32 oldest;
u32 i = 0;
- int ret;
int nr_sig = 0;
- rdsdebug("cq %p conn %p\n", cq, conn);
- rds_ib_stats_inc(s_ib_tx_cq_call);
- ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
- if (ret)
- rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
- while (ib_poll_cq(cq, 1, &wc) > 0) {
- rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
- (unsigned long long)wc.wr_id, wc.status,
- ib_wc_status_msg(wc.status), wc.byte_len,
- be32_to_cpu(wc.ex.imm_data));
- rds_ib_stats_inc(s_ib_tx_cq_event);
+ rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+ (unsigned long long)wc->wr_id, wc->status,
+ ib_wc_status_msg(wc->status), wc->byte_len,
+ be32_to_cpu(wc->ex.imm_data));
+ rds_ib_stats_inc(s_ib_tx_cq_event);
- if (wc.wr_id == RDS_IB_ACK_WR_ID) {
- if (time_after(jiffies, ic->i_ack_queued + HZ/2))
- rds_ib_stats_inc(s_ib_tx_stalled);
- rds_ib_ack_send_complete(ic);
- continue;
- }
+ if (wc->wr_id == RDS_IB_ACK_WR_ID) {
+ if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
+ rds_ib_stats_inc(s_ib_tx_stalled);
+ rds_ib_ack_send_complete(ic);
+ return;
+ }
- oldest = rds_ib_ring_oldest(&ic->i_send_ring);
+ oldest = rds_ib_ring_oldest(&ic->i_send_ring);
- completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
+ completed = rds_ib_ring_completed(&ic->i_send_ring,
+ (wc->wr_id & ~RDS_IB_SEND_OP),
+ oldest);
- for (i = 0; i < completed; i++) {
- send = &ic->i_sends[oldest];
- if (send->s_wr.send_flags & IB_SEND_SIGNALED)
- nr_sig++;
+ for (i = 0; i < completed; i++) {
+ send = &ic->i_sends[oldest];
+ if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+ nr_sig++;
- rm = rds_ib_send_unmap_op(ic, send, wc.status);
+ rm = rds_ib_send_unmap_op(ic, send, wc->status);
- if (time_after(jiffies, send->s_queued + HZ/2))
- rds_ib_stats_inc(s_ib_tx_stalled);
+ if (time_after(jiffies, send->s_queued + HZ / 2))
+ rds_ib_stats_inc(s_ib_tx_stalled);
- if (send->s_op) {
- if (send->s_op == rm->m_final_op) {
- /* If anyone waited for this message to get flushed out, wake
- * them up now */
- rds_message_unmapped(rm);
- }
- rds_message_put(rm);
- send->s_op = NULL;
+ if (send->s_op) {
+ if (send->s_op == rm->m_final_op) {
+ /* If anyone waited for this message to get
+ * flushed out, wake them up now
+ */
+ rds_message_unmapped(rm);
}
-
- oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+ rds_message_put(rm);
+ send->s_op = NULL;
}
- rds_ib_ring_free(&ic->i_send_ring, completed);
- rds_ib_sub_signaled(ic, nr_sig);
- nr_sig = 0;
+ oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+ }
- if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
- test_bit(0, &conn->c_map_queued))
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ rds_ib_ring_free(&ic->i_send_ring, completed);
+ rds_ib_sub_signaled(ic, nr_sig);
+ nr_sig = 0;
- /* We expect errors as the qp is drained during shutdown */
- if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
- rds_ib_conn_error(conn, "send completion on %pI4 had status "
- "%u (%s), disconnecting and reconnecting\n",
- &conn->c_faddr, wc.status,
- ib_wc_status_msg(wc.status));
- }
+ if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+ test_bit(0, &conn->c_map_queued))
+ queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+ /* We expect errors as the qp is drained during shutdown */
+ if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
+ rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
+ &conn->c_faddr, wc->status,
+ ib_wc_status_msg(wc->status));
}
}
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index bdf6115..8c8b84f 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -43,7 +43,6 @@
"ib_connect_raced",
"ib_listen_closed_stale",
"s_ib_evt_handler_call",
- "ib_tx_cq_call",
"ib_tasklet_call",
"ib_tx_cq_event",
"ib_tx_ring_full",
diff --git a/net/rds/send.c b/net/rds/send.c
index a081a64..ee49c25 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -432,6 +432,7 @@
out:
return ret;
}
+EXPORT_SYMBOL_GPL(rds_send_xmit);
static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
{