Commit 2e26280f authored by Timothy Stack's avatar Timothy Stack
Browse files

Print some stats when doing a sync, squelch some of rsync's output,

and a few other tweaks.  Also bumped parallelism from 10 to 25 rsyncs
at a time.
parent 9256faf3
......@@ -77,7 +77,7 @@ HOME = pw.pw_dir
CERTIFICATE = os.path.join(HOME, ".ssl", "emulab.pem")
PARALLELIZATION = 10
PARALLELIZATION = 25
DEBUG = 0
......@@ -100,6 +100,30 @@ if len(RSYNC) == 0:
sys.exit(2)
pass
IDENTITY = None
try:
auth = open(os.path.join(HOME, ".ssh", "authorized_keys"), "r").read()
for (pub, full) in ((os.path.join(HOME, ".ssh", "identity.pub"),
(1, os.path.join(HOME, ".ssh", "identity"))),
(os.path.join(HOME, ".ssh", "id_rsa.pub"),
(2, os.path.join(HOME, ".ssh", "id_rsa")))):
try:
idpub = open(pub, "r").read()
if auth.find(idpub) != -1:
IDENTITY = full
break
pass
except IOError:
pass
pass
pass
except IOError:
pass
def loghole_include(path):
retval = ""
......@@ -863,7 +887,7 @@ def do_show(args):
# @param *args The list of directories to sync.
# @retval The exit status of the rsync command-line.
#
def rsync(host, base, dlpath, paths = ["/"], port = None):
def rsync(host, base, dlpath, paths = ["/"], port = None, output = False):
global RSYNC, HOME, DEBUG
retval = 0
......@@ -876,6 +900,7 @@ def rsync(host, base, dlpath, paths = ["/"], port = None):
pass
cmd = [RSYNC,
"-rptgoDlz",
"-v", # XXX
"--copy-unsafe-links",
"--include-from=" + dlpath]
if VERBOSITY > VERBOSITY_HUMAN:
......@@ -884,6 +909,9 @@ def rsync(host, base, dlpath, paths = ["/"], port = None):
ssh_args = ("ssh -o BatchMode=yes -o StrictHostKeyChecking=no "
+ "-o UserKnownHostsFile="
+ os.path.join(HOME, ".ssh", "emulab_known_hosts"))
if IDENTITY:
ssh_args += " -o Protocol=%d -i %s" % IDENTITY
pass
if port:
ssh_args += " -p " + str(port)
pass
......@@ -900,10 +928,22 @@ def rsync(host, base, dlpath, paths = ["/"], port = None):
# Parent
pass
else:
os.close(0)
fd = os.open("/dev/null", os.O_RDONLY)
os.execve(RSYNC, cmd, os.environ)
os._exit(127)
try:
os.close(0)
os.open("/dev/null", os.O_RDONLY)
if output:
os.close(1)
fd = os.open(os.path.join(base, ".rsync.out"),
os.O_CREAT|os.O_TRUNC|os.O_WRONLY,
0664)
os.dup2(fd, 2)
pass
os.execve(RSYNC, cmd, os.environ)
os._exit(127)
pass
except:
sys.exit(1)
pass
pass
pass
pass
......@@ -941,7 +981,41 @@ def rmstar(host, port, *args):
pass
return retval
def print_status(hosts, jobs, hoststatus, hoststart):
if not sys.stdout.isatty() or VERBOSITY < VERBOSITY_HUMAN:
return
lmsg = "nodes todo/doing/done %d/%d/%d "
rmsg = "sync time min/avg/max: %.2f/%.2f/%.2f secs"
hosttimes = map(lambda x:
hoststatus.get(x, (time.time(),))[0] - hoststart[x],
hoststart.keys())
if len(hosttimes) == 0:
hosttimes = [0]
avg = 0.0
pass
else:
avg = sum(hosttimes) / len(hosttimes)
pass
lmsg = lmsg % (
len(hosts) - len(hoststatus) - len(jobs),
len(jobs),
len(hoststatus))
rmsg = rmsg % (
min(hosttimes),
avg,
max(hosttimes))
line_len = len(lmsg) + len(rmsg)
if line_len < 79:
lmsg += " " * (79 - line_len)
pass
sys.stdout.write(lmsg + rmsg + "\r")
sys.stdout.flush()
return
##
# Performs the "sync" action, which synchronizes the logholes for a group of
......@@ -1043,9 +1117,15 @@ def do_sync(args, mode="sync", must_be_active=1):
"exp" : EID,
"aspect" : "traces",
})
ssh_data = ssh_method(PACKAGE_VERSION, {
"proj" : PID,
"exp" : EID
})
if ((physical["code"] != RESPONSE_SUCCESS) or
(traces["code"] != RESPONSE_SUCCESS)):
(traces["code"] != RESPONSE_SUCCESS) or
(ssh_data["code"] != RESPONSE_SUCCESS)):
sys.stderr.write("error: xmlrpc failed - %s\n" %
(physical["output"],))
retval = 1
......@@ -1053,6 +1133,7 @@ def do_sync(args, mode="sync", must_be_active=1):
else:
physical = physical["value"]
traces = traces["value"]
ssh_data = ssh_data["value"]
hosts = [(x["node"], x["name"]) for x in physical.values()
if ((x["erole"] == "virthost" or
......@@ -1060,10 +1141,23 @@ def do_sync(args, mode="sync", must_be_active=1):
x["erole"] == "delaynode") and
(x["status"] == "up" and x["eventstatus"] == "ISUP"))]
downhosts = [(x["node"], x["name"]) for x in physical.values()
if ((x["erole"] == "virthost" or
x["erole"] == "node" or
x["erole"] == "delaynode") and
(x["status"] != "up" or x["eventstatus"] != "ISUP"))]
phosts = sets.Set([x[0] for x in hosts])
vhosts = sets.Set([x[1] for x in hosts])
dphosts = sets.Set([x[0] for x in downhosts])
dvhosts = sets.Set([x[1] for x in downhosts])
if len(node_list) == 0:
node_list = vhosts
for host in downhosts:
sys.stderr.write("warning: '%s' is down, its logs will "
"not be retrieved\n" % (host[1],))
pass
pass
known_nodes = sets.Set(node_list) & (phosts | vhosts)
......@@ -1071,8 +1165,11 @@ def do_sync(args, mode="sync", must_be_active=1):
jobs = {}
hoststatus = {}
hoststart = {}
for phost, vhost in hosts:
if (vhost in node_list) or (phost in node_list):
print_status(node_list, jobs, hoststatus, hoststart)
osid = physical[vhost]["osid"]
if osid not in OSIDS:
oi = osinfo_method(PACKAGE_VERSION, {
......@@ -1100,32 +1197,27 @@ def do_sync(args, mode="sync", must_be_active=1):
continue
pass
sd = ssh_method(PACKAGE_VERSION, { "node" : phost })
if sd["code"] != RESPONSE_SUCCESS:
sys.stderr.write("error: xmlrpc failed - %s\n" %
(sd["output"],))
retval = 1
return retval
sd = sd["value"]
if mode == "sync":
pid = rsync(phost, os.path.join(logdir, vhost),
dlhandle.name,
port = sd.get("port", None))
port = ssh_data[phost].get("port", None),
output = True)
pass
elif mode == "clean":
pid = rmstar(phost, sd.get("port", None),
pid = rmstar(phost, ssh_data[phost].get("port", None),
"/local/logs/*")
pass
else:
assert 0
jobs[pid] = vhost
hoststart[vhost] = time.time()
try:
while len(jobs) == PARALLELIZATION:
pid, sts = os.waitpid(0, 0)
if pid in jobs:
hoststatus[jobs[pid]] = sts
hoststatus[jobs[pid]] = (time.time(), sts)
del jobs[pid]
pass
pass
......@@ -1136,25 +1228,48 @@ def do_sync(args, mode="sync", must_be_active=1):
pass
try:
for job in jobs:
pid, sts = os.waitpid(job, 0)
hoststatus[jobs[pid]] = sts
while len(jobs) > 0:
pid, sts = os.waitpid(0, 0)
if pid in jobs:
hoststatus[jobs[pid]] = (time.time(), sts)
del jobs[pid]
print_status(node_list, jobs, hoststatus, hoststart)
pass
pass
pass
except OSError:
pass
sys.stdout.write("\n")
if len(unknown_nodes) > 0:
for vhost in unknown_nodes:
sys.stderr.write("error: unknown node - %s\n" % vhost)
if vhost in dvhosts or vhost in dphosts:
sys.stderr.write("error: node '%s' is down\n" %
(vhost,))
pass
else:
sys.stderr.write("error: unknown node - %s\n" %
(vhost,))
pass
pass
retval = 2
pass
for vhost, sts in hoststatus.items():
for vhost, (etime, sts) in hoststatus.items():
if VERBOSITY > VERBOSITY_HUMAN:
print "info: %s - %.2f" % (vhost, etime - hoststart[vhost])
pass
if sts not in ACCEPTABLE_RSYNC_STATUS:
sys.stderr.write("error: failed to %s node '%s'\n" %
(mode, vhost))
eout = open(os.path.join(logdir, vhost, ".rsync.out"),
"r").read()
sys.stderr.write(
vhost + ": " + re.sub(r'\n',
"\n" + vhost + ": ",
eout[:-1]))
retval = 3
pass
pass
......@@ -1806,13 +1921,18 @@ except getopt.error, e:
sys.exit(2)
pass
for PID, EID in exp_list:
if DEBUG:
sys.stderr.write("loghole: pid/eid - " + PID + "/" + EID + "\n")
pass
rc = ACTIONS[action][0](req_args[1:])
if rc != 0:
sys.exit(rc)
try:
for PID, EID in exp_list:
if DEBUG:
sys.stderr.write("loghole: pid/eid - " + PID + "/" + EID + "\n")
pass
rc = ACTIONS[action][0](req_args[1:])
if rc != 0:
sys.exit(rc)
pass
pass
pass
except KeyboardInterrupt:
sys.exit(1)
pass
......@@ -1030,7 +1030,7 @@ class experiment:
expdata = eid;
pass
elif format == "full":
expdata = {
expdata = scrubdict({
"pid" : pid,
"gid" : gid,
"name" : eid,
......@@ -1040,11 +1040,11 @@ class experiment:
"minimum_nodes" : minimum_nodes,
"maximum_nodes" : maximum_nodes,
"actual_nodes" : actual_nodes,
}
})
pass
# ... append it to the group list.
result[pid][gid].append(scrubdict(expdata))
result[pid][gid].append(expdata)
pass
return EmulabResponse(RESPONSE_SUCCESS,
......@@ -1357,6 +1357,63 @@ class experiment:
return EmulabResponse(RESPONSE_SUCCESS, output=output)
#
# Get textual info from tbreport and send back as string
#
def history(self, version, argdict):
if version != self.VERSION:
return EmulabResponse(RESPONSE_BADVERSION,
output="Client version mismatch!")
if self.readonly:
return EmulabResponse(RESPONSE_FORBIDDEN,
output="Insufficient privledge to invoke method")
try:
checknologins()
pass
except NoLoginsError, e:
return EmulabResponse(RESPONSE_REFUSED, output=str(e))
argerror = CheckRequiredArgs(argdict, ("proj", "exp"))
if (argerror):
return argerror
#
# Check permission. This will check proj/exp for illegal chars.
#
permerror = CheckExptPermission(self.uid,
argdict["proj"], argdict["exp"])
if (permerror):
return permerror
res = {}
dbres = DBQuery(
"SELECT * FROM experiment_stats "
"WHERE pid=%s and eid=%s ORDER by exptidx desc",
(argdict["proj"], argdict["exp"]),
asDict=True)
if len(dbres) == 0:
return EmulabResponse(RESPONSE_ERROR,
output="No such experiment!")
res["stats"] = scrubdict(dbres[0])
dbres = DBQuery(
"SELECT er.*,ts.start_time,ts.end_time,ts.action,ts.idx as tidx "
"FROM experiments as e "
"LEFT JOIN experiment_resources as er on e.idx=er.exptidx "
"LEFT JOIN testbed_stats as ts on ts.rsrcidx=er.idx "
"WHERE e.pid=%s and e.eid=%s",
(argdict["proj"], argdict["exp"]),
asDict=True)
for er in dbres:
er["thumbnail"] = xmlrpclib.Binary(er["thumbnail"])
pass
res["resources"] = [scrubdict(x) for x in dbres]
return EmulabResponse(RESPONSE_SUCCESS, value=res)
#
# Get textual info from tbreport and send back as string
#
......@@ -3207,9 +3264,10 @@ class node:
return EmulabResponse(RESPONSE_BADVERSION,
output="Client version mismatch!")
argerror = CheckRequiredArgs(argdict, ("node",))
if (argerror):
return argerror
argerror1 = CheckRequiredArgs(argdict, ("node",))
argerror2 = CheckRequiredArgs(argdict, ("proj", "exp"))
if (argerror1 and argerror2):
return argerror1
try:
checknologins()
......@@ -3217,24 +3275,55 @@ class node:
except NoLoginsError, e:
return EmulabResponse(RESPONSE_REFUSED, output=str(e))
if not re.match("^[-\w]*$", str(argdict["node"])):
if (argdict.has_key("node") and
not re.match("^[-\w]*$", str(argdict["node"]))):
return EmulabResponse(RESPONSE_BADARGS,
output="Improperly formed node value!")
elif argdict.has_key("node"):
# XXX Refactor the trust stuff
res = DBQueryFatal("SELECT e.pid,e.gid,e.eid FROM reserved AS r "
"left join experiments as e on "
" e.pid=r.pid and e.eid=r.eid "
"WHERE r.node_id=%s",
(argdict["node"],))
if len(res) == 0:
return EmulabResponse(RESPONSE_ERROR,
output="No such node: " +
argdict["node"])
pid = res[0][0]
gid = res[0][1]
eid = res[0][2]
clause = "n.node_id=%s"
clause_args = (argdict["node"],)
pass
# XXX Refactor the trust stuff
res = DBQueryFatal("SELECT e.pid,e.gid FROM reserved AS r "
"left join experiments as e on "
" e.pid=r.pid and e.eid=r.eid "
"WHERE r.node_id=%s",
(argdict["node"],))
if len(res) == 0:
return EmulabResponse(RESPONSE_ERROR,
output="No such node: " + argdict["node"])
if (argdict.has_key("proj") and
not (re.match("^[-\w]*$", argdict["proj"]) and
re.match("^[-\w]*$", argdict["exp"]))):
return EmulabResponse(RESPONSE_BADARGS,
output="Improperly formed proj/exp!")
elif argdict.has_key("proj"):
pid = argdict["proj"]
eid = argdict["exp"]
res = DBQueryFatal("SELECT gid,state FROM experiments "
"WHERE pid=%s and eid=%s",
(pid, eid))
if len(res) == 0:
return EmulabResponse(RESPONSE_ERROR,
output="No such experiment: %s/%s" %
(pid, eid))
if res[0][1] not in ("activating", "active", "modify_reparse"):
return EmulabResponse(RESPONSE_ERROR,
output="Experiment is not active")
gid = res[0][0]
clause = "r.pid=%s and r.eid=%s"
clause_args = (pid, eid)
pass
trust = DBQueryFatal("SELECT trust FROM group_membership "
"WHERE uid=%s and pid=%s and gid=%s",
(self.uid, res[0][0], res[0][1]))
(self.uid, pid, gid))
if len(trust) == 0:
return EmulabResponse(
......@@ -3242,41 +3331,50 @@ class node:
output=("You do not have permission to access: "
+ argdict["node"]))
res = DBQueryFatal("select n.jailflag,n.sshdport, "
res = DBQueryFatal("select n.node_id,n.jailflag,n.sshdport, "
" r.vname,r.pid,r.eid, "
" t.isvirtnode,t.isremotenode,t.isplabdslice "
" from nodes as n "
"left join reserved as r on n.node_id=r.node_id "
"left join node_types as t on t.type=n.type "
"where n.node_id=%s",
(argdict["node"],))
"where " + clause,
clause_args)
if len(res) == 0:
return EmulabResponse(RESPONSE_ERROR,
output="No such node: " + argdict["node"])
jailflag = res[0][0]
sshdport = res[0][1]
vname = res[0][2]
pid = res[0][3]
eid = res[0][4]
isvirt = res[0][5]
isremote = res[0][6]
isplab = res[0][7]
result = {
"hostname" : vname + "." + eid + "." + pid + "." + OURDOMAIN,
}
if isvirt:
if isremote:
if jailflag or isplab:
result["port"] = sshdport
result = {}
for node in res:
node_id = node[0]
jailflag = node[1]
sshdport = node[2]
vname = node[3]
pid = node[4]
eid = node[5]
isvirt = node[6]
isremote = node[7]
isplab = node[8]
node_data = {
"hostname" : vname + "." + eid + "." + pid + "." + OURDOMAIN,
}
if isvirt:
if isremote:
if jailflag or isplab:
node_data["port"] = sshdport
pass
pass
else:
node_data["gateway"] = USERNODE
pass
pass
else:
result["gateway"] = USERNODE
pass
result[node_id] = node_data
pass
if argdict.has_key("node"):
result = result[argdict["node"]]
pass
return EmulabResponse(RESPONSE_SUCCESS,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment