libceph: separate msgr1 protocol implementation

In preparation for msgr2, define internal messenger <-> protocol
interface (as opposed to external messenger <-> client interface, which
is struct ceph_connection_operations) consisting of try_read(),
try_write(), revoke(), revoke_incoming(), opened(), reset_session() and
reset_protocol() ops.  The semantics are exactly the same as they are
now.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 85d2037..4ca7d9b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -593,6 +593,11 @@ int ceph_con_close_socket(struct ceph_connection *con)
 	return rc;
 }
 
+void ceph_con_v1_reset_protocol(struct ceph_connection *con)
+{
+	con->out_skip = 0;
+}
+
 static void ceph_con_reset_protocol(struct ceph_connection *con)
 {
 	dout("%s con %p\n", __func__, con);
@@ -609,7 +614,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con)
 		con->out_msg = NULL;
 	}
 
-	con->out_skip = 0;
+	ceph_con_v1_reset_protocol(con);
 }
 
 /*
@@ -631,6 +636,12 @@ static void ceph_msg_remove_list(struct list_head *head)
 	}
 }
 
+void ceph_con_v1_reset_session(struct ceph_connection *con)
+{
+	con->connect_seq = 0;
+	con->peer_global_seq = 0;
+}
+
 void ceph_con_reset_session(struct ceph_connection *con)
 {
 	dout("%s con %p\n", __func__, con);
@@ -643,8 +654,7 @@ void ceph_con_reset_session(struct ceph_connection *con)
 	con->in_seq = 0;
 	con->in_seq_acked = 0;
 
-	con->connect_seq = 0;
-	con->peer_global_seq = 0;
+	ceph_con_v1_reset_session(con);
 }
 
 /*
@@ -692,12 +702,17 @@ void ceph_con_open(struct ceph_connection *con,
 }
 EXPORT_SYMBOL(ceph_con_open);
 
+bool ceph_con_v1_opened(struct ceph_connection *con)
+{
+	return con->connect_seq;
+}
+
 /*
  * return true if this connection ever successfully opened
  */
 bool ceph_con_opened(struct ceph_connection *con)
 {
-	return con->connect_seq > 0;
+	return ceph_con_v1_opened(con);
 }
 
 /*
@@ -2552,7 +2567,7 @@ static int read_keepalive_ack(struct ceph_connection *con)
  * Write something to the socket.  Called in a worker thread when the
  * socket appears to be writeable and we have something ready to send.
  */
-static int try_write(struct ceph_connection *con)
+int ceph_con_v1_try_write(struct ceph_connection *con)
 {
 	int ret = 1;
 
@@ -2649,7 +2664,7 @@ static int try_write(struct ceph_connection *con)
 /*
  * Read what we can from the socket.
  */
-static int try_read(struct ceph_connection *con)
+int ceph_con_v1_try_read(struct ceph_connection *con)
 {
 	int ret = -1;
 
@@ -2930,7 +2945,7 @@ static void ceph_con_workfn(struct work_struct *work)
 			BUG_ON(con->sock);
 		}
 
-		ret = try_read(con);
+		ret = ceph_con_v1_try_read(con);
 		if (ret < 0) {
 			if (ret == -EAGAIN)
 				continue;
@@ -2940,7 +2955,7 @@ static void ceph_con_workfn(struct work_struct *work)
 			break;
 		}
 
-		ret = try_write(con);
+		ret = ceph_con_v1_try_write(con);
 		if (ret < 0) {
 			if (ret == -EAGAIN)
 				continue;
@@ -3116,6 +3131,29 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 }
 EXPORT_SYMBOL(ceph_con_send);
 
+void ceph_con_v1_revoke(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->out_msg;
+
+	WARN_ON(con->out_skip);
+	/* footer */
+	if (con->out_msg_done) {
+		con->out_skip += con_out_kvec_skip(con);
+	} else {
+		WARN_ON(!msg->data_length);
+		con->out_skip += sizeof_footer(con);
+	}
+	/* data, middle, front */
+	if (msg->data_length)
+		con->out_skip += msg->cursor.total_resid;
+	if (msg->middle)
+		con->out_skip += con_out_kvec_skip(con);
+	con->out_skip += con_out_kvec_skip(con);
+
+	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
+	     con->out_kvec_bytes, con->out_skip);
+}
+
 /*
  * Revoke a message that was previously queued for send
  */
