summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c63
1 files changed, 42 insertions, 21 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 6d0cc3b8f932..a6b8c1f8f92a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -52,6 +52,8 @@
#include "sunrpc.h"
+#define RPC_TCP_READ_CHUNK_SZ (3*512*1024)
+
static void xs_close(struct rpc_xprt *xprt);
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
struct socket *sock);
@@ -805,13 +807,6 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
smp_mb__after_atomic();
}
-static void xs_sock_mark_closed(struct rpc_xprt *xprt)
-{
- xs_sock_reset_connection_flags(xprt);
- /* Mark transport as closed and wake up all pending tasks */
- xprt_disconnect_done(xprt);
-}
-
/**
* xs_error_report - callback to handle TCP socket state errors
* @sk: socket
@@ -831,9 +826,6 @@ static void xs_error_report(struct sock *sk)
err = -sk->sk_err;
if (err == 0)
goto out;
- /* Is this a reset event? */
- if (sk->sk_state == TCP_CLOSE)
- xs_sock_mark_closed(xprt);
dprintk("RPC: xs_error_report client %p, error=%d...\n",
xprt, -err);
trace_rpc_socket_error(xprt, sk->sk_socket, err);
@@ -1003,6 +995,7 @@ static void xs_local_data_receive(struct sock_xprt *transport)
struct sock *sk;
int err;
+restart:
mutex_lock(&transport->recv_mutex);
sk = transport->inet;
if (sk == NULL)
@@ -1016,6 +1009,11 @@ static void xs_local_data_receive(struct sock_xprt *transport)
}
if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break;
+ if (need_resched()) {
+ mutex_unlock(&transport->recv_mutex);
+ cond_resched();
+ goto restart;
+ }
}
out:
mutex_unlock(&transport->recv_mutex);
@@ -1070,18 +1068,18 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
- __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
spin_lock(&xprt->recv_lock);
+ __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
goto out_unpin;
}
- __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
spin_lock_bh(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied);
spin_unlock_bh(&xprt->transport_lock);
spin_lock(&xprt->recv_lock);
xprt_complete_rqst(task, copied);
+ __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock:
@@ -1094,6 +1092,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
struct sock *sk;
int err;
+restart:
mutex_lock(&transport->recv_mutex);
sk = transport->inet;
if (sk == NULL)
@@ -1107,6 +1106,11 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
}
if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break;
+ if (need_resched()) {
+ mutex_unlock(&transport->recv_mutex);
+ cond_resched();
+ goto restart;
+ }
}
out:
mutex_unlock(&transport->recv_mutex);
@@ -1479,6 +1483,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
.offset = offset,
.count = len,
};
+ size_t ret;
dprintk("RPC: xs_tcp_data_recv started\n");
do {
@@ -1507,9 +1512,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
/* Skip over any trailing bytes on short reads */
xs_tcp_read_discard(transport, &desc);
} while (desc.count);
+ ret = len - desc.count;
+ if (ret < rd_desc->count)
+ rd_desc->count -= ret;
+ else
+ rd_desc->count = 0;
trace_xs_tcp_data_recv(transport);
dprintk("RPC: xs_tcp_data_recv done\n");
- return len - desc.count;
+ return ret;
}
static void xs_tcp_data_receive(struct sock_xprt *transport)
@@ -1517,30 +1527,34 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
struct rpc_xprt *xprt = &transport->xprt;
struct sock *sk;
read_descriptor_t rd_desc = {
- .count = 2*1024*1024,
.arg.data = xprt,
};
unsigned long total = 0;
- int loop;
int read = 0;
+restart:
mutex_lock(&transport->recv_mutex);
sk = transport->inet;
if (sk == NULL)
goto out;
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
- for (loop = 0; loop < 64; loop++) {
+ for (;;) {
+ rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
lock_sock(sk);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
- if (read <= 0) {
+ if (rd_desc.count != 0 || read < 0) {
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
release_sock(sk);
break;
}
release_sock(sk);
total += read;
- rd_desc.count = 65536;
+ if (need_resched()) {
+ mutex_unlock(&transport->recv_mutex);
+ cond_resched();
+ goto restart;
+ }
}
if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker);
@@ -1631,9 +1645,11 @@ static void xs_tcp_state_change(struct sock *sk)
if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
&transport->sock_state))
xprt_clear_connecting(xprt);
+ clear_bit(XPRT_CLOSING, &xprt->state);
if (sk->sk_err)
xprt_wake_pending_tasks(xprt, -sk->sk_err);
- xs_sock_mark_closed(xprt);
+ /* Trigger the socket release */
+ xs_tcp_force_close(xprt);
}
out:
read_unlock_bh(&sk->sk_callback_lock);
@@ -2241,14 +2257,19 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct socket *sock = transport->sock;
+ int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
if (sock == NULL)
return;
- if (xprt_connected(xprt)) {
+ switch (skst) {
+ default:
kernel_sock_shutdown(sock, SHUT_RDWR);
trace_rpc_socket_shutdown(xprt, sock);
- } else
+ break;
+ case TCP_CLOSE:
+ case TCP_TIME_WAIT:
xs_reset_transport(transport);
+ }
}
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,