CIFS: SMBD: Implement RDMA memory registration

Memory registration is used for transferring payload via RDMA read or write.
After I/O is done, memory registrations are recovered and reused. This
process can be time consuming and is done in a work queue.

Signed-off-by: Long Li <longli@microsoft.com>
Reviewed-by: Pavel Shilovsky <pshilov@microsoft.com>
Reviewed-by: Ronnie Sahlberg <lsahlber@redhat.com>
Signed-off-by: Steve French <smfrench@gmail.com>
diff --git a/fs/cifs/smbdirect.c b/fs/cifs/smbdirect.c
index 3351873..731577d 100644
--- a/fs/cifs/smbdirect.c
+++ b/fs/cifs/smbdirect.c
@@ -48,6 +48,9 @@ static int smbd_post_send_page(struct smbd_connection *info,
 		struct page *page, unsigned long offset,
 		size_t size, int remaining_data_length);
 
+static void destroy_mr_list(struct smbd_connection *info);
+static int allocate_mr_list(struct smbd_connection *info);
+
 /* SMBD version number */
 #define SMBD_V1	0x0100
 
@@ -198,6 +201,12 @@ static void smbd_destroy_rdma_work(struct work_struct *work)
 	wait_event(info->wait_send_payload_pending,
 		atomic_read(&info->send_payload_pending) == 0);
 
+	log_rdma_event(INFO, "freeing mr list\n");
+	wake_up_interruptible_all(&info->wait_mr);
+	wait_event(info->wait_for_mr_cleanup,
+		atomic_read(&info->mr_used_count) == 0);
+	destroy_mr_list(info);
+
 	/* It's not posssible for upper layer to get to reassembly */
 	log_rdma_event(INFO, "drain the reassembly queue\n");
 	do {
@@ -453,6 +462,16 @@ static bool process_negotiation_response(
 	}
 	info->max_fragmented_send_size =
 		le32_to_cpu(packet->max_fragmented_size);
+	info->rdma_readwrite_threshold =
+		rdma_readwrite_threshold > info->max_fragmented_send_size ?
+		info->max_fragmented_send_size :
+		rdma_readwrite_threshold;
+
+
+	info->max_readwrite_size = min_t(u32,
+			le32_to_cpu(packet->max_readwrite_size),
+			info->max_frmr_depth * PAGE_SIZE);
+	info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 
 	return true;
 }
@@ -748,6 +767,12 @@ static int smbd_ia_open(
 		rc = -EPROTONOSUPPORT;
 		goto out2;
 	}
+	info->max_frmr_depth = min_t(int,
+		smbd_max_frmr_depth,
+		info->id->device->attrs.max_fast_reg_page_list_len);
+	info->mr_type = IB_MR_TYPE_MEM_REG;
+	if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
+		info->mr_type = IB_MR_TYPE_SG_GAPS;
 
 	info->pd = ib_alloc_pd(info->id->device, 0);
 	if (IS_ERR(info->pd)) {
@@ -1582,6 +1607,8 @@ struct smbd_connection *_smbd_get_connection(
 	struct rdma_conn_param conn_param;
 	struct ib_qp_init_attr qp_attr;
 	struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
+	struct ib_port_immutable port_immutable;
+	u32 ird_ord_hdr[2];
 
 	info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
 	if (!info)
@@ -1670,6 +1697,28 @@ struct smbd_connection *_smbd_get_connection(
 	memset(&conn_param, 0, sizeof(conn_param));
 	conn_param.initiator_depth = 0;
 
+	conn_param.responder_resources =
+		info->id->device->attrs.max_qp_rd_atom
+			< SMBD_CM_RESPONDER_RESOURCES ?
+		info->id->device->attrs.max_qp_rd_atom :
+		SMBD_CM_RESPONDER_RESOURCES;
+	info->responder_resources = conn_param.responder_resources;
+	log_rdma_mr(INFO, "responder_resources=%d\n",
+		info->responder_resources);
+
+	/* Need to send IRD/ORD in private data for iWARP */
+	info->id->device->get_port_immutable(
+		info->id->device, info->id->port_num, &port_immutable);
+	if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
+		ird_ord_hdr[0] = info->responder_resources;
+		ird_ord_hdr[1] = 1;
+		conn_param.private_data = ird_ord_hdr;
+		conn_param.private_data_len = sizeof(ird_ord_hdr);
+	} else {
+		conn_param.private_data = NULL;
+		conn_param.private_data_len = 0;
+	}
+
 	conn_param.retry_count = SMBD_CM_RETRY;
 	conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
 	conn_param.flow_control = 0;
@@ -1734,8 +1783,19 @@ struct smbd_connection *_smbd_get_connection(
 		goto negotiation_failed;
 	}
 
+	rc = allocate_mr_list(info);
+	if (rc) {
+		log_rdma_mr(ERR, "memory registration allocation failed\n");
+		goto allocate_mr_failed;
+	}
+
 	return info;
 
+allocate_mr_failed:
+	/* At this point, need to a full transport shutdown */
+	smbd_destroy(info);
+	return NULL;
+
 negotiation_failed:
 	cancel_delayed_work_sync(&info->idle_timer_work);
 	destroy_caches_and_workqueue(info);
@@ -2189,3 +2249,364 @@ int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst)
 
 	return rc;
 }
