diff --git a/pelab/flexdatasrv.in b/pelab/flexdatasrv.in index 1fec452e6d6d6bb98e67d119bcf5ce39633d5373..c97b009b3f7a77452bbade08e6eb68eefac477c9 100644 --- a/pelab/flexdatasrv.in +++ b/pelab/flexdatasrv.in @@ -14,6 +14,7 @@ import types import exceptions import traceback import threading +import time debug = True @@ -38,9 +39,9 @@ class FlexlabInvalidArgumentError(FlexlabError): class FlexlabDataServer: """ - FlexlabDataServer keeps a cache of the last 24 hours of flexmon data. + FlexlabDataServer keeps a cache of the last N seconds of flexmon data. """ - def __init__(self,autorefresh=60): + def __init__(self,autorefresh=60,cachetime=86400): """ Set autorefresh to -1 to disable; else, set interval between refreshes in seconds. @@ -60,9 +61,12 @@ class FlexlabDataServer: user='flexlabdata') self.dbs['dpdb'] = None - if autorefresh > 0: + self.cachetime = cachetime + self.autorefresh = None + if autorefresh != None and autorefresh > 0: + self.autorefresh = autorefresh # fire off a refresh/age thread: - at = threading.Thread(target=self.refreshAndAge,args=()) + at = threading.Thread(target=self._foreverRefreshAndAge,args=()) # it really IS a "daemon thread" but we want it to die when the # main thread is killed. at.setDaemon(0) @@ -83,30 +87,30 @@ class FlexlabDataServer: # Grab a summary of the last 24 hours worth of pair_data from the # ops pelab db. Note that we only remember the most recent measurement # that satisfies our criteria, for each site pair. - q = "select idx,srcsite_idx,dstsite_idx,latency,max(unixstamp) as ts"+\ - " from pair_data where (unix_timestamp()-unixstamp) < 86400" + \ - " and latency is not NULL and latency > 0" + \ - " group by srcsite_idx,dstsite_idx " + \ - " order by srcsite_idx,dstsite_idx;" + q = "select idx,srcsite_idx,dstsite_idx,latency,max(unixstamp) as ts" \ + " from pair_data where (unix_timestamp()-unixstamp) < %d" \ + " and latency is not NULL and latency > 0" \ + " group by srcsite_idx,dstsite_idx " \ + " order by srcsite_idx,dstsite_idx;" % self.cachetime qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) for (idx,sidx,didx,latency,ts) in qres: if idx > mingidx: - idx = mingidx + mingidx = idx self._cacheMeasurement('lat',sidx,didx,latency,ts) pass - q = "select idx,srcsite_idx,dstsite_idx,bw,max(unixstamp) as ts" + \ - " from pair_data where (unix_timestamp()-unixstamp) < 86400" + \ - " and bw > 0 " + \ - " group by srcsite_idx,dstsite_idx " + \ - " order by srcsite_idx,dstsite_idx;" + q = "select idx,srcsite_idx,dstsite_idx,bw,max(unixstamp) as ts" \ + " from pair_data where (unix_timestamp()-unixstamp) < %d" \ + " and bw > 0 " \ + " group by srcsite_idx,dstsite_idx " \ + " order by srcsite_idx,dstsite_idx;" % self.cachetime qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) tidx = 0 for (idx,sidx,didx,bw,ts) in qres: if idx > tidx: - idx = tidx + tidx = idx self._cacheMeasurement('bw',sidx,didx,bw,ts) pass @@ -114,8 +118,9 @@ class FlexlabDataServer: mingidx = tidx self._lastdataidx = mingidx + print "Updated lastdataidx to %d" % self._lastdataidx - q = "select site_name,site_idx,node_id,node_idx" + \ + q = "select site_name,site_idx,node_id,node_idx" \ " from site_mapping order by site_idx" qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) @@ -140,12 +145,13 @@ class FlexlabDataServer: if not self._cacheinit: return - q = "select idx,srcsite_idx,dstsite_idx,latency,bw,unixstamp as ts" + \ - " from pair_data where idx > %d" + \ - " and ((latency is not NULL and latency > 0) or bw > 0)" + \ - " group by srcsite_idx,dstsite_idx " + \ - " order by unixstamp" % (self._lastdataidx) + q = "select idx,srcsite_idx,dstsite_idx,latency,bw,unixstamp as ts" \ + " from pair_data where idx > %d" \ + " and ((latency is not NULL and latency > 0) or bw > 0)" \ + " group by srcsite_idx,dstsite_idx " \ + " order by unixstamp" % self._lastdataidx + count = 0 qres = libdb.DBQueryFatal(q,dbnum=self.dbs['opsdb']) for (idx,sidx,didx,lat,bw,ts) in qres: # figure out which it is, bw or lat: @@ -155,7 +161,10 @@ class FlexlabDataServer: self._cacheMeasurement('lat',sidx,didx,lat,ts) if idx > self._lastdataidx: self._lastdataidx = idx - + count += 1 + + print "Added %d new measurements." % count + print "Updated lastdataidx to %d" % self._lastdataidx pass def _ageCache(self): @@ -163,14 +172,19 @@ class FlexlabDataServer: return now = time.time() + count = 0 for mtype in self.cache.keys(): sdict = self.cache[mtype] for srcsite in sdict.keys(): ddict = sdict[srcsite] - for (dstsite,(val,ts)) in ddict.iteritems(): - if (now - ts) > CACHE_LIMIT: - ddict.remove(dstsite) + for dstsite in ddict.keys(): + (val,ts) = ddict[dstsite] + if (now - ts) > self.cachetime: + del ddict[dstsite] + count += 1 + + print "Aged out %d measurements." % count pass @@ -187,7 +201,7 @@ class FlexlabDataServer: traceback.print_exc() pass try: - time.sleep(self.autorefresh*1000) + time.sleep(self.autorefresh) except: traceback.print_exc() pass @@ -240,6 +254,13 @@ class flexlab: only the nodes in the nodefilter list. If false, a subset is chosen from all nodes except the nodes in the nodefilter list. + searchtype (str) Type of search to run. 'fastfallible' will search + linearly through known nodes and form a set this + way (it's highly dependent on which nodes are + selected initially into the set). 'maxclique' will + run a max clique heuristic over the set of nodes + that are supplied (or all known nodes, if none are + supplied), and will select the k-best nodes. Returns: A list of Emulab node_ids. """ @@ -248,6 +269,7 @@ class flexlab: # check args if argdict == None or type(argdict) != types.DictType: raise FlexlabInvalidArgumentError("No arguments!") + if argdict.has_key("size"): try: size = int(argdict["size"])