tipc: add ability to order and receive topology events in driver
As preparation for introducing communication groups, we add the ability
to issue topology subscriptions and receive topology events from kernel
space. This will make it possible for group member sockets to keep track
of other group members.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 5cc5398..9643426 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -132,6 +132,11 @@ static inline struct list_head *tipc_nodes(struct net *net)
return &tipc_net(net)->node_list;
}
+static inline struct tipc_server *tipc_topsrv(struct net *net)
+{
+ return tipc_net(net)->topsrv;
+}
+
static inline unsigned int tipc_hashfn(u32 addr)
{
return addr & (NODE_HTABLE_SIZE - 1);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index c843fd2..d058b1c 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -78,6 +78,7 @@ struct plist;
#define MSG_FRAGMENTER 12
#define LINK_CONFIG 13
#define SOCK_WAKEUP 14 /* pseudo user */
+#define TOP_SRV 15 /* pseudo user */
/*
* Message header sizes
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 3cd6402..7130775 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -36,6 +36,8 @@
#include "server.h"
#include "core.h"
#include "socket.h"
+#include "addr.h"
+#include "msg.h"
#include <net/sock.h>
#include <linux/module.h>
@@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref)
kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
sock_release(sock);
con->sock = NULL;
-
- spin_lock_bh(&s->idr_lock);
- idr_remove(&s->conn_idr, con->conid);
- s->idr_in_use--;
- spin_unlock_bh(&s->idr_lock);
}
-
+ spin_lock_bh(&s->idr_lock);
+ idr_remove(&s->conn_idr, con->conid);
+ s->idr_in_use--;
+ spin_unlock_bh(&s->idr_lock);
tipc_clean_outqueues(con);
kfree(con);
}
@@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con)
struct tipc_server *s = con->server;
if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
- tipc_unregister_callbacks(con);
+ if (con->sock)
+ tipc_unregister_callbacks(con);
if (con->conid)
s->tipc_conn_release(con->conid, con->usr_data);
@@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con)
* are harmless for us here as we have already deleted this
* connection from server connection list.
*/
- kernel_sock_shutdown(con->sock, SHUT_RDWR);
-
+ if (con->sock)
+ kernel_sock_shutdown(con->sock, SHUT_RDWR);
conn_put(con);
}
}
@@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
}
}
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
+ u32 lower, u32 upper, int *conid)
+{
+ struct tipc_subscriber *scbr;
+ struct tipc_subscr sub;
+ struct tipc_server *s;
+ struct tipc_conn *con;
+
+ sub.seq.type = type;
+ sub.seq.lower = lower;
+ sub.seq.upper = upper;
+ sub.timeout = TIPC_WAIT_FOREVER;
+ sub.filter = TIPC_SUB_PORTS;
+ *(u32 *)&sub.usr_handle = port;
+
+ con = tipc_alloc_conn(tipc_topsrv(net));
+ if (!con)
+ return false;
+
+ *conid = con->conid;
+ s = con->server;
+ scbr = s->tipc_conn_new(*conid);
+ if (!scbr) {
+ tipc_close_conn(con);
+ return false;
+ }
+
+ con->usr_data = scbr;
+ con->sock = NULL;
+ s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
+ return true;
+}
+
+void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
+{
+ struct tipc_conn *con;
+
+ con = tipc_conn_lookup(tipc_topsrv(net), conid);
+ if (!con)
+ return;
+ tipc_close_conn(con);
+ conn_put(con);
+}
+
+static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
+{
+ u32 port = *(u32 *)&evt->s.usr_handle;
+ u32 self = tipc_own_addr(net);
+ struct sk_buff_head evtq;
+ struct sk_buff *skb;
+
+ skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
+ self, self, port, port, 0);
+ if (!skb)
+ return;
+ msg_set_dest_droppable(buf_msg(skb), true);
+ memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
+ skb_queue_head_init(&evtq);
+ __skb_queue_tail(&evtq, skb);
+ tipc_sk_rcv(net, &evtq);
+}
+
static void tipc_send_to_sock(struct tipc_conn *con)
{
- int count = 0;
struct tipc_server *s = con->server;
struct outqueue_entry *e;
+ struct tipc_event *evt;
struct msghdr msg;
+ int count = 0;
int ret;
spin_lock_bh(&con->outqueue_lock);
while (test_bit(CF_CONNECTED, &con->flags)) {
- e = list_entry(con->outqueue.next, struct outqueue_entry,
- list);
+ e = list_entry(con->outqueue.next, struct outqueue_entry, list);
if ((struct list_head *) e == &con->outqueue)
break;
+
spin_unlock_bh(&con->outqueue_lock);
- memset(&msg, 0, sizeof(msg));
- msg.msg_flags = MSG_DONTWAIT;
-
- if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
- msg.msg_name = &e->dest;
- msg.msg_namelen = sizeof(struct sockaddr_tipc);
+ if (con->sock) {
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT;
+ if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
+ msg.msg_name = &e->dest;
+ msg.msg_namelen = sizeof(struct sockaddr_tipc);
+ }
+ ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
+ e->iov.iov_len);
+ if (ret == -EWOULDBLOCK || ret == 0) {
+ cond_resched();
+ goto out;
+ } else if (ret < 0) {
+ goto send_err;
+ }
+ } else {
+ evt = e->iov.iov_base;
+ tipc_send_kern_top_evt(s->net, evt);
}
- ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
- e->iov.iov_len);
- if (ret == -EWOULDBLOCK || ret == 0) {
- cond_resched();
- goto out;
- } else if (ret < 0) {
- goto send_err;
- }
-
/* Don't starve users filling buffers */
if (++count >= MAX_SEND_MSG_COUNT) {
cond_resched();
diff --git a/net/tipc/server.h b/net/tipc/server.h
index 34f8055..2113c91 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -83,13 +83,16 @@ struct tipc_server {
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
struct sockaddr_tipc *addr, void *data, size_t len);
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
+ u32 lower, u32 upper, int *conid);
+void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
+
/**
* tipc_conn_terminate - terminate connection with server
*
* Note: Must call it in process context since it might sleep
*/
void tipc_conn_terminate(struct tipc_server *s, int conid);
-
int tipc_server_start(struct tipc_server *s);
void tipc_server_stop(struct tipc_server *s);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d50edd6..9a7e7b5c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -896,6 +896,10 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
kfree_skb(skb);
}
+static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
+{
+}
+
/**
* tipc_sendmsg - send message in connectionless manner
* @sock: socket structure
@@ -1671,20 +1675,24 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
struct tipc_msg *hdr = buf_msg(skb);
unsigned int limit = rcvbuf_limit(sk, skb);
int err = TIPC_OK;
- int usr = msg_user(hdr);
- u32 onode;
- if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
- tipc_sk_proto_rcv(tsk, skb, xmitq);
- return false;
- }
-
- if (unlikely(usr == SOCK_WAKEUP)) {
- onode = msg_orignode(hdr);
+ if (unlikely(!msg_isdata(hdr))) {
+ switch (msg_user(hdr)) {
+ case CONN_MANAGER:
+ tipc_sk_proto_rcv(tsk, skb, xmitq);
+ return false;
+ case SOCK_WAKEUP:
+ u32_del(&tsk->cong_links, msg_orignode(hdr));
+ tsk->cong_link_cnt--;
+ sk->sk_write_space(sk);
+ break;
+ case TOP_SRV:
+ tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
+ break;
+ default:
+ break;
+ }
kfree_skb(skb);
- u32_del(&tsk->cong_links, onode);
- tsk->cong_link_cnt--;
- sk->sk_write_space(sk);
return false;
}