blob: f0f228c9589a0f932452d2856e1c938eaaa1413d [file] [log] [blame]
Per Lidenb97bf3f2006-01-02 19:04:38 +01001/*
2 * net/tipc/port.c: TIPC port code
3 *
4 * Copyright (c) 2003-2005, Ericsson Research Canada
5 * Copyright (c) 2004-2005, Wind River Systems
6 * Copyright (c) 2005-2006, Ericsson AB
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 * Redistributions of source code must retain the above copyright notice, this
13 * list of conditions and the following disclaimer.
14 * Redistributions in binary form must reproduce the above copyright notice,
15 * this list of conditions and the following disclaimer in the documentation
16 * and/or other materials provided with the distribution.
17 * Neither the names of the copyright holders nor the names of its
18 * contributors may be used to endorse or promote products derived from this
19 * software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
25 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 * POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "core.h"
35#include "config.h"
36#include "dbg.h"
37#include "port.h"
38#include "addr.h"
39#include "link.h"
40#include "node.h"
41#include "port.h"
42#include "name_table.h"
43#include "user_reg.h"
44#include "msg.h"
45#include "bcast.h"
46
47/* Connection management: */
48#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */
49#define CONFIRMED 0
50#define PROBING 1
51
52#define MAX_REJECT_SIZE 1024
53
54static struct sk_buff *msg_queue_head = 0;
55static struct sk_buff *msg_queue_tail = 0;
56
57spinlock_t port_list_lock = SPIN_LOCK_UNLOCKED;
58static spinlock_t queue_lock = SPIN_LOCK_UNLOCKED;
59
60LIST_HEAD(ports);
61static void port_handle_node_down(unsigned long ref);
62static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
63static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
64static void port_timeout(unsigned long ref);
65
66
67static inline u32 port_peernode(struct port *p_ptr)
68{
69 return msg_destnode(&p_ptr->publ.phdr);
70}
71
72static inline u32 port_peerport(struct port *p_ptr)
73{
74 return msg_destport(&p_ptr->publ.phdr);
75}
76
77static inline u32 port_out_seqno(struct port *p_ptr)
78{
79 return msg_transp_seqno(&p_ptr->publ.phdr);
80}
81
82static inline void port_set_out_seqno(struct port *p_ptr, u32 seqno)
83{
84 msg_set_transp_seqno(&p_ptr->publ.phdr,seqno);
85}
86
87static inline void port_incr_out_seqno(struct port *p_ptr)
88{
89 struct tipc_msg *m = &p_ptr->publ.phdr;
90
91 if (likely(!msg_routed(m)))
92 return;
93 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
94}
95
96/**
97 * tipc_multicast - send a multicast message to local and remote destinations
98 */
99
100int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
101 u32 num_sect, struct iovec const *msg_sect)
102{
103 struct tipc_msg *hdr;
104 struct sk_buff *buf;
105 struct sk_buff *ibuf = NULL;
106 struct port_list dports = {0, NULL, };
107 struct port *oport = port_deref(ref);
108 int ext_targets;
109 int res;
110
111 if (unlikely(!oport))
112 return -EINVAL;
113
114 /* Create multicast message */
115
116 hdr = &oport->publ.phdr;
117 msg_set_type(hdr, TIPC_MCAST_MSG);
118 msg_set_nametype(hdr, seq->type);
119 msg_set_namelower(hdr, seq->lower);
120 msg_set_nameupper(hdr, seq->upper);
121 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
122 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
123 !oport->user_port, &buf);
124 if (unlikely(!buf))
125 return res;
126
127 /* Figure out where to send multicast message */
128
129 ext_targets = nametbl_mc_translate(seq->type, seq->lower, seq->upper,
130 TIPC_NODE_SCOPE, &dports);
131
132 /* Send message to destinations (duplicate it only if necessary) */
133
134 if (ext_targets) {
135 if (dports.count != 0) {
136 ibuf = skb_copy(buf, GFP_ATOMIC);
137 if (ibuf == NULL) {
138 port_list_free(&dports);
139 buf_discard(buf);
140 return -ENOMEM;
141 }
142 }
143 res = bclink_send_msg(buf);
144 if ((res < 0) && (dports.count != 0)) {
145 buf_discard(ibuf);
146 }
147 } else {
148 ibuf = buf;
149 }
150
151 if (res >= 0) {
152 if (ibuf)
153 port_recv_mcast(ibuf, &dports);
154 } else {
155 port_list_free(&dports);
156 }
157 return res;
158}
159
160/**
161 * port_recv_mcast - deliver multicast message to all destination ports
162 *
163 * If there is no port list, perform a lookup to create one
164 */
165
166void port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
167{
168 struct tipc_msg* msg;
169 struct port_list dports = {0, NULL, };
170 struct port_list *item = dp;
171 int cnt = 0;
172
173 assert(buf);
174 msg = buf_msg(buf);
175
176 /* Create destination port list, if one wasn't supplied */
177
178 if (dp == NULL) {
179 nametbl_mc_translate(msg_nametype(msg),
180 msg_namelower(msg),
181 msg_nameupper(msg),
182 TIPC_CLUSTER_SCOPE,
183 &dports);
184 item = dp = &dports;
185 }
186
187 /* Deliver a copy of message to each destination port */
188
189 if (dp->count != 0) {
190 if (dp->count == 1) {
191 msg_set_destport(msg, dp->ports[0]);
192 port_recv_msg(buf);
193 port_list_free(dp);
194 return;
195 }
196 for (; cnt < dp->count; cnt++) {
197 int index = cnt % PLSIZE;
198 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
199
200 if (b == NULL) {
201 warn("Buffer allocation failure\n");
202 msg_dbg(msg, "LOST:");
203 goto exit;
204 }
205 if ((index == 0) && (cnt != 0)) {
206 item = item->next;
207 }
208 msg_set_destport(buf_msg(b),item->ports[index]);
209 port_recv_msg(b);
210 }
211 }
212exit:
213 buf_discard(buf);
214 port_list_free(dp);
215}
216
217/**
218 * tipc_createport_raw - create a native TIPC port
219 *
220 * Returns local port reference
221 */
222
223u32 tipc_createport_raw(void *usr_handle,
224 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
225 void (*wakeup)(struct tipc_port *),
226 const u32 importance)
227{
228 struct port *p_ptr;
229 struct tipc_msg *msg;
230 u32 ref;
231
232 p_ptr = kmalloc(sizeof(*p_ptr), GFP_ATOMIC);
233 if (p_ptr == NULL) {
234 warn("Memory squeeze; failed to create port\n");
235 return 0;
236 }
237 memset(p_ptr, 0, sizeof(*p_ptr));
238 ref = ref_acquire(p_ptr, &p_ptr->publ.lock);
239 if (!ref) {
240 warn("Reference Table Exhausted\n");
241 kfree(p_ptr);
242 return 0;
243 }
244
245 port_lock(ref);
246 p_ptr->publ.ref = ref;
247 msg = &p_ptr->publ.phdr;
248 msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0);
249 msg_set_orignode(msg, tipc_own_addr);
250 msg_set_prevnode(msg, tipc_own_addr);
251 msg_set_origport(msg, ref);
252 msg_set_importance(msg,importance);
253 p_ptr->last_in_seqno = 41;
254 p_ptr->sent = 1;
255 p_ptr->publ.usr_handle = usr_handle;
256 INIT_LIST_HEAD(&p_ptr->wait_list);
257 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
258 p_ptr->congested_link = 0;
259 p_ptr->max_pkt = MAX_PKT_DEFAULT;
260 p_ptr->dispatcher = dispatcher;
261 p_ptr->wakeup = wakeup;
262 p_ptr->user_port = 0;
263 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
264 spin_lock_bh(&port_list_lock);
265 INIT_LIST_HEAD(&p_ptr->publications);
266 INIT_LIST_HEAD(&p_ptr->port_list);
267 list_add_tail(&p_ptr->port_list, &ports);
268 spin_unlock_bh(&port_list_lock);
269 port_unlock(p_ptr);
270 return ref;
271}
272
273int tipc_deleteport(u32 ref)
274{
275 struct port *p_ptr;
276 struct sk_buff *buf = 0;
277
278 tipc_withdraw(ref, 0, 0);
279 p_ptr = port_lock(ref);
280 if (!p_ptr)
281 return -EINVAL;
282
283 ref_discard(ref);
284 port_unlock(p_ptr);
285
286 k_cancel_timer(&p_ptr->timer);
287 if (p_ptr->publ.connected) {
288 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
289 nodesub_unsubscribe(&p_ptr->subscription);
290 }
291 if (p_ptr->user_port) {
292 reg_remove_port(p_ptr->user_port);
293 kfree(p_ptr->user_port);
294 }
295
296 spin_lock_bh(&port_list_lock);
297 list_del(&p_ptr->port_list);
298 list_del(&p_ptr->wait_list);
299 spin_unlock_bh(&port_list_lock);
300 k_term_timer(&p_ptr->timer);
301 kfree(p_ptr);
302 dbg("Deleted port %u\n", ref);
303 net_route_msg(buf);
304 return TIPC_OK;
305}
306
307/**
308 * tipc_get_port() - return port associated with 'ref'
309 *
310 * Note: Port is not locked.
311 */
312
313struct tipc_port *tipc_get_port(const u32 ref)
314{
315 return (struct tipc_port *)ref_deref(ref);
316}
317
318/**
319 * tipc_get_handle - return user handle associated to port 'ref'
320 */
321
322void *tipc_get_handle(const u32 ref)
323{
324 struct port *p_ptr;
325 void * handle;
326
327 p_ptr = port_lock(ref);
328 if (!p_ptr)
329 return 0;
330 handle = p_ptr->publ.usr_handle;
331 port_unlock(p_ptr);
332 return handle;
333}
334
335static inline int port_unreliable(struct port *p_ptr)
336{
337 return msg_src_droppable(&p_ptr->publ.phdr);
338}
339
340int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
341{
342 struct port *p_ptr;
343
344 p_ptr = port_lock(ref);
345 if (!p_ptr)
346 return -EINVAL;
347 *isunreliable = port_unreliable(p_ptr);
348 spin_unlock_bh(p_ptr->publ.lock);
349 return TIPC_OK;
350}
351
352int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
353{
354 struct port *p_ptr;
355
356 p_ptr = port_lock(ref);
357 if (!p_ptr)
358 return -EINVAL;
359 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
360 port_unlock(p_ptr);
361 return TIPC_OK;
362}
363
364static inline int port_unreturnable(struct port *p_ptr)
365{
366 return msg_dest_droppable(&p_ptr->publ.phdr);
367}
368
369int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
370{
371 struct port *p_ptr;
372
373 p_ptr = port_lock(ref);
374 if (!p_ptr)
375 return -EINVAL;
376 *isunrejectable = port_unreturnable(p_ptr);
377 spin_unlock_bh(p_ptr->publ.lock);
378 return TIPC_OK;
379}
380
381int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
382{
383 struct port *p_ptr;
384
385 p_ptr = port_lock(ref);
386 if (!p_ptr)
387 return -EINVAL;
388 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
389 port_unlock(p_ptr);
390 return TIPC_OK;
391}
392
393/*
394 * port_build_proto_msg(): build a port level protocol
395 * or a connection abortion message. Called with
396 * tipc_port lock on.
397 */
398static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
399 u32 origport, u32 orignode,
400 u32 usr, u32 type, u32 err,
401 u32 seqno, u32 ack)
402{
403 struct sk_buff *buf;
404 struct tipc_msg *msg;
405
406 buf = buf_acquire(LONG_H_SIZE);
407 if (buf) {
408 msg = buf_msg(buf);
409 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode);
410 msg_set_destport(msg, destport);
411 msg_set_origport(msg, origport);
412 msg_set_destnode(msg, destnode);
413 msg_set_orignode(msg, orignode);
414 msg_set_transp_seqno(msg, seqno);
415 msg_set_msgcnt(msg, ack);
416 msg_dbg(msg, "PORT>SEND>:");
417 }
418 return buf;
419}
420
421int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz)
422{
423 msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr));
424 msg_set_options(&tp_ptr->phdr, opt, sz);
425 return TIPC_OK;
426}
427
428int tipc_reject_msg(struct sk_buff *buf, u32 err)
429{
430 struct tipc_msg *msg = buf_msg(buf);
431 struct sk_buff *rbuf;
432 struct tipc_msg *rmsg;
433 int hdr_sz;
434 u32 imp = msg_importance(msg);
435 u32 data_sz = msg_data_sz(msg);
436
437 if (data_sz > MAX_REJECT_SIZE)
438 data_sz = MAX_REJECT_SIZE;
439 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
440 imp++;
441 msg_dbg(msg, "port->rej: ");
442
443 /* discard rejected message if it shouldn't be returned to sender */
444 if (msg_errcode(msg) || msg_dest_droppable(msg)) {
445 buf_discard(buf);
446 return data_sz;
447 }
448
449 /* construct rejected message */
450 if (msg_mcast(msg))
451 hdr_sz = MCAST_H_SIZE;
452 else
453 hdr_sz = LONG_H_SIZE;
454 rbuf = buf_acquire(data_sz + hdr_sz);
455 if (rbuf == NULL) {
456 buf_discard(buf);
457 return data_sz;
458 }
459 rmsg = buf_msg(rbuf);
460 msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg));
461 msg_set_destport(rmsg, msg_origport(msg));
462 msg_set_prevnode(rmsg, tipc_own_addr);
463 msg_set_origport(rmsg, msg_destport(msg));
464 if (msg_short(msg))
465 msg_set_orignode(rmsg, tipc_own_addr);
466 else
467 msg_set_orignode(rmsg, msg_destnode(msg));
468 msg_set_size(rmsg, data_sz + hdr_sz);
469 msg_set_nametype(rmsg, msg_nametype(msg));
470 msg_set_nameinst(rmsg, msg_nameinst(msg));
471 memcpy(rbuf->data + hdr_sz, msg_data(msg), data_sz);
472
473 /* send self-abort message when rejecting on a connected port */
474 if (msg_connected(msg)) {
475 struct sk_buff *abuf = 0;
476 struct port *p_ptr = port_lock(msg_destport(msg));
477
478 if (p_ptr) {
479 if (p_ptr->publ.connected)
480 abuf = port_build_self_abort_msg(p_ptr, err);
481 port_unlock(p_ptr);
482 }
483 net_route_msg(abuf);
484 }
485
486 /* send rejected message */
487 buf_discard(buf);
488 net_route_msg(rbuf);
489 return data_sz;
490}
491
492int port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
493 struct iovec const *msg_sect, u32 num_sect,
494 int err)
495{
496 struct sk_buff *buf;
497 int res;
498
499 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
500 !p_ptr->user_port, &buf);
501 if (!buf)
502 return res;
503
504 return tipc_reject_msg(buf, err);
505}
506
507static void port_timeout(unsigned long ref)
508{
509 struct port *p_ptr = port_lock(ref);
510 struct sk_buff *buf = 0;
511
512 if (!p_ptr || !p_ptr->publ.connected)
513 return;
514
515 /* Last probe answered ? */
516 if (p_ptr->probing_state == PROBING) {
517 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
518 } else {
519 buf = port_build_proto_msg(port_peerport(p_ptr),
520 port_peernode(p_ptr),
521 p_ptr->publ.ref,
522 tipc_own_addr,
523 CONN_MANAGER,
524 CONN_PROBE,
525 TIPC_OK,
526 port_out_seqno(p_ptr),
527 0);
528 port_incr_out_seqno(p_ptr);
529 p_ptr->probing_state = PROBING;
530 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
531 }
532 port_unlock(p_ptr);
533 net_route_msg(buf);
534}
535
536
537static void port_handle_node_down(unsigned long ref)
538{
539 struct port *p_ptr = port_lock(ref);
540 struct sk_buff* buf = 0;
541
542 if (!p_ptr)
543 return;
544 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
545 port_unlock(p_ptr);
546 net_route_msg(buf);
547}
548
549
550static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
551{
552 u32 imp = msg_importance(&p_ptr->publ.phdr);
553
554 if (!p_ptr->publ.connected)
555 return 0;
556 if (imp < TIPC_CRITICAL_IMPORTANCE)
557 imp++;
558 return port_build_proto_msg(p_ptr->publ.ref,
559 tipc_own_addr,
560 port_peerport(p_ptr),
561 port_peernode(p_ptr),
562 imp,
563 TIPC_CONN_MSG,
564 err,
565 p_ptr->last_in_seqno + 1,
566 0);
567}
568
569
570static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
571{
572 u32 imp = msg_importance(&p_ptr->publ.phdr);
573
574 if (!p_ptr->publ.connected)
575 return 0;
576 if (imp < TIPC_CRITICAL_IMPORTANCE)
577 imp++;
578 return port_build_proto_msg(port_peerport(p_ptr),
579 port_peernode(p_ptr),
580 p_ptr->publ.ref,
581 tipc_own_addr,
582 imp,
583 TIPC_CONN_MSG,
584 err,
585 port_out_seqno(p_ptr),
586 0);
587}
588
589void port_recv_proto_msg(struct sk_buff *buf)
590{
591 struct tipc_msg *msg = buf_msg(buf);
592 struct port *p_ptr = port_lock(msg_destport(msg));
593 u32 err = TIPC_OK;
594 struct sk_buff *r_buf = 0;
595 struct sk_buff *abort_buf = 0;
596
597 msg_dbg(msg, "PORT<RECV<:");
598
599 if (!p_ptr) {
600 err = TIPC_ERR_NO_PORT;
601 } else if (p_ptr->publ.connected) {
602 if (port_peernode(p_ptr) != msg_orignode(msg))
603 err = TIPC_ERR_NO_PORT;
604 if (port_peerport(p_ptr) != msg_origport(msg))
605 err = TIPC_ERR_NO_PORT;
606 if (!err && msg_routed(msg)) {
607 u32 seqno = msg_transp_seqno(msg);
608 u32 myno = ++p_ptr->last_in_seqno;
609 if (seqno != myno) {
610 err = TIPC_ERR_NO_PORT;
611 abort_buf = port_build_self_abort_msg(p_ptr, err);
612 }
613 }
614 if (msg_type(msg) == CONN_ACK) {
615 int wakeup = port_congested(p_ptr) &&
616 p_ptr->publ.congested &&
617 p_ptr->wakeup;
618 p_ptr->acked += msg_msgcnt(msg);
619 if (port_congested(p_ptr))
620 goto exit;
621 p_ptr->publ.congested = 0;
622 if (!wakeup)
623 goto exit;
624 p_ptr->wakeup(&p_ptr->publ);
625 goto exit;
626 }
627 } else if (p_ptr->publ.published) {
628 err = TIPC_ERR_NO_PORT;
629 }
630 if (err) {
631 r_buf = port_build_proto_msg(msg_origport(msg),
632 msg_orignode(msg),
633 msg_destport(msg),
634 tipc_own_addr,
635 DATA_HIGH,
636 TIPC_CONN_MSG,
637 err,
638 0,
639 0);
640 goto exit;
641 }
642
643 /* All is fine */
644 if (msg_type(msg) == CONN_PROBE) {
645 r_buf = port_build_proto_msg(msg_origport(msg),
646 msg_orignode(msg),
647 msg_destport(msg),
648 tipc_own_addr,
649 CONN_MANAGER,
650 CONN_PROBE_REPLY,
651 TIPC_OK,
652 port_out_seqno(p_ptr),
653 0);
654 }
655 p_ptr->probing_state = CONFIRMED;
656 port_incr_out_seqno(p_ptr);
657exit:
658 if (p_ptr)
659 port_unlock(p_ptr);
660 net_route_msg(r_buf);
661 net_route_msg(abort_buf);
662 buf_discard(buf);
663}
664
665static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
666{
667 struct publication *publ;
668
669 if (full_id)
670 tipc_printf(buf, "<%u.%u.%u:%u>:",
671 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
672 tipc_node(tipc_own_addr), p_ptr->publ.ref);
673 else
674 tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
675
676 if (p_ptr->publ.connected) {
677 u32 dport = port_peerport(p_ptr);
678 u32 destnode = port_peernode(p_ptr);
679
680 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
681 tipc_zone(destnode), tipc_cluster(destnode),
682 tipc_node(destnode), dport);
683 if (p_ptr->publ.conn_type != 0)
684 tipc_printf(buf, " via {%u,%u}",
685 p_ptr->publ.conn_type,
686 p_ptr->publ.conn_instance);
687 }
688 else if (p_ptr->publ.published) {
689 tipc_printf(buf, " bound to");
690 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
691 if (publ->lower == publ->upper)
692 tipc_printf(buf, " {%u,%u}", publ->type,
693 publ->lower);
694 else
695 tipc_printf(buf, " {%u,%u,%u}", publ->type,
696 publ->lower, publ->upper);
697 }
698 }
699 tipc_printf(buf, "\n");
700}
701
702#define MAX_PORT_QUERY 32768
703
704struct sk_buff *port_get_ports(void)
705{
706 struct sk_buff *buf;
707 struct tlv_desc *rep_tlv;
708 struct print_buf pb;
709 struct port *p_ptr;
710 int str_len;
711
712 buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
713 if (!buf)
714 return NULL;
715 rep_tlv = (struct tlv_desc *)buf->data;
716
717 printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
718 spin_lock_bh(&port_list_lock);
719 list_for_each_entry(p_ptr, &ports, port_list) {
720 spin_lock_bh(p_ptr->publ.lock);
721 port_print(p_ptr, &pb, 0);
722 spin_unlock_bh(p_ptr->publ.lock);
723 }
724 spin_unlock_bh(&port_list_lock);
725 str_len = printbuf_validate(&pb);
726
727 skb_put(buf, TLV_SPACE(str_len));
728 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
729
730 return buf;
731}
732
733#if 0
734
735#define MAX_PORT_STATS 2000
736
737struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
738{
739 u32 ref;
740 struct port *p_ptr;
741 struct sk_buff *buf;
742 struct tlv_desc *rep_tlv;
743 struct print_buf pb;
744 int str_len;
745
746 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
747 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
748
749 ref = *(u32 *)TLV_DATA(req_tlv_area);
750 ref = ntohl(ref);
751
752 p_ptr = port_lock(ref);
753 if (!p_ptr)
754 return cfg_reply_error_string("port not found");
755
756 buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
757 if (!buf) {
758 port_unlock(p_ptr);
759 return NULL;
760 }
761 rep_tlv = (struct tlv_desc *)buf->data;
762
763 printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
764 port_print(p_ptr, &pb, 1);
765 /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
766 port_unlock(p_ptr);
767 str_len = printbuf_validate(&pb);
768
769 skb_put(buf, TLV_SPACE(str_len));
770 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
771
772 return buf;
773}
774
775#endif
776
777void port_reinit(void)
778{
779 struct port *p_ptr;
780 struct tipc_msg *msg;
781
782 spin_lock_bh(&port_list_lock);
783 list_for_each_entry(p_ptr, &ports, port_list) {
784 msg = &p_ptr->publ.phdr;
785 if (msg_orignode(msg) == tipc_own_addr)
786 break;
787 msg_set_orignode(msg, tipc_own_addr);
788 }
789 spin_unlock_bh(&port_list_lock);
790}
791
792
793/*
794 * port_dispatcher_sigh(): Signal handler for messages destinated
795 * to the tipc_port interface.
796 */
797
798static void port_dispatcher_sigh(void *dummy)
799{
800 struct sk_buff *buf;
801
802 spin_lock_bh(&queue_lock);
803 buf = msg_queue_head;
804 msg_queue_head = 0;
805 spin_unlock_bh(&queue_lock);
806
807 while (buf) {
808 struct port *p_ptr;
809 struct user_port *up_ptr;
810 struct tipc_portid orig;
811 struct tipc_name_seq dseq;
812 void *usr_handle;
813 int connected;
814 int published;
815
816 struct sk_buff *next = buf->next;
817 struct tipc_msg *msg = buf_msg(buf);
818 u32 dref = msg_destport(msg);
819
820 p_ptr = port_lock(dref);
821 if (!p_ptr) {
822 /* Port deleted while msg in queue */
823 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
824 buf = next;
825 continue;
826 }
827 orig.ref = msg_origport(msg);
828 orig.node = msg_orignode(msg);
829 up_ptr = p_ptr->user_port;
830 usr_handle = up_ptr->usr_handle;
831 connected = p_ptr->publ.connected;
832 published = p_ptr->publ.published;
833
834 if (unlikely(msg_errcode(msg)))
835 goto err;
836
837 switch (msg_type(msg)) {
838
839 case TIPC_CONN_MSG:{
840 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
841 u32 peer_port = port_peerport(p_ptr);
842 u32 peer_node = port_peernode(p_ptr);
843
844 spin_unlock_bh(p_ptr->publ.lock);
845 if (unlikely(!connected)) {
846 if (unlikely(published))
847 goto reject;
848 tipc_connect2port(dref,&orig);
849 }
850 if (unlikely(msg_origport(msg) != peer_port))
851 goto reject;
852 if (unlikely(msg_orignode(msg) != peer_node))
853 goto reject;
854 if (unlikely(!cb))
855 goto reject;
856 if (unlikely(++p_ptr->publ.conn_unacked >=
857 TIPC_FLOW_CONTROL_WIN))
858 tipc_acknowledge(dref,
859 p_ptr->publ.conn_unacked);
860 skb_pull(buf, msg_hdr_sz(msg));
861 cb(usr_handle, dref, &buf, msg_data(msg),
862 msg_data_sz(msg));
863 break;
864 }
865 case TIPC_DIRECT_MSG:{
866 tipc_msg_event cb = up_ptr->msg_cb;
867
868 spin_unlock_bh(p_ptr->publ.lock);
869 if (unlikely(connected))
870 goto reject;
871 if (unlikely(!cb))
872 goto reject;
873 skb_pull(buf, msg_hdr_sz(msg));
874 cb(usr_handle, dref, &buf, msg_data(msg),
875 msg_data_sz(msg), msg_importance(msg),
876 &orig);
877 break;
878 }
879 case TIPC_NAMED_MSG:{
880 tipc_named_msg_event cb = up_ptr->named_msg_cb;
881
882 spin_unlock_bh(p_ptr->publ.lock);
883 if (unlikely(connected))
884 goto reject;
885 if (unlikely(!cb))
886 goto reject;
887 if (unlikely(!published))
888 goto reject;
889 dseq.type = msg_nametype(msg);
890 dseq.lower = msg_nameinst(msg);
891 dseq.upper = dseq.lower;
892 skb_pull(buf, msg_hdr_sz(msg));
893 cb(usr_handle, dref, &buf, msg_data(msg),
894 msg_data_sz(msg), msg_importance(msg),
895 &orig, &dseq);
896 break;
897 }
898 }
899 if (buf)
900 buf_discard(buf);
901 buf = next;
902 continue;
903err:
904 switch (msg_type(msg)) {
905
906 case TIPC_CONN_MSG:{
907 tipc_conn_shutdown_event cb =
908 up_ptr->conn_err_cb;
909 u32 peer_port = port_peerport(p_ptr);
910 u32 peer_node = port_peernode(p_ptr);
911
912 spin_unlock_bh(p_ptr->publ.lock);
913 if (!connected || !cb)
914 break;
915 if (msg_origport(msg) != peer_port)
916 break;
917 if (msg_orignode(msg) != peer_node)
918 break;
919 tipc_disconnect(dref);
920 skb_pull(buf, msg_hdr_sz(msg));
921 cb(usr_handle, dref, &buf, msg_data(msg),
922 msg_data_sz(msg), msg_errcode(msg));
923 break;
924 }
925 case TIPC_DIRECT_MSG:{
926 tipc_msg_err_event cb = up_ptr->err_cb;
927
928 spin_unlock_bh(p_ptr->publ.lock);
929 if (connected || !cb)
930 break;
931 skb_pull(buf, msg_hdr_sz(msg));
932 cb(usr_handle, dref, &buf, msg_data(msg),
933 msg_data_sz(msg), msg_errcode(msg), &orig);
934 break;
935 }
936 case TIPC_NAMED_MSG:{
937 tipc_named_msg_err_event cb =
938 up_ptr->named_err_cb;
939
940 spin_unlock_bh(p_ptr->publ.lock);
941 if (connected || !cb)
942 break;
943 dseq.type = msg_nametype(msg);
944 dseq.lower = msg_nameinst(msg);
945 dseq.upper = dseq.lower;
946 skb_pull(buf, msg_hdr_sz(msg));
947 cb(usr_handle, dref, &buf, msg_data(msg),
948 msg_data_sz(msg), msg_errcode(msg), &dseq);
949 break;
950 }
951 }
952 if (buf)
953 buf_discard(buf);
954 buf = next;
955 continue;
956reject:
957 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
958 buf = next;
959 }
960}
961
962/*
963 * port_dispatcher(): Dispatcher for messages destinated
964 * to the tipc_port interface. Called with port locked.
965 */
966
967static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
968{
969 buf->next = NULL;
970 spin_lock_bh(&queue_lock);
971 if (msg_queue_head) {
972 msg_queue_tail->next = buf;
973 msg_queue_tail = buf;
974 } else {
975 msg_queue_tail = msg_queue_head = buf;
976 k_signal((Handler)port_dispatcher_sigh, 0);
977 }
978 spin_unlock_bh(&queue_lock);
979 return TIPC_OK;
980}
981
982/*
983 * Wake up port after congestion: Called with port locked,
984 *
985 */
986
987static void port_wakeup_sh(unsigned long ref)
988{
989 struct port *p_ptr;
990 struct user_port *up_ptr;
991 tipc_continue_event cb = 0;
992 void *uh = 0;
993
994 p_ptr = port_lock(ref);
995 if (p_ptr) {
996 up_ptr = p_ptr->user_port;
997 if (up_ptr) {
998 cb = up_ptr->continue_event_cb;
999 uh = up_ptr->usr_handle;
1000 }
1001 port_unlock(p_ptr);
1002 }
1003 if (cb)
1004 cb(uh, ref);
1005}
1006
1007
1008static void port_wakeup(struct tipc_port *p_ptr)
1009{
1010 k_signal((Handler)port_wakeup_sh, p_ptr->ref);
1011}
1012
1013void tipc_acknowledge(u32 ref, u32 ack)
1014{
1015 struct port *p_ptr;
1016 struct sk_buff *buf = 0;
1017
1018 p_ptr = port_lock(ref);
1019 if (!p_ptr)
1020 return;
1021 if (p_ptr->publ.connected) {
1022 p_ptr->publ.conn_unacked -= ack;
1023 buf = port_build_proto_msg(port_peerport(p_ptr),
1024 port_peernode(p_ptr),
1025 ref,
1026 tipc_own_addr,
1027 CONN_MANAGER,
1028 CONN_ACK,
1029 TIPC_OK,
1030 port_out_seqno(p_ptr),
1031 ack);
1032 }
1033 port_unlock(p_ptr);
1034 net_route_msg(buf);
1035}
1036
1037/*
1038 * tipc_createport(): user level call. Will add port to
1039 * registry if non-zero user_ref.
1040 */
1041
1042int tipc_createport(u32 user_ref,
1043 void *usr_handle,
1044 unsigned int importance,
1045 tipc_msg_err_event error_cb,
1046 tipc_named_msg_err_event named_error_cb,
1047 tipc_conn_shutdown_event conn_error_cb,
1048 tipc_msg_event msg_cb,
1049 tipc_named_msg_event named_msg_cb,
1050 tipc_conn_msg_event conn_msg_cb,
1051 tipc_continue_event continue_event_cb,/* May be zero */
1052 u32 *portref)
1053{
1054 struct user_port *up_ptr;
1055 struct port *p_ptr;
1056 u32 ref;
1057
1058 up_ptr = (struct user_port *)kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1059 if (up_ptr == NULL) {
1060 return -ENOMEM;
1061 }
1062 ref = tipc_createport_raw(0, port_dispatcher, port_wakeup, importance);
1063 p_ptr = port_lock(ref);
1064 if (!p_ptr) {
1065 kfree(up_ptr);
1066 return -ENOMEM;
1067 }
1068
1069 p_ptr->user_port = up_ptr;
1070 up_ptr->user_ref = user_ref;
1071 up_ptr->usr_handle = usr_handle;
1072 up_ptr->ref = p_ptr->publ.ref;
1073 up_ptr->err_cb = error_cb;
1074 up_ptr->named_err_cb = named_error_cb;
1075 up_ptr->conn_err_cb = conn_error_cb;
1076 up_ptr->msg_cb = msg_cb;
1077 up_ptr->named_msg_cb = named_msg_cb;
1078 up_ptr->conn_msg_cb = conn_msg_cb;
1079 up_ptr->continue_event_cb = continue_event_cb;
1080 INIT_LIST_HEAD(&up_ptr->uport_list);
1081 reg_add_port(up_ptr);
1082 *portref = p_ptr->publ.ref;
1083 dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref);
1084 port_unlock(p_ptr);
1085 return TIPC_OK;
1086}
1087
1088int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1089{
1090 id->ref = ref;
1091 id->node = tipc_own_addr;
1092 return TIPC_OK;
1093}
1094
1095int tipc_portimportance(u32 ref, unsigned int *importance)
1096{
1097 struct port *p_ptr;
1098
1099 p_ptr = port_lock(ref);
1100 if (!p_ptr)
1101 return -EINVAL;
1102 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1103 spin_unlock_bh(p_ptr->publ.lock);
1104 return TIPC_OK;
1105}
1106
1107int tipc_set_portimportance(u32 ref, unsigned int imp)
1108{
1109 struct port *p_ptr;
1110
1111 if (imp > TIPC_CRITICAL_IMPORTANCE)
1112 return -EINVAL;
1113
1114 p_ptr = port_lock(ref);
1115 if (!p_ptr)
1116 return -EINVAL;
1117 msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1118 spin_unlock_bh(p_ptr->publ.lock);
1119 return TIPC_OK;
1120}
1121
1122
1123int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1124{
1125 struct port *p_ptr;
1126 struct publication *publ;
1127 u32 key;
1128 int res = -EINVAL;
1129
1130 p_ptr = port_lock(ref);
1131 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1132 "lower = %u, upper = %u\n",
1133 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1134 if (!p_ptr)
1135 return -EINVAL;
1136 if (p_ptr->publ.connected)
1137 goto exit;
1138 if (seq->lower > seq->upper)
1139 goto exit;
1140 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1141 goto exit;
1142 key = ref + p_ptr->pub_count + 1;
1143 if (key == ref) {
1144 res = -EADDRINUSE;
1145 goto exit;
1146 }
1147 publ = nametbl_publish(seq->type, seq->lower, seq->upper,
1148 scope, p_ptr->publ.ref, key);
1149 if (publ) {
1150 list_add(&publ->pport_list, &p_ptr->publications);
1151 p_ptr->pub_count++;
1152 p_ptr->publ.published = 1;
1153 res = TIPC_OK;
1154 }
1155exit:
1156 port_unlock(p_ptr);
1157 return res;
1158}
1159
1160int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1161{
1162 struct port *p_ptr;
1163 struct publication *publ;
1164 struct publication *tpubl;
1165 int res = -EINVAL;
1166
1167 p_ptr = port_lock(ref);
1168 if (!p_ptr)
1169 return -EINVAL;
1170 if (!p_ptr->publ.published)
1171 goto exit;
1172 if (!seq) {
1173 list_for_each_entry_safe(publ, tpubl,
1174 &p_ptr->publications, pport_list) {
1175 nametbl_withdraw(publ->type, publ->lower,
1176 publ->ref, publ->key);
1177 }
1178 res = TIPC_OK;
1179 } else {
1180 list_for_each_entry_safe(publ, tpubl,
1181 &p_ptr->publications, pport_list) {
1182 if (publ->scope != scope)
1183 continue;
1184 if (publ->type != seq->type)
1185 continue;
1186 if (publ->lower != seq->lower)
1187 continue;
1188 if (publ->upper != seq->upper)
1189 break;
1190 nametbl_withdraw(publ->type, publ->lower,
1191 publ->ref, publ->key);
1192 res = TIPC_OK;
1193 break;
1194 }
1195 }
1196 if (list_empty(&p_ptr->publications))
1197 p_ptr->publ.published = 0;
1198exit:
1199 port_unlock(p_ptr);
1200 return res;
1201}
1202
1203int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1204{
1205 struct port *p_ptr;
1206 struct tipc_msg *msg;
1207 int res = -EINVAL;
1208
1209 p_ptr = port_lock(ref);
1210 if (!p_ptr)
1211 return -EINVAL;
1212 if (p_ptr->publ.published || p_ptr->publ.connected)
1213 goto exit;
1214 if (!peer->ref)
1215 goto exit;
1216
1217 msg = &p_ptr->publ.phdr;
1218 msg_set_destnode(msg, peer->node);
1219 msg_set_destport(msg, peer->ref);
1220 msg_set_orignode(msg, tipc_own_addr);
1221 msg_set_origport(msg, p_ptr->publ.ref);
1222 msg_set_transp_seqno(msg, 42);
1223 msg_set_type(msg, TIPC_CONN_MSG);
1224 if (!may_route(peer->node))
1225 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1226 else
1227 msg_set_hdr_sz(msg, LONG_H_SIZE);
1228
1229 p_ptr->probing_interval = PROBING_INTERVAL;
1230 p_ptr->probing_state = CONFIRMED;
1231 p_ptr->publ.connected = 1;
1232 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1233
1234 nodesub_subscribe(&p_ptr->subscription,peer->node, (void *)ref,
1235 (net_ev_handler)port_handle_node_down);
1236 res = TIPC_OK;
1237exit:
1238 port_unlock(p_ptr);
1239 p_ptr->max_pkt = link_get_max_pkt(peer->node, ref);
1240 return res;
1241}
1242
1243/*
1244 * tipc_disconnect(): Disconnect port form peer.
1245 * This is a node local operation.
1246 */
1247
1248int tipc_disconnect(u32 ref)
1249{
1250 struct port *p_ptr;
1251 int res = -ENOTCONN;
1252
1253 p_ptr = port_lock(ref);
1254 if (!p_ptr)
1255 return -EINVAL;
1256 if (p_ptr->publ.connected) {
1257 p_ptr->publ.connected = 0;
1258 /* let timer expire on it's own to avoid deadlock! */
1259 nodesub_unsubscribe(&p_ptr->subscription);
1260 res = TIPC_OK;
1261 }
1262 port_unlock(p_ptr);
1263 return res;
1264}
1265
1266/*
1267 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1268 */
1269int tipc_shutdown(u32 ref)
1270{
1271 struct port *p_ptr;
1272 struct sk_buff *buf = 0;
1273
1274 p_ptr = port_lock(ref);
1275 if (!p_ptr)
1276 return -EINVAL;
1277
1278 if (p_ptr->publ.connected) {
1279 u32 imp = msg_importance(&p_ptr->publ.phdr);
1280 if (imp < TIPC_CRITICAL_IMPORTANCE)
1281 imp++;
1282 buf = port_build_proto_msg(port_peerport(p_ptr),
1283 port_peernode(p_ptr),
1284 ref,
1285 tipc_own_addr,
1286 imp,
1287 TIPC_CONN_MSG,
1288 TIPC_CONN_SHUTDOWN,
1289 port_out_seqno(p_ptr),
1290 0);
1291 }
1292 port_unlock(p_ptr);
1293 net_route_msg(buf);
1294 return tipc_disconnect(ref);
1295}
1296
1297int tipc_isconnected(u32 ref, int *isconnected)
1298{
1299 struct port *p_ptr;
1300
1301 p_ptr = port_lock(ref);
1302 if (!p_ptr)
1303 return -EINVAL;
1304 *isconnected = p_ptr->publ.connected;
1305 port_unlock(p_ptr);
1306 return TIPC_OK;
1307}
1308
1309int tipc_peer(u32 ref, struct tipc_portid *peer)
1310{
1311 struct port *p_ptr;
1312 int res;
1313
1314 p_ptr = port_lock(ref);
1315 if (!p_ptr)
1316 return -EINVAL;
1317 if (p_ptr->publ.connected) {
1318 peer->ref = port_peerport(p_ptr);
1319 peer->node = port_peernode(p_ptr);
1320 res = TIPC_OK;
1321 } else
1322 res = -ENOTCONN;
1323 port_unlock(p_ptr);
1324 return res;
1325}
1326
1327int tipc_ref_valid(u32 ref)
1328{
1329 /* Works irrespective of type */
1330 return !!ref_deref(ref);
1331}
1332
1333
1334/*
1335 * port_recv_sections(): Concatenate and deliver sectioned
1336 * message for this node.
1337 */
1338
1339int port_recv_sections(struct port *sender, unsigned int num_sect,
1340 struct iovec const *msg_sect)
1341{
1342 struct sk_buff *buf;
1343 int res;
1344
1345 res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1346 MAX_MSG_SIZE, !sender->user_port, &buf);
1347 if (likely(buf))
1348 port_recv_msg(buf);
1349 return res;
1350}
1351
1352/**
1353 * tipc_send - send message sections on connection
1354 */
1355
1356int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1357{
1358 struct port *p_ptr;
1359 u32 destnode;
1360 int res;
1361
1362 p_ptr = port_deref(ref);
1363 if (!p_ptr || !p_ptr->publ.connected)
1364 return -EINVAL;
1365
1366 p_ptr->publ.congested = 1;
1367 if (!port_congested(p_ptr)) {
1368 destnode = port_peernode(p_ptr);
1369 if (likely(destnode != tipc_own_addr))
1370 res = link_send_sections_fast(p_ptr, msg_sect, num_sect,
1371 destnode);
1372 else
1373 res = port_recv_sections(p_ptr, num_sect, msg_sect);
1374
1375 if (likely(res != -ELINKCONG)) {
1376 port_incr_out_seqno(p_ptr);
1377 p_ptr->publ.congested = 0;
1378 p_ptr->sent++;
1379 return res;
1380 }
1381 }
1382 if (port_unreliable(p_ptr)) {
1383 p_ptr->publ.congested = 0;
1384 /* Just calculate msg length and return */
1385 return msg_calc_data_size(msg_sect, num_sect);
1386 }
1387 return -ELINKCONG;
1388}
1389
1390/**
1391 * tipc_send_buf - send message buffer on connection
1392 */
1393
1394int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1395{
1396 struct port *p_ptr;
1397 struct tipc_msg *msg;
1398 u32 destnode;
1399 u32 hsz;
1400 u32 sz;
1401 u32 res;
1402
1403 p_ptr = port_deref(ref);
1404 if (!p_ptr || !p_ptr->publ.connected)
1405 return -EINVAL;
1406
1407 msg = &p_ptr->publ.phdr;
1408 hsz = msg_hdr_sz(msg);
1409 sz = hsz + dsz;
1410 msg_set_size(msg, sz);
1411 if (skb_cow(buf, hsz))
1412 return -ENOMEM;
1413
1414 skb_push(buf, hsz);
1415 memcpy(buf->data, (unchar *)msg, hsz);
1416 destnode = msg_destnode(msg);
1417 p_ptr->publ.congested = 1;
1418 if (!port_congested(p_ptr)) {
1419 if (likely(destnode != tipc_own_addr))
1420 res = tipc_send_buf_fast(buf, destnode);
1421 else {
1422 port_recv_msg(buf);
1423 res = sz;
1424 }
1425 if (likely(res != -ELINKCONG)) {
1426 port_incr_out_seqno(p_ptr);
1427 p_ptr->sent++;
1428 p_ptr->publ.congested = 0;
1429 return res;
1430 }
1431 }
1432 if (port_unreliable(p_ptr)) {
1433 p_ptr->publ.congested = 0;
1434 return dsz;
1435 }
1436 return -ELINKCONG;
1437}
1438
1439/**
1440 * tipc_forward2name - forward message sections to port name
1441 */
1442
1443int tipc_forward2name(u32 ref,
1444 struct tipc_name const *name,
1445 u32 domain,
1446 u32 num_sect,
1447 struct iovec const *msg_sect,
1448 struct tipc_portid const *orig,
1449 unsigned int importance)
1450{
1451 struct port *p_ptr;
1452 struct tipc_msg *msg;
1453 u32 destnode = domain;
1454 u32 destport = 0;
1455 int res;
1456
1457 p_ptr = port_deref(ref);
1458 if (!p_ptr || p_ptr->publ.connected)
1459 return -EINVAL;
1460
1461 msg = &p_ptr->publ.phdr;
1462 msg_set_type(msg, TIPC_NAMED_MSG);
1463 msg_set_orignode(msg, orig->node);
1464 msg_set_origport(msg, orig->ref);
1465 msg_set_hdr_sz(msg, LONG_H_SIZE);
1466 msg_set_nametype(msg, name->type);
1467 msg_set_nameinst(msg, name->instance);
1468 msg_set_lookup_scope(msg, addr_scope(domain));
1469 if (importance <= TIPC_CRITICAL_IMPORTANCE)
1470 msg_set_importance(msg,importance);
1471 destport = nametbl_translate(name->type, name->instance, &destnode);
1472 msg_set_destnode(msg, destnode);
1473 msg_set_destport(msg, destport);
1474
1475 if (likely(destport || destnode)) {
1476 p_ptr->sent++;
1477 if (likely(destnode == tipc_own_addr))
1478 return port_recv_sections(p_ptr, num_sect, msg_sect);
1479 res = link_send_sections_fast(p_ptr, msg_sect, num_sect,
1480 destnode);
1481 if (likely(res != -ELINKCONG))
1482 return res;
1483 if (port_unreliable(p_ptr)) {
1484 /* Just calculate msg length and return */
1485 return msg_calc_data_size(msg_sect, num_sect);
1486 }
1487 return -ELINKCONG;
1488 }
1489 return port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1490 TIPC_ERR_NO_NAME);
1491}
1492
1493/**
1494 * tipc_send2name - send message sections to port name
1495 */
1496
1497int tipc_send2name(u32 ref,
1498 struct tipc_name const *name,
1499 unsigned int domain,
1500 unsigned int num_sect,
1501 struct iovec const *msg_sect)
1502{
1503 struct tipc_portid orig;
1504
1505 orig.ref = ref;
1506 orig.node = tipc_own_addr;
1507 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1508 TIPC_PORT_IMPORTANCE);
1509}
1510
1511/**
1512 * tipc_forward_buf2name - forward message buffer to port name
1513 */
1514
1515int tipc_forward_buf2name(u32 ref,
1516 struct tipc_name const *name,
1517 u32 domain,
1518 struct sk_buff *buf,
1519 unsigned int dsz,
1520 struct tipc_portid const *orig,
1521 unsigned int importance)
1522{
1523 struct port *p_ptr;
1524 struct tipc_msg *msg;
1525 u32 destnode = domain;
1526 u32 destport = 0;
1527 int res;
1528
1529 p_ptr = (struct port *)ref_deref(ref);
1530 if (!p_ptr || p_ptr->publ.connected)
1531 return -EINVAL;
1532
1533 msg = &p_ptr->publ.phdr;
1534 if (importance <= TIPC_CRITICAL_IMPORTANCE)
1535 msg_set_importance(msg, importance);
1536 msg_set_type(msg, TIPC_NAMED_MSG);
1537 msg_set_orignode(msg, orig->node);
1538 msg_set_origport(msg, orig->ref);
1539 msg_set_nametype(msg, name->type);
1540 msg_set_nameinst(msg, name->instance);
1541 msg_set_lookup_scope(msg, addr_scope(domain));
1542 msg_set_hdr_sz(msg, LONG_H_SIZE);
1543 msg_set_size(msg, LONG_H_SIZE + dsz);
1544 destport = nametbl_translate(name->type, name->instance, &destnode);
1545 msg_set_destnode(msg, destnode);
1546 msg_set_destport(msg, destport);
1547 msg_dbg(msg, "forw2name ==> ");
1548 if (skb_cow(buf, LONG_H_SIZE))
1549 return -ENOMEM;
1550 skb_push(buf, LONG_H_SIZE);
1551 memcpy(buf->data, (unchar *)msg, LONG_H_SIZE);
1552 msg_dbg(buf_msg(buf),"PREP:");
1553 if (likely(destport || destnode)) {
1554 p_ptr->sent++;
1555 if (destnode == tipc_own_addr)
1556 return port_recv_msg(buf);
1557 res = tipc_send_buf_fast(buf, destnode);
1558 if (likely(res != -ELINKCONG))
1559 return res;
1560 if (port_unreliable(p_ptr))
1561 return dsz;
1562 return -ELINKCONG;
1563 }
1564 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1565}
1566
1567/**
1568 * tipc_send_buf2name - send message buffer to port name
1569 */
1570
1571int tipc_send_buf2name(u32 ref,
1572 struct tipc_name const *dest,
1573 u32 domain,
1574 struct sk_buff *buf,
1575 unsigned int dsz)
1576{
1577 struct tipc_portid orig;
1578
1579 orig.ref = ref;
1580 orig.node = tipc_own_addr;
1581 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1582 TIPC_PORT_IMPORTANCE);
1583}
1584
1585/**
1586 * tipc_forward2port - forward message sections to port identity
1587 */
1588
1589int tipc_forward2port(u32 ref,
1590 struct tipc_portid const *dest,
1591 unsigned int num_sect,
1592 struct iovec const *msg_sect,
1593 struct tipc_portid const *orig,
1594 unsigned int importance)
1595{
1596 struct port *p_ptr;
1597 struct tipc_msg *msg;
1598 int res;
1599
1600 p_ptr = port_deref(ref);
1601 if (!p_ptr || p_ptr->publ.connected)
1602 return -EINVAL;
1603
1604 msg = &p_ptr->publ.phdr;
1605 msg_set_type(msg, TIPC_DIRECT_MSG);
1606 msg_set_orignode(msg, orig->node);
1607 msg_set_origport(msg, orig->ref);
1608 msg_set_destnode(msg, dest->node);
1609 msg_set_destport(msg, dest->ref);
1610 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1611 if (importance <= TIPC_CRITICAL_IMPORTANCE)
1612 msg_set_importance(msg, importance);
1613 p_ptr->sent++;
1614 if (dest->node == tipc_own_addr)
1615 return port_recv_sections(p_ptr, num_sect, msg_sect);
1616 res = link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1617 if (likely(res != -ELINKCONG))
1618 return res;
1619 if (port_unreliable(p_ptr)) {
1620 /* Just calculate msg length and return */
1621 return msg_calc_data_size(msg_sect, num_sect);
1622 }
1623 return -ELINKCONG;
1624}
1625
1626/**
1627 * tipc_send2port - send message sections to port identity
1628 */
1629
1630int tipc_send2port(u32 ref,
1631 struct tipc_portid const *dest,
1632 unsigned int num_sect,
1633 struct iovec const *msg_sect)
1634{
1635 struct tipc_portid orig;
1636
1637 orig.ref = ref;
1638 orig.node = tipc_own_addr;
1639 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig,
1640 TIPC_PORT_IMPORTANCE);
1641}
1642
1643/**
1644 * tipc_forward_buf2port - forward message buffer to port identity
1645 */
1646int tipc_forward_buf2port(u32 ref,
1647 struct tipc_portid const *dest,
1648 struct sk_buff *buf,
1649 unsigned int dsz,
1650 struct tipc_portid const *orig,
1651 unsigned int importance)
1652{
1653 struct port *p_ptr;
1654 struct tipc_msg *msg;
1655 int res;
1656
1657 p_ptr = (struct port *)ref_deref(ref);
1658 if (!p_ptr || p_ptr->publ.connected)
1659 return -EINVAL;
1660
1661 msg = &p_ptr->publ.phdr;
1662 msg_set_type(msg, TIPC_DIRECT_MSG);
1663 msg_set_orignode(msg, orig->node);
1664 msg_set_origport(msg, orig->ref);
1665 msg_set_destnode(msg, dest->node);
1666 msg_set_destport(msg, dest->ref);
1667 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1668 if (importance <= TIPC_CRITICAL_IMPORTANCE)
1669 msg_set_importance(msg, importance);
1670 msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1671 if (skb_cow(buf, DIR_MSG_H_SIZE))
1672 return -ENOMEM;
1673
1674 skb_push(buf, DIR_MSG_H_SIZE);
1675 memcpy(buf->data, (unchar *)msg, DIR_MSG_H_SIZE);
1676 msg_dbg(msg, "buf2port: ");
1677 p_ptr->sent++;
1678 if (dest->node == tipc_own_addr)
1679 return port_recv_msg(buf);
1680 res = tipc_send_buf_fast(buf, dest->node);
1681 if (likely(res != -ELINKCONG))
1682 return res;
1683 if (port_unreliable(p_ptr))
1684 return dsz;
1685 return -ELINKCONG;
1686}
1687
1688/**
1689 * tipc_send_buf2port - send message buffer to port identity
1690 */
1691
1692int tipc_send_buf2port(u32 ref,
1693 struct tipc_portid const *dest,
1694 struct sk_buff *buf,
1695 unsigned int dsz)
1696{
1697 struct tipc_portid orig;
1698
1699 orig.ref = ref;
1700 orig.node = tipc_own_addr;
1701 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig,
1702 TIPC_PORT_IMPORTANCE);
1703}
1704