Commit 572da456 authored by Timothy Stack's avatar Timothy Stack
Browse files

Hack loghole to retry any failed rsync's, for example, if an ssh

failed because of an NFS hiccup.  Also added a little activity
display.
parent 26ff11b6
......@@ -222,11 +222,23 @@ Create a directory for each link.
Setup symbolic links to the pcap(3) files retrieved from the delay nodes.
.TP
3.
Run the pre-shaped pcap(3) files (e.g. .recv.0) through tcptrace(1) to generate
Run the pre-shaped pcap(3) files (e.g. *.recv.0) through tcptrace(1) to generate
graphs viewable with xplot(1). Note that the processing will only be done for
pcap files that were generated by a SNAPSHOT event sent to the tracemon agent.
The "-s" option is provided to automatically send this event for each agent.
.P
While the download is in progress,
.B loghole
will display some simple statistics. The left-hand side of the display shows
the number of nodes remaining to be synced, in progress, and completed. The
right-hand side shows minimum, average, and maximum amount of time needed to
sync a node. Finally, a "spinner" on the far right is updated when the
currently active rsync log files have grown, which usually happens when more
files are being synced.
.P
If rsync(1) encounters an error while running, it will automatically be rerun
after a short delay.
.P
Optional arguments:
.TP
\fB-r\fR, \fB--remote\fR=\fIremotedir
......
......@@ -40,6 +40,7 @@ XMLRPC_PORT = 3069
RSYNC = "@RSYNC@"
ACCEPTABLE_RSYNC_STATUS = [ 0 ]
NONRETRIABLE_RSYNC_STATUS = [ 0, 1, 2, 4 ]
DIRS = {
"proj" : "proj",
......@@ -94,6 +95,10 @@ SERVER = None
OSIDS = {}
JOB_TRIES = 3
JOB_PROGRESS = [ "-", "\\", "|", "/" ]
JOB_PROGRESS_INDEX = 0
if len(XMLRPC_SERVER) == 0:
sys.stderr.write("internal error: XMLRPC_SERVER is empty\n")
sys.exit(2)
......@@ -895,7 +900,8 @@ def do_show(args):
# @param *args The list of directories to sync.
# @retval The exit status of the rsync command-line.
#
def rsync(host, base, dlpath, paths = ["/"], port = None, output = False):
def rsync(host, base, dlpath, paths = ["/"], port = None, output = False,
delay = 0):
global RSYNC, HOME, DEBUG
retval = 0
......@@ -937,6 +943,9 @@ def rsync(host, base, dlpath, paths = ["/"], port = None, output = False):
pass
else:
try:
if delay > 0:
time.sleep(delay)
pass
os.close(0)
os.open("/dev/null", os.O_RDONLY)
if output:
......@@ -958,7 +967,7 @@ def rsync(host, base, dlpath, paths = ["/"], port = None, output = False):
return retval
def rmstar(host, port, *args):
def rmstar(host, port, delay, *args):
global HOME, DEBUG
retval = 0
......@@ -982,6 +991,9 @@ def rmstar(host, port, *args):
# Parent
pass
else:
if delay > 0:
time.sleep(delay)
pass
os.execve("/usr/bin/ssh", cmd, os.environ)
os._exit(127)
pass
......@@ -999,12 +1011,12 @@ def print_status(msg):
return
def print_job_status(hosts, jobs, hoststatus, hoststart):
def print_job_status(mode, hosts, jobs, hoststatus, hoststart):
if not sys.stdout.isatty() or VERBOSITY < VERBOSITY_HUMAN:
return
lmsg = "nodes todo/doing/done %d/%d/%d "
rmsg = "sync time min/avg/max: %.2f/%.2f/%.2f secs"
rmsg = mode + " time min/avg/max: %.2f/%.2f/%.2f secs"
hosttimes = map(lambda x:
hoststatus.get(x, (time.time(),))[0] - hoststart[x],
......@@ -1025,14 +1037,40 @@ def print_job_status(hosts, jobs, hoststatus, hoststart):
min(hosttimes),
avg,
max(hosttimes))
line_len = len(lmsg) + len(rmsg)
line_len = len(lmsg) + len(rmsg) + 2
if line_len < 79:
lmsg += " " * (79 - line_len)
pass
print_status(lmsg + rmsg + "\r")
print_status(lmsg + rmsg + " "
+ JOB_PROGRESS[JOB_PROGRESS_INDEX % len(JOB_PROGRESS)] + "\r")
return
##
# Detect changes in the rsync log file sizes.
#
# @param jobs The hashtable of jobs
# @param logsize The hashtable of previous log file sizes.
# @param logdir The root directory for log files.
# @return True if the size of any of the log files has changed.
#
def making_progress(jobs, logsize, logdir):
retval = False
for pid in jobs:
phost, vhost = jobs[pid]
try:
st = os.stat(os.path.join(logdir, vhost, ".rsync.out"))
if st[stat.ST_SIZE] > logsize[pid]:
retval = True
logsize[pid] = st[stat.ST_SIZE]
pass
pass
except OSError:
pass
pass
return retval
##
# Trace preprocessing.
#
......@@ -1127,11 +1165,13 @@ def trace_post(link2trace):
# Performs the "sync" action, which synchronizes the logholes for a group of
# nodes.
#
# XXX This method is in dire need of a rewrite.
#
# @param args Action-specific command line arguments.
#
def do_sync(args, mode="sync", must_be_active=1):
from emulabclient import RESPONSE_SUCCESS
global OSIDS, SERVER, DOT_LOGHOLES
global OSIDS, SERVER, DOT_LOGHOLES, JOB_PROGRESS_INDEX
retval = 0
......@@ -1220,6 +1260,7 @@ def do_sync(args, mode="sync", must_be_active=1):
osinfo_method = getattr(SERVER, "osid.info")
ssh_method = getattr(SERVER, "node.sshdescription")
# Get any bits of information we need from the RPC server.
physical = info_method(PACKAGE_VERSION, {
"proj" : PID,
"exp" : EID,
......@@ -1252,13 +1293,15 @@ def do_sync(args, mode="sync", must_be_active=1):
if snapshot_traces:
trace_pre(traces)
pass
# Separate the nodes into those that are up
hosts = [(x["node"], x["name"]) for x in physical.values()
if ((x["erole"] == "virthost" or
x["erole"] == "node" or
x["erole"] == "delaynode") and
(x["status"] == "up" and x["eventstatus"] == "ISUP"))]
# ... or down.
downhosts = [(x["node"], x["name"]) for x in physical.values()
if ((x["erole"] == "virthost" or
x["erole"] == "node" or
......@@ -1284,10 +1327,19 @@ def do_sync(args, mode="sync", must_be_active=1):
jobs = {}
hoststatus = {}
hoststart = {}
for phost, vhost in hosts:
hosttries = {}
logsize = {}
while len(hosts) > 0 or len(jobs) > 0:
if len(hosts) > 0:
ht = hosts.pop()
phost, vhost = ht
pass
else:
phost = ""
vhost = ""
pass
if (vhost in node_list) or (phost in node_list):
print_job_status(node_list, jobs, hoststatus, hoststart)
# Start a job for the next host.
osid = physical[vhost]["osid"]
if osid not in OSIDS:
oi = osinfo_method(PACKAGE_VERSION, {
......@@ -1300,6 +1352,7 @@ def do_sync(args, mode="sync", must_be_active=1):
pass
pass
# Make sure we can talk to it.
if not OSIDS[osid]:
sys.stderr.write("warning: unknown OSID %s for %s\n" %
(physical[vhost]["osid"], vhost))
......@@ -1315,55 +1368,81 @@ def do_sync(args, mode="sync", must_be_active=1):
continue
pass
# Start the job.
if ht in hosttries:
delay = 3
pass
else:
delay = 0
pass
if mode == "sync":
pid = rsync(phost, os.path.join(logdir, vhost),
dlhandle.name,
port = ssh_data[phost].get("port", None),
output = True)
output = True,
delay = delay)
pass
elif mode == "clean":
pid = rmstar(phost, ssh_data[phost].get("port", None),
pid = rmstar(phost,
ssh_data[phost].get("port", None),
delay,
"/local/logs/*")
pass
else:
assert 0
jobs[pid] = vhost
hoststart[vhost] = time.time()
jobs[pid] = ht
logsize[pid] = 0
hoststart[ht] = time.time()
try:
while len(jobs) == PARALLELIZATION:
pid, sts = os.waitpid(0, 0)
if pid in jobs:
if ht not in hosttries:
hosttries[ht] = 0
pass
hosttries[ht] += 1
pass
try:
# Wait for the children and update the status.
while len(jobs) == PARALLELIZATION or len(hosts) == 0:
print_job_status(mode, node_list, jobs,
hoststatus, hoststart)
# Check for any changes in the log files.
if (mode == "sync" and
making_progress(jobs, logsize, logdir)):
JOB_PROGRESS_INDEX += 1
pass
# Check for processes that have ended.
pid, sts = os.waitpid(0, os.WNOHANG)
if pid in jobs:
# Retry on some errors
if (mode == "clean" and sts == 255 and
hosttries[jobs[pid]] <= JOB_TRIES):
hosts.append(jobs[pid])
pass
elif (mode == "sync" and
sts not in NONRETRIABLE_RSYNC_STATUS and
hosttries[jobs[pid]] <= JOB_TRIES):
hosts.append(jobs[pid])
pass
else:
hoststatus[jobs[pid]] = (time.time(), sts)
del jobs[pid]
pass
del jobs[pid]
pass
pass
except OSError:
pass
pass
pass
try:
while len(jobs) > 0:
pid, sts = os.waitpid(0, 0)
if pid in jobs:
hoststatus[jobs[pid]] = (time.time(), sts)
del jobs[pid]
print_job_status(node_list,
jobs,
hoststatus,
hoststart)
if len(jobs) == PARALLELIZATION or len(hosts) == 0:
time.sleep(0.5)
pass
pass
pass
pass
except OSError:
except OSError:
pass
pass
sys.stdout.write("\n")
# Clue the user in if there were nodes that they specified nodes
# we don't know about.
if len(unknown_nodes) > 0:
for vhost in unknown_nodes:
if vhost in dvhosts or vhost in dphosts:
......@@ -1378,10 +1457,14 @@ def do_sync(args, mode="sync", must_be_active=1):
retval = 2
pass
for vhost, (etime, sts) in hoststatus.items():
# Check the status of the jobs and dump logs for any that failed.
for ht, (etime, sts) in hoststatus.items():
phost, vhost = ht
if VERBOSITY > VERBOSITY_HUMAN:
print "info: %s - %.2f" % (vhost, etime - hoststart[vhost])
print "info: %s - %.2f" % (vhost, etime - hoststart[ht])
pass
if mode == "clean":
continue
if sts not in ACCEPTABLE_RSYNC_STATUS:
sys.stderr.write("error: failed to %s node '%s'\n" %
(mode, vhost))
......@@ -1390,12 +1473,15 @@ def do_sync(args, mode="sync", must_be_active=1):
sys.stderr.write(
vhost + ": " + re.sub(r'\n',
"\n" + vhost + ": ",
eout[:-1]))
eout[:-1]) + eout[-1])
retval = 3
pass
else:
os.chdir(os.path.join(vhost, "var", "emulab", "logs"))
if os.path.exists("delayagent.debug"):
# Post process the delay-agent log files, if there are any.
vdir = os.path.join(vhost, "var", "emulab", "logs")
if (os.path.exists(vdir) and
os.path.exists("delayagent.debug")):
os.chdir(vdir)
os.system("%s %s" %
(DELAYLOG2XPLOT, "delayagent.debug"))
pass
......@@ -1441,6 +1527,8 @@ def do_sync(args, mode="sync", must_be_active=1):
pass
pass
# Add some links to the xplot log files created by
# delaylog2xplot
src = os.path.join(trace["delayvname"],
"var", "emulab", "logs",
"%s-%s-*.xpl" % (trace["linkvname"],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment