tipc: message reassembly using fragment chain

When the first fragment of a long data data message is received on a link, a
reassembly buffer large enough to hold the data from this and all subsequent
fragments of the message is allocated. The payload of each new fragment is
copied into this buffer upon arrival. When the last fragment is received, the
reassembled message is delivered upwards to the port/socket layer.

Not only is this an inefficient approach, but it may also cause bursts of
reassembly failures in low memory situations. since we may fail to allocate
the necessary large buffer in the first place. Furthermore, after 100 subsequent
such failures the link will be reset, something that in reality aggravates the
situation.

To remedy this problem, this patch introduces a different approach. Instead of
allocating a big reassembly buffer, we now append the arriving fragments
to a reassembly chain on the link, and deliver the whole chain up to the
socket layer once the last fragment has been received. This is safe because
the retransmission layer of a TIPC link always delivers packets in strict
uninterrupted order, to the reassembly layer as to all other upper layers.
Hence there can never be more than one fragment chain pending reassembly at
any given time in a link, and we can trust (but still verify) that the
fragments will be chained up in the correct order.

Signed-off-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Paul Gortmaker <paul.gortmaker@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 766a6eb..0d44025 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -480,15 +480,19 @@
 			tipc_node_unlock(node);
 			tipc_link_recv_bundle(buf);
 		} else if (msg_user(msg) == MSG_FRAGMENTER) {
-			int ret = tipc_link_recv_fragment(&node->bclink.defragm,
-						      &buf, &msg);
-			if (ret < 0)
+			int ret;
+			ret = tipc_link_recv_fragment(&node->bclink.reasm_head,
+						      &node->bclink.reasm_tail,
+						      &buf);
+			if (ret == LINK_REASM_ERROR)
 				goto unlock;
 			spin_lock_bh(&bc_lock);
 			bclink_accept_pkt(node, seqno);
 			bcl->stats.recv_fragments++;
-			if (ret > 0) {
+			if (ret == LINK_REASM_COMPLETE) {
 				bcl->stats.recv_fragmented++;
+				/* Point msg to inner header */
+				msg = buf_msg(buf);
 				spin_unlock_bh(&bc_lock);
 				goto receive;
 			}