diff options
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r-- | net/sunrpc/xprtsock.c | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6d0cc3b8f932..a6b8c1f8f92a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -52,6 +52,8 @@ #include "sunrpc.h" +#define RPC_TCP_READ_CHUNK_SZ (3*512*1024) + static void xs_close(struct rpc_xprt *xprt); static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, struct socket *sock); @@ -805,13 +807,6 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) smp_mb__after_atomic(); } -static void xs_sock_mark_closed(struct rpc_xprt *xprt) -{ - xs_sock_reset_connection_flags(xprt); - /* Mark transport as closed and wake up all pending tasks */ - xprt_disconnect_done(xprt); -} - /** * xs_error_report - callback to handle TCP socket state errors * @sk: socket @@ -831,9 +826,6 @@ static void xs_error_report(struct sock *sk) err = -sk->sk_err; if (err == 0) goto out; - /* Is this a reset event? */ - if (sk->sk_state == TCP_CLOSE) - xs_sock_mark_closed(xprt); dprintk("RPC: xs_error_report client %p, error=%d...\n", xprt, -err); trace_rpc_socket_error(xprt, sk->sk_socket, err); @@ -1003,6 +995,7 @@ static void xs_local_data_receive(struct sock_xprt *transport) struct sock *sk; int err; +restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) @@ -1016,6 +1009,11 @@ static void xs_local_data_receive(struct sock_xprt *transport) } if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; + if (need_resched()) { + mutex_unlock(&transport->recv_mutex); + cond_resched(); + goto restart; + } } out: mutex_unlock(&transport->recv_mutex); @@ -1070,18 +1068,18 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { - __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); spin_lock(&xprt->recv_lock); + __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); goto out_unpin; } - __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); spin_lock_bh(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, copied); spin_unlock_bh(&xprt->transport_lock); spin_lock(&xprt->recv_lock); xprt_complete_rqst(task, copied); + __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); out_unpin: xprt_unpin_rqst(rovr); out_unlock: @@ -1094,6 +1092,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport) struct sock *sk; int err; +restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) @@ -1107,6 +1106,11 @@ static void xs_udp_data_receive(struct sock_xprt *transport) } if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; + if (need_resched()) { + mutex_unlock(&transport->recv_mutex); + cond_resched(); + goto restart; + } } out: mutex_unlock(&transport->recv_mutex); @@ -1479,6 +1483,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns .offset = offset, .count = len, }; + size_t ret; dprintk("RPC: xs_tcp_data_recv started\n"); do { @@ -1507,9 +1512,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns /* Skip over any trailing bytes on short reads */ xs_tcp_read_discard(transport, &desc); } while (desc.count); + ret = len - desc.count; + if (ret < rd_desc->count) + rd_desc->count -= ret; + else + rd_desc->count = 0; trace_xs_tcp_data_recv(transport); dprintk("RPC: xs_tcp_data_recv done\n"); - return len - desc.count; + return ret; } static void xs_tcp_data_receive(struct sock_xprt *transport) @@ -1517,30 +1527,34 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) struct rpc_xprt *xprt = &transport->xprt; struct sock *sk; read_descriptor_t rd_desc = { - .count = 2*1024*1024, .arg.data = xprt, }; unsigned long total = 0; - int loop; int read = 0; +restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) goto out; /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - for (loop = 0; loop < 64; loop++) { + for (;;) { + rd_desc.count = RPC_TCP_READ_CHUNK_SZ; lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (read <= 0) { + if (rd_desc.count != 0 || read < 0) { clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); release_sock(sk); break; } release_sock(sk); total += read; - rd_desc.count = 65536; + if (need_resched()) { + mutex_unlock(&transport->recv_mutex); + cond_resched(); + goto restart; + } } if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) queue_work(xprtiod_workqueue, &transport->recv_worker); @@ -1631,9 +1645,11 @@ static void xs_tcp_state_change(struct sock *sk) if (test_and_clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state)) xprt_clear_connecting(xprt); + clear_bit(XPRT_CLOSING, &xprt->state); if (sk->sk_err) xprt_wake_pending_tasks(xprt, -sk->sk_err); - xs_sock_mark_closed(xprt); + /* Trigger the socket release */ + xs_tcp_force_close(xprt); } out: read_unlock_bh(&sk->sk_callback_lock); @@ -2241,14 +2257,19 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct socket *sock = transport->sock; + int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE; if (sock == NULL) return; - if (xprt_connected(xprt)) { + switch (skst) { + default: kernel_sock_shutdown(sock, SHUT_RDWR); trace_rpc_socket_shutdown(xprt, sock); - } else + break; + case TCP_CLOSE: + case TCP_TIME_WAIT: xs_reset_transport(transport); + } } static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, |