Commit ca73f315 authored by Mike Hibler's avatar Mike Hibler
Browse files

Robustness work on the image upload side.

Implement idle ("lack of progress") timeout on the server.

Timeout work on the client-side too. Implement idle timeouts (not for
SSL connections yet), fixup handling of over-all timeout. In particular,
if we are reading from a pipe ('-') don't start timeout til we get some
input from the pipe.

Add option to put out progress dots on the client (cuz we are all about
dots in imagezip/frisbee).
parent ef20fcdc
......@@ -54,6 +54,7 @@ extern int debug;
struct emulab_configstate {
int image_maxsize; /* sitevar:images/create/maxsize (in GB) */
int image_maxwait; /* sitevar:images/create/maxwait (in min) */
int image_maxiwait; /* sitevar:images/create/idlewait (in min) */
int image_maxrate_dyn; /* sitevar:images/frisbee/maxrate_dynamic */
int image_maxrate_std; /* sitevar:images/frisbee/maxrate_std (in MB/s) */
int image_maxrate_usr; /* sitevar:images/frisbee/maxrate_usr (in MB/s) */
......@@ -99,6 +100,7 @@ static int INELABINELAB = 0;
static uint64_t put_maxsize = 10000000000ULL; /* zero means no limit */
static uint32_t put_maxwait = 2000; /* zero means no limit */
static uint32_t put_maxiwait = 120; /* zero means no limit */
static uint32_t get_maxrate_dyn = 0; /* non-zero means use dynamic */
static uint32_t get_maxrate_std = 72000000; /* zero means no limit */
static uint32_t get_maxrate_usr = 54000000; /* zero means no limit */
......@@ -187,6 +189,17 @@ emulab_read(void)
FrisLog(" image_put_maxwait = %d min",
(int)(put_maxwait/60));
val = emulab_getsitevar("images/create/idlewait");
if (val) {
ival = atoi(val);
/* in minutes, allow up to about 10TB @ 10MB/sec */
if (ival >= 0 && ival < 20000)
put_maxiwait = (uint32_t)ival * 60;
free(val);
}
FrisLog(" image_put_maxiwait = %d min",
(int)(put_maxiwait/60));
val = emulab_getsitevar("images/frisbee/maxrate_dyn");
if (val) {
ival = atoi(val);
......@@ -234,6 +247,7 @@ emulab_save(void)
cs->image_maxsize = put_maxsize;
cs->image_maxwait = put_maxwait;
cs->image_maxiwait = put_maxiwait;
cs->image_maxrate_dyn = get_maxrate_dyn;
cs->image_maxrate_std = get_maxrate_std;
cs->image_maxrate_usr = get_maxrate_usr;
......@@ -252,6 +266,9 @@ emulab_restore(void *state)
put_maxwait = cs->image_maxwait;
FrisLog(" image_put_maxwait = %d min",
(int)(put_maxwait/60));
put_maxiwait = cs->image_maxiwait;
FrisLog(" image_put_maxiwait = %d min",
(int)(put_maxiwait/60));
get_maxrate_dyn = cs->image_maxrate_dyn;
FrisLog(" image_get_maxrate_dyn = %s",
get_maxrate_dyn ? "true" : "false");
......@@ -354,13 +371,14 @@ set_get_values(struct config_host_authinfo *ai, int ix)
/* and whack the put_* fields */
ii->put_maxsize = 0;
ii->put_timeout = 0;
ii->put_itimeout = 0;
ii->put_options = NULL;
ii->put_oldversion = NULL;
}
/*
* Set the PUT maxsize/options for a particular node/image.
* XXX right now these are completely pulled out of our posterior.
* XXX right now these mostly come from the DB.
*/
static void
set_put_values(struct config_host_authinfo *ai, int ix)
......@@ -372,6 +390,7 @@ set_put_values(struct config_host_authinfo *ai, int ix)
/* put_timeout */
ii->put_timeout = put_maxwait;
ii->put_itimeout = put_maxiwait;
/* put_oldversion */
/*
......
......@@ -156,6 +156,7 @@ set_get_values(struct config_host_authinfo *ai, int ix)
/* and whack the put_* fields */
ai->imageinfo[ix].put_maxsize = 0;
ai->imageinfo[ix].put_timeout = 0;
ai->imageinfo[ix].put_itimeout = 0;
ai->imageinfo[ix].put_oldversion = NULL;
ai->imageinfo[ix].put_options = NULL;
}
......@@ -172,6 +173,7 @@ set_put_values(struct config_host_authinfo *ai, int ix)
/* put_timeout */
ai->imageinfo[ix].put_timeout = 900;
ai->imageinfo[ix].put_itimeout = 120;
/* put_oldversion */
ai->imageinfo[ix].put_oldversion = NULL;
......
......@@ -32,6 +32,7 @@ struct config_imageinfo {
char *put_options; /* command line options for PUT server */
uint64_t put_maxsize; /* maximum size for this image */
int put_timeout; /* max time to allow PUT server to run */
int put_itimeout; /* max time to allow per-socket-op to run */
char *put_oldversion; /* where to save the old version */
void *extra; /* config-type specific info */
};
......
/*
* Copyright (c) 2010-2013 University of Utah and the Flux Group.
* Copyright (c) 2010-2014 University of Utah and the Flux Group.
*
* {{{EMULAB-LICENSE
*
......@@ -47,6 +47,7 @@ static char *path;
static uint64_t maxsize = 0;
static int bufsize = (64 * 1024);;
static int timeout = 0;
static int idletimeout = 0;
static int sock = -1;
/* Globals */
......@@ -71,6 +72,9 @@ main(int argc, char **argv)
FrisLog("%s: listening on port %d for image data from %s (max of %llu bytes)",
path, portnum, inet_ntoa(clientip), maxsize);
if (idletimeout || timeout)
FrisLog("%s: using idletimeout=%ds, timeout=%ds\n",
path, idletimeout, timeout);
rv = recv_file();
close(sock);
......@@ -82,7 +86,7 @@ static void
parse_args(int argc, char **argv)
{
int ch;
while ((ch = getopt(argc, argv, "m:p:i:b:T:s:")) != -1) {
while ((ch = getopt(argc, argv, "m:p:i:b:I:T:s:")) != -1) {
switch (ch) {
case 'm':
if (!inet_aton(optarg, &clientip)) {
......@@ -109,6 +113,14 @@ parse_args(int argc, char **argv)
exit(1);
}
break;
case 'I':
idletimeout = atoi(optarg);
if (idletimeout < 0 || idletimeout > (24 * 60 * 60)) {
fprintf(stderr, "Invalid idle timeout %d\n",
idletimeout);
exit(1);
}
break;
case 'T':
timeout = atoi(optarg);
if (timeout < 0 || timeout > (24 * 60 * 60)) {
......@@ -129,6 +141,17 @@ parse_args(int argc, char **argv)
if (clientip.s_addr == 0 || portnum == 0 || argc < 1)
usage();
/*
* If both timeouts are set, make sure they play nice
*/
if (idletimeout > 0 && timeout > 0) {
/* idletimeout should be <= timeout */
if (idletimeout > timeout)
idletimeout = timeout;
/* if the are equal, no need for the idletimeout */
if (idletimeout == timeout)
idletimeout = 0;
}
path = argv[0];
}
......@@ -146,6 +169,7 @@ usage(void)
" -p <port> TCP port number on which to listen for client.\n"
" -i <iface> Interface on which to listen (specified by local IP).\n"
" -I <timo> Max time (in seconds) to allow connect to be idle (no traffic from client).\n"
" -T <timo> Max time (in seconds) to wait for upload to complete.\n"
" -s <size> Maximum amount of data (in bytes) to upload.\n"
"\n\n";
......@@ -157,7 +181,7 @@ usage(void)
static sigjmp_buf toenv;
static void
send_timeout(int sig)
recv_timeout(int sig)
{
siglongjmp(toenv, 1);
}
......@@ -177,6 +201,14 @@ recv_file()
int rv = 1;
char *stat;
gettimeofday(&st, NULL); /* XXX for early errors */
/*
* If we have an overall timeout (timeout > 0) then just set the
* alarm to that and we will longjmp if we hit it.
*
* Idletimeout are handled by the connection.
*/
if (timeout > 0) {
struct itimerval it;
......@@ -186,13 +218,11 @@ recv_file()
}
it.it_value.tv_sec = timeout;
it.it_value.tv_usec = 0;
it.it_interval = it.it_value;
signal(SIGALRM, send_timeout);
it.it_interval.tv_sec = it.it_interval.tv_usec = 0;
signal(SIGALRM, recv_timeout);
setitimer(ITIMER_REAL, &it, NULL);
}
gettimeofday(&st, NULL); /* XXX for early errors */
wbuf = malloc(bufsize);
if (wbuf == NULL) {
FrisError("Could not allocate %d byte buffer, try using -b",
......@@ -209,7 +239,7 @@ recv_file()
goto done;
}
conn = conn_accept_tcp(sock, &clientip);
conn = conn_accept_tcp(sock, &clientip, idletimeout, idletimeout);
if (conn == NULL) {
FrisError("Error accepting from %s", inet_ntoa(clientip));
goto done;
......@@ -226,7 +256,10 @@ recv_file()
cc = remaining;
ncc = conn_read(conn, wbuf, cc);
if (ncc < 0) {
FrisPwarning("socket read");
if (conn_timeout(conn))
rv = 2;
else
FrisPwarning("socket read");
goto done;
}
if (ncc == 0)
......@@ -252,8 +285,7 @@ recv_file()
if (timeout) {
struct itimerval it;
it.it_value.tv_sec = 0;
it.it_value.tv_usec = 0;
it.it_value.tv_sec = it.it_value.tv_usec = 0;
setitimer(ITIMER_REAL, &it, NULL);
}
......
......@@ -288,6 +288,7 @@ struct uploadextra {
char *realname;
uint64_t isize;
uint32_t mtime;
int itimeout;
};
static struct childinfo *findchild(char *, int, int);
......@@ -370,6 +371,7 @@ copy_imageinfo(struct config_imageinfo *ii)
goto fail;
nii->put_maxsize = ii->put_maxsize;
nii->put_timeout = ii->put_timeout;
nii->put_itimeout = ii->put_itimeout;
/* XXX don't care about extra right now */
return nii;
......@@ -1668,15 +1670,23 @@ startchild(struct childinfo *ci)
switch (ci->ptype) {
case PTYPE_SERVER:
{
int timo = ci->timeout;
/* XXX compensate for 2s sleep in handle_get */
if (timo)
timo += 2;
pname = FRISBEE_SERVER;
opts = ci->imageinfo->get_options ?
ci->imageinfo->get_options : "";
snprintf(argbuf, sizeof argbuf,
"%s -i %s -T %d %s %s -m %s -p %d %s",
pname, ifacestr, ci->timeout, opts,
pname, ifacestr, timo, opts,
ci->method == CONFIG_IMAGE_BCAST ? "-b" : "",
inet_ntoa(in), ci->port, ci->imageinfo->path);
break;
}
case PTYPE_CLIENT:
pname = FRISBEE_CLIENT;
snprintf(argbuf, sizeof argbuf,
......@@ -1689,15 +1699,24 @@ startchild(struct childinfo *ci)
break;
case PTYPE_UPLOADER:
{
int timo = ci->timeout;
int itimo =
((struct uploadextra *)ci->extra)->itimeout;
uint64_t isize =
((struct uploadextra *)ci->extra)->isize;
/* XXX compensate for 2s sleep in handle_put */
if (timo)
timo += 2;
if (itimo)
itimo += 2;
pname = FRISBEE_UPLOAD;
opts = ci->imageinfo->put_options ?
ci->imageinfo->put_options : "";
snprintf(argbuf, sizeof argbuf,
"%s -i %s -T %d %s -s %llu -m %s -p %d %s",
pname, ifacestr, ci->timeout, opts,
"%s -i %s -I %d -T %d %s -s %llu -m %s -p %d %s",
pname, ifacestr, itimo, timo, opts,
(unsigned long long)isize,
inet_ntoa(in), ci->port, ci->imageinfo->path);
break;
......@@ -2009,7 +2028,7 @@ startuploader(struct config_imageinfo *ii, in_addr_t meaddr, in_addr_t youaddr,
struct childinfo *ci;
struct uploadextra *ue;
char *tmpname;
int len;
int len, itimo = 0;
assert(findchild(ii->imageid, PTYPE_UPLOADER, MS_METHOD_ANY) == NULL);
assert(errorp != NULL);
......@@ -2029,6 +2048,12 @@ startuploader(struct config_imageinfo *ii, in_addr_t meaddr, in_addr_t youaddr,
if (timo == 0 || timo > ii->put_timeout)
timo = ii->put_timeout;
/*
* Add per-operation timeout if less than overall timeout.
*/
if (timo == 0 || ii->put_itimeout < timo)
itimo = ii->put_itimeout;
/*
* Find a port to use. Note that with MS_METHOD_UNICAST,
* get_server_address will return 0 as the addr, so we set it
......@@ -2079,6 +2104,7 @@ startuploader(struct config_imageinfo *ii, in_addr_t meaddr, in_addr_t youaddr,
memset(ue, 0, sizeof(*ue));
ue->isize = isize;
ue->mtime = mtime;
ue->itimeout = itimo;
/*
* Arrange to upload the image as <path>.tmp and then
......
/*
* Copyright (c) 2010-2013 University of Utah and the Flux Group.
* Copyright (c) 2010-2014 University of Utah and the Flux Group.
*
* {{{EMULAB-LICENSE
*
......@@ -26,6 +26,7 @@
* frisbee master server.
*/
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
......@@ -54,9 +55,11 @@ static uint64_t filesize;
static uint32_t mtime;
static int bufsize = (64 * 1024);;
static int timeout = -1;
static int idletimeout = 0;
static int usessl = 0;
static int verify = 1;
static in_addr_t proxyip = 0;
static int dots = 0;
/* Globals */
int debug = 0;
......@@ -90,6 +93,64 @@ main(int argc, char **argv)
mtime = 0;
if (timeout == -1)
timeout = 0;
/*
* Even more special:
*
* If we are piping in from an imagezip, it can take
* considerable time to start cranking out the image,
* in particular if we are checking/computing a signature.
* So don't start the server timeout ticking until we have
* some data to send.
*
* But first we do make an immediate query of the image to
* ensure it is accessible and attempt to prevent the writer
* of the pipe from doing a whole lotta work and then finding
* out it was all for naught. Note however that for this to
* work, we have to actively inform (i.e., signal) the other
* process. Just exiting is not enough since the writer won't
* notice that we have exited until it tries to write the
* pipe; i.e., until it has done all that work we were trying
* to have it avoid!
*
* So we make the query, sending a SIGPIPE to our process
* group if there is an error. We then select on the pipe
* and wait til it has something to offer. Only then will
* we continue and make the official PUT request and start
* the server timeout ticking.
*/
if (imageid && !askonly) {
fd_set set;
if (!put_request(imageid, ntohl(msip.s_addr), msport,
proxyip, filesize, mtime, 0, 1, timo,
&reply))
FrisFatal("Could not get upload info for '%s'",
imageid);
if (reply.error) {
#if 0 /* XXX bad idea as this will kill any wrapper script too */
/*
* Try SIGPIPE-ing our process group to
* see if we can actively kill off the
* writer (who otherwise won't notice we
* have exited until it goes to write the
* pipe. This may fail if we are not running
* as root and the writer is running as
* another uid.
*/
signal(SIGPIPE, SIG_IGN);
kill(0, SIGPIPE);
#endif
FrisFatal("%s: server returned error: %s",
imageid, GetMSError(reply.error));
}
fprintf(stderr, "Upload from stdin, "
"waiting for data availability...\n");
FD_ZERO(&set);
FD_SET(STDIN_FILENO, &set);
(void) select(1, &set, NULL, NULL, NULL);
fprintf(stderr, "...data ready, continuing.\n");
}
}
/* otherwise make sure file exists and see how big it is */
else {
......@@ -102,7 +163,7 @@ main(int argc, char **argv)
filesize = sb.st_size;
mtime = (uint32_t)sb.st_mtime;
/* timeout is a function of file size */
/* overall timeout is a function of file size */
if (timeout == -1) {
timeout = (int)(filesize / MIN_UPLOAD_RATE);
/* file is stupid-huge, no timeout */
......@@ -114,6 +175,12 @@ main(int argc, char **argv)
}
}
/*
* No need for connection timeout if it is >= overall timeout.
*/
if (idletimeout >= timeout)
idletimeout = 0;
if (imageid) {
if (!put_request(imageid, ntohl(msip.s_addr), msport, proxyip,
filesize, mtime, timeout, askonly, timo,
......@@ -161,9 +228,12 @@ main(int argc, char **argv)
FrisLog("%s: upload to %s:%d from %s",
imageid, inet_ntoa(serverip), portnum, uploadpath);
if (idletimeout || timeout)
FrisLog("%s: using idletimeout=%ds, timeout=%ds\n",
imageid, idletimeout, timeout);
rv = send_file();
if (rv == 0 && verify) {
uint64_t isize = 0;
uint32_t mt = 0;
......@@ -241,7 +311,7 @@ static void
parse_args(int argc, char **argv)
{
int ch;
while ((ch = getopt(argc, argv, "S:p:F:Q:sb:T:NP:")) != -1) {
while ((ch = getopt(argc, argv, "S:p:F:Q:sb:I:T:NP:o")) != -1) {
switch (ch) {
case 'S':
mshost = optarg;
......@@ -264,6 +334,14 @@ parse_args(int argc, char **argv)
exit(1);
}
break;
case 'I':
idletimeout = atoi(optarg);
if (idletimeout < 0 || idletimeout > (24 * 60 * 60)) {
fprintf(stderr, "Invalid idle timeout %d\n",
idletimeout);
exit(1);
}
break;
case 'T':
timeout = atoi(optarg);
if (timeout < 0 || timeout > (24 * 60 * 60)) {
......@@ -291,6 +369,9 @@ parse_args(int argc, char **argv)
proxyip = ntohl(in.s_addr);
break;
}
case 'o':
dots++;
break;
default:
break;
}
......@@ -342,12 +423,55 @@ usage(void)
exit(1);
}
static sigjmp_buf toenv;
static struct timeval sstamp;
static void
dodots(ssize_t cc)
{
static uint64_t total;
static int lastmb, dotcol;
struct timeval estamp;
int count, newmb;
if (cc < 0) {
while (dotcol++ <= 66)
fputc(' ', stderr);
gettimeofday(&estamp, 0);
estamp.tv_sec -= sstamp.tv_sec;
fprintf(stderr, "%4ld %6u\n",
(long)estamp.tv_sec, (unsigned)(total / 1000000));
}
total += cc;
newmb = (total + cc) / 1000000;
if ((count = newmb - lastmb) <= 0)
return;
lastmb = newmb;
while (count-- > 0) {
fputc('.', stderr);
if (dotcol++ > 65) {
gettimeofday(&estamp, 0);
estamp.tv_sec -= sstamp.tv_sec;
fprintf(stderr, "%4ld %6u\n",
(long)estamp.tv_sec,
(unsigned)(total / 1000000));
dotcol = 0;
}
}
}
static sigjmp_buf toenv, bpenv;
static void
send_timeout(int sig)
send_signal(int sig)
{
siglongjmp(toenv, 1);
if (sig == SIGALRM)
siglongjmp(toenv, 1);
if (sig == SIGPIPE)
siglongjmp(bpenv, 1);
}
/*
......@@ -361,22 +485,25 @@ send_file(void)
char * volatile rbuf = NULL;
volatile int fd = -1;
volatile uint64_t remaining = filesize;
struct timeval st, et;
struct timeval et;
int rv = 1;
char *stat;
gettimeofday(&sstamp, NULL);
if (timeout > 0) {
struct itimerval it;
if (sigsetjmp(toenv, 1)) {
signal(SIGALRM, SIG_DFL);
rv = 2;
goto done;
}
it.it_value.tv_sec = timeout;
it.it_value.tv_usec = 0;
it.it_interval = it.it_value;
it.it_interval.tv_sec = it.it_interval.tv_usec = 0;
signal(SIGALRM, send_timeout);
signal(SIGALRM, send_signal);
setitimer(ITIMER_REAL, &it, NULL);
}
......@@ -396,14 +523,24 @@ send_file(void)
goto done;
}
conn = conn_open(ntohl(serverip.s_addr), portnum, usessl);
/*
* Catch broken pipe so we can report gracefully
*/
signal(SIGPIPE, send_signal);
if (sigsetjmp(bpenv, 1)) {
signal(SIGPIPE, SIG_DFL);
rv = 3;
goto done;
}
conn = conn_open(ntohl(serverip.s_addr), portnum, usessl,
idletimeout, idletimeout);
if (conn == NULL) {
FrisError("Could not open connection with server %s:%d",
inet_ntoa(serverip), portnum);
goto done;
}
gettimeofday(&st, NULL);
while (filesize == 0 || remaining > 0) {
ssize_t cc, ncc;
......@@ -421,12 +558,23 @@ send_file(void)
cc = conn_write(conn, rbuf, ncc);
if (cc < 0) {
FrisPwarning("socket write");
if (conn_timeout(conn))
rv = 2;