Commit 272c350b authored by Timothy Stack's avatar Timothy Stack
Browse files

Checkpoint some nfstrace fixes. It should behave much better now, but

still needs some work.
parent a467a28a
/*
* $Id: nfs_v2.c,v 1.1 2005-11-28 15:44:00 stack Exp $
* $Id: nfs_v2.c,v 1.2 2005-11-30 17:30:57 stack Exp $
*/
#ifdef HAVE_CONFIG_H
......@@ -32,7 +32,7 @@ static int compute_pct (int n, int d);
#define PRINT_STATUS(s, p) \
if (p) { if ((s) == NFS_OK) { fprintf (OutFile, "OK "); } \
else { fprintf (OutFile, "%x ", (s)); } \
else { fprintf (OutFile, "%d ", (s)); } \
}
/*
......@@ -445,13 +445,13 @@ u_int32_t *print_sattr2 (u_int32_t *p, u_int32_t *e, int print)
fprintf (OutFile, "mode %x ", ntohl (p [0]));
}
if (ntohl (p [1]) != -1) {
fprintf (OutFile, "uid %x ", ntohl (p [1]));
fprintf (OutFile, "uid %d ", ntohl (p [1]));
}
if (ntohl (p [2]) != -1) {
fprintf (OutFile, "gid %x ", ntohl (p [2]));
fprintf (OutFile, "gid %d ", ntohl (p [2]));
}
if (ntohl (p [3]) != -1) {
fprintf (OutFile, "size %x ", ntohl (p [3]));
fprintf (OutFile, "size %d ", ntohl (p [3]));
}
/*
......@@ -570,8 +570,8 @@ u_int32_t *print_fattr2 (u_int32_t *p, u_int32_t *e, int print)
fprintf (OutFile, "ftype %x ", ntohl (p [0])); /* ftype */
fprintf (OutFile, "mode %x ", ntohl (p [1])); /* mode */
fprintf (OutFile, "nlink %x ", ntohl (p [2])); /* nlink */
fprintf (OutFile, "uid %x ", ntohl (p [3])); /* uid */
fprintf (OutFile, "gid %x ", ntohl (p [4])); /* gid */
fprintf (OutFile, "uid %d ", ntohl (p [3])); /* uid */
fprintf (OutFile, "gid %d ", ntohl (p [4])); /* gid */
fprintf (OutFile, "size %x ", ntohl (p [5])); /* size */
fprintf (OutFile, "blksize %x ", ntohl (p [6])); /* blksize */
fprintf (OutFile, "rdev %x ", ntohl (p [7])); /* rdev */
......
/*
* $Id: nfs_v3.c,v 1.1 2005-11-28 15:44:00 stack Exp $
* $Id: nfs_v3.c,v 1.2 2005-11-30 17:30:57 stack Exp $
*
*/
......@@ -959,10 +959,10 @@ int print_sattr3_x (sattr3 *s)
fprintf (OutFile, "mode %x ", s->mode.set_mode3_u.mode);
}
if (s->uid.set_it) {
fprintf (OutFile, "uid %x ", s->uid.set_uid3_u.uid);
fprintf (OutFile, "uid %d ", s->uid.set_uid3_u.uid);
}
if (s->gid.set_it) {
fprintf (OutFile, "gid %x ", s->gid.set_gid3_u.gid);
fprintf (OutFile, "gid %d ", s->gid.set_gid3_u.gid);
}
if (s->size.set_it) {
print_uint64_x ((u_int32_t *) &(s->size.set_size3_u.size),
......
/*
* $Id: nfsrecord.c,v 1.1 2005-11-28 15:44:00 stack Exp $
* $Id: nfsrecord.c,v 1.2 2005-11-30 17:30:57 stack Exp $
*/
#ifdef HAVE_CONFIG_H
......@@ -471,9 +471,9 @@ int processPacket (struct pcap_pkthdr *h, /* Captured stuff */
fprintf (OutFile, ".%.4x ", 0xffff & record->dstPort);
fprintf (OutFile, "%c ", proto == IPPROTO_TCP ? 'T' : 'U');
fprintf (OutFile, "C%d %x 1 mnt fn \"%s\" ", record->nfsVersion, ntohl (rpc_b->rm_xid), payload_data);
fprintf (OutFile, "C%d %d 1 mnt fn \"%s\" ", record->nfsVersion, ntohl (rpc_b->rm_xid), payload_data);
if (euid != -1 && egid != -1) {
fprintf (OutFile, "euid %x egid %x ",
fprintf (OutFile, "euid %d egid %d ",
euid, egid);
}
fprintf (OutFile, "con = %d len = %d",
......@@ -490,7 +490,7 @@ int processPacket (struct pcap_pkthdr *h, /* Captured stuff */
fprintf (OutFile, ".%.4x ", 0xffff & record->dstPort);
fprintf (OutFile, "%c ", proto == IPPROTO_TCP ? 'T' : 'U');
fprintf (OutFile, "R%d %x ", record->nfsVersion, ntohl (rpc_b->rm_xid));
fprintf (OutFile, "R%d %d ", record->nfsVersion, ntohl (rpc_b->rm_xid));
if (record->rpcStatus == 0) {
fprintf(OutFile, "1 mnt OK ");
if (record->nfsVersion == 1 || record->nfsVersion == 2) {
......@@ -502,9 +502,9 @@ int processPacket (struct pcap_pkthdr *h, /* Captured stuff */
}
}
else {
fprintf(OutFile, "1 mnt %x ", record->rpcStatus);
fprintf(OutFile, "1 mnt %d ", record->rpcStatus);
}
fprintf (OutFile, "status=%x ", record->rpcStatus);
fprintf (OutFile, "status=%d ", record->rpcStatus);
fprintf (OutFile, "pl = %d ", payload_len);
fprintf (OutFile, "con = %d len = %d",
......@@ -520,7 +520,7 @@ int processPacket (struct pcap_pkthdr *h, /* Captured stuff */
tot_len - consumed);
if (euid != -1 && egid != -1) {
fprintf (OutFile, "euid %x egid %x ",
fprintf (OutFile, "euid %d egid %d ",
euid, egid);
}
......@@ -623,7 +623,7 @@ void printRecord (nfs_pkt_t *record, void *xdr, u_int32_t payload_len,
fprintf (OutFile, "RU%d\n", record->nfsVersion);
}
fprintf (OutFile, "status=%x ", record->rpcStatus);
fprintf (OutFile, "status=%d ", record->rpcStatus);
fprintf (OutFile, "pl = %d ", payload_len);
}
......
......@@ -16,7 +16,14 @@ DBDIR = "/var/db/nfstrace"
DBNAME = "nfsdb"
DBUSER = "nfstrace"
DBPASS = open(os.path.join(BASEDIR, "dbpass")).read().strip()
try:
DBPASS = open(os.path.join(BASEDIR, "dbpass")).read().strip()
pass
except IOError:
sys.stderr.write("error: permission denied\n")
sys.exit(1)
pass
INTERVAL = 5
VERBOSITY = 1
......@@ -67,6 +74,8 @@ IDX_ID = 5
IDX_COMMAND = 7
IDX_BODY_START = 8
fh_cache = None
TABLES = [
"mounts",
"mount_replies",
......@@ -75,10 +84,14 @@ TABLES = [
"creates",
"create_replies",
"removes",
"renames",
"rename_replies",
"lookups",
"lookup_replies",
]
MAX_FH_LENGTH = 64
con = MySQLdb.connect(db = DBNAME, user = DBUSER, passwd = DBPASS)
cur = con.cursor()
......@@ -101,27 +114,48 @@ def body_to_dict(line):
return retval
def padfh(fh):
retval = fh
if len(retval) < MAX_FH_LENGTH:
retval = fh + ("0" * (MAX_FH_LENGTH - len(fh)))
pass
return retval
class Parser:
def __init__(self):
self.first_timestamp = 0
self.lookups = {}
self.data_files = {}
self.row_counts = {}
for tab in TABLES:
self.data_files[tab] = open(os.path.join(DBDIR, tab), 'w')
self.data_files[tab].truncate(0)
self.row_counts[tab] = 0
pass
return
def load_files(self):
for value in self.lookups.values():
self.write_row("lookups", value)
pass
self.lookups.clear()
for tab in TABLES:
self.data_files[tab].flush()
print (" Table: " + tab.ljust(18) + " \t"
+ `self.row_counts[tab]` + " record(s)")
cur.execute("LOAD DATA INFILE %s REPLACE INTO TABLE " + tab +
" FIELDS TERMINATED BY ','",
(self.data_files[tab].name,))
self.data_files[tab].seek(0)
self.data_files[tab].truncate(0)
self.row_counts[tab] = 0
pass
return
......@@ -135,7 +169,7 @@ class Parser:
cmd = sl[IDX_COMMAND]
try:
if not self.first_timestamp:
self.first_timestamp = int(sl[IDX_TIMESTAMP].split(".")[0])
self.first_timestamp = sl[IDX_TIMESTAMP]
pass
if sl[IDX_REQ_REPLY] in ("C1", "C2", "C3") :
body = body_to_dict(sl[IDX_BODY_START:-6])
......@@ -156,9 +190,9 @@ class Parser:
return
def write_row(self, tab, data):
data = map(lambda x: re.sub(r',', '\\,', str(x)), data)
self.data_files[tab].write(",".join(data))
self.data_files[tab].write("\n")
self.row_counts[tab] += 1
return
def dispatch(self, method_name, sl, body):
......@@ -176,7 +210,7 @@ class Parser:
def update_file_checkpoint(self, sl, body):
if False and body.has_key("fh"):
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
sl[IDX_TIMESTAMP],
body["fh"],
int(body["ftype"], 16),
int(body["mode"], 16),
......@@ -198,12 +232,12 @@ class Parser:
def handle_mnt(self, sl, body):
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
body["fn"][1:-1],
int(body["euid"], 16),
int(body["egid"], 16))
body["euid"],
body["egid"])
self.write_row("mounts", data)
return
......@@ -211,58 +245,55 @@ class Parser:
def handle_mnt_reply(self, sl, body):
status = sl[IDX_BODY_START]
if status == "OK":
status = 0
pass
else:
status = int(status, 16)
status = "0"
pass
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_DST_IP]),
status,
body.get("fh", ''))
padfh(body.get("fh", '')))
self.write_row("mount_replies", data)
return
def handle_read(self, sl, body):
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
body["fh"],
int(body["count"], 16),
int(body["euid"], 16),
int(body["egid"], 16))
padfh(body["fh"]),
str(int(body["count"], 16)),
body["euid"],
body["egid"])
self.write_row("reads", data)
return
def handle_write(self, sl, body):
data = (
str(int(sl[IDX_TIMESTAMP].split(".")[0])),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
body["fh"],
int((body["tcount"] or body["count"]), 16),
int(body["euid"], 16),
int(body["egid"], 16))
padfh(body["fh"]),
str(int((body["tcount"] or body["count"]), 16)),
body["euid"],
body["egid"])
self.write_row("writes", data)
return
def handle_create(self, sl, body):
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
body["fh"],
padfh(body["fh"]),
(body["fn"] or body["name"])[1:-1],
int(body.get("mode", "0"), 16), # XXX
int(body["euid"], 16),
int(body["egid"], 16))
str(int(body.get("mode", "0"), 16)), # XXX
body["euid"],
body["egid"])
self.write_row("creates", data)
return
......@@ -270,65 +301,101 @@ class Parser:
def handle_create_reply(self, sl, body):
status = sl[IDX_BODY_START]
if status == "OK":
status = 0
pass
else:
status = int(status, 16)
status = "0"
pass
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_DST_IP]),
status,
body.get("fh", ''))
padfh(body.get("fh", '')))
self.write_row("create_replies", data)
return
def handle_remove(self, sl, body):
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
body["fh"],
padfh(body["fh"]),
(body["fn"] or body["name"])[1:-1],
int(body["euid"], 16),
int(body["egid"], 16))
body["euid"],
body["egid"])
self.write_row("removes", data)
return
def handle_rename(self, sl, body):
data = (
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
padfh(body["fh"]),
(body["fn"] or body["name"])[1:-1],
padfh(body["fh2"]),
(body["fn2"] or body["name2"])[1:-1],
body["euid"],
body["egid"])
self.write_row("renames", data)
return
def handle_rename_reply(self, sl, body):
status = sl[IDX_BODY_START]
if status == "OK":
status = "0"
pass
data = (
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
status)
self.write_row("rename_replies", data)
return
def handle_lookup(self, sl, body):
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_SRC_IP]),
body["fh"],
padfh(body["fh"]),
(body["fn"] or body["name"])[1:-1],
int(body["euid"], 16),
int(body["egid"], 16))
self.write_row("lookups", data)
body["euid"],
body["egid"])
self.lookups[(data[1], data[2])] = data
return
def handle_lookup_reply(self, sl, body):
status = sl[IDX_BODY_START]
if status == "OK":
status = 0
status = "0"
data = (
sl[IDX_TIMESTAMP],
sl[IDX_ID],
convert_ip(sl[IDX_DST_IP]),
status,
padfh(body.get("fh", '')))
key = (data[1], data[2])
if self.lookups.has_key(key):
self.write_row("lookups", self.lookups[key])
del self.lookups[key]
pass
self.write_row("lookup_replies", data)
pass
else:
status = int(status, 16)
key = (sl[IDX_ID], convert_ip(sl[IDX_DST_IP]))
if self.lookups.has_key(key):
del self.lookups[key]
pass
pass
data = (
int(sl[IDX_TIMESTAMP].split(".")[0]),
str(int(sl[IDX_ID], 16)),
convert_ip(sl[IDX_DST_IP]),
status,
body.get("fh", ''))
self.write_row("lookup_replies", data)
return
pass
......@@ -410,23 +477,23 @@ class LRU:
def keys(self):
return self.d.keys()
fh_cache = LRU(1024)
def resolve_fh(fh, depth=0):
global fh_cache
retval = None
if depth > 50:
print "exceeded depth on " + fh
return (0, "<unknown>")
prefix = " " * depth
# prefix = " " * depth
if fh not in fh_cache:
cur.execute("SELECT complete,fn FROM handle_map WHERE fh=%s", (fh,))
fh_cache[fh] = cur.fetchone()
pass
fn = fh_cache[fh]
print prefix + " cache " + `fn` + " " + fh
# print prefix + " cache " + `fn` + " " + fh
if fn:
retval = (fn[0], fn[1])
pass
......@@ -436,26 +503,16 @@ def resolve_fh(fh, depth=0):
" l.id=lr.id and l.fh!=%s) "
"where lr.fh=%s and lr.status=0 limit 1", (fh, fh,))
res = cur.fetchone()
print prefix + " lookup " + `res` + " " + fh
# print prefix + " lookup " + `res` + " " + fh
if res and res[0]:
comp = resolve_fh(res[0], depth + 1)
retval = (comp[0], comp[1] + "/" + res[1])
pass
else:
cur.execute("SELECT m.fn FROM mount_replies as mr "
"left join mounts as m on (m.node_ip=mr.node_ip and "
" m.id=mr.id) "
"where mr.fh=%s and mr.status=0 limit 1", (fh,))
res = cur.fetchone()
print prefix + " mount " + `res` + " " + fh
if res and res[0]:
retval = (1, res[0])
pass
pass
if retval:
cur.execute("REPLACE INTO handle_map (fh, complete, fn) "
"VALUES (%s, %s, %s)", (fh, retval[0], retval[1]))
pass
else:
retval = (0, "<unknown>")
pass
......@@ -466,24 +523,49 @@ def resolve_fh(fh, depth=0):
return retval
def update_handle_map(ts):
global fh_cache
cur.execute("SELECT distinct m.fn,mr.fh FROM mount_replies as mr "
"left join mounts as m on (m.node_ip=mr.node_ip and "
" m.id=mr.id) "
"left join handle_map as hm on hm.fh=mr.fh "
"where mr.status=0 and mr.timestamp>=%s and hm.fh is NULL",
(ts,))
count = 0
for (fn, fh) in cur:
count += 1
fh_cache[fh] = (1, fn)
cur.execute("REPLACE INTO handle_map (fh, complete, fn) "
"VALUES (%s, %s, %s)", (fh, 1, fn))
pass
print " " + `count` + " mount(s)"
cur.execute("SELECT distinct lr.fh FROM lookup_replies as lr "
"left join handle_map as hm on hm.fh=lr.fh "
"where lr.status=0 and lr.timestamp>=%s and hm.fh is NULL",
"where lr.status=0 and lr.timestamp>=%s and "
" (hm.fh is NULL or hm.complete=0)",
(ts,))
count = 0
for (fh,) in cur:
print " lookup " + fh
count += 1
resolve_fh(fh)
pass
print " " + `count` + " lookup(s)"
cur.execute("SELECT distinct cr.fh FROM create_replies as cr "
"left join handle_map as hm on hm.fh=cr.fh "
"where cr.status=0 and cr.timestamp>=%s and hm.fh is NULL",
"where cr.status=0 and cr.timestamp>=%s",
(ts,))
count = 0
for (fh,) in cur:
print " create " + fh
count += 1
resolve_fh(fh)
pass
print " " + `count` + " create(s)"
return
if update_only:
......@@ -503,7 +585,7 @@ def update_files(files):
print "Reading " + fn
name2stamp[fn] = st[stat.ST_MTIME]
fh = open(fn)
for line in fh.readlines():
for line in fh:
parser.nextline(line)
pass
fh.close()
......@@ -543,10 +625,11 @@ def update_files(files):
print " Delete old data..."
for tab in TABLES:
cur.execute("DELETE FROM " + tab + " WHERE timestamp<%s",
(parser.first_timestamp - (5 * 60),))
(float(parser.first_timestamp) - (5 * 60),))
pass
print " Done."
parser.clear_timestamp()
pass
pass
......@@ -558,12 +641,21 @@ def update_files(files):
return
fh_cache = LRU(1024)
try:
lpc = 0
while (count == 0) or (lpc < count):
update_files(req_args)
con.commit()
for key in fh_cache.keys():
complete, fh = fh_cache[key]
if not complete:
del fh_cache[key]
pass