summaryrefslogtreecommitdiff
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2014-06-27 12:51:00 -0700
committerDavid S. Miller <davem@davemloft.net>2014-06-27 12:51:00 -0700
commit0ff9275a4b1d6b1b0439a79c9cf7c03b38dbe24d (patch)
treed8d74d0ae3e05b1228498f5db04ff88a66b03035 /net/tipc/socket.c
parentf5b265039f1381ec048a4d61d47130f71abb119b (diff)
parent60120526c26f42fd658e32bf4a6d548483d09da8 (diff)
downloadlinux-0ff9275a4b1d6b1b0439a79c9cf7c03b38dbe24d.tar.gz
linux-0ff9275a4b1d6b1b0439a79c9cf7c03b38dbe24d.tar.xz
Merge branch 'tipc-next'
Jon Maloy says: ==================== tipc: new unicast transmission code As a step towards making the data transmission code more maintainable and performant, we introduce a number of new functions, both for building, sending and rejecting messages. The new functions will eventually be used for alla data transmission, user data unicast, service internal messaging, and multicast/broadcast. We start with this series, where we introduce the functions, and let user data unicast and the internal connection protocol use them. The remaining users will come in a later series. There are only minor changes to data structures, and no protocol changes, so the older functions can still be used in parallel for some time. Until the old functions are removed, we use temporary names for the new functions, such as tipc_build_msg2, tipc_link_xmit2. It should be noted that the first two commits are unrelated to the rest of the series. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c487
1 files changed, 280 insertions, 207 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ef0475568f9e..ede78b144dcf 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -36,14 +36,17 @@
#include "core.h"
#include "port.h"
+#include "name_table.h"
#include "node.h"
-
+#include "link.h"
#include <linux/export.h>
+#include "link.h"
#define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
+#define TIPC_FWD_MSG 1
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
static void tipc_data_ready(struct sock *sk);
@@ -123,9 +126,12 @@ static void advance_rx_queue(struct sock *sk)
static void reject_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
+ u32 dnode;
- while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
- tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+ while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
+ if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
+ tipc_link_xmit2(buf, dnode, 0);
+ }
}
/**
@@ -201,6 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
+ tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port);
@@ -303,6 +310,7 @@ static int tipc_release(struct socket *sock)
struct tipc_sock *tsk;
struct tipc_port *port;
struct sk_buff *buf;
+ u32 dnode;
/*
* Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -331,7 +339,8 @@ static int tipc_release(struct socket *sock)
sock->state = SS_DISCONNECTING;
tipc_port_disconnect(port->ref);
}
- tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+ if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
+ tipc_link_xmit2(buf, dnode, 0);
}
}
@@ -504,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch ((int)sock->state) {
case SS_UNCONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong)
mask |= POLLOUT;
break;
case SS_READY:
case SS_CONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case SS_CONNECTING:
@@ -526,6 +535,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
}
/**
+ * tipc_sk_proto_rcv - receive a connection mng protocol message
+ * @tsk: receiving socket
+ * @dnode: node to send response message to, if any
+ * @buf: buffer containing protocol message
+ * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
+ * (CONN_PROBE_REPLY) message should be forwarded.
+ */
+int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_port *port = &tsk->port;
+ int conn_cong;
+
+ /* Ignore if connection cannot be validated: */
+ if (!port->connected || !tipc_port_peer_msg(port, msg))
+ goto exit;
+
+ port->probing_state = TIPC_CONN_OK;
+
+ if (msg_type(msg) == CONN_ACK) {
+ conn_cong = tipc_sk_conn_cong(tsk);
+ tsk->sent_unacked -= msg_msgcnt(msg);
+ if (conn_cong)
+ tipc_sock_wakeup(tsk);
+ } else if (msg_type(msg) == CONN_PROBE) {
+ if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
+ return TIPC_OK;
+ msg_set_type(msg, CONN_PROBE_REPLY);
+ return TIPC_FWD_MSG;
+ }
+ /* Do nothing if msg_type() == CONN_PROBE_REPLY */
+exit:
+ kfree_skb(buf);
+ return TIPC_OK;
+}
+
+/**
* dest_name_check - verify user is permitted to send to specified port name
* @dest: destination address
* @m: descriptor for message to be sent
@@ -539,6 +585,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
{
struct tipc_cfg_msg_hdr hdr;
+ if (unlikely(dest->addrtype == TIPC_ADDR_ID))
+ return 0;
if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
return 0;
if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
@@ -575,19 +623,55 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
+ done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
+/**
+ * tipc_sendmcast - send multicast message
+ * @sock: socket structure
+ * @seq: destination address
+ * @iov: message data to send
+ * @dsz: total length of message data
+ * @timeo: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
+ struct iovec *iov, size_t dsz, long timeo)
+{
+ struct sock *sk = sock->sk;
+ struct tipc_sock *tsk = tipc_sk(sk);
+ int rc;
+
+ do {
+ if (sock->state != SS_READY) {
+ rc = -EOPNOTSUPP;
+ break;
+ }
+ rc = tipc_port_mcast_xmit(&tsk->port, seq, iov, dsz);
+ if (likely(rc >= 0)) {
+ if (sock->state != SS_READY)
+ sock->state = SS_CONNECTING;
+ break;
+ }
+ if (rc != -ELINKCONG)
+ break;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ } while (!rc);
+
+ return rc;
+}
/**
* tipc_sendmsg - send message in connectionless manner
* @iocb: if NULL, indicates that socket lock is already held
* @sock: socket structure
* @m: message to send
- * @total_len: length of message
+ * @dsz: amount of user data to be sent
*
* Message must have an destination specified explicitly.
* Used for SOCK_RDM and SOCK_DGRAM messages,
@@ -597,100 +681,122 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
* Returns the number of bytes sent on success, or errno otherwise
*/
static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
- struct msghdr *m, size_t total_len)
+ struct msghdr *m, size_t dsz)
{
+ DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_port *port = &tsk->port;
- DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
- int needs_conn;
+ struct tipc_msg *mhdr = &port->phdr;
+ struct iovec *iov = m->msg_iov;
+ u32 dnode, dport;
+ struct sk_buff *buf;
+ struct tipc_name_seq *seq = &dest->addr.nameseq;
+ u32 mtu;
long timeo;
- int res = -EINVAL;
+ int rc = -EINVAL;
if (unlikely(!dest))
return -EDESTADDRREQ;
+
if (unlikely((m->msg_namelen < sizeof(*dest)) ||
(dest->family != AF_TIPC)))
return -EINVAL;
- if (total_len > TIPC_MAX_USER_MSG_SIZE)
+
+ if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE;
if (iocb)
lock_sock(sk);
- needs_conn = (sock->state != SS_READY);
- if (unlikely(needs_conn)) {
+ if (unlikely(sock->state != SS_READY)) {
if (sock->state == SS_LISTENING) {
- res = -EPIPE;
+ rc = -EPIPE;
goto exit;
}
if (sock->state != SS_UNCONNECTED) {
- res = -EISCONN;
+ rc = -EISCONN;
goto exit;
}
if (tsk->port.published) {
- res = -EOPNOTSUPP;
+ rc = -EOPNOTSUPP;
goto exit;
}
if (dest->addrtype == TIPC_ADDR_NAME) {
tsk->port.conn_type = dest->addr.name.name.type;
tsk->port.conn_instance = dest->addr.name.name.instance;
}
-
- /* Abort any pending connection attempts (very unlikely) */
- reject_rx_queue(sk);
}
+ rc = dest_name_check(dest, m);
+ if (rc)
+ goto exit;
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
- do {
- if (dest->addrtype == TIPC_ADDR_NAME) {
- res = dest_name_check(dest, m);
- if (res)
- break;
- res = tipc_send2name(port,
- &dest->addr.name.name,
- dest->addr.name.domain,
- m->msg_iov,
- total_len);
- } else if (dest->addrtype == TIPC_ADDR_ID) {
- res = tipc_send2port(port,
- &dest->addr.id,
- m->msg_iov,
- total_len);
- } else if (dest->addrtype == TIPC_ADDR_MCAST) {
- if (needs_conn) {
- res = -EOPNOTSUPP;
- break;
- }
- res = dest_name_check(dest, m);
- if (res)
- break;
- res = tipc_port_mcast_xmit(port,
- &dest->addr.nameseq,
- m->msg_iov,
- total_len);
+
+ if (dest->addrtype == TIPC_ADDR_MCAST) {
+ rc = tipc_sendmcast(sock, seq, iov, dsz, timeo);
+ goto exit;
+ } else if (dest->addrtype == TIPC_ADDR_NAME) {
+ u32 type = dest->addr.name.name.type;
+ u32 inst = dest->addr.name.name.instance;
+ u32 domain = dest->addr.name.domain;
+
+ dnode = domain;
+ msg_set_type(mhdr, TIPC_NAMED_MSG);
+ msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
+ msg_set_nametype(mhdr, type);
+ msg_set_nameinst(mhdr, inst);
+ msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+ dport = tipc_nametbl_translate(type, inst, &dnode);
+ msg_set_destnode(mhdr, dnode);
+ msg_set_destport(mhdr, dport);
+ if (unlikely(!dport && !dnode)) {
+ rc = -EHOSTUNREACH;
+ goto exit;
}
- if (likely(res != -ELINKCONG)) {
- if (needs_conn && (res >= 0))
+ } else if (dest->addrtype == TIPC_ADDR_ID) {
+ dnode = dest->addr.id.node;
+ msg_set_type(mhdr, TIPC_DIRECT_MSG);
+ msg_set_lookup_scope(mhdr, 0);
+ msg_set_destnode(mhdr, dnode);
+ msg_set_destport(mhdr, dest->addr.id.ref);
+ msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+ }
+
+new_mtu:
+ mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
+ rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
+ if (rc < 0)
+ goto exit;
+
+ do {
+ rc = tipc_link_xmit2(buf, dnode, tsk->port.ref);
+ if (likely(rc >= 0)) {
+ if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
+ rc = dsz;
break;
}
- res = tipc_wait_for_sndmsg(sock, &timeo);
- if (res)
+ if (rc == -EMSGSIZE)
+ goto new_mtu;
+
+ if (rc != -ELINKCONG)
break;
- } while (1);
+
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ } while (!rc);
exit:
if (iocb)
release_sock(sk);
- return res;
+
+ return rc;
}
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
- struct tipc_port *port = &tsk->port;
DEFINE_WAIT(wait);
int done;
@@ -709,37 +815,49 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p,
- (!port->congested || !port->connected));
+ (!tsk->link_cong &&
+ !tipc_sk_conn_cong(tsk)) ||
+ !tsk->port.connected);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
/**
- * tipc_send_packet - send a connection-oriented message
- * @iocb: if NULL, indicates that socket lock is already held
+ * tipc_send_stream - send stream-oriented data
+ * @iocb: (unused)
* @sock: socket structure
- * @m: message to send
- * @total_len: length of message
+ * @m: data to send
+ * @dsz: total length of data to be transmitted
*
- * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
+ * Used for SOCK_STREAM data.
*
- * Returns the number of bytes sent on success, or errno otherwise
+ * Returns the number of bytes sent on success (or partial success),
+ * or errno if no data sent
*/
-static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
- struct msghdr *m, size_t total_len)
+static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
+ struct msghdr *m, size_t dsz)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_port *port = &tsk->port;
+ struct tipc_msg *mhdr = &port->phdr;
+ struct sk_buff *buf;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
- int res = -EINVAL;
+ u32 ref = port->ref;
+ int rc = -EINVAL;
long timeo;
+ u32 dnode;
+ uint mtu, send, sent = 0;
/* Handle implied connection establishment */
- if (unlikely(dest))
- return tipc_sendmsg(iocb, sock, m, total_len);
-
- if (total_len > TIPC_MAX_USER_MSG_SIZE)
+ if (unlikely(dest)) {
+ rc = tipc_sendmsg(iocb, sock, m, dsz);
+ if (dsz && (dsz == rc))
+ tsk->sent_unacked = 1;
+ return rc;
+ }
+ if (dsz > (uint)INT_MAX)
return -EMSGSIZE;
if (iocb)
@@ -747,123 +865,64 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
if (unlikely(sock->state != SS_CONNECTED)) {
if (sock->state == SS_DISCONNECTING)
- res = -EPIPE;
+ rc = -EPIPE;
else
- res = -ENOTCONN;
+ rc = -ENOTCONN;
goto exit;
}
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+ dnode = tipc_port_peernode(port);
+
+next:
+ mtu = port->max_pkt;
+ send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
+ rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
+ if (unlikely(rc < 0))
+ goto exit;
do {
- res = tipc_send(&tsk->port, m->msg_iov, total_len);
- if (likely(res != -ELINKCONG))
- break;
- res = tipc_wait_for_sndpkt(sock, &timeo);
- if (res)
- break;
- } while (1);
+ if (likely(!tipc_sk_conn_cong(tsk))) {
+ rc = tipc_link_xmit2(buf, dnode, ref);
+ if (likely(!rc)) {
+ tsk->sent_unacked++;
+ sent += send;
+ if (sent == dsz)
+ break;
+ goto next;
+ }
+ if (rc == -EMSGSIZE) {
+ port->max_pkt = tipc_node_get_mtu(dnode, ref);
+ goto next;
+ }
+ if (rc != -ELINKCONG)
+ break;
+ }
+ rc = tipc_wait_for_sndpkt(sock, &timeo);
+ } while (!rc);
exit:
if (iocb)
release_sock(sk);
- return res;
+ return sent ? sent : rc;
}
/**
- * tipc_send_stream - send stream-oriented data
- * @iocb: (unused)
+ * tipc_send_packet - send a connection-oriented message
+ * @iocb: if NULL, indicates that socket lock is already held
* @sock: socket structure
- * @m: data to send
- * @total_len: total length of data to be sent
+ * @m: message to send
+ * @dsz: length of data to be transmitted
*
- * Used for SOCK_STREAM data.
+ * Used for SOCK_SEQPACKET messages.
*
- * Returns the number of bytes sent on success (or partial success),
- * or errno if no data sent
+ * Returns the number of bytes sent on success, or errno otherwise
*/
-static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
- struct msghdr *m, size_t total_len)
+static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
+ struct msghdr *m, size_t dsz)
{
- struct sock *sk = sock->sk;
- struct tipc_sock *tsk = tipc_sk(sk);
- struct msghdr my_msg;
- struct iovec my_iov;
- struct iovec *curr_iov;
- int curr_iovlen;
- char __user *curr_start;
- u32 hdr_size;
- int curr_left;
- int bytes_to_send;
- int bytes_sent;
- int res;
-
- lock_sock(sk);
-
- /* Handle special cases where there is no connection */
- if (unlikely(sock->state != SS_CONNECTED)) {
- if (sock->state == SS_UNCONNECTED)
- res = tipc_send_packet(NULL, sock, m, total_len);
- else
- res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
- goto exit;
- }
-
- if (unlikely(m->msg_name)) {
- res = -EISCONN;
- goto exit;
- }
-
- if (total_len > (unsigned int)INT_MAX) {
- res = -EMSGSIZE;
- goto exit;
- }
-
- /*
- * Send each iovec entry using one or more messages
- *
- * Note: This algorithm is good for the most likely case
- * (i.e. one large iovec entry), but could be improved to pass sets
- * of small iovec entries into send_packet().
- */
- curr_iov = m->msg_iov;
- curr_iovlen = m->msg_iovlen;
- my_msg.msg_iov = &my_iov;
- my_msg.msg_iovlen = 1;
- my_msg.msg_flags = m->msg_flags;
- my_msg.msg_name = NULL;
- bytes_sent = 0;
-
- hdr_size = msg_hdr_sz(&tsk->port.phdr);
-
- while (curr_iovlen--) {
- curr_start = curr_iov->iov_base;
- curr_left = curr_iov->iov_len;
-
- while (curr_left) {
- bytes_to_send = tsk->port.max_pkt - hdr_size;
- if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
- bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
- if (curr_left < bytes_to_send)
- bytes_to_send = curr_left;
- my_iov.iov_base = curr_start;
- my_iov.iov_len = bytes_to_send;
- res = tipc_send_packet(NULL, sock, &my_msg,
- bytes_to_send);
- if (res < 0) {
- if (bytes_sent)
- res = bytes_sent;
- goto exit;
- }
- curr_left -= bytes_to_send;
- curr_start += bytes_to_send;
- bytes_sent += bytes_to_send;
- }
+ if (dsz > TIPC_MAX_USER_MSG_SIZE)
+ return -EMSGSIZE;
- curr_iov++;
- }
- res = bytes_sent;
-exit:
- release_sock(sk);
- return res;
+ return tipc_send_stream(iocb, sock, m, dsz);
}
/**
@@ -1104,8 +1163,10 @@ restart:
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) &&
- (++port->conn_unacked >= TIPC_CONNACK_INTV))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
exit:
@@ -1213,8 +1274,10 @@ restart:
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
@@ -1269,17 +1332,16 @@ static void tipc_data_ready(struct sock *sk)
* @tsk: TIPC socket
* @msg: message
*
- * Returns TIPC error status code and socket error status code
- * once it encounters some errors
+ * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise
*/
-static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
+static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
{
struct sock *sk = &tsk->sk;
struct tipc_port *port = &tsk->port;
struct socket *sock = sk->sk_socket;
struct tipc_msg *msg = buf_msg(*buf);
- u32 retval = TIPC_ERR_NO_PORT;
+ int retval = -TIPC_ERR_NO_PORT;
int res;
if (msg_mcast(msg))
@@ -1382,32 +1444,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
*
* Called with socket lock already taken; port lock may also be taken.
*
- * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
+ * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
+ * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
*/
-static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
+static int filter_rcv(struct sock *sk, struct sk_buff *buf)
{
struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *msg = buf_msg(buf);
unsigned int limit = rcvbuf_limit(sk, buf);
- u32 res = TIPC_OK;
+ u32 onode;
+ int rc = TIPC_OK;
+
+ if (unlikely(msg_user(msg) == CONN_MANAGER))
+ return tipc_sk_proto_rcv(tsk, &onode, buf);
/* Reject message if it is wrong sort of message for socket */
if (msg_type(msg) > TIPC_DIRECT_MSG)
- return TIPC_ERR_NO_PORT;
+ return -TIPC_ERR_NO_PORT;
if (sock->state == SS_READY) {
if (msg_connected(msg))
- return TIPC_ERR_NO_PORT;
+ return -TIPC_ERR_NO_PORT;
} else {
- res = filter_connect(tsk, &buf);
- if (res != TIPC_OK || buf == NULL)
- return res;
+ rc = filter_connect(tsk, &buf);
+ if (rc != TIPC_OK || buf == NULL)
+ return rc;
}
/* Reject message if there isn't room to queue it */
if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
- return TIPC_ERR_OVERLOAD;
+ return -TIPC_ERR_OVERLOAD;
/* Enqueue message */
TIPC_SKB_CB(buf)->handle = NULL;
@@ -1429,16 +1496,23 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
*/
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
{
- u32 res;
+ int rc;
+ u32 onode;
struct tipc_sock *tsk = tipc_sk(sk);
uint truesize = buf->truesize;
- res = filter_rcv(sk, buf);
- if (unlikely(res))
- tipc_reject_msg(buf, res);
+ rc = filter_rcv(sk, buf);
+
+ if (likely(!rc)) {
+ if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
+ atomic_add(truesize, &tsk->dupl_rcvcnt);
+ return 0;
+ }
- if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
- atomic_add(truesize, &tsk->dupl_rcvcnt);
+ if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
+ return 0;
+
+ tipc_link_xmit2(buf, onode, 0);
return 0;
}
@@ -1455,19 +1529,14 @@ int tipc_sk_rcv(struct sk_buff *buf)
struct tipc_port *port;
struct sock *sk;
u32 dport = msg_destport(buf_msg(buf));
- int err = TIPC_OK;
+ int rc = TIPC_OK;
uint limit;
+ u32 dnode;
- /* Forward unresolved named message */
- if (unlikely(!dport)) {
- tipc_net_route_msg(buf);
- return 0;
- }
-
- /* Validate destination */
+ /* Validate destination and message */
port = tipc_port_lock(dport);
if (unlikely(!port)) {
- err = TIPC_ERR_NO_PORT;
+ rc = tipc_msg_eval(buf, &dnode);
goto exit;
}
@@ -1478,23 +1547,25 @@ int tipc_sk_rcv(struct sk_buff *buf)
bh_lock_sock(sk);
if (!sock_owned_by_user(sk)) {
- err = filter_rcv(sk, buf);
+ rc = filter_rcv(sk, buf);
} else {
if (sk->sk_backlog.len == 0)
atomic_set(&tsk->dupl_rcvcnt, 0);
limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
if (sk_add_backlog(sk, buf, limit))
- err = TIPC_ERR_OVERLOAD;
+ rc = -TIPC_ERR_OVERLOAD;
}
-
bh_unlock_sock(sk);
tipc_port_unlock(port);
- if (likely(!err))
+ if (likely(!rc))
return 0;
exit:
- tipc_reject_msg(buf, err);
- return -EHOSTUNREACH;
+ if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
+ return -EHOSTUNREACH;
+
+ tipc_link_xmit2(buf, dnode, 0);
+ return (rc < 0) ? -EHOSTUNREACH : 0;
}
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -1758,6 +1829,7 @@ static int tipc_shutdown(struct socket *sock, int how)
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_port *port = &tsk->port;
struct sk_buff *buf;
+ u32 peer;
int res;
if (how != SHUT_RDWR)
@@ -1778,7 +1850,8 @@ restart:
goto restart;
}
tipc_port_disconnect(port->ref);
- tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
+ if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
+ tipc_link_xmit2(buf, peer, 0);
} else {
tipc_port_shutdown(port->ref);
}