tipc: eliminate struct tipc_subscriber
It is unnecessary to keep two structures, struct tipc_conn and struct
tipc_subscriber, with a one-to-one relationship and still with different
life cycles. The fact that the two often run in different contexts, and
still may access each other via direct pointers constitutes an additional
hazard, something we have experienced at several occasions, and still
see happening.
We have identified at least two remaining problems that are easier to
fix if we simplify the topology server data structure somewhat.
- When there is a race between a subscription up/down event and a
timeout event, it is fully possible that the former might be delivered
after the latter, leading to confusion for the receiver.
- The function tipc_subcrp_timeout() is executing in interrupt context,
while the following call chain is at least theoretically possible:
tipc_subscrp_timeout()
tipc_subscrp_send_event()
tipc_conn_sendmsg()
conn_put()
tipc_conn_kref_release()
sock_release(sock)
I.e., we end up calling a function that might try to sleep in
interrupt context. To eliminate this, we need to ensure that the
tipc_conn structure and the socket, as well as the subscription
instances, only are deleted in work queue context, i.e., after the
timeout event really has been sent out.
We now remove this unnecessary complexity, by merging data and
functionality of the subscriber structure into struct tipc_conn
and the associated file server.c. We thereafter add a spinlock and
a new 'inactive' state to the subscription structure. Using those,
both problems described above can be easily solved.
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index b86fbbf..c6de145 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -1,7 +1,7 @@
/*
* net/tipc/subscr.c: TIPC network topology service
*
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2017, Ericsson AB
* Copyright (c) 2005-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
@@ -39,22 +39,6 @@
#include "subscr.h"
/**
- * struct tipc_subscriber - TIPC network topology subscriber
- * @kref: reference counter to tipc_subscription object
- * @conid: connection identifier to server connecting to subscriber
- * @lock: control access to subscriber
- * @subscrp_list: list of subscription objects for this subscriber
- */
-struct tipc_subscriber {
- struct kref kref;
- int conid;
- spinlock_t lock;
- struct list_head subscrp_list;
-};
-
-static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
-
-/**
* htohl - convert value to endianness used by destination
* @in: value to convert
* @swap: non-zero if endianness must be reversed
@@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 event, u32 port_ref, u32 node)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct tipc_subscriber *subscriber = sub->subscriber;
struct kvec msg_sect;
+ if (sub->inactive)
+ return;
msg_sect.iov_base = (void *)&sub->evt;
msg_sect.iov_len = sizeof(struct tipc_event);
sub->evt.event = htohl(event, sub->swap);
@@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap);
- tipc_conn_sendmsg(tn->topsrv, subscriber->conid,
+ tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
msg_sect.iov_base, msg_sect.iov_len);
}
@@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
return;
if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
return;
-
- tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
- node);
+ spin_lock(&sub->lock);
+ tipc_subscrp_send_event(sub, found_lower, found_upper,
+ event, port_ref, node);
+ spin_unlock(&sub->lock);
}
static void tipc_subscrp_timeout(struct timer_list *t)
{
struct tipc_subscription *sub = from_timer(sub, t, timer);
- struct tipc_subscriber *subscriber = sub->subscriber;
+ struct tipc_subscr *s = &sub->evt.s;
- spin_lock_bh(&subscriber->lock);
- tipc_nametbl_unsubscribe(sub);
- list_del(&sub->subscrp_list);
- spin_unlock_bh(&subscriber->lock);
-
- /* Notify subscriber of timeout */
- tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
+ spin_lock(&sub->lock);
+ tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
-
- tipc_subscrp_put(sub);
-}
-
-static void tipc_subscrb_kref_release(struct kref *kref)
-{
- kfree(container_of(kref,struct tipc_subscriber, kref));
-}
-
-static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
-{
- kref_put(&subscriber->kref, tipc_subscrb_kref_release);
-}
-
-static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
-{
- kref_get(&subscriber->kref);
+ sub->inactive = true;
+ spin_unlock(&sub->lock);
}
static void tipc_subscrp_kref_release(struct kref *kref)
@@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref)
struct tipc_subscription,
kref);
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct tipc_subscriber *subscriber = sub->subscriber;
atomic_dec(&tn->subscription_count);
kfree(sub);
- tipc_subscrb_put(subscriber);
}
void tipc_subscrp_put(struct tipc_subscription *subscription)
@@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
kref_get(&subscription->kref);
}
-/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
- * subscriptions for a given subscriber.
- */
-static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
- struct tipc_subscr *s)
-{
- struct list_head *subscription_list = &subscriber->subscrp_list;
- struct tipc_subscription *sub, *temp;
- u32 timeout;
-
- spin_lock_bh(&subscriber->lock);
- list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
- if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
- continue;
-
- timeout = htohl(sub->evt.s.timeout, sub->swap);
- if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
- tipc_nametbl_unsubscribe(sub);
- list_del(&sub->subscrp_list);
- tipc_subscrp_put(sub);
- }
-
- if (s)
- break;
- }
- spin_unlock_bh(&subscriber->lock);
-}
-
-struct tipc_subscriber *tipc_subscrb_create(int conid)
-{
- struct tipc_subscriber *subscriber;
-
- subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
- if (!subscriber) {
- pr_warn("Subscriber rejected, no memory\n");
- return NULL;
- }
- INIT_LIST_HEAD(&subscriber->subscrp_list);
- kref_init(&subscriber->kref);
- subscriber->conid = conid;
- spin_lock_init(&subscriber->lock);
-
- return subscriber;
-}
-
-void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
-{
- tipc_subscrb_subscrp_delete(subscriber, NULL);
- tipc_subscrb_put(subscriber);
-}
-
-static void tipc_subscrp_cancel(struct tipc_subscr *s,
- struct tipc_subscriber *subscriber)
-{
- tipc_subscrb_get(subscriber);
- tipc_subscrb_subscrp_delete(subscriber, s);
- tipc_subscrb_put(subscriber);
-}
-
static struct tipc_subscription *tipc_subscrp_create(struct net *net,
struct tipc_subscr *s,
- int swap)
+ int conid, bool swap)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_subscription *sub;
@@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
/* Initialize subscription object */
sub->net = net;
+ sub->conid = conid;
+ sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
(htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
pr_warn("Subscription rejected, illegal request\n");
@@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(*s));
+ spin_lock_init(&sub->lock);
atomic_inc(&tn->subscription_count);
kref_init(&sub->kref);
return sub;
}
-static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
- struct tipc_subscriber *subscriber, int swap,
- bool status)
+struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+ struct tipc_subscr *s,
+ int conid, bool swap,
+ bool status)
{
struct tipc_subscription *sub = NULL;
u32 timeout;
- sub = tipc_subscrp_create(net, s, swap);
+ sub = tipc_subscrp_create(net, s, conid, swap);
if (!sub)
- return -1;
+ return NULL;
- spin_lock_bh(&subscriber->lock);
- list_add(&sub->subscrp_list, &subscriber->subscrp_list);
- sub->subscriber = subscriber;
tipc_nametbl_subscribe(sub, status);
- tipc_subscrb_get(subscriber);
- spin_unlock_bh(&subscriber->lock);
-
timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
timeout = htohl(sub->evt.s.timeout, swap);
-
if (timeout != TIPC_WAIT_FOREVER)
mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
- return 0;
+ return sub;
}
-/* Handle one request to create a new subscription for the subscriber
- */
-int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
- void *buf, size_t len)
+void tipc_sub_delete(struct tipc_subscription *sub)
{
- struct tipc_subscriber *subscriber = usr_data;
- struct tipc_subscr *s = (struct tipc_subscr *)buf;
- bool status;
- int swap;
-
- /* Determine subscriber's endianness */
- swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
- TIPC_SUB_CANCEL));
-
- /* Detect & process a subscription cancellation request */
- if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
- s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
- tipc_subscrp_cancel(s, subscriber);
- return 0;
- }
- status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
- return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
+ tipc_nametbl_unsubscribe(sub);
+ if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
+ del_timer_sync(&sub->timer);
+ list_del(&sub->subscrp_list);
+ tipc_subscrp_put(sub);
}
int tipc_topsrv_start(struct net *net)