Commit 82024d06 authored by Timothy Stack's avatar Timothy Stack
Browse files

Another checkpoint of nfs tracing stuff. Finally flogged myself with

the sql cluestick, so the queries shouldn't be as bad as before.
parent 3813b0da
/*
* $Id: nfs_v2.c,v 1.2 2005-11-30 17:30:57 stack Exp $
* $Id: nfs_v2.c,v 1.3 2005-12-02 00:43:29 stack Exp $
*/
#ifdef HAVE_CONFIG_H
......@@ -208,7 +208,7 @@ int nfs_v2_print_call (u_int32_t op, u_int32_t xid,
}
if (new_p == NULL) {
fprintf (OutFile, "+|+");
fprintf (OutFile, " error 1 ");
}
return (0);
......
/*
* $Id: nfs_v3.c,v 1.2 2005-11-30 17:30:57 stack Exp $
* $Id: nfs_v3.c,v 1.3 2005-12-02 00:43:29 stack Exp $
*
*/
......@@ -505,7 +505,7 @@ int nfs_v3_print_call (u_int32_t op, u_int32_t xid,
xdr_destroy (&xdr);
if (! got_all) {
fprintf (OutFile, "SHORT PACKET");
fprintf (OutFile, " error 1 ");
}
return (rc);
......@@ -1069,7 +1069,7 @@ u_int32_t *print_fn3 (u_int32_t *p, u_int32_t *e, int print)
}
if (e < (u_int32_t *) (str + tot_len)) {
fprintf (OutFile, " ... ");
fprintf (OutFile, " \"...\" ");
return (NULL);
}
......
This diff is collapsed.
......@@ -31,6 +31,7 @@ export PYTHONUNBUFFERED
case "$1" in
start)
rm -f $ND_TRACES
echo -n "Starting nfstrace daemons:"
${AS_NT} \
${INSTALL_SBIN}/nfsdump $ND_FLAGS $ND_FILTER \
......
#! /usr/bin/env python
#
# nfstrace.proxy - Tool used to query the nfstrace infrastructure.
#
import re
import sys
import time
import math
import socket
import getopt
import os, os.path
......@@ -20,56 +25,101 @@ except IOError:
sys.exit(1)
pass
if os.path.exists("/var/nfstrace/maint"):
MAINT_PATH = "/var/nfstrace/maint"
if os.path.exists(MAINT_PATH):
# sys.stderr.write("not available\n")
sys.exit(0)
# sys.exit(0)
pass
VERBOSITY = 1
# Tables that should be cleaned when a node transitions from one experiment
# to another.
GC_TABLES = [
"node_ids",
"mounts",
"mount_replies",
"lookups",
"lookup_replies",
"reads",
"writes",
"readlinks",
"readlink_replies",
"creates",
"create_replies",
"mkdirs",
"mkdir_replies",
"mknods",
"removes",
"remove_replies",
"rmdirs",
"rmdir_replies",
"renames",
"rename_replies",
"file_access",
"link_access",
"file_dropped", ]
def human_readable(n, suffix='B', places=2):
'''Return a human friendly approximation of n, using SI prefixes'''
prefixes = [' ','K','M','G','T']
base, step, limit = 10, 3, 100
if n == 0:
magnitude = 0 #cannot take log(0)
else:
magnitude = math.log(n, base)
order = int(round(magnitude)) // step
return '%.1f %s%s' % (float(n)/base**(order*step), \
prefixes[order], suffix)
def usage():
print "Usage: nfstrace.proxy [-h] <action> [...]"
print
print "Query the nfstrace infrastructure."
print
print "Optional arguments:"
print " -h, --help Print this message, or the action's"
print " usage message."
print
print "Actions:"
print " gc Garbage collect old data and update the node_ids table."
print " get Get the list of files accessed by an experiment."
print " stats Print out NFS statistics."
return
##
# Print the usage statement for the "gc" action.
#
def gc_usage():
print "Usage: nfstrace gc <host0> [<host1> ...]"
print "Usage: nfstrace gc [-e pid/eid] <host0> [<host1> ...]"
print
print "Garbage collect old NFS trace data for the given hosts and update"
print "the node_ids table."
print
print "Garbage collect old NFS trace data."
print "Optional arguments:"
print " -e, --experiment=PID/EID"
print " Specify a project ID and experiment ID for the nodes."
print
print "Examples:"
print " $ nfstrace gc pc10 pc11"
print " $ nfstrace gc -e foo/bar node0 node1"
return
##
# Print the usage statement for the "get" action.
#
def get_usage():
print "Usage: nfstrace get <host0> [<host1> ...]"
print "Usage: nfstrace get [-m sdir:cdir] <pid> <eid>"
print
print "Get a file listing for the given hosts."
print "Get a file listing for the given experiment."
print
print "Optional arguments:"
print " -m, --mount=SDIR:CDIR"
print " Add a mount mapping to use when translating links with"
print " absolute paths. This option can be used multiple times."
print
print "Examples:"
print " $ nfstrace get pc10 pc11"
print " $ nfstrace get -m /q/proj/foo:/proj/foo foo bar"
return
##
......@@ -78,17 +128,23 @@ def get_usage():
def stats_usage():
print "Usage: nfstrace stats"
print
print "Print out statistics."
print "Print out statistics on NFS traffic."
print
print "Examples:"
print " $ nfstrace stats"
return
def resolve_hosts(hosts):
def resolve_hosts(pid, eid, hosts):
retval = []
for lpc in range(0, len(hosts)):
try:
retval.append(socket.gethostbyname(hosts[lpc]))
if pid and eid:
name = hosts[lpc] + "." + eid + "." + pid
pass
else:
name = hosts[lpc]
pass
retval.append(socket.gethostbyname(name))
pass
except socket.gaierror, e:
raise getopt.error(hosts[lpc] + ":" + e.args[1])
......@@ -104,23 +160,32 @@ def resolve_hosts(hosts):
def do_gc(args):
retval = 0
pid = None
eid = None
try:
opts, args = getopt.getopt(args, "", [])
opts, args = getopt.getopt(args, "e:", [ "experiment=" ])
for opt, val in opts:
if opt in ("-n", "--dry-run"):
if opt in ("-e", "--experiment"):
l = val.split('/')
if len(l) != 2:
raise getopt.error("Invalid experiment name: " + val)
pid, eid = l
pass
pass
if len(args) == 0:
raise getopt.error("Not enough arguments")
ips = resolve_hosts(args)
ips = resolve_hosts(pid, eid, args)
pass
except getopt.error, e:
print e.args[0]
gc_usage()
return 2
cur.execute("DELETE FROM node_ids WHERE pid=%s and eid=%s", (pid, eid))
for tab in GC_TABLES:
cur.execute("DELETE FROM " + tab + " where node_ip in "
"(" + ",".join(("%s",) * len(ips)) + ")", ips)
......@@ -128,9 +193,9 @@ def do_gc(args):
for lpc in range(0, len(ips)):
if args[lpc] != ips[lpc]:
cur.execute("REPLACE INTO node_ids (node_id, node_ip) VALUES "
"(%s,%s)",
(args[lpc], ips[lpc]))
cur.execute("REPLACE INTO node_ids (pid, eid, node_id, node_ip) "
"VALUES (%s,%s,%s,%s)",
(pid, eid, args[lpc], ips[lpc]))
pass
pass
......@@ -138,6 +203,135 @@ def do_gc(args):
return retval
class LRU:
"""
Implementation of a length-limited O(1) LRU queue.
Built for and used by PyPE:
http://pype.sourceforge.net
Copyright 2003 Josiah Carlson.
"""
class Node:
def __init__(self, prev, me):
self.prev = prev
self.me = me
self.next = None
def __init__(self, count, pairs=[]):
self.count = max(count, 1)
self.d = {}
self.first = None
self.last = None
for key, value in pairs:
self[key] = value
def __contains__(self, obj):
return obj in self.d
def __getitem__(self, obj):
a = self.d[obj].me
self[a[0]] = a[1]
return a[1]
def __setitem__(self, obj, val):
if obj in self.d:
del self[obj]
nobj = self.Node(self.last, (obj, val))
if self.first is None:
self.first = nobj
if self.last:
self.last.next = nobj
self.last = nobj
self.d[obj] = nobj
if len(self.d) > self.count:
if self.first == self.last:
self.first = None
self.last = None
return
a = self.first
a.next.prev = None
self.first = a.next
a.next = None
del self.d[a.me[0]]
del a
def __delitem__(self, obj):
nobj = self.d[obj]
if nobj.prev:
nobj.prev.next = nobj.next
else:
self.first = nobj.next
if nobj.next:
nobj.next.prev = nobj.prev
else:
self.last = nobj.prev
del self.d[obj]
def __iter__(self):
cur = self.first
while cur != None:
cur2 = cur.next
yield cur.me[1]
cur = cur2
def iteritems(self):
cur = self.first
while cur != None:
cur2 = cur.next
yield cur.me
cur = cur2
def iterkeys(self):
return iter(self.d)
def itervalues(self):
for i,j in self.iteritems():
yield j
def keys(self):
return self.d.keys()
pass
# Cache for file handles we pull from the DB when doing a resolve.
fh_cache = LRU(1024)
def resolve_fh(fh, depth=0):
global fh_cache
retval = None
if depth > 50:
print "exceeded depth on " + fh
return (0, "<u:" + fh + ">")
# prefix = " " * depth
if fh in fh_cache:
fn = fh_cache[fh]
pass
else:
fn = None
pass
# print prefix + " cache " + `fn` + " " + fh
if fn:
retval = (fn[0], fn[1], fn[2])
pass
else:
cur.execute("SELECT parent,fn,valid from handle_map "
"WHERE fh=%s and fn!='.' "
"ORDER BY valid DESC",
(fh,))
res = cur.fetchone()
# print prefix + " lookup " + `res` + " " + fh
if res:
if res[0] == '':
retval = (1, res[1], res[2])
pass
else:
comp = resolve_fh(res[0], depth + 1)
retval = (comp[0], os.path.join(comp[1], res[1]),
res[2] and comp[2])
pass
pass
if not retval:
retval = (0, "<u:" + fh + ">", 1)
pass
fh_cache[fh] = retval
pass
return retval
##
# Performs the "get" action, which prints out the files accessed by a set of
# nodes.
......@@ -147,17 +341,26 @@ def do_gc(args):
def do_get(args):
retval = 0
mount_map = []
try:
opts, args = getopt.getopt(args, "", [])
opts, args = getopt.getopt(args, "m:", [ "map=" ])
for opt, val in opts:
if opt in ("-n", "--dry-run"):
if opt in ("-m", "--map="):
l = val.split(':')
if len(l) != 2:
raise getopt.error("Invalid mount map: " + val)
mount_map.append(l)
pass
pass
if len(args) == 0:
raise getopt.error("Not enough arguments")
args = resolve_hosts(args)
if len(args) < 2:
raise getopt.error("No pid/eid given.")
if len(args) > 2:
raise getopt.error("Too many arguments.")
pid = args[0]
eid = args[1]
pass
except getopt.error, e:
print e.args[0]
......@@ -165,25 +368,82 @@ def do_get(args):
return 2
missing = 0
cur.execute("select hm.fn,"
# Get the accessed links first. We don't print them out yet, instead we
# wait to see if the file they reference was accessed.
links = {}
used_links = {}
cur.execute("SELECT hm.parent,la.fh,la.fn,"
" IFNULL(MAX(la.last_access)>MAX(fd.last_remove),1) "
"FROM link_access as la "
"INNER JOIN node_ids as ni on (ni.node_ip=la.node_ip and "
" ni.pid=%s and ni.eid=%s) "
"LEFT JOIN file_dropped as fd on (la.fh=fd.fh and "
" ni.node_ip=fd.node_ip) "
"LEFT JOIN handle_map as hm on (hm.fh=la.fh) "
"GROUP BY hm.parent,la.fh,la.fn", (pid, eid))
for (parent,fh,link_fn,alive) in cur:
if not alive:
continue
lcomplete, lfn, lvalid = resolve_fh(fh)
complete, fn, valid = resolve_fh(parent)
if valid and lvalid:
full_fn = os.path.join(fn, link_fn)
full_fn = os.path.normpath(full_fn)
for sdir, cdir in mount_map:
if full_fn.startswith(cdir):
full_fn = full_fn.replace(cdir, sdir, 1)
break
pass
links[full_fn] = (lcomplete, lfn)
pass
pass
# Find all the accessed files and print them out.
seen = {}
dirs = {}
cur.execute("SELECT fa.fh,"
" IFNULL(MAX(fa.last_access)>MAX(fd.last_remove),1) "
" from file_access as fa "
"left join file_dropped as fd on fa.fh=fd.fh "
"left join handle_map as hm on hm.fh=fa.fh "
"where fa.node_ip in (" + ",".join(("%s",) * len(args))
+ ") group by hm.fn", args)
for (fn, alive) in cur:
if alive and fn:
"FROM file_access as fa "
"INNER JOIN node_ids as ni on (ni.node_ip=fa.node_ip and "
" ni.pid=%s and ni.eid=%s) "
"LEFT JOIN file_dropped as fd on (fa.fh=fd.fh and "
" ni.node_ip=fd.node_ip) "
"GROUP BY fa.fh", (pid, eid))
for (fh,alive) in cur:
if not alive:
continue
complete, fn, valid = resolve_fh(fh)
if valid and fn not in seen:
print fn
if fn in links:
used_links[links[fn]] = 1
pass
seen[fn] = 1
if not complete:
missing += 1
pass
# Check if the directory was accessed through a link.
dir, file = os.path.split(fn)
if dir in links:
used_links[links[dir]] = 1
pass
pass
elif not fn:
pass
# Finally, print out the used links.
for (complete, fn) in used_links.keys():
print fn
if not complete:
missing += 1
pass
pass
if missing > 0:
print `missing` + " unknown file(s) accessed."
sys.stderr.write(`missing` + " unknown file(s) accessed.\n")
pass
return retval
......@@ -219,34 +479,33 @@ def do_stats(args):
(over_last, row_count))
readers = cur.fetchall()
for (node_ip,count) in readers:
cur.execute("select node_id from node_ids where node_ip=%s",
cur.execute("select node_id,eid,pid from node_ids where node_ip=%s",
(node_ip,))
node_id = cur.fetchone()
if node_id:
node_id = node_id[0];
node_id = node_id[0] + "." + node_id[1] + "." + node_id[2];
pass
else:
he = socket.gethostbyaddr(node_ip)
node_id = he[0].split('.')[0]
pass
print " %8d\t%s" % (count, node_id)
print " %8d pkts\t %s" % (count, node_id)
pass
print "Top read files:"
cur.execute("select hm.fn,count(1) as rc from reads as r "
"left join handle_map as hm on hm.fh=r.fh "
"where timestamp > (UNIX_TIMESTAMP() - (60 * %s)) "
cur.execute("select r.fh,count(1) as rc from reads as r "
"where r.timestamp > (UNIX_TIMESTAMP() - (60 * %s)) "
"group by r.fh order by rc desc limit %s",
(over_last, row_count))
for (fn,count) in cur:
print " %8d\t%s" % (count, fn or "<unknown>")
for (fh,count) in cur:
print " %8d pkts\t %s" % (count, resolve_fh(fh)[1])
pass
print "Top writers:"
cur.execute("select w.node_ip,ni.node_id,count(1) as wc,sum(amount) "
"from writes as w "
"left join node_ids as ni on ni.node_ip=w.node_ip "
"where timestamp > (UNIX_TIMESTAMP() - (60 * %s)) "
"where w.timestamp > (UNIX_TIMESTAMP() - (60 * %s)) "
"group by w.node_ip order by wc desc limit %s",
(over_last, row_count))
for (node_ip,node_id,count,amount) in cur:
......@@ -254,17 +513,16 @@ def do_stats(args):
he = socket.gethostbyaddr(node_ip)
node_id = he[0].split('.')[0]
pass
print " %8d\t%16d\t%s" % (count, amount, node_id)
print " %8d pkts\t %8s\t%s" % (count, human_readable(amount), node_id)
pass
print "Top written files:"
cur.execute("select hm.fn,count(1) as wc from writes as w "
"left join handle_map as hm on hm.fh=w.fh "
"where timestamp > (UNIX_TIMESTAMP() - (60 * %s)) "
cur.execute("select w.fh,count(1) as wc from writes as w "
"where w.timestamp > (UNIX_TIMESTAMP() - (60 * %s)) "
"group by w.fh order by wc desc limit %s",
(over_last, row_count))
for (fn,count) in cur:
print " %8d\t%s" % (count, fn or "<unknown>")
for (fh,count) in cur:
print " %8d pkts\t %s" % (count, resolve_fh(fh)[1])
pass
return retval
......@@ -281,9 +539,18 @@ try:
"h",
[ "help", ])
if len(req_args) > 0:
action = req_args[0].lower()
pass
for opt, val in opts:
if opt in ("-h", "--help"):
usage()
if action in ACTIONS:
ACTIONS[action][1]()
pass
else:
usage()
pass
sys.exit()
pass
pass
......@@ -291,7 +558,6 @@ try:
if len(req_args) < 1:
raise getopt.error('error: too few arguments')
action = req_args[0].lower()
if action not in ACTIONS:
raise getopt.error('error: unknown action - ' + req_args[0] + '\n'
'error: action must be one of: '
......
......@@ -17,6 +17,11 @@ my $TBOPS = "@TBOPSEMAIL@";
my $TESTMODE = @TESTMODE@;
my $FSNODE = "@FSNODE@";
my $FSDIR_GROUPS = "@FSDIR_GROUPS@";
my $FSDIR_PROJ = "@FSDIR_PROJ@";
my $FSDIR_USERS = "@FSDIR_USERS@";
my $FSDIR_SHARE = "@FSDIR_SHARE@";
# Note no -n option. We redirect stdin from the new exports file below.