Commit 800d36b4 authored by David Johnson's avatar David Johnson

Hah, make the cache update thread actually do stuff.

parent dd20b23c
...@@ -14,6 +14,7 @@ import types ...@@ -14,6 +14,7 @@ import types
import exceptions import exceptions
import traceback import traceback
import threading import threading
import time
debug = True debug = True
...@@ -38,9 +39,9 @@ class FlexlabInvalidArgumentError(FlexlabError): ...@@ -38,9 +39,9 @@ class FlexlabInvalidArgumentError(FlexlabError):
class FlexlabDataServer: class FlexlabDataServer:
""" """
FlexlabDataServer keeps a cache of the last 24 hours of flexmon data. FlexlabDataServer keeps a cache of the last N seconds of flexmon data.
""" """
def __init__(self,autorefresh=60): def __init__(self,autorefresh=60,cachetime=86400):
""" """
Set autorefresh to -1 to disable; else, set interval between refreshes Set autorefresh to -1 to disable; else, set interval between refreshes
in seconds. in seconds.
...@@ -60,9 +61,12 @@ class FlexlabDataServer: ...@@ -60,9 +61,12 @@ class FlexlabDataServer:
user='flexlabdata') user='flexlabdata')
self.dbs['dpdb'] = None self.dbs['dpdb'] = None
if autorefresh > 0: self.cachetime = cachetime
self.autorefresh = None
if autorefresh != None and autorefresh > 0:
self.autorefresh = autorefresh
# fire off a refresh/age thread: # fire off a refresh/age thread:
at = threading.Thread(target=self.refreshAndAge,args=()) at = threading.Thread(target=self._foreverRefreshAndAge,args=())
# it really IS a "daemon thread" but we want it to die when the # it really IS a "daemon thread" but we want it to die when the
# main thread is killed. # main thread is killed.
at.setDaemon(0) at.setDaemon(0)
...@@ -83,30 +87,30 @@ class FlexlabDataServer: ...@@ -83,30 +87,30 @@ class FlexlabDataServer:
# Grab a summary of the last 24 hours worth of pair_data from the # Grab a summary of the last 24 hours worth of pair_data from the
# ops pelab db. Note that we only remember the most recent measurement # ops pelab db. Note that we only remember the most recent measurement
# that satisfies our criteria, for each site pair. # that satisfies our criteria, for each site pair.
q = "select idx,srcsite_idx,dstsite_idx,latency,max(unixstamp) as ts"+\ q = "select idx,srcsite_idx,dstsite_idx,latency,max(unixstamp) as ts" \
" from pair_data where (unix_timestamp()-unixstamp) < 86400" + \ " from pair_data where (unix_timestamp()-unixstamp) < %d" \
" and latency is not NULL and latency > 0" + \ " and latency is not NULL and latency > 0" \
" group by srcsite_idx,dstsite_idx " + \ " group by srcsite_idx,dstsite_idx " \
" order by srcsite_idx,dstsite_idx;" " order by srcsite_idx,dstsite_idx;" % self.cachetime
qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb'])
for (idx,sidx,didx,latency,ts) in qres: for (idx,sidx,didx,latency,ts) in qres:
if idx > mingidx: if idx > mingidx:
idx = mingidx mingidx = idx
self._cacheMeasurement('lat',sidx,didx,latency,ts) self._cacheMeasurement('lat',sidx,didx,latency,ts)
pass pass
q = "select idx,srcsite_idx,dstsite_idx,bw,max(unixstamp) as ts" + \ q = "select idx,srcsite_idx,dstsite_idx,bw,max(unixstamp) as ts" \
" from pair_data where (unix_timestamp()-unixstamp) < 86400" + \ " from pair_data where (unix_timestamp()-unixstamp) < %d" \
" and bw > 0 " + \ " and bw > 0 " \
" group by srcsite_idx,dstsite_idx " + \ " group by srcsite_idx,dstsite_idx " \
" order by srcsite_idx,dstsite_idx;" " order by srcsite_idx,dstsite_idx;" % self.cachetime
qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb'])
tidx = 0 tidx = 0
for (idx,sidx,didx,bw,ts) in qres: for (idx,sidx,didx,bw,ts) in qres:
if idx > tidx: if idx > tidx:
idx = tidx tidx = idx
self._cacheMeasurement('bw',sidx,didx,bw,ts) self._cacheMeasurement('bw',sidx,didx,bw,ts)
pass pass
...@@ -114,8 +118,9 @@ class FlexlabDataServer: ...@@ -114,8 +118,9 @@ class FlexlabDataServer:
mingidx = tidx mingidx = tidx
self._lastdataidx = mingidx self._lastdataidx = mingidx
print "Updated lastdataidx to %d" % self._lastdataidx
q = "select site_name,site_idx,node_id,node_idx" + \ q = "select site_name,site_idx,node_id,node_idx" \
" from site_mapping order by site_idx" " from site_mapping order by site_idx"
qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb'])
...@@ -140,12 +145,13 @@ class FlexlabDataServer: ...@@ -140,12 +145,13 @@ class FlexlabDataServer:
if not self._cacheinit: if not self._cacheinit:
return return
q = "select idx,srcsite_idx,dstsite_idx,latency,bw,unixstamp as ts" + \ q = "select idx,srcsite_idx,dstsite_idx,latency,bw,unixstamp as ts" \
" from pair_data where idx > %d" + \ " from pair_data where idx > %d" \
" and ((latency is not NULL and latency > 0) or bw > 0)" + \ " and ((latency is not NULL and latency > 0) or bw > 0)" \
" group by srcsite_idx,dstsite_idx " + \ " group by srcsite_idx,dstsite_idx " \
" order by unixstamp" % (self._lastdataidx) " order by unixstamp" % self._lastdataidx
count = 0
qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb'])
for (idx,sidx,didx,lat,bw,ts) in qres: for (idx,sidx,didx,lat,bw,ts) in qres:
# figure out which it is, bw or lat: # figure out which it is, bw or lat:
...@@ -155,7 +161,10 @@ class FlexlabDataServer: ...@@ -155,7 +161,10 @@ class FlexlabDataServer:
self._cacheMeasurement('lat',sidx,didx,lat,ts) self._cacheMeasurement('lat',sidx,didx,lat,ts)
if idx > self._lastdataidx: if idx > self._lastdataidx:
self._lastdataidx = idx self._lastdataidx = idx
count += 1
print "Added %d new measurements." % count
print "Updated lastdataidx to %d" % self._lastdataidx
pass pass
def _ageCache(self): def _ageCache(self):
...@@ -163,14 +172,19 @@ class FlexlabDataServer: ...@@ -163,14 +172,19 @@ class FlexlabDataServer:
return return
now = time.time() now = time.time()
count = 0
for mtype in self.cache.keys(): for mtype in self.cache.keys():
sdict = self.cache[mtype] sdict = self.cache[mtype]
for srcsite in sdict.keys(): for srcsite in sdict.keys():
ddict = sdict[srcsite] ddict = sdict[srcsite]
for (dstsite,(val,ts)) in ddict.iteritems(): for dstsite in ddict.keys():
if (now - ts) > CACHE_LIMIT: (val,ts) = ddict[dstsite]
ddict.remove(dstsite) if (now - ts) > self.cachetime:
del ddict[dstsite]
count += 1
print "Aged out %d measurements." % count
pass pass
...@@ -187,7 +201,7 @@ class FlexlabDataServer: ...@@ -187,7 +201,7 @@ class FlexlabDataServer:
traceback.print_exc() traceback.print_exc()
pass pass
try: try:
time.sleep(self.autorefresh*1000) time.sleep(self.autorefresh)
except: except:
traceback.print_exc() traceback.print_exc()
pass pass
...@@ -240,6 +254,13 @@ class flexlab: ...@@ -240,6 +254,13 @@ class flexlab:
only the nodes in the nodefilter list. If false, only the nodes in the nodefilter list. If false,
a subset is chosen from all nodes except the nodes a subset is chosen from all nodes except the nodes
in the nodefilter list. in the nodefilter list.
searchtype (str) Type of search to run. 'fastfallible' will search
linearly through known nodes and form a set this
way (it's highly dependent on which nodes are
selected initially into the set). 'maxclique' will
run a max clique heuristic over the set of nodes
that are supplied (or all known nodes, if none are
supplied), and will select the k-best nodes.
Returns: Returns:
A list of Emulab node_ids. A list of Emulab node_ids.
""" """
...@@ -248,6 +269,7 @@ class flexlab: ...@@ -248,6 +269,7 @@ class flexlab:
# check args # check args
if argdict == None or type(argdict) != types.DictType: if argdict == None or type(argdict) != types.DictType:
raise FlexlabInvalidArgumentError("No arguments!") raise FlexlabInvalidArgumentError("No arguments!")
if argdict.has_key("size"): if argdict.has_key("size"):
try: try:
size = int(argdict["size"]) size = int(argdict["size"])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment