server.c 28 KB
Newer Older
Leigh B. Stoller's avatar
Leigh B. Stoller committed
1
2
/*
 * EMULAB-COPYRIGHT
3
 * Copyright (c) 2000-2004 University of Utah and the Flux Group.
Leigh B. Stoller's avatar
Leigh B. Stoller committed
4
5
6
 * All rights reserved.
 */

7
8
9
10
11
12
13
14
/*
 * Frisbee server
 */
#include <sys/types.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/fcntl.h>
#include <sys/socket.h>
Mike Hibler's avatar
Mike Hibler committed
15
#include <sys/resource.h>
16
17
18
19
20
21
22
23
24
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>
Mike Hibler's avatar
Mike Hibler committed
25
#include <assert.h>
26
27
#include "decls.h"
#include "queue.h"
Mike Hibler's avatar
Mike Hibler committed
28
#include "utils.h"
29

30
31
#include "trace.h"

32
33
/* Globals */
int		debug = 0;
34
int		tracing = 0;
35
36
37
38
int		dynburst = 0;
int		timeout = SERVER_INACTIVE_SECONDS;
int		readsize = SERVER_READ_SIZE;
volatile int	burstsize = SERVER_BURST_SIZE;
Mike Hibler's avatar
Mike Hibler committed
39
40
41
int		maxburstsize = SERVER_DYNBURST_SIZE;
int		burstinterval = SERVER_BURST_GAP;
unsigned long	bandwidth;
42
int		portnum;
43
44
int		killme;
int		blockslost;
Mike Hibler's avatar
Mike Hibler committed
45
46
int		clientretries;
char		*lostmap;
47
int		sendretries;
48
49
50
struct in_addr	mcastaddr;
struct in_addr	mcastif;
char	       *filename;
Mike Hibler's avatar
Mike Hibler committed
51
52
struct timeval  IdleTimeStamp, FirstReq, LastReq;
volatile int	activeclients;
53

54
55
56
/* Forward decls */
void		quit(int);
void		reinit(int);
57
static ssize_t	mypread(int fd, void *buf, size_t nbytes, off_t offset);
Mike Hibler's avatar
Mike Hibler committed
58
59
static void	calcburst(void);
static void	compute_sendrate(void);
60

Mike Hibler's avatar
Mike Hibler committed
61
#ifdef STATS
62
/*
Mike Hibler's avatar
Mike Hibler committed
63
 * Track duplicate chunks/joins for stats gathering
64
65
66
 */
char		*chunkmap;

Mike Hibler's avatar
Mike Hibler committed
67
#define MAXCLIENTS 256	/* not a realy limit, just for stats */
68
69
70
71
struct {
	unsigned int id;
	unsigned int ip;
} clients[MAXCLIENTS];
Mike Hibler's avatar
Mike Hibler committed
72

Mike Hibler's avatar
Mike Hibler committed
73
74
75
76
77
78
79
80
81
82
83
/*
 * Stats gathering.
 */
struct {
	unsigned long	msgin;
	unsigned long	joins;
	unsigned long	leaves;
	unsigned long	requests;
	unsigned long	joinrep;
	unsigned long	blockssent;
	unsigned long	filereads;
Mike Hibler's avatar
Mike Hibler committed
84
	unsigned long long filebytes;
85
86
87
88
89
	unsigned long	partialreq;
	unsigned long   dupsent;
	unsigned long	qmerges;
	unsigned long	badpackets;
	unsigned long   blockslost;
Mike Hibler's avatar
Mike Hibler committed
90
	unsigned long	clientlost;
91
	unsigned long	goesidle;
92
	unsigned long	wakeups;
Mike Hibler's avatar
Mike Hibler committed
93
94
	unsigned long	intervals;
	unsigned long	missed;
Mike Hibler's avatar
Mike Hibler committed
95
96
97
98
99
100
} Stats;
#define DOSTAT(x)	(Stats.x)
#else
#define DOSTAT(x)
#endif

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
 * This structure defines the file we are spitting back.
 */
struct FileInfo {
	int	fd;		/* Open file descriptor */
	int	blocks;		/* Number of BLOCKSIZE blocks */
	int	chunks;		/* Number of CHUNKSIZE chunks */
};
static struct FileInfo FileInfo;

/*
 * The work queue of regions a client has requested.
 */
typedef struct {
	queue_chain_t	chain;
	int		chunk;		/* Which chunk */
Mike Hibler's avatar
Mike Hibler committed
117
118
	int		nblocks;	/* Number of blocks in map */
	BlockMap_t	blockmap;	/* Which blocks of the chunk */
119
120
121
} WQelem_t;
static queue_head_t     WorkQ;
static pthread_mutex_t	WorkQLock;
122
123
static int		WorkQDelay = -1;
static int		WorkQSize = 0;
Mike Hibler's avatar
Mike Hibler committed
124
static int		WorkChunk, WorkBlock, WorkCount;
125
126
127
#ifdef STATS
static int		WorkQMax = 0;
#endif
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

/*
 * Work queue routines. The work queue is a time ordered list of chunk/blocks
 * pieces that a client is missing. When a request comes in, lock the list
 * and scan it for an existing work item that covers the new request. The new
 * request can be dropped if there already exists a Q item, since the client
 * is going to see that piece eventually.
 *
 * We use a spinlock to guard the work queue, which incidentally will protect
 * malloc/free.
 *
 * XXX - Clients make requests for chunk/block pieces they are
 * missing. For now, map that into an entire chunk and add it to the
 * work queue. This is going to result in a lot more data being sent
 * than is needed by the client, but lets wait and see if that
 * matters.
 */
static void
WorkQueueInit(void)
{
	pthread_mutex_init(&WorkQLock, NULL);
	queue_init(&WorkQ);
150
151

	if (WorkQDelay < 0)
Mike Hibler's avatar
Mike Hibler committed
152
		WorkQDelay = sleeptime(1, NULL, 1);
153

154
#ifdef STATS
Mike Hibler's avatar
Mike Hibler committed
155
	chunkmap = calloc(FileInfo.chunks, 1);
156
#endif
157
158
}

Mike Hibler's avatar
Mike Hibler committed
159
160
161
162
/*
 * Enqueue a work request.
 * If map==NULL, then we want the entire chunk.
 */
163
static int
Mike Hibler's avatar
Mike Hibler committed
164
WorkQueueEnqueue(int chunk, BlockMap_t *map, int count)
165
166
{
	WQelem_t	*wqel;
Mike Hibler's avatar
Mike Hibler committed
167
168
	int		elt, blocks;

169
170
	if (count == 0)
		return 0;
171

172
173
	pthread_mutex_lock(&WorkQLock);

Mike Hibler's avatar
Mike Hibler committed
174
175
176
177
178
179
180
181
182
183
184
185
	/*
	 * Common case: a full chunk request for the full block we are
	 * currently sending.  Don't queue.
	 */
	if (count == CHUNKSIZE && chunk == WorkChunk && count == WorkCount) {
		EVENT(1, EV_WORKMERGE, mcastaddr, chunk, count, count, ~0);
		pthread_mutex_unlock(&WorkQLock);
		return 0;
	}

	elt = WorkQSize - 1;
	queue_riterate(&WorkQ, wqel, WQelem_t *, chain) {
186
187
		if (wqel->chunk == chunk) {
			/*
Mike Hibler's avatar
Mike Hibler committed
188
189
190
			 * If this is the head element of the queue
			 * we can only merge if the request is beyond
			 * the range being currently processed.
191
			 */
Mike Hibler's avatar
Mike Hibler committed
192
193
194
195
196
			if ((WQelem_t *)queue_first(&WorkQ) == wqel &&
			    chunk == WorkChunk &&
			    BlockMapFirst(map) < WorkBlock + WorkCount) {
				elt--;
				continue;
197
			}
Mike Hibler's avatar
Mike Hibler committed
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212

			/*
			 * We have a queued request for the entire chunk
			 * already, nothing to do.
			 */
			if (wqel->nblocks == CHUNKSIZE)
				blocks = 0;
			else
				blocks = BlockMapMerge(map, &wqel->blockmap);
			EVENT(1, EV_WORKMERGE, mcastaddr,
			      chunk, wqel->nblocks, blocks, elt);
			wqel->nblocks += blocks;
			assert(wqel->nblocks <= CHUNKSIZE);
			pthread_mutex_unlock(&WorkQLock);
			return 0;
213
		}
Mike Hibler's avatar
Mike Hibler committed
214
		elt--;
215
216
	}

Mike Hibler's avatar
Mike Hibler committed
217
218
	wqel = calloc(1, sizeof(WQelem_t));
	if (wqel == NULL)
219
220
221
		fatal("WorkQueueEnqueue: No more memory");

	wqel->chunk = chunk;
Mike Hibler's avatar
Mike Hibler committed
222
223
	wqel->nblocks = count;
	wqel->blockmap = *map;
224
	queue_enter(&WorkQ, wqel, WQelem_t *, chain);
225
226
227
228
229
	WorkQSize++;
#ifdef STATS
	if (WorkQSize > WorkQMax)
		WorkQMax = WorkQSize;
#endif
230
231

	pthread_mutex_unlock(&WorkQLock);
232

Mike Hibler's avatar
Mike Hibler committed
233
	EVENT(1, EV_WORKENQ, mcastaddr, chunk, count, WorkQSize, 0);
234
235
236
237
	return 1;
}

static int
Mike Hibler's avatar
Mike Hibler committed
238
WorkQueueDequeue(int *chunkp, int *blockp, int *countp)
239
240
{
	WQelem_t	*wqel;
Mike Hibler's avatar
Mike Hibler committed
241
	int		chunk, block, count;
242

243
244
	pthread_mutex_lock(&WorkQLock);

245
	/*
246
247
	 * Condvars broken in linux threads impl, so use this rather bogus
	 * sleep to keep from churning cycles. 
248
	 */
249
	if (queue_empty(&WorkQ)) {
Mike Hibler's avatar
Mike Hibler committed
250
		WorkChunk = -1;
251
252
253
		pthread_mutex_unlock(&WorkQLock);
		fsleep(WorkQDelay);
		return 0;
254
	}
255
	
Mike Hibler's avatar
Mike Hibler committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
	wqel = (WQelem_t *) queue_first(&WorkQ);
	chunk = wqel->chunk;
	if (wqel->nblocks == CHUNKSIZE) {
		block = 0;
		count = CHUNKSIZE;
	} else
		count = BlockMapExtract(&wqel->blockmap, &block);
	assert(count <= wqel->nblocks);
	wqel->nblocks -= count;
	if (wqel->nblocks == 0) {
		queue_remove(&WorkQ, wqel, WQelem_t *, chain);
		free(wqel);
		WorkQSize--;
	}
	WorkChunk = chunk;
	WorkBlock = block;
	WorkCount = count;
273
274

	pthread_mutex_unlock(&WorkQLock);
275

Mike Hibler's avatar
Mike Hibler committed
276
277
278
279
280
	*chunkp = chunk;
	*blockp = block;
	*countp = count;

	EVENT(1, EV_WORKDEQ, mcastaddr, chunk, block, count, WorkQSize);
281
282
283
	return 1;
}

Mike Hibler's avatar
Mike Hibler committed
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
static void
ClientEnqueueMap(int chunk, BlockMap_t *map, int count, int isretry)
{
	int		enqueued;

	if (count != CHUNKSIZE) {
		DOSTAT(blockslost+=count);
		blockslost += count;
		DOSTAT(partialreq++);
	}

	enqueued = WorkQueueEnqueue(chunk, map, count);
	if (!enqueued)
		DOSTAT(qmerges++);
#ifdef STATS
	else if (chunkmap != 0 && count == CHUNKSIZE) {
		if (chunkmap[chunk]) {
			if (debug > 1)
				log("Duplicate chunk request: %d", chunk);
			EVENT(1, EV_DUPCHUNK, mcastaddr, chunk, 0, 0, 0);
			DOSTAT(dupsent++);
		} else
			chunkmap[chunk] = 1;
	}
#endif

	if (isretry) {
		clientretries++;
		/*
		 * We only consider the block lost if we didn't have it
		 * on the server queue.  This is a feeble attempt to
		 * filter out rerequests prompted by a long server queue.
		 * Note we only do it at chunk granularity.
		 */
		if (enqueued) {
			if (lostmap)
				lostmap[chunk]++;
			DOSTAT(clientlost++);
		}
	}
}

326
/*
327
328
329
330
331
332
333
334
 * A client joins. We print out the time at which the client joins, and
 * return a reply packet with the number of chunks in the file so that
 * the client knows how much to ask for. We do not do anything else with
 * this info; clients can crash and go away and it does not matter. If they
 * crash they will start up again later. Inactivity is defined as a period
 * with no data block requests. The client will resend its join message
 * until it gets a reply back; duplicates of either the request or the
 * reply are harmless.
335
336
337
338
 */
static void
ClientJoin(Packet_t *p)
{
339
340
	struct in_addr	ipaddr   = { p->hdr.srcip };
	unsigned int    clientid = p->msg.join.clientid;
341
342

	/*
343
	 * Return fileinfo. Duplicates are harmless.
344
	 */
345
	EVENT(1, EV_JOINREQ, ipaddr, clientid, 0, 0, 0);
346
347
	p->hdr.type            = PKTTYPE_REPLY;
	p->hdr.datalen         = sizeof(p->msg.join);
348
349
	p->msg.join.blockcount = FileInfo.blocks;
	PacketReply(p);
Mike Hibler's avatar
Mike Hibler committed
350
351
352
353
354
#ifdef STATS
	{
		int i, j = -1;

		for (i = 0; i < MAXCLIENTS; i++) {
355
			if (clients[i].id == clientid) {
356
				if (clients[i].ip != ipaddr.s_addr) {
357
358
					log("%s reuses active client id",
					    inet_ntoa(ipaddr));
359
					clients[i].ip = ipaddr.s_addr;
360
361
362
				}
				break;
			}
363
			if (clients[i].ip == ipaddr.s_addr) {
364
365
366
				log("%s rejoins with different cid, ocid=%u",
				    inet_ntoa(ipaddr), clients[i].id);
				clients[i].id = clientid;
Mike Hibler's avatar
Mike Hibler committed
367
				break;
368
369
			}
			if (j == -1 && clients[i].id == 0)
Mike Hibler's avatar
Mike Hibler committed
370
371
372
373
				j = i;
		}
		if (i == MAXCLIENTS) {
			activeclients++;
374
375
			if (j != -1) {
				clients[j].id = clientid;
376
				clients[j].ip = ipaddr.s_addr;
377
			}
Mike Hibler's avatar
Mike Hibler committed
378
379
		}
	}
Mike Hibler's avatar
Mike Hibler committed
380
	DOSTAT(joinrep++);
Mike Hibler's avatar
Mike Hibler committed
381
382
383
384
#else
	activeclients++;
#endif

385
	EVENT(1, EV_JOINREP, ipaddr, FileInfo.blocks, 0, 0, 0);
386
387
388
389
390

	/*
	 * Log after we send reply so that we get the packet off as
	 * quickly as possible!
	 */
Mike Hibler's avatar
Mike Hibler committed
391
392
	log("%s (id %u) joins at %s!  %d active clients.",
	    inet_ntoa(ipaddr), clientid, CurrentTimeString(), activeclients);
393
394
395
}

/*
396
397
 * A client leaves. Not much to it. All we do is print out a log statement
 * about it so that we can see the time. If the packet is lost, no big deal.
398
399
400
401
402
 */
static void
ClientLeave(Packet_t *p)
{
	struct in_addr	ipaddr = { p->hdr.srcip };
Mike Hibler's avatar
Mike Hibler committed
403
	unsigned int clientid = p->msg.leave.clientid;
404

Mike Hibler's avatar
Mike Hibler committed
405
	EVENT(1, EV_LEAVEMSG, ipaddr, clientid, p->msg.leave.elapsed, 0, 0);
406

Mike Hibler's avatar
Mike Hibler committed
407
408
409
410
411
#ifdef STATS
	{
		int i;

		for (i = 0; i < MAXCLIENTS; i++)
412
			if (clients[i].id == clientid) {
Mike Hibler's avatar
Mike Hibler committed
413
				activeclients--;
414
				clients[i].id = 0;
415
416
417
418
419
420
				clients[i].ip = 0;
				log("%s (id %u): leaves at %s, "
				    "ran for %d seconds.  %d active clients",
				    inet_ntoa(ipaddr), clientid,
				    CurrentTimeString(),
				    p->msg.leave.elapsed, activeclients);
Mike Hibler's avatar
Mike Hibler committed
421
422
				break;
			}
423
424
425
		if (i == MAXCLIENTS)
			log("%s (id %u): spurious leave ignored",
			    inet_ntoa(ipaddr), clientid);
Mike Hibler's avatar
Mike Hibler committed
426
427
428
429
430
431
	}
#else
	activeclients--;
	log("%s (id %u): leaves at %s, ran for %d seconds.  %d active clients",
	    inet_ntoa(ipaddr), clientid, CurrentTimeString(),
	    p->msg.leave.elapsed, activeclients);
432
#endif
433
434
}

435
436
437
438
439
440
441
442
/*
 * A client leaves. Not much to it. All we do is print out a log statement
 * about it so that we can see the time. If the packet is lost, no big deal.
 */
static void
ClientLeave2(Packet_t *p)
{
	struct in_addr	ipaddr = { p->hdr.srcip };
Mike Hibler's avatar
Mike Hibler committed
443
	unsigned int clientid = p->msg.leave2.clientid;
444

Mike Hibler's avatar
Mike Hibler committed
445
	EVENT(1, EV_LEAVEMSG, ipaddr, clientid, p->msg.leave2.elapsed, 0, 0);
446
447

#ifdef STATS
Mike Hibler's avatar
Mike Hibler committed
448
449
450
451
	{
		int i;

		for (i = 0; i < MAXCLIENTS; i++)
452
453
			if (clients[i].id == clientid) {
				clients[i].id = 0;
454
				clients[i].ip = 0;
Mike Hibler's avatar
Mike Hibler committed
455
456
457
458
459
460
				activeclients--;
				log("%s (id %u): leaves at %s, "
				    "ran for %d seconds.  %d active clients",
				    inet_ntoa(ipaddr), clientid,
				    CurrentTimeString(),
				    p->msg.leave2.elapsed, activeclients);
461
				ClientStatsDump(clientid, &p->msg.leave2.stats);
Mike Hibler's avatar
Mike Hibler committed
462
463
				break;
			}
464
465
466
		if (i == MAXCLIENTS)
			log("%s (id %u): spurious leave ignored",
			    inet_ntoa(ipaddr), clientid);
Mike Hibler's avatar
Mike Hibler committed
467
468
469
470
471
472
	}
#else
	activeclients--;
	log("%s (id %u): leaves at %s, ran for %d seconds.  %d active clients",
	    inet_ntoa(ipaddr), clientid, CurrentTimeString(),
	    p->msg.leave2.elapsed, activeclients);
473
474
475
#endif
}

476
477
478
479
480
481
482
483
484
485
486
487
/*
 * A client requests a chunk/block. Add to the workqueue, but do not
 * send a reply. The client will make a new request later if the packet
 * got lost.
 */
static void
ClientRequest(Packet_t *p)
{
	struct in_addr	ipaddr = { p->hdr.srcip };
	int		chunk = p->msg.request.chunk;
	int		block = p->msg.request.block;
	int		count = p->msg.request.count;
Mike Hibler's avatar
Mike Hibler committed
488
	BlockMap_t	tmp;
489

490
491
492
	if (count == 0)
		log("WARNING: ClientRequest with zero count");

493
	EVENT(1, EV_REQMSG, ipaddr, chunk, block, count, 0);
494
495
496
	if (block + count > CHUNKSIZE)
		fatal("Bad request from %s - chunk:%d block:%d size:%d", 
		      inet_ntoa(ipaddr), chunk, block, count);
497

Mike Hibler's avatar
Mike Hibler committed
498
499
	BlockMapInit(&tmp, block, count);
	ClientEnqueueMap(chunk, &tmp, count, 0);
500

Mike Hibler's avatar
Mike Hibler committed
501
502
503
	if (debug > 1) {
		log("Client %s requests chunk:%d block:%d size:%d",
		    inet_ntoa(ipaddr), chunk, block, count);
504
	}
Mike Hibler's avatar
Mike Hibler committed
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
}

/*
 * A client requests a chunk/block. Add to the workqueue, but do not
 * send a reply. The client will make a new request later if the packet
 * got lost.
 */
static void
ClientPartialRequest(Packet_t *p)
{
	struct in_addr	ipaddr = { p->hdr.srcip };
	int		chunk = p->msg.prequest.chunk;
	int		count;

	count = BlockMapIsAlloc(&p->msg.prequest.blockmap, 0, CHUNKSIZE);
520
521
522
523

	if (count == 0)
		log("WARNING: ClientPartialRequest with zero count");

Mike Hibler's avatar
Mike Hibler committed
524
525
526
	EVENT(1, EV_PREQMSG, ipaddr, chunk, count, p->msg.prequest.retries, 0);
	ClientEnqueueMap(chunk, &p->msg.prequest.blockmap, count,
			 p->msg.prequest.retries);
527

Mike Hibler's avatar
Mike Hibler committed
528
	if (debug > 1) {
Mike Hibler's avatar
Mike Hibler committed
529
530
		log("Client %s requests %d blocks of chunk:%d",
		    inet_ntoa(ipaddr), count, chunk);
531
532
533
534
535
536
537
538
539
540
541
	}
}

/*
 * The server receive thread. This thread does nothing more than receive
 * request packets from the clients, and add to the work queue.
 */
void *
ServerRecvThread(void *arg)
{
	Packet_t	packet, *p = &packet;
Mike Hibler's avatar
Mike Hibler committed
542
	static int	gotone;
543

Mike Hibler's avatar
Mike Hibler committed
544
	if (debug > 1)
545
546
547
		log("Server pthread starting up ...");
	
	while (1) {
548
		pthread_testcancel();
Mike Hibler's avatar
Mike Hibler committed
549
		if (PacketReceive(p) != 0) {
550
551
			continue;
		}
Mike Hibler's avatar
Mike Hibler committed
552
		DOSTAT(msgin++);
553

554
		if (! PacketValid(p, FileInfo.chunks)) {
555
			struct in_addr ipaddr = { p->hdr.srcip };
556
			DOSTAT(badpackets++);
Mike Hibler's avatar
Mike Hibler committed
557
			log("bad packet %d/%d from %s, ignored",
558
559
			    p->hdr.type, p->hdr.subtype, inet_ntoa(ipaddr));
			if (p->hdr.type == PKTTYPE_REQUEST &&
Mike Hibler's avatar
Mike Hibler committed
560
561
562
			    (p->hdr.subtype == PKTSUBTYPE_REQUEST ||
			     p->hdr.subtype == PKTSUBTYPE_PREQUEST))
				log("  len=%d, chunk=%d(%d), word2=%d",
563
				    p->hdr.datalen, p->msg.request.chunk,
Mike Hibler's avatar
Mike Hibler committed
564
				    FileInfo.chunks, p->msg.request.block);
565
566
			continue;
		}
Mike Hibler's avatar
Mike Hibler committed
567
568
569
570
571
		gettimeofday(&LastReq, 0);
		if (!gotone) {
			FirstReq = LastReq;
			gotone = 1;
		}
572

573
574
		switch (p->hdr.subtype) {
		case PKTSUBTYPE_JOIN:
Mike Hibler's avatar
Mike Hibler committed
575
			DOSTAT(joins++);
576
577
578
			ClientJoin(p);
			break;
		case PKTSUBTYPE_LEAVE:
Mike Hibler's avatar
Mike Hibler committed
579
			DOSTAT(leaves++);
580
581
			ClientLeave(p);
			break;
582
583
584
585
		case PKTSUBTYPE_LEAVE2:
			DOSTAT(leaves++);
			ClientLeave2(p);
			break;
586
		case PKTSUBTYPE_REQUEST:
Mike Hibler's avatar
Mike Hibler committed
587
			DOSTAT(requests++);
588
589
			ClientRequest(p);
			break;
Mike Hibler's avatar
Mike Hibler committed
590
591
592
593
594
		case PKTSUBTYPE_PREQUEST:
			DOSTAT(requests++);
			ClientPartialRequest(p);
			break;

595
596
597
598
599
600
601
602
603
604
605
606
		}
	}
}

/*
 * The main thread spits out blocks. 
 *
 * NOTES: Perhaps use readv into a vector of packet buffers?
 */
static void
PlayFrisbee(void)
{
607
	int		chunk, block, blockcount, cc, j, idlelastloop = 0;
608
	int		startblock, lastblock, throttle = 0;
609
	Packet_t	packet, *p = &packet;
610
	char		*databuf;
611
	off_t		offset;
Mike Hibler's avatar
Mike Hibler committed
612
	struct timeval	startnext;
613

614
615
616
	if ((databuf = malloc(readsize * BLOCKSIZE)) == NULL)
		fatal("could not allocate read buffer");

617
	while (1) {
618
619
620
		if (killme)
			return;
		
621
		/*
622
623
624
625
626
		 * Look for a WorkQ item to process. When there is nothing
		 * to process, check for being idle too long, and exit if
		 * no one asks for anything for a long time. Note that
		 * WorkQueueDequeue will delay for a while, so this will not
		 * spin.
627
		 */
628
		if (! WorkQueueDequeue(&chunk, &startblock, &blockcount)) {
Mike Hibler's avatar
Mike Hibler committed
629
630
631
632
633
634
635
636
637
638
639
			struct timeval stamp;

			gettimeofday(&stamp, 0);

			/*
			 * Restart an interval on every idle
			 */
			if (burstinterval > 0) {
				addusec(&startnext, &stamp, burstinterval);
				throttle = 0;
			}
640

641
			/* If zero, never exit */
642
			if (timeout == 0)
643
644
				continue;
			
Mike Hibler's avatar
Mike Hibler committed
645
646
647
648
649
650
651
652
#ifdef STATS
			/* If less than zero, exit when last cilent leaves */
			if (timeout < 0 &&
			    Stats.joins > 0 && activeclients == 0) {
				log("Last client left!");
				break;
			}
#endif
653

Mike Hibler's avatar
Mike Hibler committed
654
655
656
			if (idlelastloop) {
				if (timeout > 0 &&
				    stamp.tv_sec - IdleTimeStamp.tv_sec >
657
658
659
660
				    timeout) {
					log("No requests for %d seconds!",
					    timeout);
					break;
661
				}
Mike Hibler's avatar
Mike Hibler committed
662
			} else {
663
				DOSTAT(goesidle++);
Mike Hibler's avatar
Mike Hibler committed
664
				IdleTimeStamp = stamp;
665
				idlelastloop = 1;
666
667
668
			}
			continue;
		}
669
670
		idlelastloop = 0;
		
671
672
673
674
675
676
677
678
		lastblock = startblock + blockcount;

		/* Offset within the file */
		offset = (((off_t) BLOCKSIZE * chunk * CHUNKSIZE) +
			  ((off_t) BLOCKSIZE * startblock));

		for (block = startblock; block < lastblock; ) {
			int	readcount;
679
680
			int	readbytes;
			int	resends;
Mike Hibler's avatar
Mike Hibler committed
681
682
			int	thisburst = 0;

683
684
685
			/*
			 * Read blocks of data from disk.
			 */
686
687
			if (lastblock - block > readsize)
				readcount = readsize;
688
689
			else
				readcount = lastblock - block;
690
			readbytes = readcount * BLOCKSIZE;
691

692
			if ((cc = mypread(FileInfo.fd, databuf,
693
					  readbytes, offset)) <= 0) {
694
695
696
697
				if (cc < 0)
					pfatal("Reading File");
				fatal("EOF on file");
			}
Mike Hibler's avatar
Mike Hibler committed
698
699
			DOSTAT(filereads++);
			DOSTAT(filebytes += cc);
700
			EVENT(2, EV_READFILE, mcastaddr,
701
702
703
			      offset, readbytes, cc, 0);
			if (cc != readbytes)
				fatal("Short read: %d!=%d", cc, readbytes);
704
705
706
707
708
709
710
711
712
713
714

			for (j = 0; j < readcount; j++) {
				p->hdr.type    = PKTTYPE_REQUEST;
				p->hdr.subtype = PKTSUBTYPE_BLOCK;
				p->hdr.datalen = sizeof(p->msg.block);
				p->msg.block.chunk = chunk;
				p->msg.block.block = block + j;
				memcpy(p->msg.block.buf,
				       &databuf[j * BLOCKSIZE],
				       BLOCKSIZE);

715
716
				PacketSend(p, &resends);
				sendretries += resends;
Mike Hibler's avatar
Mike Hibler committed
717
				DOSTAT(blockssent++);
718
719
				EVENT(3, EV_BLOCKMSG, mcastaddr,
				      chunk, block+j, 0, 0);
720
721
722
723
724

				/*
				 * Completed a burst.  Adjust the busrtsize
				 * if necessary and delay as required.
				 */
Mike Hibler's avatar
Mike Hibler committed
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
				if (burstinterval > 0 &&
				    ++throttle >= burstsize) {
					thisburst += throttle;

					/*
					 * XXX if we overran our interval, we
					 * reset the base time so we don't
					 * accumulate error.
					 */
					if (!sleeptil(&startnext)) {
						EVENT(1, EV_OVERRUN, mcastaddr,
						      startnext.tv_sec,
						      startnext.tv_usec,
						      chunk, block+j);
						gettimeofday(&startnext, 0);
						DOSTAT(missed++);
					} else {
						if (thisburst > burstsize)
							EVENT(1, EV_LONGBURST,
							      mcastaddr,
							      thisburst,
							      burstsize,
							      chunk, block+j);
						thisburst = 0;
					}
750
751
					if (dynburst)
						calcburst();
Mike Hibler's avatar
Mike Hibler committed
752
753
					addusec(&startnext, &startnext,
						burstinterval);
754
					throttle = 0;
Mike Hibler's avatar
Mike Hibler committed
755
					DOSTAT(intervals++);
756
				}
757
			}
758
			offset   += readbytes;
759
760
761
			block    += readcount;
		}
	}
762
	free(databuf);
763
764
765
766
767
}

char *usagestr = 
 "usage: frisbeed [-d] <-p #> <-m mcastaddr> <filename>\n"
 " -d              Turn on debugging. Multiple -d options increase output.\n"
768
769
770
771
 " -p portnum      Specify a port number to listen on.\n"
 " -m mcastaddr    Specify a multicast address in dotted notation.\n"
 " -i mcastif      Specify a multicast interface in dotted notation.\n"
 " -b              Use broadcast instead of multicast\n"
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
 "\n";

void
usage()
{
	fprintf(stderr, usagestr);
	exit(1);
}

int
main(int argc, char **argv)
{
	int		ch, fd;
	pthread_t	child_pid;
	off_t		fsize;
787
	void		*ignored;
788

Mike Hibler's avatar
Mike Hibler committed
789
	while ((ch = getopt(argc, argv, "dhp:m:i:tbDT:R:B:G:L:W:")) != -1)
790
		switch(ch) {
791
792
793
794
		case 'b':
			broadcast++;
			break;
			
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
		case 'd':
			debug++;
			break;
			
		case 'p':
			portnum = atoi(optarg);
			break;
			
		case 'm':
			inet_aton(optarg, &mcastaddr);
			break;

		case 'i':
			inet_aton(optarg, &mcastif);
			break;
810
811
812
		case 't':
			tracing++;
			break;
813
814
815
816
817
818
819
820
821
822
823
824
825
		case 'D':
			dynburst = 1;
			break;
		case 'T':
			timeout = atoi(optarg);
			break;
		case 'R':
			readsize = atoi(optarg);
			break;
		case 'B':
			burstsize = atoi(optarg);
			break;
		case 'G':
Mike Hibler's avatar
Mike Hibler committed
826
827
828
829
830
			burstinterval = atoi(optarg);
			break;
		case 'W':
			bandwidth = atol(optarg);
			break;
831
832
833
834
835
836
837
838
839
840
841
842
843
		case 'h':
		case '?':
		default:
			usage();
		}
	argc -= optind;
	argv += optind;
	if (argc != 1)
		usage();

	if (!portnum || ! mcastaddr.s_addr)
		usage();

844
845
846
847
	signal(SIGINT, quit);
	signal(SIGTERM, quit);
	signal(SIGHUP, reinit);

848
	ServerLogInit();
849
	
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
	filename = argv[0];
	if (access(filename, R_OK) < 0)
		pfatal("Cannot read %s", filename);

	/*
	 * Open the file and get its size so that we can tell clients how
	 * much to expect/require.
	 */
	if ((fd = open(filename, O_RDONLY)) < 0)
		pfatal("Cannot open %s", filename);

	if ((fsize = lseek(fd, (off_t)0, SEEK_END)) < 0)
		pfatal("Cannot lseek to end of file");

	FileInfo.fd     = fd;
865
	FileInfo.blocks = (int) (roundup(fsize, (off_t)BLOCKSIZE) / BLOCKSIZE);
866
867
868
	FileInfo.chunks = FileInfo.blocks / CHUNKSIZE;
	log("Opened %s: %d blocks", filename, FileInfo.blocks);

Mike Hibler's avatar
Mike Hibler committed
869
870
	compute_sendrate();

871
	WorkQueueInit();
Mike Hibler's avatar
Mike Hibler committed
872
	lostmap = calloc(FileInfo.chunks, 1);
873

874
875
876
877
878
	/*
	 * Everything else done, now init the network.
	 */
	ServerNetInit();

879
880
881
882
883
	if (tracing) {
		ServerTraceInit("frisbeed");
		TraceStart(tracing);
	}

884
885
886
887
888
889
	/*
	 * Create the subthread to listen for packets.
	 */
	if (pthread_create(&child_pid, NULL, ServerRecvThread, (void *)0)) {
		fatal("Failed to create pthread!");
	}
890
	gettimeofday(&IdleTimeStamp, 0);
891
892
	
	PlayFrisbee();
893
894
	pthread_cancel(child_pid);
	pthread_join(child_pid, &ignored);
895

896
897
898
899
	if (tracing) {
		TraceStop();
		TraceDump();
	}
Mike Hibler's avatar
Mike Hibler committed
900
	subtime(&LastReq, &LastReq, &FirstReq);
901

Mike Hibler's avatar
Mike Hibler committed
902
#ifdef  STATS
903
	{
Mike Hibler's avatar
Mike Hibler committed
904
		struct rusage ru;
905
906
		extern unsigned long nonetbufs;

Mike Hibler's avatar
Mike Hibler committed
907
		getrusage(RUSAGE_SELF, &ru);
908
		log("Params:");
Mike Hibler's avatar
Mike Hibler committed
909
910
911
912
		log("  chunk/block size    %d/%d", CHUNKSIZE, BLOCKSIZE);
		log("  burst size/interval %d/%d", burstsize, burstinterval);
		log("  file read size      %d", readsize);
		log("  file:size           %s:%qd",
913
914
		    filename, (long long)fsize);
		log("Stats:");
Mike Hibler's avatar
Mike Hibler committed
915
916
917
918
919
920
		log("  service time:      %d.%03d sec",
		    LastReq.tv_sec, LastReq.tv_usec/1000);
		log("  user/sys CPU time: %d.%03d/%d.%03d",
		    ru.ru_utime.tv_sec, ru.ru_utime.tv_usec/1000,
		    ru.ru_stime.tv_sec, ru.ru_stime.tv_usec/1000);
		log("  msgs in/out:       %d/%d",
921
		    Stats.msgin, Stats.joinrep + Stats.blockssent);
Mike Hibler's avatar
Mike Hibler committed
922
923
		log("  joins/leaves:      %d/%d", Stats.joins, Stats.leaves);
		log("  requests:          %d (%d merged in queue)",
924
		    Stats.requests, Stats.qmerges);
Mike Hibler's avatar
Mike Hibler committed
925
926
927
928
929
930
931
		log("  partial req/blks:  %d/%d",
		    Stats.partialreq, Stats.blockslost);
		log("  duplicate req:     %d",
		    Stats.dupsent);
		log("  client re-req:     %d",
		    Stats.clientlost);
		log("  1k blocks sent:    %d (%d repeated)",
932
		    Stats.blockssent, Stats.blockssent - FileInfo.blocks);
Mike Hibler's avatar
Mike Hibler committed
933
		log("  file reads:        %d (%qu bytes, %qu repeated)",
934
935
		    Stats.filereads, Stats.filebytes,
		    Stats.filebytes - FileInfo.blocks * BLOCKSIZE);
Mike Hibler's avatar
Mike Hibler committed
936
937
938
939
940
		log("  net idle/blocked:  %d/%d", Stats.goesidle, nonetbufs);
		log("  send intvl/missed: %d/%d",
		    Stats.intervals, Stats.missed);
		log("  spurious wakeups:  %d", Stats.wakeups);
		log("  max workq size:    %d", WorkQMax);
941
	}
Mike Hibler's avatar
Mike Hibler committed
942
943
#endif

944
945
946
947
948
949
950
	/*
	 * Exit from main thread will kill all the children.
	 */
	log("Exiting!");
	exit(0);
}

951
952
953
954
955
956
957
/*
 * We catch the signals, but do not do anything. We exit with 0 status
 * for these, since it indicates a desired shutdown.
 */
void
quit(int sig)
{
958
	killme = 1;
959
960
961
962
963
964
965
966
967
968
969
970
}

/*
 * We cannot reinit, so exit with non-zero to indicate it was unexpected.
 */
void
reinit(int sig)
{
	log("Caught signal %d. Exiting ...", sig);
	exit(1);
}

971
972
#define NFS_READ_DELAY	100000

973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
 * Wrap up pread with a retry mechanism to help protect against
 * transient NFS errors.
 */
static ssize_t
mypread(int fd, void *buf, size_t nbytes, off_t offset)
{
	int		cc, i, count = 0;

	while (nbytes) {
		int	maxretries = 100;

		for (i = 0; i < maxretries; i++) {
			cc = pread(fd, buf, nbytes, offset);
			if (cc == 0)
				fatal("EOF on file");

			if (cc > 0) {
				nbytes -= cc;
				buf    += cc;
				offset += cc;
				count  += cc;
				goto again;
			}

			if (i == 0)
				pwarning("read error: will retry");

1001
			fsleep(NFS_READ_DELAY);
1002
1003
1004
1005
		}
		pfatal("read error: busted for too long");
		return -1;
	again:
1006
		;
1007
1008
1009
	}
	return count;
}
1010

Mike Hibler's avatar
Mike Hibler committed
1011
1012
1013
1014
1015
#define LOSS_INTERVAL	250	/* interval in which we collect data (ms) */
#define MULT_DECREASE	0.95	/* mult factor to decrease burst rate */
#define ADD_INCREASE	1	/* add factore to increase burst rate */

#define CHUNK_LIMIT	0
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027

/*
 * Should we consider PacketSend retries?   They indicated that we are over
 * driving the socket?  Even though they have a builtin delay between retries,
 * we might be better off detecting the case and avoiding the delays.
 *
 * From Dave:
 *
 * A smoother one that is still fair with TCP is:
 *    W_{next} = W_{cur} - sqrt( W_{cur} ) if loss
 *    W_{next} = W_{cur} + 1 / sqrt( W_{cur} )  if no loss
 */
Mike Hibler's avatar
Mike Hibler committed
1028
static void
1029
1030
calcburst(void)
{
Mike Hibler's avatar
Mike Hibler committed
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
	static int		lastsendretries, bursts, lastclientretries;
	static struct timeval	nextstamp;
	struct timeval		stamp;
	int			clientlost, lostchunks, hadloss = 0;

	gettimeofday(&stamp, 0);
	if (nextstamp.tv_sec == 0) {
		addusec(&nextstamp, &stamp, LOSS_INTERVAL * 1000);
		return;
	}
1041

Mike Hibler's avatar
Mike Hibler committed
1042
	bursts++;
1043

Mike Hibler's avatar
Mike Hibler committed
1044
1045
1046
1047
1048
	/*
	 * Has a full interval passed?
	 */
	if (!pasttime(&stamp, &nextstamp))
		return;
1049

Mike Hibler's avatar
Mike Hibler committed
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
	/*
	 * An interval has past, now what constitiues a significant loss?
	 * The number of explicit client retry requests during the interval
	 * is the basis right now.
	 */
	clientlost = clientretries - lastclientretries;

	/*
	 * If we are overrunning our UDP socket then we are certainly
	 * transmitting too fast.  Allow one overrun per burst.
	 */
	if (sendretries - lastsendretries > bursts)
		hadloss = 1;

	lostchunks = 0;
	if (lostmap) {
		int i;
		for (i = 0; i < FileInfo.chunks; i++)
			if (lostmap[i]) {
				lostchunks++;
				lostmap[i] = 0;
			}
	}

	if (lostchunks > CHUNK_LIMIT)
		hadloss = 1;

	if (debug && hadloss)
		log("%d client retries for %d chunks from %d clients, "
		    "%d overruns in %d bursts",
		    clientlost, lostchunks, activeclients,
		    sendretries-lastsendretries, bursts);

	if (hadloss) {
1084
1085
1086
		/*
		 * Decrement the burstsize slowly.
		 */
Mike Hibler's avatar
Mike Hibler committed
1087
1088
1089
1090
		if (burstsize > 1) {
			burstsize = (int)(burstsize * MULT_DECREASE);
			if (burstsize < 1)
				burstsize = 1;
1091
			if (debug)
Mike Hibler's avatar
Mike Hibler committed
1092
				log("Decrement burstsize to %d", burstsize);
1093
		}
Mike Hibler's avatar
Mike Hibler committed
1094
	} else {
1095
1096
1097
		/*
		 * Increment the burstsize even more slowly.
		 */
Mike Hibler's avatar
Mike Hibler committed
1098
1099
1100
1101
		if (burstsize < maxburstsize) {
			burstsize += ADD_INCREASE;
			if (burstsize > maxburstsize)
				burstsize = maxburstsize;
1102
			if (debug)
Mike Hibler's avatar
Mike Hibler committed
1103
				log("Increment burstsize to %d", burstsize);
1104
1105
1106
		}
	}

Mike Hibler's avatar
Mike Hibler committed
1107
1108
1109
1110
1111
1112
1113
1114
1115
	/*
	 * Update for next time
	 */
	addusec(&nextstamp, &nextstamp, LOSS_INTERVAL * 1000);
	lastclientretries = clientretries;
	lastsendretries = sendretries;
	bursts = 0;
}

Mike Hibler's avatar
Mike Hibler committed
1116
1117
#define LINK_OVERHEAD	(14+4+8+12)	/* ethernet (hdr+CRC+preamble+gap) */
#define IP_OVERHEAD	(20+8)		/* IP + UDP hdrs */
Mike Hibler's avatar
Mike Hibler committed
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185

/*
 * Compute the approximate send rate.  Due to typically coarse grained
 * timers, send rate is implemented as a burst rate and a burst interval;
 * i.e. we put out "burst size" blocks every "burst interval" microseconds.
 * The user can specify either an aggregate bandwidth (bandwidth) or the
 * individual components (burstsize, burstinterval).
 */
static void
compute_sendrate(void)
{
	double blockspersec, burstspersec;
	int clockres, wireblocksize, minburst;

	if (burstinterval == 0) {
		burstsize = 1;
		log("Maximum send bandwidth unlimited");
		return;
	}

	/* clock resolution in usec */
	clockres = sleeptime(1, 0, 1);

	burstspersec = 1000000.0 / clockres;
	wireblocksize = (sizeof(Packet_t) + IP_OVERHEAD + LINK_OVERHEAD) * 8;

	if (bandwidth != 0) {
		/*
		 * Convert Mbits/sec to blocks/sec
		 */
		blockspersec = bandwidth / wireblocksize;

		/*
		 * If blocks/sec less than maximum bursts/sec,
		 * crank down the clock.
		 */
		if (blockspersec < burstspersec)
			burstspersec = blockspersec;

		burstsize = blockspersec / burstspersec;
		burstinterval = (int)(1000000 / burstspersec);
	}

	burstinterval = sleeptime(burstinterval, 0, 1);
	burstspersec = 1000000.0 / burstinterval;
	bandwidth = (unsigned long)(burstspersec*burstsize*wireblocksize);

	/*
	 * For the dynamic rate throttle, we use the standard parameters
	 * as a cap.  We adjust the burstsize to ensure it is large
	 * enough to ensure a reasonable starting multiplicitive decrement.
	 * If we cannot do that while still maintaining a reasonable
	 * burstinterval (< 0.5 seconds), just cancel the dynamic behavior.
	 */
	minburst = (int)(4.0 / (1.0 - MULT_DECREASE));
	if (dynburst && burstsize < minburst) {
		double burstfactor = (double)minburst / burstsize;

		if (burstinterval * burstfactor < 500000) {
			burstsize = minburst;
			burstinterval =
				sleeptime((int)burstinterval*burstfactor,
					  0, 1);
			burstspersec = (double)1000000.0 / burstinterval;
			bandwidth = (unsigned long)
				(burstspersec*burstsize*wireblocksize);
		} else
			dynburst = 0;
1186
	}
Mike Hibler's avatar
Mike Hibler committed
1187
1188
1189
1190
1191
1192
1193
1194
	if (dynburst)
		maxburstsize = burstsize;

	log("Maximum send bandwidth %.3f Mbits/sec (%d blocks/sec)",
	    bandwidth / 1000000.0, bandwidth / wireblocksize);
	if (debug)
		log("  burstsize=%d, burstinterval=%dus",
		    burstsize, burstinterval);
1195
}