+
+static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct smbd_mr *mr;
+	struct ib_cqe *cqe;
+
+	if (wc->status) {
+		log_rdma_mr(ERR, "status=%d\n", wc->status);
+		cqe = wc->wr_cqe;
+		mr = container_of(cqe, struct smbd_mr, cqe);
+		smbd_disconnect_rdma_connection(mr->conn);
+	}
+}
+
+/*
+ * The work queue function that recovers MRs
+ * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
+ * again. Both calls are slow, so finish them in a workqueue. This will not
+ * block I/O path.
+ * There is one workqueue that recovers MRs, there is no need to lock as the
+ * I/O requests calling smbd_register_mr will never update the links in the
+ * mr_list.
+ */
+static void smbd_mr_recovery_work(struct work_struct *work)
+{
+	struct smbd_connection *info =
+		container_of(work, struct smbd_connection, mr_recovery_work);
+	struct smbd_mr *smbdirect_mr;
+	int rc;
+
+	list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
+		if (smbdirect_mr->state == MR_INVALIDATED ||
+			smbdirect_mr->state == MR_ERROR) {
+
+			if (smbdirect_mr->state == MR_INVALIDATED) {
+				ib_dma_unmap_sg(
+					info->id->device, smbdirect_mr->sgl,
+					smbdirect_mr->sgl_count,
+					smbdirect_mr->dir);
+				smbdirect_mr->state = MR_READY;
+			} else if (smbdirect_mr->state == MR_ERROR) {
+
+				/* recover this MR entry */
+				rc = ib_dereg_mr(smbdirect_mr->mr);
+				if (rc) {
+					log_rdma_mr(ERR,
+						"ib_dereg_mr faield rc=%x\n",
+						rc);
+					smbd_disconnect_rdma_connection(info);
+				}
+
+				smbdirect_mr->mr = ib_alloc_mr(
+					info->pd, info->mr_type,
+					info->max_frmr_depth);
+				if (IS_ERR(smbdirect_mr->mr)) {
+					log_rdma_mr(ERR,
+						"ib_alloc_mr failed mr_type=%x "
+						"max_frmr_depth=%x\n",
+						info->mr_type,
+						info->max_frmr_depth);
+					smbd_disconnect_rdma_connection(info);
+				}
+
+				smbdirect_mr->state = MR_READY;
+			}
+			/* smbdirect_mr->state is updated by this function
+			 * and is read and updated by I/O issuing CPUs trying
+			 * to get a MR, the call to atomic_inc_return
+			 * implicates a memory barrier and guarantees this
+			 * value is updated before waking up any calls to
+			 * get_mr() from the I/O issuing CPUs
+			 */
+			if (atomic_inc_return(&info->mr_ready_count) == 1)
+				wake_up_interruptible(&info->wait_mr);
+		}
+	}
+}
+
+static void destroy_mr_list(struct smbd_connection *info)
+{
+	struct smbd_mr *mr, *tmp;
+
+	cancel_work_sync(&info->mr_recovery_work);
+	list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
+		if (mr->state == MR_INVALIDATED)
+			ib_dma_unmap_sg(info->id->device, mr->sgl,
+				mr->sgl_count, mr->dir);
+		ib_dereg_mr(mr->mr);
+		kfree(mr->sgl);
+		kfree(mr);
+	}
+}
+
+/*
+ * Allocate MRs used for RDMA read/write
+ * The number of MRs will not exceed hardware capability in responder_resources
+ * All MRs are kept in mr_list. The MR can be recovered after it's used
+ * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
+ * as MRs are used and recovered for I/O, but the list links will not change
+ */
+static int allocate_mr_list(struct smbd_connection *info)
+{
+	int i;
+	struct smbd_mr *smbdirect_mr, *tmp;
+
+	INIT_LIST_HEAD(&info->mr_list);
+	init_waitqueue_head(&info->wait_mr);
+	spin_lock_init(&info->mr_list_lock);
+	atomic_set(&info->mr_ready_count, 0);
+	atomic_set(&info->mr_used_count, 0);
+	init_waitqueue_head(&info->wait_for_mr_cleanup);
+	/* Allocate more MRs (2x) than hardware responder_resources */
+	for (i = 0; i < info->responder_resources * 2; i++) {
+		smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
+		if (!smbdirect_mr)
+			goto out;
+		smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
+					info->max_frmr_depth);
+		if (IS_ERR(smbdirect_mr->mr)) {
+			log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
+				"max_frmr_depth=%x\n",
+				info->mr_type, info->max_frmr_depth);
+			goto out;
+		}
+		smbdirect_mr->sgl = kcalloc(
+					info->max_frmr_depth,
+					sizeof(struct scatterlist),
+					GFP_KERNEL);
+		if (!smbdirect_mr->sgl) {
+			log_rdma_mr(ERR, "failed to allocate sgl\n");
+			ib_dereg_mr(smbdirect_mr->mr);
+			goto out;
+		}
+		smbdirect_mr->state = MR_READY;
+		smbdirect_mr->conn = info;
+
+		list_add_tail(&smbdirect_mr->list, &info->mr_list);
+		atomic_inc(&info->mr_ready_count);
+	}
+	INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
+	return 0;
+
+out:
+	kfree(smbdirect_mr);
+
+	list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
+		ib_dereg_mr(smbdirect_mr->mr);
+		kfree(smbdirect_mr->sgl);
+		kfree(smbdirect_mr);
+	}
+	return -ENOMEM;
+}
+
+/*
+ * Get a MR from mr_list. This function waits until there is at least one
+ * MR available in the list. It may access the list while the
+ * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
+ * as they never modify the same places. However, there may be several CPUs
+ * issueing I/O trying to get MR at the same time, mr_list_lock is used to
+ * protect this situation.
+ */
+static struct smbd_mr *get_mr(struct smbd_connection *info)
+{
+	struct smbd_mr *ret;
+	int rc;
+again:
+	rc = wait_event_interruptible(info->wait_mr,
+		atomic_read(&info->mr_ready_count) ||
+		info->transport_status != SMBD_CONNECTED);
+	if (rc) {
+		log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
+		return NULL;
+	}
+
+	if (info->transport_status != SMBD_CONNECTED) {
+		log_rdma_mr(ERR, "info->transport_status=%x\n",
+			info->transport_status);
+		return NULL;
+	}
+
+	spin_lock(&info->mr_list_lock);
+	list_for_each_entry(ret, &info->mr_list, list) {
+		if (ret->state == MR_READY) {
+			ret->state = MR_REGISTERED;
+			spin_unlock(&info->mr_list_lock);
+			atomic_dec(&info->mr_ready_count);
+			atomic_inc(&info->mr_used_count);
+			return ret;
+		}
+	}
+
+	spin_unlock(&info->mr_list_lock);
+	/*
+	 * It is possible that we could fail to get MR because other processes may
+	 * try to acquire a MR at the same time. If this is the case, retry it.
+	 */
+	goto again;
+}
+
+/*
+ * Register memory for RDMA read/write
+ * pages[]: the list of pages to register memory with
+ * num_pages: the number of pages to register
+ * tailsz: if non-zero, the bytes to register in the last page
+ * writing: true if this is a RDMA write (SMB read), false for RDMA read
+ * need_invalidate: true if this MR needs to be locally invalidated after I/O
+ * return value: the MR registered, NULL if failed.
+ */
+struct smbd_mr *smbd_register_mr(
+	struct smbd_connection *info, struct page *pages[], int num_pages,
+	int tailsz, bool writing, bool need_invalidate)
+{
+	struct smbd_mr *smbdirect_mr;
+	int rc, i;
+	enum dma_data_direction dir;
+	struct ib_reg_wr *reg_wr;
+	struct ib_send_wr *bad_wr;
+
+	if (num_pages > info->max_frmr_depth) {
+		log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
+			num_pages, info->max_frmr_depth);
+		return NULL;
+	}
+
+	smbdirect_mr = get_mr(info);
+	if (!smbdirect_mr) {
+		log_rdma_mr(ERR, "get_mr returning NULL\n");
+		return NULL;
+	}
+	smbdirect_mr->need_invalidate = need_invalidate;
+	smbdirect_mr->sgl_count = num_pages;
+	sg_init_table(smbdirect_mr->sgl, num_pages);
+
+	for (i = 0; i < num_pages - 1; i++)
+		sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
+
+	sg_set_page(&smbdirect_mr->sgl[i], pages[i],
+		tailsz ? tailsz : PAGE_SIZE, 0);
+
+	dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
+	smbdirect_mr->dir = dir;
+	rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
+	if (!rc) {
+		log_rdma_mr(INFO, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
+			num_pages, dir, rc);
+		goto dma_map_error;
+	}
+
+	rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
+		NULL, PAGE_SIZE);
+	if (rc != num_pages) {
+		log_rdma_mr(INFO,
+			"ib_map_mr_sg failed rc = %x num_pages = %x\n",
+			rc, num_pages);
+		goto map_mr_error;
+	}
+
+	ib_update_fast_reg_key(smbdirect_mr->mr,
+		ib_inc_rkey(smbdirect_mr->mr->rkey));
+	reg_wr = &smbdirect_mr->wr;
+	reg_wr->wr.opcode = IB_WR_REG_MR;
+	smbdirect_mr->cqe.done = register_mr_done;
+	reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
+	reg_wr->wr.num_sge = 0;
+	reg_wr->wr.send_flags = IB_SEND_SIGNALED;
+	reg_wr->mr = smbdirect_mr->mr;
+	reg_wr->key = smbdirect_mr->mr->rkey;
+	reg_wr->access = writing ?
+			IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
+			IB_ACCESS_REMOTE_READ;
+
+	/*
+	 * There is no need for waiting for complemtion on ib_post_send
+	 * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
+	 * on the next ib_post_send when we actaully send I/O to remote peer
+	 */
+	rc = ib_post_send(info->id->qp, &reg_wr->wr, &bad_wr);
+	if (!rc)
+		return smbdirect_mr;
+
+	log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
+		rc, reg_wr->key);
+
+	/* If all failed, attempt to recover this MR by setting it MR_ERROR*/
+map_mr_error:
+	ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
+		smbdirect_mr->sgl_count, smbdirect_mr->dir);
+
+dma_map_error:
+	smbdirect_mr->state = MR_ERROR;
+	if (atomic_dec_and_test(&info->mr_used_count))
+		wake_up(&info->wait_for_mr_cleanup);
+
+	return NULL;
+}
+
+static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct smbd_mr *smbdirect_mr;
+	struct ib_cqe *cqe;
+
+	cqe = wc->wr_cqe;
+	smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
+	smbdirect_mr->state = MR_INVALIDATED;
+	if (wc->status != IB_WC_SUCCESS) {
+		log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
+		smbdirect_mr->state = MR_ERROR;
+	}
+	complete(&smbdirect_mr->invalidate_done);
+}
+
+/*
+ * Deregister a MR after I/O is done
+ * This function may wait if remote invalidation is not used
+ * and we have to locally invalidate the buffer to prevent data is being
+ * modified by remote peer after upper layer consumes it
+ */
+int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
+{
+	struct ib_send_wr *wr, *bad_wr;
+	struct smbd_connection *info = smbdirect_mr->conn;
+	int rc = 0;
+
+	if (smbdirect_mr->need_invalidate) {
+		/* Need to finish local invalidation before returning */
+		wr = &smbdirect_mr->inv_wr;
+		wr->opcode = IB_WR_LOCAL_INV;
+		smbdirect_mr->cqe.done = local_inv_done;
+		wr->wr_cqe = &smbdirect_mr->cqe;
+		wr->num_sge = 0;
+		wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
+		wr->send_flags = IB_SEND_SIGNALED;
+
+		init_completion(&smbdirect_mr->invalidate_done);
+		rc = ib_post_send(info->id->qp, wr, &bad_wr);
+		if (rc) {
+			log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
+			smbd_disconnect_rdma_connection(info);
+			goto done;
+		}
+		wait_for_completion(&smbdirect_mr->invalidate_done);
+		smbdirect_mr->need_invalidate = false;
+	} else
+		/*
+		 * For remote invalidation, just set it to MR_INVALIDATED
+		 * and defer to mr_recovery_work to recover the MR for next use
+		 */
+		smbdirect_mr->state = MR_INVALIDATED;
+
+	/*
+	 * Schedule the work to do MR recovery for future I/Os
+	 * MR recovery is slow and we don't want it to block the current I/O
+	 */
+	queue_work(info->workqueue, &info->mr_recovery_work);
+
+done:
+	if (atomic_dec_and_test(&info->mr_used_count))
+		wake_up(&info->wait_for_mr_cleanup);
+
+	return rc;
+}