summaryrefslogtreecommitdiff
path: root/net/rds
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/cong.c3
-rw-r--r--net/rds/connection.c329
-rw-r--r--net/rds/ib.c1
-rw-r--r--net/rds/ib_cm.c3
-rw-r--r--net/rds/ib_rdma.c3
-rw-r--r--net/rds/ib_recv.c1
-rw-r--r--net/rds/ib_send.c1
-rw-r--r--net/rds/loop.c1
-rw-r--r--net/rds/rdma_transport.c1
-rw-r--r--net/rds/rds.h152
-rw-r--r--net/rds/rds_single_path.h30
-rw-r--r--net/rds/recv.c27
-rw-r--r--net/rds/send.c293
-rw-r--r--net/rds/tcp.c7
-rw-r--r--net/rds/tcp_connect.c4
-rw-r--r--net/rds/tcp_listen.c11
-rw-r--r--net/rds/tcp_recv.c1
-rw-r--r--net/rds/tcp_send.c1
-rw-r--r--net/rds/threads.c95
19 files changed, 614 insertions, 350 deletions
diff --git a/net/rds/cong.c b/net/rds/cong.c
index 6641bcf7c185..8398fee7c866 100644
--- a/net/rds/cong.c
+++ b/net/rds/cong.c
@@ -235,7 +235,8 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
* therefore trigger warnings.
* Defer the xmit to rds_send_worker() instead.
*/
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ queue_delayed_work(rds_wq,
+ &conn->c_path[0].cp_send_w, 0);
}
}
diff --git a/net/rds/connection.c b/net/rds/connection.c
index e3b118cae81d..a4b07c899d89 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -95,14 +95,16 @@ static struct rds_connection *rds_conn_lookup(struct net *net,
* and receiving over this connection again in the future. It is up to
* the transport to have serialized this call with its send and recv.
*/
-static void rds_conn_reset(struct rds_connection *conn)
+static void rds_conn_path_reset(struct rds_conn_path *cp)
{
+ struct rds_connection *conn = cp->cp_conn;
+
rdsdebug("connection %pI4 to %pI4 reset\n",
&conn->c_laddr, &conn->c_faddr);
rds_stats_inc(s_conn_reset);
- rds_send_reset(conn);
- conn->c_flags = 0;
+ rds_send_path_reset(cp);
+ cp->cp_flags = 0;
/* Do not clear next_rx_seq here, else we cannot distinguish
* retransmitted packets from new packets, and will hand all
@@ -110,6 +112,32 @@ static void rds_conn_reset(struct rds_connection *conn)
* reliability guarantees of RDS. */
}
+static void __rds_conn_path_init(struct rds_connection *conn,
+ struct rds_conn_path *cp, bool is_outgoing)
+{
+ spin_lock_init(&cp->cp_lock);
+ cp->cp_next_tx_seq = 1;
+ init_waitqueue_head(&cp->cp_waitq);
+ INIT_LIST_HEAD(&cp->cp_send_queue);
+ INIT_LIST_HEAD(&cp->cp_retrans);
+
+ cp->cp_conn = conn;
+ atomic_set(&cp->cp_state, RDS_CONN_DOWN);
+ cp->cp_send_gen = 0;
+ /* cp_outgoing is per-path. So we can only set it here
+ * for the single-path transports.
+ */
+ if (!conn->c_trans->t_mp_capable)
+ cp->cp_outgoing = (is_outgoing ? 1 : 0);
+ cp->cp_reconnect_jiffies = 0;
+ INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker);
+ INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker);
+ INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker);
+ INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
+ mutex_init(&cp->cp_cm_lock);
+ cp->cp_flags = 0;
+}
+
/*
* There is only every one 'conn' for a given pair of addresses in the
* system at a time. They contain messages to be retransmitted and so
@@ -153,13 +181,8 @@ static struct rds_connection *__rds_conn_create(struct net *net,
INIT_HLIST_NODE(&conn->c_hash_node);
conn->c_laddr = laddr;
conn->c_faddr = faddr;
- spin_lock_init(&conn->c_lock);
- conn->c_next_tx_seq = 1;
- rds_conn_net_set(conn, net);
- init_waitqueue_head(&conn->c_waitq);
- INIT_LIST_HEAD(&conn->c_send_queue);
- INIT_LIST_HEAD(&conn->c_retrans);
+ rds_conn_net_set(conn, net);
ret = rds_cong_get_maps(conn);
if (ret) {
@@ -195,17 +218,6 @@ static struct rds_connection *__rds_conn_create(struct net *net,
goto out;
}
- atomic_set(&conn->c_state, RDS_CONN_DOWN);
- conn->c_send_gen = 0;
- conn->c_outgoing = (is_outgoing ? 1 : 0);
- conn->c_reconnect_jiffies = 0;
- INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
- INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
- INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker);
- INIT_WORK(&conn->c_down_w, rds_shutdown_worker);
- mutex_init(&conn->c_cm_lock);
- conn->c_flags = 0;
-
rdsdebug("allocated conn %p for %pI4 -> %pI4 over %s %s\n",
conn, &laddr, &faddr,
trans->t_name ? trans->t_name : "[unknown]",
@@ -222,7 +234,7 @@ static struct rds_connection *__rds_conn_create(struct net *net,
if (parent) {
/* Creating passive conn */
if (parent->c_passive) {
- trans->conn_free(conn->c_transport_data);
+ trans->conn_free(conn->c_path[0].cp_transport_data);
kmem_cache_free(rds_conn_slab, conn);
conn = parent->c_passive;
} else {
@@ -236,10 +248,26 @@ static struct rds_connection *__rds_conn_create(struct net *net,
found = rds_conn_lookup(net, head, laddr, faddr, trans);
if (found) {
- trans->conn_free(conn->c_transport_data);
+ struct rds_conn_path *cp;
+ int i;
+
+ for (i = 0; i < RDS_MPATH_WORKERS; i++) {
+ cp = &conn->c_path[i];
+ trans->conn_free(cp->cp_transport_data);
+ if (!trans->t_mp_capable)
+ break;
+ }
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
+ int i;
+
+ for (i = 0; i < RDS_MPATH_WORKERS; i++) {
+ __rds_conn_path_init(conn, &conn->c_path[i],
+ is_outgoing);
+ conn->c_path[i].cp_index = i;
+ }
+
hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
@@ -267,10 +295,12 @@ struct rds_connection *rds_conn_create_outgoing(struct net *net,
}
EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
-void rds_conn_shutdown(struct rds_connection *conn)
+void rds_conn_shutdown(struct rds_conn_path *cp)
{
+ struct rds_connection *conn = cp->cp_conn;
+
/* shut it down unless it's down already */
- if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+ if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
/*
* Quiesce the connection mgmt handlers before we start tearing
* things down. We don't hold the mutex for the entire
@@ -278,35 +308,41 @@ void rds_conn_shutdown(struct rds_connection *conn)
* deadlocking with the CM handler. Instead, the CM event
* handler is supposed to check for state DISCONNECTING
*/
- mutex_lock(&conn->c_cm_lock);
- if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
- && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
- rds_conn_error(conn, "shutdown called in state %d\n",
- atomic_read(&conn->c_state));
- mutex_unlock(&conn->c_cm_lock);
+ mutex_lock(&cp->cp_cm_lock);
+ if (!rds_conn_path_transition(cp, RDS_CONN_UP,
+ RDS_CONN_DISCONNECTING) &&
+ !rds_conn_path_transition(cp, RDS_CONN_ERROR,
+ RDS_CONN_DISCONNECTING)) {
+ rds_conn_path_error(cp,
+ "shutdown called in state %d\n",
+ atomic_read(&cp->cp_state));
+ mutex_unlock(&cp->cp_cm_lock);
return;
}
- mutex_unlock(&conn->c_cm_lock);
+ mutex_unlock(&cp->cp_cm_lock);
- wait_event(conn->c_waitq,
- !test_bit(RDS_IN_XMIT, &conn->c_flags));
- wait_event(conn->c_waitq,
- !test_bit(RDS_RECV_REFILL, &conn->c_flags));
+ wait_event(cp->cp_waitq,
+ !test_bit(RDS_IN_XMIT, &cp->cp_flags));
+ wait_event(cp->cp_waitq,
+ !test_bit(RDS_RECV_REFILL, &cp->cp_flags));
- conn->c_trans->conn_shutdown(conn);
- rds_conn_reset(conn);
+ if (!conn->c_trans->t_mp_capable)
+ conn->c_trans->conn_shutdown(conn);
+ else
+ conn->c_trans->conn_path_shutdown(cp);
+ rds_conn_path_reset(cp);
- if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+ if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING,
+ RDS_CONN_DOWN)) {
/* This can happen - eg when we're in the middle of tearing
* down the connection, and someone unloads the rds module.
* Quite reproduceable with loopback connections.
* Mostly harmless.
*/
- rds_conn_error(conn,
- "%s: failed to transition to state DOWN, "
- "current state is %d\n",
- __func__,
- atomic_read(&conn->c_state));
+ rds_conn_path_error(cp, "%s: failed to transition "
+ "to state DOWN, current state "
+ "is %d\n", __func__,
+ atomic_read(&cp->cp_state));
return;
}
}
@@ -315,18 +351,46 @@ void rds_conn_shutdown(struct rds_connection *conn)
* The passive side of an IB loopback connection is never added
* to the conn hash, so we never trigger a reconnect on this
* conn - the reconnect is always triggered by the active peer. */
- cancel_delayed_work_sync(&conn->c_conn_w);
+ cancel_delayed_work_sync(&cp->cp_conn_w);
rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock();
if (conn->c_trans->t_type != RDS_TRANS_TCP ||
- conn->c_outgoing == 1)
- rds_queue_reconnect(conn);
+ cp->cp_outgoing == 1)
+ rds_queue_reconnect(cp);
} else {
rcu_read_unlock();
}
}
+/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
+ * all paths using rds_conn_path_destroy()
+ */
+static void rds_conn_path_destroy(struct rds_conn_path *cp)
+{
+ struct rds_message *rm, *rtmp;
+
+ rds_conn_path_drop(cp);
+ flush_work(&cp->cp_down_w);
+
+ /* make sure lingering queued work won't try to ref the conn */
+ cancel_delayed_work_sync(&cp->cp_send_w);
+ cancel_delayed_work_sync(&cp->cp_recv_w);
+
+ /* tear down queued messages */
+ list_for_each_entry_safe(rm, rtmp,
+ &cp->cp_send_queue,
+ m_conn_item) {
+ list_del_init(&rm->m_conn_item);
+ BUG_ON(!list_empty(&rm->m_sock_item));
+ rds_message_put(rm);
+ }
+ if (cp->cp_xmit_rm)
+ rds_message_put(cp->cp_xmit_rm);
+
+ cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
+}
+
/*
* Stop and free a connection.
*
@@ -336,7 +400,6 @@ void rds_conn_shutdown(struct rds_connection *conn)
*/
void rds_conn_destroy(struct rds_connection *conn)
{
- struct rds_message *rm, *rtmp;
unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
@@ -350,25 +413,19 @@ void rds_conn_destroy(struct rds_connection *conn)
synchronize_rcu();
/* shut the connection down */
- rds_conn_drop(conn);
- flush_work(&conn->c_down_w);
-
- /* make sure lingering queued work won't try to ref the conn */
- cancel_delayed_work_sync(&conn->c_send_w);
- cancel_delayed_work_sync(&conn->c_recv_w);
+ if (!conn->c_trans->t_mp_capable) {
+ rds_conn_path_destroy(&conn->c_path[0]);
+ BUG_ON(!list_empty(&conn->c_path[0].cp_retrans));
+ } else {
+ int i;
+ struct rds_conn_path *cp;
- /* tear down queued messages */
- list_for_each_entry_safe(rm, rtmp,
- &conn->c_send_queue,
- m_conn_item) {
- list_del_init(&rm->m_conn_item);
- BUG_ON(!list_empty(&rm->m_sock_item));
- rds_message_put(rm);
+ for (i = 0; i < RDS_MPATH_WORKERS; i++) {
+ cp = &conn->c_path[i];
+ rds_conn_path_destroy(cp);
+ BUG_ON(!list_empty(&cp->cp_retrans));
+ }
}
- if (conn->c_xmit_rm)
- rds_message_put(conn->c_xmit_rm);
-
- conn->c_trans->conn_free(conn->c_transport_data);
/*
* The congestion maps aren't freed up here. They're
@@ -377,7 +434,6 @@ void rds_conn_destroy(struct rds_connection *conn)
*/
rds_cong_remove_conn(conn);
- BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
spin_lock_irqsave(&rds_conn_lock, flags);
@@ -398,6 +454,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
unsigned int total = 0;
unsigned long flags;
size_t i;
+ int j;
len /= sizeof(struct rds_info_message);
@@ -406,23 +463,32 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
hlist_for_each_entry_rcu(conn, head, c_hash_node) {
- if (want_send)
- list = &conn->c_send_queue;
- else
- list = &conn->c_retrans;
-
- spin_lock_irqsave(&conn->c_lock, flags);
-
- /* XXX too lazy to maintain counts.. */
- list_for_each_entry(rm, list, m_conn_item) {
- total++;
- if (total <= len)
- rds_inc_info_copy(&rm->m_inc, iter,
- conn->c_laddr,
- conn->c_faddr, 0);
+ struct rds_conn_path *cp;
+
+ for (j = 0; j < RDS_MPATH_WORKERS; j++) {
+ cp = &conn->c_path[j];
+ if (want_send)
+ list = &cp->cp_send_queue;
+ else
+ list = &cp->cp_retrans;
+
+ spin_lock_irqsave(&cp->cp_lock, flags);
+
+ /* XXX too lazy to maintain counts.. */
+ list_for_each_entry(rm, list, m_conn_item) {
+ total++;
+ if (total <= len)
+ rds_inc_info_copy(&rm->m_inc,
+ iter,
+ conn->c_laddr,
+ conn->c_faddr,
+ 0);
+ }
+
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
+ if (!conn->c_trans->t_mp_capable)
+ break;
}
-
- spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
rcu_read_unlock();
@@ -484,27 +550,72 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
}
EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
-static int rds_conn_info_visitor(struct rds_connection *conn,
- void *buffer)
+void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
+ struct rds_info_iterator *iter,
+ struct rds_info_lengths *lens,
+ int (*visitor)(struct rds_conn_path *, void *),
+ size_t item_len)
+{
+ u64 buffer[(item_len + 7) / 8];
+ struct hlist_head *head;
+ struct rds_connection *conn;
+ size_t i;
+ int j;
+
+ rcu_read_lock();
+
+ lens->nr = 0;
+ lens->each = item_len;
+
+ for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
+ i++, head++) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
+ struct rds_conn_path *cp;
+
+ for (j = 0; j < RDS_MPATH_WORKERS; j++) {
+ cp = &conn->c_path[j];
+
+ /* XXX no cp_lock usage.. */
+ if (!visitor(cp, buffer))
+ continue;
+ if (!conn->c_trans->t_mp_capable)
+ break;
+ }
+
+ /* We copy as much as we can fit in the buffer,
+ * but we count all items so that the caller
+ * can resize the buffer.
+ */
+ if (len >= item_len) {
+ rds_info_copy(iter, buffer, item_len);
+ len -= item_len;
+ }
+ lens->nr++;
+ }
+ }
+ rcu_read_unlock();
+}
+
+static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
{
struct rds_info_connection *cinfo = buffer;
- cinfo->next_tx_seq = conn->c_next_tx_seq;
- cinfo->next_rx_seq = conn->c_next_rx_seq;
- cinfo->laddr = conn->c_laddr;
- cinfo->faddr = conn->c_faddr;
- strncpy(cinfo->transport, conn->c_trans->t_name,
+ cinfo->next_tx_seq = cp->cp_next_tx_seq;
+ cinfo->next_rx_seq = cp->cp_next_rx_seq;
+ cinfo->laddr = cp->cp_conn->c_laddr;
+ cinfo->faddr = cp->cp_conn->c_faddr;
+ strncpy(cinfo->transport, cp->cp_conn->c_trans->t_name,
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
- atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
+ atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
CONNECTING);
rds_conn_info_set(cinfo->flags,
- atomic_read(&conn->c_state) == RDS_CONN_UP,
+ atomic_read(&cp->cp_state) == RDS_CONN_UP,
CONNECTED);
return 1;
}
@@ -513,7 +624,7 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens)
{
- rds_for_each_conn_info(sock, len, iter, lens,
+ rds_walk_conn_path_info(sock, len, iter, lens,
rds_conn_info_visitor,
sizeof(struct rds_info_connection));
}
@@ -553,10 +664,16 @@ void rds_conn_exit(void)
/*
* Force a disconnect
*/
+void rds_conn_path_drop(struct rds_conn_path *cp)
+{
+ atomic_set(&cp->cp_state, RDS_CONN_ERROR);
+ queue_work(rds_wq, &cp->cp_down_w);
+}
+EXPORT_SYMBOL_GPL(rds_conn_path_drop);
+
void rds_conn_drop(struct rds_connection *conn)
{
- atomic_set(&conn->c_state, RDS_CONN_ERROR);
- queue_work(rds_wq, &conn->c_down_w);
+ rds_conn_path_drop(&conn->c_path[0]);
}
EXPORT_SYMBOL_GPL(rds_conn_drop);
@@ -564,11 +681,17 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
* If the connection is down, trigger a connect. We may have scheduled a
* delayed reconnect however - in this case we should not interfere.
*/
+void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
+{
+ if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
+ !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
+ queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+}
+
void rds_conn_connect_if_down(struct rds_connection *conn)
{
- if (rds_conn_state(conn) == RDS_CONN_DOWN &&
- !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
- queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+ WARN_ON(conn->c_trans->t_mp_capable);
+ rds_conn_path_connect_if_down(&conn->c_path[0]);
}
EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
@@ -586,3 +709,15 @@ __rds_conn_error(struct rds_connection *conn, const char *fmt, ...)
rds_conn_drop(conn);
}
+
+void
+__rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vprintk(fmt, ap);
+ va_end(ap);
+
+ rds_conn_path_drop(cp);
+}
diff --git a/net/rds/ib.c b/net/rds/ib.c
index b5342fddaf98..44946a681a8c 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -40,6 +40,7 @@
#include <linux/slab.h>
#include <linux/module.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
#include "ib_mr.h"
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 7c2a65a6af5c..e48bb1ba3dfc 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -36,6 +36,7 @@
#include <linux/vmalloc.h>
#include <linux/ratelimit.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
@@ -273,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued)))
- rds_send_xmit(ic->conn);
+ rds_send_xmit(&ic->conn->c_path[0]);
}
static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index f7164ac1ffc1..977f69886c00 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -35,6 +35,7 @@
#include <linux/rculist.h>
#include <linux/llist.h>
+#include "rds_single_path.h"
#include "ib_mr.h"
struct workqueue_struct *rds_ib_mr_wq;
@@ -618,7 +619,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
int rds_ib_mr_init(void)
{
- rds_ib_mr_wq = create_workqueue("rds_mr_flushd");
+ rds_ib_mr_wq = alloc_workqueue("rds_mr_flushd", WQ_MEM_RECLAIM, 0);
if (!rds_ib_mr_wq)
return -ENOMEM;
return 0;
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index abc8cc805e8d..4ea8cb17cc7a 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -36,6 +36,7 @@
#include <linux/dma-mapping.h>
#include <rdma/rdma_cm.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index f27d2c82b036..6e4110aa5135 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -36,6 +36,7 @@
#include <linux/dmapool.h>
#include <linux/ratelimit.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "ib.h"
diff --git a/net/rds/loop.c b/net/rds/loop.c
index 814173b466d9..15f83db78f0c 100644
--- a/net/rds/loop.c
+++ b/net/rds/loop.c
@@ -34,6 +34,7 @@
#include <linux/slab.h>
#include <linux/in.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "loop.h"
diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c
index 7220bebcf558..345f09059e9f 100644
--- a/net/rds/rdma_transport.c
+++ b/net/rds/rdma_transport.c
@@ -33,6 +33,7 @@
#include <linux/module.h>
#include <rdma/rdma_cm.h>
+#include "rds_single_path.h"
#include "rdma_transport.h"
#include "ib.h"
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 387df5f32e49..2e35b738176f 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -84,56 +84,69 @@ enum {
#define RDS_IN_XMIT 2
#define RDS_RECV_REFILL 3
+/* Max number of multipaths per RDS connection. Must be a power of 2 */
+#define RDS_MPATH_WORKERS 1
+
+/* Per mpath connection state */
+struct rds_conn_path {
+ struct rds_connection *cp_conn;
+ struct rds_message *cp_xmit_rm;
+ unsigned long cp_xmit_sg;
+ unsigned int cp_xmit_hdr_off;
+ unsigned int cp_xmit_data_off;
+ unsigned int cp_xmit_atomic_sent;
+ unsigned int cp_xmit_rdma_sent;
+ unsigned int cp_xmit_data_sent;
+
+ spinlock_t cp_lock; /* protect msg queues */
+ u64 cp_next_tx_seq;
+ struct list_head cp_send_queue;
+ struct list_head cp_retrans;
+
+ u64 cp_next_rx_seq;
+
+ void *cp_transport_data;
+
+ atomic_t cp_state;
+ unsigned long cp_send_gen;
+ unsigned long cp_flags;
+ unsigned long cp_reconnect_jiffies;
+ struct delayed_work cp_send_w;
+ struct delayed_work cp_recv_w;
+ struct delayed_work cp_conn_w;
+ struct work_struct cp_down_w;
+ struct mutex cp_cm_lock; /* protect cp_state & cm */
+ wait_queue_head_t cp_waitq;
+
+ unsigned int cp_unacked_packets;
+ unsigned int cp_unacked_bytes;
+ unsigned int cp_outgoing:1,
+ cp_pad_to_32:31;
+ unsigned int cp_index;
+};
+
+/* One rds_connection per RDS address pair */
struct rds_connection {
struct hlist_node c_hash_node;
__be32 c_laddr;
__be32 c_faddr;
unsigned int c_loopback:1,
- c_outgoing:1,
- c_pad_to_32:30;
+ c_pad_to_32:31;
+ int c_npaths;
struct rds_connection *c_passive;
+ struct rds_transport *c_trans;
struct rds_cong_map *c_lcong;
struct rds_cong_map *c_fcong;
- struct rds_message *c_xmit_rm;
- unsigned long c_xmit_sg;
- unsigned int c_xmit_hdr_off;
- unsigned int c_xmit_data_off;
- unsigned int c_xmit_atomic_sent;
- unsigned int c_xmit_rdma_sent;
- unsigned int c_xmit_data_sent;
-
- spinlock_t c_lock; /* protect msg queues */
- u64 c_next_tx_seq;
- struct list_head c_send_queue;
- struct list_head c_retrans;
-
- u64 c_next_rx_seq;
-
- struct rds_transport *c_trans;
- void *c_transport_data;
-
- atomic_t c_state;
- unsigned long c_send_gen;
- unsigned long c_flags;
- unsigned long c_reconnect_jiffies;
- struct delayed_work c_send_w;
- struct delayed_work c_recv_w;
- struct delayed_work c_conn_w;
- struct work_struct c_down_w;
- struct mutex c_cm_lock; /* protect conn state & cm */
- wait_queue_head_t c_waitq;
+ /* Protocol version */
+ unsigned int c_version;
+ possible_net_t c_net;
struct list_head c_map_item;
unsigned long c_map_queued;
- unsigned int c_unacked_packets;
- unsigned int c_unacked_bytes;
-
- /* Protocol version */
- unsigned int c_version;
- possible_net_t c_net;
+ struct rds_conn_path c_path[RDS_MPATH_WORKERS];
};
static inline
@@ -218,6 +231,7 @@ struct rds_incoming {
atomic_t i_refcount;
struct list_head i_item;
struct rds_connection *i_conn;
+ struct rds_conn_path *i_conn_path;
struct rds_header i_hdr;
unsigned long i_rx_jiffies;
__be32 i_saddr;
@@ -433,7 +447,8 @@ struct rds_transport {
char t_name[TRANSNAMSIZ];
struct list_head t_item;
struct module *t_owner;
- unsigned int t_prefer_loopback:1;
+ unsigned int t_prefer_loopback:1,
+ t_mp_capable:1;
unsigned int t_type;
int (*laddr_check)(struct net *net, __be32 addr);
@@ -441,8 +456,11 @@ struct rds_transport {
void (*conn_free)(void *data);
int (*conn_connect)(struct rds_connection *conn);
void (*conn_shutdown)(struct rds_connection *conn);
+ void (*conn_path_shutdown)(struct rds_conn_path *conn);
void (*xmit_prepare)(struct rds_connection *conn);
+ void (*xmit_path_prepare)(struct rds_conn_path *cp);
void (*xmit_complete)(struct rds_connection *conn);
+ void (*xmit_path_complete)(struct rds_conn_path *cp);
int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
@@ -636,10 +654,12 @@ struct rds_connection *rds_conn_create(struct net *net,
struct rds_connection *rds_conn_create_outgoing(struct net *net,
__be32 laddr, __be32 faddr,
struct rds_transport *trans, gfp_t gfp);
-void rds_conn_shutdown(struct rds_connection *conn);
+void rds_conn_shutdown(struct rds_conn_path *cpath);
void rds_conn_destroy(struct rds_connection *conn);
void rds_conn_drop(struct rds_connection *conn);
+void rds_conn_path_drop(struct rds_conn_path *cpath);
void rds_conn_connect_if_down(struct rds_connection *conn);
+void rds_conn_path_connect_if_down(struct rds_conn_path *cp);
void rds_for_each_conn_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
struct rds_info_lengths *lens,
@@ -650,28 +670,60 @@ void __rds_conn_error(struct rds_connection *conn, const char *, ...);
#define rds_conn_error(conn, fmt...) \
__rds_conn_error(conn, KERN_WARNING "RDS: " fmt)
+void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...);
+#define rds_conn_path_error(cp, fmt...) \
+ __rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt)
+
+static inline int
+rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
+{
+ return atomic_cmpxchg(&cp->cp_state, old, new) == old;
+}
+
static inline int
rds_conn_transition(struct rds_connection *conn, int old, int new)
{
- return atomic_cmpxchg(&conn->c_state, old, new) == old;
+ WARN_ON(conn->c_trans->t_mp_capable);
+ return rds_conn_path_transition(&conn->c_path[0], old, new);
+}
+
+static inline int
+rds_conn_path_state(struct rds_conn_path *cp)
+{
+ return atomic_read(&cp->cp_state);
}
static inline int
rds_conn_state(struct rds_connection *conn)
{
- return atomic_read(&conn->c_state);
+ WARN_ON(conn->c_trans->t_mp_capable);
+ return rds_conn_path_state(&conn->c_path[0]);
+}
+
+static inline int
+rds_conn_path_up(struct rds_conn_path *cp)
+{
+ return atomic_read(&cp->cp_state) == RDS_CONN_UP;
}
static inline int
rds_conn_up(struct rds_connection *conn)
{
- return atomic_read(&conn->c_state) == RDS_CONN_UP;
+ WARN_ON(conn->c_trans->t_mp_capable);
+ return rds_conn_path_up(&conn->c_path[0]);
+}
+
+static inline int
+rds_conn_path_connecting(struct rds_conn_path *cp)
+{
+ return atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING;
}
static inline int
rds_conn_connecting(struct rds_connection *conn)
{
- return atomic_read(&conn->c_state) == RDS_CONN_CONNECTING;
+ WARN_ON(conn->c_trans->t_mp_capable);
+ return rds_conn_path_connecting(&conn->c_path[0]);
}
/* message.c */
@@ -720,6 +772,8 @@ void rds_page_exit(void);
/* recv.c */
void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
__be32 saddr);
+void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *conn,
+ __be32 saddr);
void rds_inc_put(struct rds_incoming *inc);
void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_incoming *inc, gfp_t gfp);
@@ -733,16 +787,16 @@ void rds_inc_info_copy(struct rds_incoming *inc,
/* send.c */
int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
-void rds_send_reset(struct rds_connection *conn);
-int rds_send_xmit(struct rds_connection *conn);
+void rds_send_path_reset(struct rds_conn_path *conn);
+int rds_send_xmit(struct rds_conn_path *cp);
struct sockaddr_in;
void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest);
typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack);
void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
is_acked_func is_acked);
-int rds_send_pong(struct rds_connection *conn, __be16 dport);
-struct rds_message *rds_send_get_message(struct rds_connection *,
- struct rm_rdma_op *);
+void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
+ is_acked_func is_acked);
+int rds_send_pong(struct rds_conn_path *cp, __be16 dport);
/* rdma.c */
void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force);
@@ -809,12 +863,12 @@ extern unsigned int rds_sysctl_trace_level;
int rds_threads_init(void);
void rds_threads_exit(void);
extern struct workqueue_struct *rds_wq;
-void rds_queue_reconnect(struct rds_connection *conn);
+void rds_queue_reconnect(struct rds_conn_path *cp);
void rds_connect_worker(struct work_struct *);
void rds_shutdown_worker(struct work_struct *);
void rds_send_worker(struct work_struct *);
void rds_recv_worker(struct work_struct *);
-void rds_connect_path_complete(struct rds_connection *conn, int curr);
+void rds_connect_path_complete(struct rds_conn_path *conn, int curr);
void rds_connect_complete(struct rds_connection *conn);
/* transport.c */
diff --git a/net/rds/rds_single_path.h b/net/rds/rds_single_path.h
new file mode 100644
index 000000000000..e1241af7c1ad
--- /dev/null
+++ b/net/rds/rds_single_path.h
@@ -0,0 +1,30 @@
+#ifndef _RDS_RDS_SINGLE_H
+#define _RDS_RDS_SINGLE_H
+
+#define c_xmit_rm c_path[0].cp_xmit_rm
+#define c_xmit_sg c_path[0].cp_xmit_sg
+#define c_xmit_hdr_off c_path[0].cp_xmit_hdr_off
+#define c_xmit_data_off c_path[0].cp_xmit_data_off
+#define c_xmit_atomic_sent c_path[0].cp_xmit_atomic_sent
+#define c_xmit_rdma_sent c_path[0].cp_xmit_rdma_sent
+#define c_xmit_data_sent c_path[0].cp_xmit_data_sent
+#define c_lock c_path[0].cp_lock
+#define c_next_tx_seq c_path[0].cp_next_tx_seq
+#define c_send_queue c_path[0].cp_send_queue
+#define c_retrans c_path[0].cp_retrans
+#define c_next_rx_seq c_path[0].cp_next_rx_seq
+#define c_transport_data c_path[0].cp_transport_data
+#define c_state c_path[0].cp_state
+#define c_send_gen c_path[0].cp_send_gen
+#define c_flags c_path[0].cp_flags
+#define c_reconnect_jiffies c_path[0].cp_reconnect_jiffies
+#define c_send_w c_path[0].cp_send_w
+#define c_recv_w c_path[0].cp_recv_w
+#define c_conn_w c_path[0].cp_conn_w
+#define c_down_w c_path[0].cp_down_w
+#define c_cm_lock c_path[0].cp_cm_lock
+#define c_waitq c_path[0].cp_waitq
+#define c_unacked_packets c_path[0].cp_unacked_packets
+#define c_unacked_bytes c_path[0].cp_unacked_bytes
+
+#endif /* _RDS_RDS_SINGLE_H */
diff --git a/net/rds/recv.c b/net/rds/recv.c
index 8413f6c99e13..b58f50571782 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -53,6 +53,20 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
}
EXPORT_SYMBOL_GPL(rds_inc_init);
+void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
+ __be32 saddr)
+{
+ atomic_set(&inc->i_refcount, 1);
+ INIT_LIST_HEAD(&inc->i_item);
+ inc->i_conn = cp->cp_conn;
+ inc->i_conn_path = cp;
+ inc->i_saddr = saddr;
+ inc->i_rdma_cookie = 0;
+ inc->i_rx_tstamp.tv_sec = 0;
+ inc->i_rx_tstamp.tv_usec = 0;
+}
+EXPORT_SYMBOL_GPL(rds_inc_path_init);
+
static void rds_inc_addref(struct rds_incoming *inc)
{
rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
@@ -164,13 +178,18 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_sock *rs = NULL;
struct sock *sk;
unsigned long flags;
+ struct rds_conn_path *cp;
inc->i_conn = conn;
inc->i_rx_jiffies = jiffies;
+ if (conn->c_trans->t_mp_capable)
+ cp = inc->i_conn_path;
+ else
+ cp = &conn->c_path[0];
rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
"flags 0x%x rx_jiffies %lu\n", conn,
- (unsigned long long)conn->c_next_rx_seq,
+ (unsigned long long)cp->cp_next_rx_seq,
inc,
(unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
be32_to_cpu(inc->i_hdr.h_len),
@@ -199,16 +218,16 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
* XXX we could spend more on the wire to get more robust failure
* detection, arguably worth it to avoid data corruption.
*/
- if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
+ if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
(inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
rds_stats_inc(s_recv_drop_old_seq);
goto out;
}
- conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
+ cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
rds_stats_inc(s_recv_ping);
- rds_send_pong(conn, inc->i_hdr.h_sport);
+ rds_send_pong(cp, inc->i_hdr.h_sport);
goto out;
}
diff --git a/net/rds/send.c b/net/rds/send.c
index b1962f8e30f7..ee43d6b2ea8f 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -62,14 +62,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status);
* Reset the send state. Callers must ensure that this doesn't race with
* rds_send_xmit().
*/
-void rds_send_reset(struct rds_connection *conn)
+void rds_send_path_reset(struct rds_conn_path *cp)
{
struct rds_message *rm, *tmp;
unsigned long flags;
- if (conn->c_xmit_rm) {
- rm = conn->c_xmit_rm;
- conn->c_xmit_rm = NULL;
+ if (cp->cp_xmit_rm) {
+ rm = cp->cp_xmit_rm;
+ cp->cp_xmit_rm = NULL;
/* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
@@ -78,37 +78,37 @@ void rds_send_reset(struct rds_connection *conn)
rds_message_put(rm);
}
- conn->c_xmit_sg = 0;
- conn->c_xmit_hdr_off = 0;
- conn->c_xmit_data_off = 0;
- conn->c_xmit_atomic_sent = 0;
- conn->c_xmit_rdma_sent = 0;
- conn->c_xmit_data_sent = 0;
+ cp->cp_xmit_sg = 0;
+ cp->cp_xmit_hdr_off = 0;
+ cp->cp_xmit_data_off = 0;
+ cp->cp_xmit_atomic_sent = 0;
+ cp->cp_xmit_rdma_sent = 0;
+ cp->cp_xmit_data_sent = 0;
- conn->c_map_queued = 0;
+ cp->cp_conn->c_map_queued = 0;
- conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
- conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
+ cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
+ cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
/* Mark messages as retransmissions, and move them to the send q */
- spin_lock_irqsave(&conn->c_lock, flags);
- list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
+ spin_lock_irqsave(&cp->cp_lock, flags);
+ list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
}
- list_splice_init(&conn->c_retrans, &conn->c_send_queue);
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
}
-EXPORT_SYMBOL_GPL(rds_send_reset);
+EXPORT_SYMBOL_GPL(rds_send_path_reset);
-static int acquire_in_xmit(struct rds_connection *conn)
+static int acquire_in_xmit(struct rds_conn_path *cp)
{
- return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+ return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
}
-static void release_in_xmit(struct rds_connection *conn)
+static void release_in_xmit(struct rds_conn_path *cp)
{
- clear_bit(RDS_IN_XMIT, &conn->c_flags);
+ clear_bit(RDS_IN_XMIT, &cp->cp_flags);
smp_mb__after_atomic();
/*
* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
@@ -116,8 +116,8 @@ static void release_in_xmit(struct rds_connection *conn)
* the system-wide hashed waitqueue buckets in the fast path only to
* almost never find waiters.
*/
- if (waitqueue_active(&conn->c_waitq))
- wake_up_all(&conn->c_waitq);
+ if (waitqueue_active(&cp->cp_waitq))
+ wake_up_all(&cp->cp_waitq);
}
/*
@@ -134,8 +134,9 @@ static void release_in_xmit(struct rds_connection *conn)
* - small message latency is higher behind queued large messages
* - large message latency isn't starved by intervening small sends
*/
-int rds_send_xmit(struct rds_connection *conn)
+int rds_send_xmit(struct rds_conn_path *cp)
{
+ struct rds_connection *conn = cp->cp_conn;
struct rds_message *rm;
unsigned long flags;
unsigned int tmp;
@@ -155,7 +156,7 @@ restart:
* avoids blocking the caller and trading per-connection data between
* caches per message.
*/
- if (!acquire_in_xmit(conn)) {
+ if (!acquire_in_xmit(cp)) {
rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
@@ -169,21 +170,25 @@ restart:
* The acquire_in_xmit() check above ensures that only one
* caller can increment c_send_gen at any time.
*/
- conn->c_send_gen++;
- send_gen = conn->c_send_gen;
+ cp->cp_send_gen++;
+ send_gen = cp->cp_send_gen;
/*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races.
*/
- if (!rds_conn_up(conn)) {
- release_in_xmit(conn);
+ if (!rds_conn_path_up(cp)) {
+ release_in_xmit(cp);
ret = 0;
goto out;
}
- if (conn->c_trans->xmit_prepare)
+ if (conn->c_trans->t_mp_capable) {
+ if (conn->c_trans->xmit_path_prepare)
+ conn->c_trans->xmit_path_prepare(cp);
+ } else if (conn->c_trans->xmit_prepare) {
conn->c_trans->xmit_prepare(conn);
+ }
/*
* spin trying to push headers and data down the connection until
@@ -191,7 +196,7 @@ restart:
*/
while (1) {
- rm = conn->c_xmit_rm;
+ rm = cp->cp_xmit_rm;
/*
* If between sending messages, we can send a pending congestion
@@ -204,14 +209,16 @@ restart:
break;
}
rm->data.op_active = 1;
+ rm->m_inc.i_conn_path = cp;
+ rm->m_inc.i_conn = cp->cp_conn;
- conn->c_xmit_rm = rm;
+ cp->cp_xmit_rm = rm;
}
/*
* If not already working on one, grab the next message.
*
- * c_xmit_rm holds a ref while we're sending this message down
+ * cp_xmit_rm holds a ref while we're sending this message down
* the connction. We can use this ref while holding the
* send_sem.. rds_send_reset() is serialized with it.
*/
@@ -228,10 +235,10 @@ restart:
if (batch_count >= send_batch_count)
goto over_batch;
- spin_lock_irqsave(&conn->c_lock, flags);
+ spin_lock_irqsave(&cp->cp_lock, flags);
- if (!list_empty(&conn->c_send_queue)) {
- rm = list_entry(conn->c_send_queue.next,
+ if (!list_empty(&cp->cp_send_queue)) {
+ rm = list_entry(cp->cp_send_queue.next,
struct rds_message,
m_conn_item);
rds_message_addref(rm);
@@ -240,10 +247,11 @@ restart:
* Move the message from the send queue to the retransmit
* list right away.
*/
- list_move_tail(&rm->m_conn_item, &conn->c_retrans);
+ list_move_tail(&rm->m_conn_item,
+ &cp->cp_retrans);
}
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
if (!rm)
break;
@@ -257,32 +265,34 @@ restart:
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
- spin_lock_irqsave(&conn->c_lock, flags);
+ spin_lock_irqsave(&cp->cp_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
continue;
}
/* Require an ACK every once in a while */
len = ntohl(rm->m_inc.i_hdr.h_len);
- if (conn->c_unacked_packets == 0 ||
- conn->c_unacked_bytes < len) {
+ if (cp->cp_unacked_packets == 0 ||
+ cp->cp_unacked_bytes < len) {
__set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
- conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
- conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
+ cp->cp_unacked_packets =
+ rds_sysctl_max_unacked_packets;
+ cp->cp_unacked_bytes =
+ rds_sysctl_max_unacked_bytes;
rds_stats_inc(s_send_ack_required);
} else {
- conn->c_unacked_bytes -= len;
- conn->c_unacked_packets--;
+ cp->cp_unacked_bytes -= len;
+ cp->cp_unacked_packets--;
}
- conn->c_xmit_rm = rm;
+ cp->cp_xmit_rm = rm;
}
/* The transport either sends the whole rdma or none of it */
- if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+ if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
rm->m_final_op = &rm->rdma;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue
@@ -294,11 +304,11 @@ restart:
wake_up_interruptible(&rm->m_flush_wait);
break;
}
- conn->c_xmit_rdma_sent = 1;
+ cp->cp_xmit_rdma_sent = 1;
}
- if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
+ if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
rm->m_final_op = &rm->atomic;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue
@@ -310,7 +320,7 @@ restart:
wake_up_interruptible(&rm->m_flush_wait);
break;
}
- conn->c_xmit_atomic_sent = 1;
+ cp->cp_xmit_atomic_sent = 1;
}
@@ -336,41 +346,42 @@ restart:
rm->data.op_active = 0;
}
- if (rm->data.op_active && !conn->c_xmit_data_sent) {
+ if (rm->data.op_active && !cp->cp_xmit_data_sent) {
rm->m_final_op = &rm->data;
+
ret = conn->c_trans->xmit(conn, rm,
- conn->c_xmit_hdr_off,
- conn->c_xmit_sg,
- conn->c_xmit_data_off);
+ cp->cp_xmit_hdr_off,
+ cp->cp_xmit_sg,
+ cp->cp_xmit_data_off);
if (ret <= 0)
break;
- if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
+ if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
tmp = min_t(int, ret,
sizeof(struct rds_header) -
- conn->c_xmit_hdr_off);
- conn->c_xmit_hdr_off += tmp;
+ cp->cp_xmit_hdr_off);
+ cp->cp_xmit_hdr_off += tmp;
ret -= tmp;
}
- sg = &rm->data.op_sg[conn->c_xmit_sg];
+ sg = &rm->data.op_sg[cp->cp_xmit_sg];
while (ret) {
tmp = min_t(int, ret, sg->length -
- conn->c_xmit_data_off);
- conn->c_xmit_data_off += tmp;
+ cp->cp_xmit_data_off);
+ cp->cp_xmit_data_off += tmp;
ret -= tmp;
- if (conn->c_xmit_data_off == sg->length) {
- conn->c_xmit_data_off = 0;
+ if (cp->cp_xmit_data_off == sg->length) {
+ cp->cp_xmit_data_off = 0;
sg++;
- conn->c_xmit_sg++;
- BUG_ON(ret != 0 &&
- conn->c_xmit_sg == rm->data.op_nents);
+ cp->cp_xmit_sg++;
+ BUG_ON(ret != 0 && cp->cp_xmit_sg ==
+ rm->data.op_nents);
}
}
- if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
- (conn->c_xmit_sg == rm->data.op_nents))
- conn->c_xmit_data_sent = 1;
+ if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
+ (cp->cp_xmit_sg == rm->data.op_nents))
+ cp->cp_xmit_data_sent = 1;
}
/*
@@ -378,23 +389,27 @@ restart:
* if there is a data op. Thus, if the data is sent (or there was
* none), then we're done with the rm.
*/
- if (!rm->data.op_active || conn->c_xmit_data_sent) {
- conn->c_xmit_rm = NULL;
- conn->c_xmit_sg = 0;
- conn->c_xmit_hdr_off = 0;
- conn->c_xmit_data_off = 0;
- conn->c_xmit_rdma_sent = 0;
- conn->c_xmit_atomic_sent = 0;
- conn->c_xmit_data_sent = 0;
+ if (!rm->data.op_active || cp->cp_xmit_data_sent) {
+ cp->cp_xmit_rm = NULL;
+ cp->cp_xmit_sg = 0;
+ cp->cp_xmit_hdr_off = 0;
+ cp->cp_xmit_data_off = 0;
+ cp->cp_xmit_rdma_sent = 0;
+ cp->cp_xmit_atomic_sent = 0;
+ cp->cp_xmit_data_sent = 0;
rds_message_put(rm);
}
}
over_batch:
- if (conn->c_trans->xmit_complete)
+ if (conn->c_trans->t_mp_capable) {
+ if (conn->c_trans->xmit_path_complete)
+ conn->c_trans->xmit_path_complete(cp);
+ } else if (conn->c_trans->xmit_complete) {
conn->c_trans->xmit_complete(conn);
- release_in_xmit(conn);
+ }
+ release_in_xmit(cp);
/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
@@ -422,12 +437,12 @@ over_batch:
if (ret == 0) {
smp_mb();
if ((test_bit(0, &conn->c_map_queued) ||
- !list_empty(&conn->c_send_queue)) &&
- send_gen == conn->c_send_gen) {
+ !list_empty(&cp->cp_send_queue)) &&
+ send_gen == cp->cp_send_gen) {
rds_stats_inc(s_send_lock_queue_raced);
if (batch_count < send_batch_count)
goto restart;
- queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+ queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
}
}
out:
@@ -560,42 +575,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
}
/*
- * This is called from the IB send completion when we detect
- * a RDMA operation that failed with remote access error.
- * So speed is not an issue here.
- */
-struct rds_message *rds_send_get_message(struct rds_connection *conn,
- struct rm_rdma_op *op)
-{
- struct rds_message *rm, *tmp, *found = NULL;
- unsigned long flags;
-
- spin_lock_irqsave(&conn->c_lock, flags);
-
- list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
- if (&rm->rdma == op) {
- atomic_inc(&rm->m_refcount);
- found = rm;
- goto out;
- }
- }
-
- list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) {
- if (&rm->rdma == op) {
- atomic_inc(&rm->m_refcount);
- found = rm;
- break;
- }
- }
-
-out:
- spin_unlock_irqrestore(&conn->c_lock, flags);
-
- return found;
-}
-EXPORT_SYMBOL_GPL(rds_send_get_message);
-
-/*
* This removes messages from the socket's list if they're on it. The list
* argument must be private to the caller, we must be able to modify it
* without locks. The messages must have a reference held for their
@@ -685,16 +664,16 @@ unlock_and_drop:
* assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
* checks the RDS_MSG_HAS_ACK_SEQ bit.
*/
-void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
- is_acked_func is_acked)
+void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
+ is_acked_func is_acked)
{
struct rds_message *rm, *tmp;
unsigned long flags;
LIST_HEAD(list);
- spin_lock_irqsave(&conn->c_lock, flags);
+ spin_lock_irqsave(&cp->cp_lock, flags);
- list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
+ list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
if (!rds_send_is_acked(rm, ack, is_acked))
break;
@@ -706,17 +685,26 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
if (!list_empty(&list))
smp_mb__after_atomic();
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
/* now remove the messages from the sock list as needed */
rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
}
+EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
+
+void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
+ is_acked_func is_acked)
+{
+ WARN_ON(conn->c_trans->t_mp_capable);
+ rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
+}
EXPORT_SYMBOL_GPL(rds_send_drop_acked);
void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
{
struct rds_message *rm, *tmp;
struct rds_connection *conn;
+ struct rds_conn_path *cp;
unsigned long flags;
LIST_HEAD(list);
@@ -745,22 +733,26 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
list_for_each_entry(rm, &list, m_sock_item) {
conn = rm->m_inc.i_conn;
+ if (conn->c_trans->t_mp_capable)
+ cp = rm->m_inc.i_conn_path;
+ else
+ cp = &conn->c_path[0];
- spin_lock_irqsave(&conn->c_lock, flags);
+ spin_lock_irqsave(&cp->cp_lock, flags);
/*
* Maybe someone else beat us to removing rm from the conn.
* If we race with their flag update we'll get the lock and
* then really see that the flag has been cleared.
*/
if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
spin_lock_irqsave(&rm->m_rs_lock, flags);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
continue;
}
list_del_init(&rm->m_conn_item);
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
/*
* Couldn't grab m_rs_lock in top loop (lock ordering),
@@ -809,6 +801,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
* message from the flow with RDS_CANCEL_SENT_TO.
*/
static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
+ struct rds_conn_path *cp,
struct rds_message *rm, __be16 sport,
__be16 dport, int *queued)
{
@@ -852,13 +845,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
trying to minimize the time we hold c_lock */
rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
rm->m_inc.i_conn = conn;
+ rm->m_inc.i_conn_path = cp;
rds_message_addref(rm);
- spin_lock(&conn->c_lock);
- rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++);
- list_add_tail(&rm->m_conn_item, &conn->c_send_queue);
+ spin_lock(&cp->cp_lock);
+ rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
+ list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
- spin_unlock(&conn->c_lock);
+ spin_unlock(&cp->cp_lock);
rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
rm, len, rs, rs->rs_snd_bytes,
@@ -990,6 +984,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
int queued = 0, allocated_mr = 0;
int nonblock = msg->msg_flags & MSG_DONTWAIT;
long timeo = sock_sndtimeo(sk, nonblock);
+ struct rds_conn_path *cpath;
/* Mirror Linux UDP mirror of BSD error message compatibility */
/* XXX: Perhaps MSG_MORE someday */
@@ -1088,15 +1083,16 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
goto out;
}
- rds_conn_connect_if_down(conn);
+ cpath = &conn->c_path[0];
+
+ rds_conn_path_connect_if_down(cpath);
ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
if (ret) {
rs->rs_seen_congestion = 1;
goto out;
}
-
- while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
+ while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
dport, &queued)) {
rds_stats_inc(s_send_queue_full);
@@ -1106,7 +1102,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
}
timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
- rds_send_queue_rm(rs, conn, rm,
+ rds_send_queue_rm(rs, conn, cpath, rm,
rs->rs_bound_port,
dport,
&queued),
@@ -1127,9 +1123,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
*/
rds_stats_inc(s_send_queued);
- ret = rds_send_xmit(conn);
+ ret = rds_send_xmit(cpath);
if (ret == -ENOMEM || ret == -EAGAIN)
- queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+ queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
rds_message_put(rm);
return payload_len;
@@ -1150,7 +1146,7 @@ out:
* Reply to a ping packet.
*/
int
-rds_send_pong(struct rds_connection *conn, __be16 dport)
+rds_send_pong(struct rds_conn_path *cp, __be16 dport)
{
struct rds_message *rm;
unsigned long flags;
@@ -1162,31 +1158,32 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
goto out;
}
- rm->m_daddr = conn->c_faddr;
+ rm->m_daddr = cp->cp_conn->c_faddr;
rm->data.op_active = 1;
- rds_conn_connect_if_down(conn);
+ rds_conn_path_connect_if_down(cp);
- ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL);
+ ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
if (ret)
goto out;
- spin_lock_irqsave(&conn->c_lock, flags);
- list_add_tail(&rm->m_conn_item, &conn->c_send_queue);
+ spin_lock_irqsave(&cp->cp_lock, flags);
+ list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
rds_message_addref(rm);
- rm->m_inc.i_conn = conn;
+ rm->m_inc.i_conn = cp->cp_conn;
+ rm->m_inc.i_conn_path = cp;
rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
- conn->c_next_tx_seq);
- conn->c_next_tx_seq++;
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ cp->cp_next_tx_seq);
+ cp->cp_next_tx_seq++;
+ spin_unlock_irqrestore(&cp->cp_lock, flags);
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
/* schedule the send work on rds_wq */
- queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+ queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
rds_message_put(rm);
return 0;
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 74ee126a6fe6..5217d49ce6d6 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -38,6 +38,7 @@
#include <net/net_namespace.h>
#include <net/netns/generic.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
@@ -56,8 +57,8 @@ static int rds_tcp_skbuf_handler(struct ctl_table *ctl, int write,
void __user *buffer, size_t *lenp,
loff_t *fpos);
-int rds_tcp_min_sndbuf = SOCK_MIN_SNDBUF;
-int rds_tcp_min_rcvbuf = SOCK_MIN_RCVBUF;
+static int rds_tcp_min_sndbuf = SOCK_MIN_SNDBUF;
+static int rds_tcp_min_rcvbuf = SOCK_MIN_RCVBUF;
static struct ctl_table rds_tcp_sysctl_table[] = {
#define RDS_TCP_SNDBUF 0
@@ -185,7 +186,7 @@ void rds_tcp_reset_callbacks(struct socket *sock,
release_sock(osock->sk);
sock_release(osock);
newsock:
- rds_send_reset(conn);
+ rds_send_path_reset(&conn->c_path[0]);
lock_sock(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
tc->t_sock = sock;
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index f6e95d60db54..96c2c4d17909 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -34,6 +34,7 @@
#include <linux/in.h>
#include <net/tcp.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
@@ -60,7 +61,8 @@ void rds_tcp_state_change(struct sock *sk)
case TCP_SYN_RECV:
break;
case TCP_ESTABLISHED:
- rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
+ rds_connect_path_complete(&conn->c_path[0],
+ RDS_CONN_CONNECTING);
break;
case TCP_CLOSE_WAIT:
case TCP_CLOSE:
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index 245542ca4718..f9cc945a77b3 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -35,6 +35,7 @@
#include <linux/in.h>
#include <net/tcp.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
@@ -132,17 +133,19 @@ int rds_tcp_accept_one(struct socket *sock)
* c_transport_data.
*/
if (ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr) ||
- !conn->c_outgoing) {
+ !conn->c_path[0].cp_outgoing) {
goto rst_nsk;
} else {
rds_tcp_reset_callbacks(new_sock, conn);
- conn->c_outgoing = 0;
+ conn->c_path[0].cp_outgoing = 0;
/* rds_connect_path_complete() marks RDS_CONN_UP */
- rds_connect_path_complete(conn, RDS_CONN_RESETTING);
+ rds_connect_path_complete(&conn->c_path[0],
+ RDS_CONN_RESETTING);
}
} else {
rds_tcp_set_callbacks(new_sock, conn);
- rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
+ rds_connect_path_complete(&conn->c_path[0],
+ RDS_CONN_CONNECTING);
}
new_sock = NULL;
ret = 0;
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index 6e6a7111a034..4a87d9ef3084 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -34,6 +34,7 @@
#include <linux/slab.h>
#include <net/tcp.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 618be69c9c3b..710f1aae97ad 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -34,6 +34,7 @@
#include <linux/in.h>
#include <net/tcp.h>
+#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 4a323045719b..9fbe95bb14a9 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -71,30 +71,30 @@
struct workqueue_struct *rds_wq;
EXPORT_SYMBOL_GPL(rds_wq);
-void rds_connect_path_complete(struct rds_connection *conn, int curr)
+void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
{
- if (!rds_conn_transition(conn, curr, RDS_CONN_UP)) {
+ if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
printk(KERN_WARNING "%s: Cannot transition to state UP, "
"current state is %d\n",
__func__,
- atomic_read(&conn->c_state));
- rds_conn_drop(conn);
+ atomic_read(&cp->cp_state));
+ rds_conn_path_drop(cp);
return;
}
rdsdebug("conn %p for %pI4 to %pI4 complete\n",
- conn, &conn->c_laddr, &conn->c_faddr);
+ cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
- conn->c_reconnect_jiffies = 0;
- set_bit(0, &conn->c_map_queued);
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
- queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+ cp->cp_reconnect_jiffies = 0;
+ set_bit(0, &cp->cp_conn->c_map_queued);
+ queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+ queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
}
EXPORT_SYMBOL_GPL(rds_connect_path_complete);
void rds_connect_complete(struct rds_connection *conn)
{
- rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
+ rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
}
EXPORT_SYMBOL_GPL(rds_connect_complete);
@@ -116,70 +116,79 @@ EXPORT_SYMBOL_GPL(rds_connect_complete);
* We should *always* start with a random backoff; otherwise a broken connection
* will always take several iterations to be re-established.
*/
-void rds_queue_reconnect(struct rds_connection *conn)
+void rds_queue_reconnect(struct rds_conn_path *cp)
{
unsigned long rand;
+ struct rds_connection *conn = cp->cp_conn;
rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n",
conn, &conn->c_laddr, &conn->c_faddr,
- conn->c_reconnect_jiffies);
+ cp->cp_reconnect_jiffies);
- set_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
- if (conn->c_reconnect_jiffies == 0) {
- conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
- queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+ set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
+ if (cp->cp_reconnect_jiffies == 0) {
+ cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
+ queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
return;
}
get_random_bytes(&rand, sizeof(rand));
rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
- rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies,
+ rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
conn, &conn->c_laddr, &conn->c_faddr);
- queue_delayed_work(rds_wq, &conn->c_conn_w,
- rand % conn->c_reconnect_jiffies);
+ queue_delayed_work(rds_wq, &cp->cp_conn_w,
+ rand % cp->cp_reconnect_jiffies);
- conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2,
+ cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
rds_sysctl_reconnect_max_jiffies);
}
void rds_connect_worker(struct work_struct *work)
{
- struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work);
+ struct rds_conn_path *cp = container_of(work,
+ struct rds_conn_path,
+ cp_conn_w.work);
+ struct rds_connection *conn = cp->cp_conn;
int ret;
- clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
- if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
+ clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
+ if (rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
ret = conn->c_trans->conn_connect(conn);
rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
conn, &conn->c_laddr, &conn->c_faddr, ret);
if (ret) {
- if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN))
- rds_queue_reconnect(conn);
+ if (rds_conn_path_transition(cp,
+ RDS_CONN_CONNECTING,
+ RDS_CONN_DOWN))
+ rds_queue_reconnect(cp);
else
- rds_conn_error(conn, "RDS: connect failed\n");
+ rds_conn_path_error(cp,
+ "RDS: connect failed\n");
}
}
}
void rds_send_worker(struct work_struct *work)
{
- struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work);
+ struct rds_conn_path *cp = container_of(work,
+ struct rds_conn_path,
+ cp_send_w.work);
int ret;
- if (rds_conn_state(conn) == RDS_CONN_UP) {
- clear_bit(RDS_LL_SEND_FULL, &conn->c_flags);
- ret = rds_send_xmit(conn);
+ if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+ clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
+ ret = rds_send_xmit(cp);
cond_resched();
- rdsdebug("conn %p ret %d\n", conn, ret);
+ rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) {
case -EAGAIN:
rds_stats_inc(s_send_immediate_retry);
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
break;
case -ENOMEM:
rds_stats_inc(s_send_delayed_retry);
- queue_delayed_work(rds_wq, &conn->c_send_w, 2);
+ queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
default:
break;
}
@@ -188,20 +197,22 @@ void rds_send_worker(struct work_struct *work)
void rds_recv_worker(struct work_struct *work)
{
- struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work);
+ struct rds_conn_path *cp = container_of(work,
+ struct rds_conn_path,
+ cp_recv_w.work);
int ret;
- if (rds_conn_state(conn) == RDS_CONN_UP) {
- ret = conn->c_trans->recv(conn);
- rdsdebug("conn %p ret %d\n", conn, ret);
+ if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+ ret = cp->cp_conn->c_trans->recv(cp->cp_conn);
+ rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) {
case -EAGAIN:
rds_stats_inc(s_recv_immediate_retry);
- queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+ queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
break;
case -ENOMEM:
rds_stats_inc(s_recv_delayed_retry);
- queue_delayed_work(rds_wq, &conn->c_recv_w, 2);
+ queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
default:
break;
}
@@ -210,9 +221,11 @@ void rds_recv_worker(struct work_struct *work)
void rds_shutdown_worker(struct work_struct *work)
{
- struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w);
+ struct rds_conn_path *cp = container_of(work,
+ struct rds_conn_path,
+ cp_down_w);
- rds_conn_shutdown(conn);
+ rds_conn_shutdown(cp);
}
void rds_threads_exit(void)