xprtsock.c 65.9 KB
Newer Older
1
2
3
4
5
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
6
7
 * TCP callback races fixes (C) 1998 Red Hat
 * TCP send fixes (C) 1998 Red Hat
8
9
10
11
12
13
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14
15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16
17
18
 *
 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
 *   <gilles.quillard@bull.net>
19
20
21
22
 */

#include <linux/types.h>
#include <linux/slab.h>
23
#include <linux/module.h>
24
25
26
27
28
29
30
31
32
33
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
34
#include <linux/sunrpc/sched.h>
35
#include <linux/sunrpc/svcsock.h>
36
#include <linux/sunrpc/xprtsock.h>
37
#include <linux/file.h>
38
39
40
#ifdef CONFIG_NFS_V4_1
#include <linux/sunrpc/bc_xprt.h>
#endif
41
42
43
44
45
46

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

47
#include "sunrpc.h"
48
49
50
51
52
53
54
55
56
/*
 * xprtsock tunables
 */
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;

unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;

57
#define XS_TCP_LINGER_TO	(15U * HZ)
58
static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
59

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * directory become the union of all files registered there.
 *
 * We simply need to make sure that we don't collide with
 * someone else's file names!
 */

#ifdef RPC_DEBUG

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

static struct ctl_table_header *sunrpc_table_header;

/*
 * FIXME: changing the UDP slot table size should also resize the UDP
 *        socket buffers for existing UDP transports
 */
static ctl_table xs_tunables_table[] = {
	{
		.procname	= "udp_slot_table_entries",
		.data		= &xprt_udp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
88
		.proc_handler	= proc_dointvec_minmax,
89
90
91
92
93
94
95
96
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.procname	= "tcp_slot_table_entries",
		.data		= &xprt_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
97
		.proc_handler	= proc_dointvec_minmax,
98
99
100
101
102
103
104
105
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.procname	= "min_resvport",
		.data		= &xprt_min_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
106
		.proc_handler	= proc_dointvec_minmax,
107
108
109
110
111
112
113
114
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.procname	= "max_resvport",
		.data		= &xprt_max_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
115
		.proc_handler	= proc_dointvec_minmax,
116
117
118
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
119
120
121
122
123
	{
		.procname	= "tcp_fin_timeout",
		.data		= &xs_tcp_fin_timeout,
		.maxlen		= sizeof(xs_tcp_fin_timeout),
		.mode		= 0644,
124
		.proc_handler	= proc_dointvec_jiffies,
125
	},
126
	{ },
127
128
129
130
131
132
133
134
};

static ctl_table sunrpc_table[] = {
	{
		.procname	= "sunrpc",
		.mode		= 0555,
		.child		= xs_tunables_table
	},
135
	{ },
136
137
138
139
};

#endif

140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

170
171
#ifdef RPC_DEBUG
# undef  RPC_DEBUG_DATA
172
# define RPCDBG_FACILITY	RPCDBG_TRANS
173
174
175
#endif

#ifdef RPC_DEBUG_DATA
176
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
177
{
178
179
	u8 *buf = (u8 *) packet;
	int j;
180

181
	dprintk("RPC:       %s\n", msg);
182
183
184
185
186
187
188
189
190
191
192
193
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
194
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
195
196
197
198
199
{
	/* NOP */
}
#endif

200
201
struct sock_xprt {
	struct rpc_xprt		xprt;
202
203
204
205
206
207

	/*
	 * Network layer
	 */
	struct socket *		sock;
	struct sock *		inet;
208
209
210
211
212

	/*
	 * State of TCP reply receive
	 */
	__be32			tcp_fraghdr,
213
214
				tcp_xid,
				tcp_calldir;
215
216
217
218
219
220

	u32			tcp_offset,
				tcp_reclen;

	unsigned long		tcp_copied,
				tcp_flags;
221
222
223
224

	/*
	 * Connection of transports
	 */
225
	struct delayed_work	connect_worker;
226
227
	struct sockaddr_storage	srcaddr;
	unsigned short		srcport;
228
229
230
231
232
233

	/*
	 * UDP socket buffer size parameters
	 */
	size_t			rcvsize,
				sndsize;
234
235
236
237
238
239
240

	/*
	 * Saved socket callback addresses
	 */
	void			(*old_data_ready)(struct sock *, int);
	void			(*old_state_change)(struct sock *);
	void			(*old_write_space)(struct sock *);
241
	void			(*old_error_report)(struct sock *);
242
243
};

244
245
246
247
248
249
250
/*
 * TCP receive state flags
 */
#define TCP_RCV_LAST_FRAG	(1UL << 0)
#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
#define TCP_RCV_COPY_XID	(1UL << 2)
#define TCP_RCV_COPY_DATA	(1UL << 3)
251
252
#define TCP_RCV_READ_CALLDIR	(1UL << 4)
#define TCP_RCV_COPY_CALLDIR	(1UL << 5)
253
254
255
256

/*
 * TCP RPC flags
 */
257
#define TCP_RPC_REPLY		(1UL << 6)
258

259
260
261
262
263
264
static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
{
	return (struct sockaddr *) &xprt->addr;
}

static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
265
{
266
267
268
269
270
271
272
273
	return (struct sockaddr_in *) &xprt->addr;
}

static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
{
	return (struct sockaddr_in6 *) &xprt->addr;
}

274
static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
275
{
276
	struct sockaddr *sap = xs_addr(xprt);
277
278
	struct sockaddr_in6 *sin6;
	struct sockaddr_in *sin;
279
	char buf[128];
280

281
282
	(void)rpc_ntop(sap, buf, sizeof(buf));
	xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
283

284
285
286
	switch (sap->sa_family) {
	case AF_INET:
		sin = xs_addr_in(xprt);
287
		snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
288
289
290
		break;
	case AF_INET6:
		sin6 = xs_addr_in6(xprt);
291
		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
292
293
294
		break;
	default:
		BUG();
295
	}
296
	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
297
298
}

299
static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
300
{
301
302
	struct sockaddr *sap = xs_addr(xprt);
	char buf[128];
303

304
	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
305
	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
306

307
	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
308
309
	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
}
310

311
312
313
static void xs_format_peer_addresses(struct rpc_xprt *xprt,
				     const char *protocol,
				     const char *netid)
314
{
315
316
	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
317
	xs_format_common_peer_addresses(xprt);
318
	xs_format_common_peer_ports(xprt);
319
}
320

321
static void xs_update_peer_port(struct rpc_xprt *xprt)
322
{
323
324
	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
325

326
	xs_format_common_peer_ports(xprt);
327
328
329
330
}

static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
331
332
333
334
335
336
337
338
339
340
	unsigned int i;

	for (i = 0; i < RPC_DISPLAY_MAX; i++)
		switch (i) {
		case RPC_DISPLAY_PROTO:
		case RPC_DISPLAY_NETID:
			continue;
		default:
			kfree(xprt->address_strings[i]);
		}
341
342
}

343
344
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

345
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
346
347
348
349
{
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
350
351
352
353
354
		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
	};
	struct kvec iov = {
		.iov_base	= vec->iov_base + base,
		.iov_len	= vec->iov_len - base,
355
356
	};

357
	if (iov.iov_len != 0)
358
359
360
361
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

362
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
363
{
364
365
366
367
368
369
370
371
372
373
374
	struct page **ppage;
	unsigned int remainder;
	int err, sent = 0;

	remainder = xdr->page_len - base;
	base += xdr->page_base;
	ppage = xdr->pages + (base >> PAGE_SHIFT);
	base &= ~PAGE_MASK;
	for(;;) {
		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
		int flags = XS_SENDMSG_FLAGS;
375

376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
		remainder -= len;
		if (remainder != 0 || more)
			flags |= MSG_MORE;
		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
		if (remainder == 0 || err != len)
			break;
		sent += err;
		ppage++;
		base = 0;
	}
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
391
392
}

393
394
395
396
397
398
399
400
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 *
401
 */
402
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
403
{
404
405
	unsigned int remainder = xdr->len - base;
	int err, sent = 0;
406

407
	if (unlikely(!sock))
408
		return -ENOTSOCK;
409
410

	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
411
412
413
414
	if (base != 0) {
		addr = NULL;
		addrlen = 0;
	}
415

416
417
418
419
420
	if (base < xdr->head[0].iov_len || addr != NULL) {
		unsigned int len = xdr->head[0].iov_len - base;
		remainder -= len;
		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
		if (remainder == 0 || err != len)
421
			goto out;
422
		sent += err;
423
424
		base = 0;
	} else
425
		base -= xdr->head[0].iov_len;
426

427
428
429
430
431
	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
		if (remainder == 0 || err != len)
432
			goto out;
433
		sent += err;
434
		base = 0;
435
436
437
438
439
440
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
		return sent;
	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
441
out:
442
443
444
445
446
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
447
448
}

449
450
451
452
453
454
455
456
static void xs_nospace_callback(struct rpc_task *task)
{
	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);

	transport->inet->sk_write_pending--;
	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
}

457
/**
458
459
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
460
 *
461
 */
462
static int xs_nospace(struct rpc_task *task)
463
{
464
465
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
466
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
467
	int ret = 0;
468

469
	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
470
471
472
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

473
474
475
476
477
478
	/* Protect against races with write_space */
	spin_lock_bh(&xprt->transport_lock);

	/* Don't race with disconnect */
	if (xprt_connected(xprt)) {
		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
479
			ret = -EAGAIN;
480
481
482
483
484
485
486
487
488
489
490
			/*
			 * Notify TCP that we're limited by the application
			 * window size
			 */
			set_bit(SOCK_NOSPACE, &transport->sock->flags);
			transport->inet->sk_write_pending++;
			/* ...and wait for more buffer space */
			xprt_wait_for_buffer_space(task, xs_nospace_callback);
		}
	} else {
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
491
		ret = -ENOTCONN;
492
	}
493

494
	spin_unlock_bh(&xprt->transport_lock);
495
	return ret;
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
}

/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
513
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
514
515
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
516

517
	xs_pktdump("packet data:",
518
519
520
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

521
522
	if (!xprt_bound(xprt))
		return -ENOTCONN;
523
	status = xs_sendpages(transport->sock,
524
			      xs_addr(xprt),
525
526
			      xprt->addrlen, xdr,
			      req->rq_bytes_sent);
527

528
	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
529
			xdr->len - req->rq_bytes_sent, status);
530

531
	if (status >= 0) {
532
		req->rq_xmit_bytes_sent += status;
533
534
535
		if (status >= req->rq_slen)
			return 0;
		/* Still some bytes left; set up for a retry later. */
536
		status = -EAGAIN;
537
	}
538

539
	switch (status) {
540
541
542
543
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
544
	case -EAGAIN:
545
		status = xs_nospace(task);
546
		break;
547
548
549
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
550
551
	case -ENETUNREACH:
	case -EPIPE:
552
553
	case -ECONNREFUSED:
		/* When the server has died, an ICMP port unreachable message
554
		 * prompts ECONNREFUSED. */
555
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
556
	}
557

558
	return status;
559
560
}

561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
/**
 * xs_tcp_shutdown - gracefully shut down a TCP socket
 * @xprt: transport
 *
 * Initiates a graceful shutdown of the TCP socket by calling the
 * equivalent of shutdown(SHUT_WR);
 */
static void xs_tcp_shutdown(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct socket *sock = transport->sock;

	if (sock != NULL)
		kernel_sock_shutdown(sock, SHUT_WR);
}

577
578
579
580
581
582
583
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
}

584
/**
585
 * xs_tcp_send_request - write an RPC request to a TCP socket
586
587
588
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
589
590
591
592
593
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
594
595
 *
 * XXX: In the case of soft timeouts, should we eventually give up
596
 *	if sendmsg is not able to make progress?
597
 */
598
static int xs_tcp_send_request(struct rpc_task *task)
599
600
601
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
602
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
603
	struct xdr_buf *xdr = &req->rq_snd_buf;
604
	int status;
605

606
	xs_encode_tcp_record_marker(&req->rq_snd_buf);
607

608
609
610
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
611
612
613

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
614
	 * called sendmsg(). */
615
	while (1) {
616
617
		status = xs_sendpages(transport->sock,
					NULL, 0, xdr, req->rq_bytes_sent);
618

619
		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
620
				xdr->len - req->rq_bytes_sent, status);
621

622
		if (unlikely(status < 0))
623
624
			break;

625
626
627
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
		req->rq_bytes_sent += status;
628
		req->rq_xmit_bytes_sent += status;
629
630
631
632
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
633

634
635
		if (status != 0)
			continue;
636
		status = -EAGAIN;
637
		break;
638
639
	}

640
	switch (status) {
641
642
643
644
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
645
	case -EAGAIN:
646
		status = xs_nospace(task);
647
		break;
648
649
650
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
651
	case -ECONNRESET:
652
	case -EPIPE:
653
654
		xs_tcp_shutdown(xprt);
	case -ECONNREFUSED:
655
	case -ENOTCONN:
656
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
657
	}
658

659
660
661
	return status;
}

662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
/**
 * xs_tcp_release_xprt - clean up after a tcp transmission
 * @xprt: transport
 * @task: rpc task
 *
 * This cleans up if an error causes us to abort the transmission of a request.
 * In this case, the socket may need to be reset in order to avoid confusing
 * the server.
 */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *req;

	if (task != xprt->snd_task)
		return;
	if (task == NULL)
		goto out_release;
	req = task->tk_rqstp;
	if (req->rq_bytes_sent == 0)
		goto out_release;
	if (req->rq_bytes_sent == req->rq_snd_buf.len)
		goto out_release;
	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
out_release:
	xprt_release_xprt(xprt, task);
}

689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	transport->old_data_ready = sk->sk_data_ready;
	transport->old_state_change = sk->sk_state_change;
	transport->old_write_space = sk->sk_write_space;
	transport->old_error_report = sk->sk_error_report;
}

static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	sk->sk_data_ready = transport->old_data_ready;
	sk->sk_state_change = transport->old_state_change;
	sk->sk_write_space = transport->old_write_space;
	sk->sk_error_report = transport->old_error_report;
}

705
static void xs_reset_transport(struct sock_xprt *transport)
706
{
707
708
	struct socket *sock = transport->sock;
	struct sock *sk = transport->inet;
709

710
711
	if (sk == NULL)
		return;
712

713
	write_lock_bh(&sk->sk_callback_lock);
714
715
	transport->inet = NULL;
	transport->sock = NULL;
716

717
	sk->sk_user_data = NULL;
718
719

	xs_restore_old_callbacks(transport, sk);
720
721
	write_unlock_bh(&sk->sk_callback_lock);

722
	sk->sk_no_check = 0;
723
724

	sock_release(sock);
725
726
727
728
729
730
731
732
}

/**
 * xs_close - close a socket
 * @xprt: transport
 *
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
733
734
735
 *
 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 * xs_reset_transport() zeroing the socket from underneath a writer.
736
737
738
739
740
741
742
743
 */
static void xs_close(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	dprintk("RPC:       xs_close xprt %p\n", xprt);

	xs_reset_transport(transport);
744
	xprt->reestablish_timeout = 0;
745

746
	smp_mb__before_clear_bit();
747
	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
748
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
749
	clear_bit(XPRT_CLOSING, &xprt->state);
750
	smp_mb__after_clear_bit();
751
	xprt_disconnect_done(xprt);
752
753
}

754
755
756
757
758
759
760
761
static void xs_tcp_close(struct rpc_xprt *xprt)
{
	if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
		xs_close(xprt);
	else
		xs_tcp_shutdown(xprt);
}

762
763
764
765
766
767
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
768
{
769
770
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

771
	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
772

773
	cancel_rearming_delayed_work(&transport->connect_worker);
774

775
	xs_close(xprt);
776
	xs_free_peer_addresses(xprt);
777
	xprt_free(xprt);
778
	module_put(THIS_MODULE);
779
780
}

781
782
783
784
785
786
787
788
789
790
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

/**
 * xs_udp_data_ready - "data ready" callback for UDP sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
791
 */
792
static void xs_udp_data_ready(struct sock *sk, int len)
793
{
794
795
	struct rpc_task *task;
	struct rpc_xprt *xprt;
796
	struct rpc_rqst *rovr;
797
	struct sk_buff *skb;
798
	int err, repsize, copied;
799
800
	u32 _xid;
	__be32 *xp;
801

Eric Dumazet's avatar
Eric Dumazet committed
802
	read_lock_bh(&sk->sk_callback_lock);
803
	dprintk("RPC:       xs_udp_data_ready...\n");
804
	if (!(xprt = xprt_from_sock(sk)))
805
806
807
808
809
810
811
812
813
814
		goto out;

	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
		goto out;

	if (xprt->shutdown)
		goto dropit;

	repsize = skb->len - sizeof(struct udphdr);
	if (repsize < 4) {
815
		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
816
817
818
819
820
821
822
823
824
825
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(struct udphdr),
				sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
Chuck Lever's avatar
Chuck Lever committed
826
	spin_lock(&xprt->transport_lock);
827
828
829
830
831
832
833
834
835
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	if ((copied = rovr->rq_private_buf.buflen) > repsize)
		copied = repsize;

	/* Suck it into the iovec, verify checksum if not done by hw. */
836
837
	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
838
		goto out_unlock;
839
840
841
	}

	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
842
843

	/* Something worked... */
Eric Dumazet's avatar
Eric Dumazet committed
844
	dst_confirm(skb_dst(skb));
845

846
847
	xprt_adjust_cwnd(task, copied);
	xprt_complete_rqst(task, copied);
848
849

 out_unlock:
Chuck Lever's avatar
Chuck Lever committed
850
	spin_unlock(&xprt->transport_lock);
851
852
853
 dropit:
	skb_free_datagram(sk, skb);
 out:
Eric Dumazet's avatar
Eric Dumazet committed
854
	read_unlock_bh(&sk->sk_callback_lock);
855
856
}

857
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
858
{
859
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
860
861
862
	size_t len, used;
	char *p;

863
864
	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
865
	used = xdr_skb_read_bits(desc, p, len);
866
	transport->tcp_offset += used;
867
868
	if (used != len)
		return;
869

870
871
	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
872
		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
873
	else
874
		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
875
	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
876

877
	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
878
	transport->tcp_offset = 0;
879

880
	/* Sanity check of the record length */
881
	if (unlikely(transport->tcp_reclen < 8)) {
882
		dprintk("RPC:       invalid TCP record fragment length\n");
883
		xprt_force_disconnect(xprt);
884
		return;
885
	}
886
	dprintk("RPC:       reading TCP record fragment of length %d\n",
887
			transport->tcp_reclen);
888
889
}

890
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
891
{
892
	if (transport->tcp_offset == transport->tcp_reclen) {
893
		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
894
		transport->tcp_offset = 0;
895
896
897
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
			transport->tcp_flags |= TCP_RCV_COPY_XID;
898
			transport->tcp_copied = 0;
899
900
901
902
		}
	}
}

903
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
904
905
906
907
{
	size_t len, used;
	char *p;

908
	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
909
	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
910
	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
911
	used = xdr_skb_read_bits(desc, p, len);
912
	transport->tcp_offset += used;
913
914
	if (used != len)
		return;
915
	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
916
	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
917
	transport->tcp_copied = 4;
918
919
920
	dprintk("RPC:       reading %s XID %08x\n",
			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
							      : "request with",
921
922
			ntohl(transport->tcp_xid));
	xs_tcp_check_fraghdr(transport);
923
924
}

925
926
static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
				       struct xdr_skb_reader *desc)
927
{
928
929
	size_t len, used;
	u32 offset;
930
	char *p;
931
932
933
934
935
936
937
938

	/*
	 * We want transport->tcp_offset to be 8 at the end of this routine
	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
	 * When this function is called for the first time,
	 * transport->tcp_offset is 4 (after having already read the xid).
	 */
	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
939
	len = sizeof(transport->tcp_calldir) - offset;
940
	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
941
942
	p = ((char *) &transport->tcp_calldir) + offset;
	used = xdr_skb_read_bits(desc, p, len);
943
944
945
	transport->tcp_offset += used;
	if (used != len)
		return;
946
947
948
949
950
	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
	/*
	 * We don't yet have the XDR buffer, so we will write the calldir
	 * out after we get the buffer from the 'struct rpc_rqst'
	 */
951
952
953
954
	switch (ntohl(transport->tcp_calldir)) {
	case RPC_REPLY:
		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
		transport->tcp_flags |= TCP_RCV_COPY_DATA;
955
		transport->tcp_flags |= TCP_RPC_REPLY;
956
957
958
959
		break;
	case RPC_CALL:
		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
		transport->tcp_flags |= TCP_RCV_COPY_DATA;
960
		transport->tcp_flags &= ~TCP_RPC_REPLY;
961
962
963
964
965
		break;
	default:
		dprintk("RPC:       invalid request message type\n");
		xprt_force_disconnect(&transport->xprt);
	}
966
967
968
	xs_tcp_check_fraghdr(transport);
}

969
970
971
static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
				     struct xdr_skb_reader *desc,
				     struct rpc_rqst *req)
972
{
973
974
	struct sock_xprt *transport =
				container_of(xprt, struct sock_xprt, xprt);
975
976
977
978
979
	struct xdr_buf *rcvbuf;
	size_t len;
	ssize_t r;

	rcvbuf = &req->rq_private_buf;
980
981
982
983
984
985

	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
		/*
		 * Save the RPC direction in the XDR buffer
		 */
		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
986
987
988
			&transport->tcp_calldir,
			sizeof(transport->tcp_calldir));
		transport->tcp_copied += sizeof(transport->tcp_calldir);
989
		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
990
991
992
	}

	len = desc->count;
993
	if (len > transport->tcp_reclen - transport->tcp_offset) {
994
		struct xdr_skb_reader my_desc;
995

996
		len = transport->tcp_reclen - transport->tcp_offset;
997
998
		memcpy(&my_desc, desc, sizeof(my_desc));
		my_desc.count = len;
999
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1000
					  &my_desc, xdr_skb_read_bits);
1001
1002
1003
		desc->count -= r;
		desc->offset += r;
	} else
1004
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1005
					  desc, xdr_skb_read_bits);
1006
1007

	if (r > 0) {
1008
1009
		transport->tcp_copied += r;
		transport->tcp_offset += r;
1010
1011
1012
1013
1014
	}
	if (r != len) {
		/* Error when copying to the receive buffer,
		 * usually because we weren't able to allocate
		 * additional buffer pages. All we can do now
1015
		 * is turn off TCP_RCV_COPY_DATA, so the request
1016
1017
1018
1019
1020
		 * will not receive any additional updates,
		 * and time out.
		 * Any remaining data from this record will
		 * be discarded.
		 */
1021
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1022
		dprintk("RPC:       XID %08x truncated request\n",
1023
				ntohl(transport->tcp_xid));
1024
1025
1026
1027
		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
				"tcp_offset = %u, tcp_reclen = %u\n",
				xprt, transport->tcp_copied,
				transport->tcp_offset, transport->tcp_reclen);
1028
		return;
1029
1030
	}

1031
	dprintk("RPC:       XID %08x read %Zd bytes\n",
1032
			ntohl(transport->tcp_xid), r);
1033
1034
1035
	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
			transport->tcp_offset, transport->tcp_reclen);
1036
1037

	if (transport->tcp_copied == req->rq_private_buf.buflen)
1038
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1039
	else if (transport->tcp_offset == transport->tcp_reclen) {
1040
1041
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1042
	}
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
}

/*
 * Finds the request corresponding to the RPC xid and invokes the common
 * tcp read code to read the data.
 */
static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
				    struct xdr_skb_reader *desc)
{
	struct sock_xprt *transport =
				container_of(xprt, struct sock_xprt, xprt);
	struct rpc_rqst *req;

	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));

	/* Find and lock the request corresponding to this xid */
	spin_lock(&xprt->transport_lock);
	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
	if (!req) {
		dprintk("RPC:       XID %08x request not found!\n",
				ntohl(transport->tcp_xid));
		spin_unlock(&xprt->transport_lock);
		return -1;
	}

	xs_tcp_read_common(xprt, desc, req);

1070
	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1071
		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1072

Chuck Lever's avatar
Chuck Lever committed
1073
	spin_unlock(&xprt->transport_lock);