@@ -3129,39 +3167,50 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 	}
 
 	mutex_lock(&con->mutex);
-	if (!list_empty(&msg->list_head)) {
-		dout("%s %p msg %p - was on queue\n", __func__, con, msg);
-		list_del_init(&msg->list_head);
-		msg->hdr.seq = 0;
-
-		ceph_msg_put(msg);
+	if (list_empty(&msg->list_head)) {
+		WARN_ON(con->out_msg == msg);
+		dout("%s con %p msg %p not linked\n", __func__, con, msg);
+		mutex_unlock(&con->mutex);
+		return;
 	}
+
+	dout("%s con %p msg %p was linked\n", __func__, con, msg);
+	msg->hdr.seq = 0;
+	ceph_msg_remove(msg);
+
 	if (con->out_msg == msg) {
-		BUG_ON(con->out_skip);
-		/* footer */
-		if (con->out_msg_done) {
-			con->out_skip += con_out_kvec_skip(con);
-		} else {
-			BUG_ON(!msg->data_length);
-			con->out_skip += sizeof_footer(con);
-		}
-		/* data, middle, front */
-		if (msg->data_length)
-			con->out_skip += msg->cursor.total_resid;
-		if (msg->middle)
-			con->out_skip += con_out_kvec_skip(con);
-		con->out_skip += con_out_kvec_skip(con);
-
-		dout("%s %p msg %p - was sending, will write %d skip %d\n",
-		     __func__, con, msg, con->out_kvec_bytes, con->out_skip);
-		msg->hdr.seq = 0;
+		WARN_ON(con->state != CEPH_CON_S_OPEN);
+		dout("%s con %p msg %p was sending\n", __func__, con, msg);
+		ceph_con_v1_revoke(con);
+		ceph_msg_put(con->out_msg);
 		con->out_msg = NULL;
-		ceph_msg_put(msg);
+	} else {
+		dout("%s con %p msg %p not current, out_msg %p\n", __func__,
+		     con, msg, con->out_msg);
 	}
-
 	mutex_unlock(&con->mutex);
 }
 
+void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
+{
+	unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+	unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+	unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
+
+	/* skip rest of message */
+	con->in_base_pos = con->in_base_pos -
+			sizeof(struct ceph_msg_header) -
+			front_len -
+			middle_len -
+			data_len -
+			sizeof(struct ceph_msg_footer);
+
+	con->in_tag = CEPH_MSGR_TAG_READY;
+	con->in_seq++;
+
+	dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
+}
+
 /*
  * Revoke a message that we may be reading data into
  */
@@ -3176,25 +3225,14 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 
 	mutex_lock(&con->mutex);
 	if (con->in_msg == msg) {
-		unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
-		unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
-		unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
-
-		/* skip rest of message */
-		dout("%s %p msg %p revoked\n", __func__, con, msg);
-		con->in_base_pos = con->in_base_pos -
-				sizeof(struct ceph_msg_header) -
-				front_len -
-				middle_len -
-				data_len -
-				sizeof(struct ceph_msg_footer);
+		WARN_ON(con->state != CEPH_CON_S_OPEN);
+		dout("%s con %p msg %p was recving\n", __func__, con, msg);
+		ceph_con_v1_revoke_incoming(con);
 		ceph_msg_put(con->in_msg);
 		con->in_msg = NULL;
-		con->in_tag = CEPH_MSGR_TAG_READY;
-		con->in_seq++;
 	} else {
-		dout("%s %p in_msg %p msg %p no-op\n",
-		     __func__, con, con->in_msg, msg);
+		dout("%s con %p msg %p not current, in_msg %p\n", __func__,
+		     con, msg, con->in_msg);
 	}
 	mutex_unlock(&con->mutex);
 }