modified bw link monitoring module to poll only the tap ports

added framework to calculate the meter rate for tap flow management
        - still the dynamic rate limiting module to be updated

bandwidth calculation is much better from previous readings using mininet,
but still lacks accuracy to a greater extent. [Think through]
parent b2ca0113
tapDataBase
meterDataBase
bwDataBase
tags
build/*
......
......@@ -186,6 +186,7 @@ def bandwidthTest(bw=5, controllerip="localhost"):
i = 10;
while ( i < sleep_time):
print s1.cmd('dpctl unix:/tmp/s1 meter-config')
#print s1.cmd('dpctl unix:/tmp/s1 stats-meter')
print "Sleeping for %d seconds for rate-limiters to grow" % i
sleep(i)
sleep_time=sleep_time - i;
......
......@@ -22,6 +22,7 @@ from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_common, ofproto_parser
from ryu.lib import ofctl_v1_3
from ryu.lib import dpid as dpid_lib
from ryu.lib.dpid import dpid_to_str
from ryu.app.wsgi import ControllerBase, WSGIApplication
from ryu.topology.switches import (get_switch,
get_all_switch,
......@@ -31,6 +32,7 @@ import networkx as nx
import matplotlib.pyplot as plt
from ryu.lib import tapDB
from ryu.lib import objectTapDB as oDB
from ryu.lib import meterDB
LOG = logging.getLogger('ryu.app.cnac_rest')
LOG.setLevel(logging.DEBUG)
......@@ -555,6 +557,15 @@ class TapPolicyController(ControllerBase):
meter_mod = ofp.OFPMeterMod(
dp, of.OFPMC_MODIFY, of.OFPMF_KBPS, int(tap_id), bands)
dp.send_msg(meter_mod)
meterdbEntry={
'dpid': dpid_to_str(dp.id),
'port': int(tap_port),
'meterID':int(tap_id),
'rxBytes':0,
'timeStamp':0,
'rxRate':0
};
meterDB.insertmeterdbDict(meterdbEntry);
for i in flows[str(dpid)]:
for j in i['actions']:
......@@ -1036,6 +1047,7 @@ class RestCnacApi(app_manager.RyuApp):
super(RestCnacApi, self).__init__(*args, **kwargs)
self.dpset = kwargs['dpset']
tapDB.createTapDBConnection()
meterDB.createmeterdbConnection()
wsgi = kwargs['wsgi']
self.waiters = {}
self.data = {}
......
......@@ -12,6 +12,7 @@ import sys
#sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ryu.lib import tapDB
from ryu.lib import bwDB
from ryu.lib import meterDB
from subprocess import Popen
import logging
......@@ -56,16 +57,6 @@ class dynamicRateLimiter(simple_switch_13.SimpleSwitch13):
dpid = str_to_dpid(tap.get('dpid'))
datapath = self.datapaths[dpid]
'''
for dp in self.datapaths.values():
print dpid_str
print dpid_to_str(dp.id)
if dpid_str == dpid_to_str(dp.id):
print "Datapath found %d" % dp.id
datapath = self.datapaths[dp.id]
break;
'''
bandwidth = bw.get('bandwidth')
usedbw = bw.get('usedbw')
pktError = bw.get('pktError')
......@@ -78,8 +69,16 @@ class dynamicRateLimiter(simple_switch_13.SimpleSwitch13):
max_rate_limit = (bandwidth / 2) #max limit = 50%
increment = (bandwidth / (20)) # 5% increment [bandwidth is in Kbps]
meterdbEntry= {
'dpid':tap.get('dpid'),
'meterID': meterid,
'rxBytes': 0,
'rxRate': 0,
'timeStamp':0
};
if (pktError > 0):
print "Reduce the rate limiting in multiples of increment"
self.logger.info("Reduce the rate limiting in multiples of increment")
no_of_increments = (pktError/increment) + 1; #always have higher allocation
new_limit = increment * no_of_increments;
of = datapath.ofproto
......@@ -96,17 +95,19 @@ class dynamicRateLimiter(simple_switch_13.SimpleSwitch13):
'tapID' : tapid
}
tapDB.udpateRateLimit(rateEntry)
meterDB.resetMeterEntry(meterdbEntry);
return ;
if ratelimit == max_rate_limit:
print "Already at maximum time ratelimit"
self.logger.info("Already at maximum time ratelimit")
return ;
if (usedbw < 50): #less than 50%
print "Increase the rate for tunnel traffic"
new_limit = ratelimit + increment;
if new_limit > max_rate_limit:
new_limit = max_rate_limit;
self.logger.info("meterid[%d] Increase the rate[%d] for tunnel traffic",
meterid, new_limit)
of = datapath.ofproto
ofp = datapath.ofproto_parser
burst_size = 0
......@@ -121,3 +122,4 @@ class dynamicRateLimiter(simple_switch_13.SimpleSwitch13):
'tapID' : tapid
}
tapDB.udpateRateLimit(rateEntry)
meterDB.resetMeterEntry(meterdbEntry);
......@@ -11,6 +11,8 @@ import os
import sys
#sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ryu.lib import bwDB
from ryu.lib import tapDB
from ryu.lib import meterDB
import logging
class SimpleMonitor(simple_switch_13.SimpleSwitch13):
......@@ -56,20 +58,54 @@ class SimpleMonitor(simple_switch_13.SimpleSwitch13):
def _monitor(self):
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
portList = tapDB.getTapPortsForSwitch({'dpid':dpid_to_str(dp.id)})
for port in portList:
self._request_stats(dp, port)
meterList = tapDB.getMeterIDs({'dpid':dpid_to_str(dp.id)})
self.logger.debug('meter: %s', str(meterList))
for meterID in meterList:
self.logger.debug('meterID: %d', int(meterID))
self._request_mstats(dp, meterID);
hub.sleep(10)
def _request_stats(self, datapath):
self.logger.debug('send stats request: %016x', datapath.id)
def _request_mstats(self, datapath, meterID):
self.logger.debug('send meter stats request: %016x with meterID: %d',
datapath.id, meterID)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
#no need for stat request for now
#req = parser.OFPFlowStatsRequest(datapath)
#datapath.send_msg(req)
req = parser.OFPPortStatsRequest(datapath, 0,
ofproto.OFPP_ANY)
req = parser.OFPMeterStatsRequest(datapath, 0, meterID)
datapath.send_msg(req)
def _request_stats(self, datapath, port):
self.logger.debug('send stats request: %016x for port: %d',
datapath.id, port)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPPortStatsRequest(datapath, 0, port)
#ofproto.OFPP_ANY)
datapath.send_msg(req)
@set_ev_cls(ofp_event.EventOFPMeterStatsReply, MAIN_DISPATCHER)
def _meter_stats_reply_handler(self, ev):
self.logger.debug("_meter_stats_reply_handler : %s", str(ev.msg.body))
dp_str = dpid_to_str(ev.msg.datapath.id)
for stat in ev.msg.body:
self.logger.debug('meter_id=0x%08x len=%d flow_count=%d '
'packet_in_count=%d byte_in_count=%d '
'duration_sec=%d duration_nsec=%d '
'band_stats=%s' %
(stat.meter_id, stat.len, stat.flow_count,
stat.packet_in_count, stat.byte_in_count,
stat.duration_sec, stat.duration_nsec,
stat.band_stats))
meterdbEntry= {
'dpid': dp_str,
'meterID':stat.meter_id,
'rxBytes':stat.byte_in_count,
'timeStamp':stat.duration_sec
};
meterDB.updateMeterDict(meterdbEntry);
@set_ev_cls(ofp_event.EventOFPPortStatsReply,[MAIN_DISPATCHER])
def _port_stats_reply_handler(self, ev):
body = ev.msg.body
......
......@@ -4,6 +4,7 @@ from os.path import isfile, getsize
import sqlite3 as sql
import logging
from ryu.lib import meterDB
#single write and single read so no locks are implemented.
global bwdb
......@@ -176,6 +177,7 @@ def updateBWDict(bwDBEntry):
cursor = conn.cursor()
rc = True;
meterRxRate = meterDB.getRateBySwitchPort(bwDBEntry);
try:
cursor.execute("""
select txBytes, txDrops, timeStamp, bandwidth, pktError
......@@ -194,29 +196,32 @@ def updateBWDict(bwDBEntry):
bandwidth = row[3];
txByte = (bwDBEntry.get('txBytes'));
timeStamp = bwDBEntry.get('timeStamp');
errMultiplier = 1;
newBytes = (txByte - prevTxBytes);
sec = timeStamp - prevTimeStamp;
try:
KB = ((float(newBytes))/(1024)); #bytes -> KB
rate = (KB/sec); #KBps
utilization = ((rate/float(bandwidth)) * 100) #in percentage
serviceTrafficRate = rate - meterRxRate ;
utilization = ((serviceTrafficRate/float(bandwidth)) * 100) #in percentage
except ZeroDivisionError:
log.info("division by zero error setting to 0")
log.info("division by zero error skip update")
rate = 0;
utilization = 0;
cursor.close();
conn.close();
rc = False;
return rc;
#utilization = (rate * 100 / bandwidth) #in percentage
log.info("switch:%s port:%d bw[%d]: newBytes[%d], sec[%d], KB[%f], rate[%f], "
"utilization:[%d]", bwDBEntry.get('dpid'),
"meterRate[%d], service[%d], utilization:[%d]", bwDBEntry.get('dpid'),
int(bwDBEntry.get('port')), bandwidth, newBytes,
sec, KB, rate, utilization)
if utilization != 0:
print ('switch:', bwDBEntry.get('dpid'),
'port:' , bwDBEntry.get('port'),
'[usage]:',utilization)
sec, KB, rate, meterRxRate, serviceTrafficRate, utilization)
bwDBEntry['usedbw']= int(utilization)
bwDBEntry['usedbw']= int(utilization) * errMultiplier
cursor.execute("""
update bwTable set usedbw=:usedbw, rxBytes=:rxBytes,
......
import os
import numpy as np
from os.path import isfile, getsize
import sqlite3 as sql
import logging
#single write and single read so no locks are implemented.
global meterdb
meterdb ='meterDataBase'
log = logging.getLogger()
log.setLevel(logging.INFO)
def createmeterdbConnection():
if os.path.exists(meterdb):
os.remove(meterdb) #clear the sqllite file
conn = sql.connect(meterdb)
cursor = conn.cursor()
print "createmeterdbConnection: Create meterdb trace.."
#cursor.execute(""" PRAGMA foreign_keys=ON;""");
cursor.execute("""
create table if not exists meterTable(
uniqID INTEGER PRIMARY KEY AUTOINCREMENT,
dpid VARCHAR(255),
meterID INTEGER,
port INTEGER,
rxBytes UNSIGNED BIGINT DEFAULT 0,
timeStamp UNSIGNED BIG INT DEFAULT 0,
rxRate UNSIGNED INT DEFAULT 0
);
""");
print "createmeterdbConnection: meterdb Creation Done.. "
print "createmeterdbConnection: Clear all table entries.. "
conn.execute("""delete from meterTable;""");
conn.commit()
cursor.close()
conn.close()
return True
def checkDuplicateEntry(meterdbEntry):
#print "checkDuplicateEntry: print entry for checkDuplicate query"
#print meterdbEntry
rc = False; #no duplicates found
conn = sql.connect(meterdb)
cursor = conn.cursor()
cursor.execute("""select * from meterTable where dpid=:dpid and
meterID=:meterID """,meterdbEntry);
#print "checkDuplicateEntry: select query output"
len = 0
for row in cursor.fetchall():
print row;
len = len + 1
if len != 0:
print "checkDuplicateEntry: Duplicates found.."
rc = True; #duplicates found
cursor.close()
conn.close()
return rc;
def getRateBySwitchPort(meterdbEntry):
log.info("getRateBySwitchPort query for currentRate")
log.info("%s", str(meterdbEntry))
conn = sql.connect(meterdb)
cursor = conn.cursor()
cursor.execute("""select rxRate from meterTable where
dpid=:dpid and port=:port""",
meterdbEntry);
row = cursor.fetchone();
if row is None:
rxRate = 0;
else:
rxRate = int(row[0])
log.info("switch[%s] port[%d] Rxrate[%d]",
meterdbEntry.get('dpid'),
meterdbEntry.get('port'), rxRate)
cursor.close();
conn.close();
return rxRate;
def getMeterEntry(meterdbEntry):
print "getMeterEntry query for flowTable"
print meterdbEntry
conn = sql.connect(meterdb)
cursor = conn.cursor()
cursor.execute("""select uniqID, meterID, dpid, rxBytes, timeStamp
dpid=:dpid and meterID=:meterID """, meterdbEntry);
row = cursor.fetchone();
if row is None:
meterDict = None;
else:
meterDict = { 'uniqID' : row[0],
'meterID':row[1],
'dpid':row[2],
'rxBytes':row[3],
'timeStamp':row[4]
}
print rowDict
cursor.close();
conn.close();
return meterDict;
def deleteMeterDict(meterdbEntry):
print "deleteMeterDict: print delete tap entry"
conn = sql.connect(meterdb)
cursor = conn.cursor()
rc = False;
try:
cursor.execute("""delete from meterTable where dpid=:dpid and
meterID=:meterID""",meterdbEntry);
conn.commit();
rc = True;
except sql.Error as e:
print "An error occured ",e.args[0];
cursor.close();
conn.close();
return rc;
#assuming that the entries are always legit to avoid too much
#sanity checks..
def insertmeterdbDict(meterdbEntry):
print "insertmeterdbDict: print entry for bandwidth table insert query"
#print meterdbEntry
if checkDuplicateEntry(meterdbEntry) is True:
print "insertmeterdbDict insertion failed[duplicate error]"
return None;
conn = sql.connect(meterdb)
cursor = conn.cursor()
rc = None;
try:
cursor.execute("""
insert into meterTable( dpid, meterID, port, rxBytes, timeStamp)
values (:dpid, :meterID, :port, :rxBytes, :timeStamp)
""", meterdbEntry);
conn.commit();
rc = cursor.lastrowid
except sql.Error as e:
print "An error occured ",e.args[0];
rc = -1;
cursor.close();
conn.close();
return rc;
def dumpMeterTable():
conn = sql.connect(meterdb)
cursor = conn.cursor()
cursor.execute("""select * from meterTable""");
for row in cursor.fetchall():
print row;
cursor.close();
conn.close();
def resetMeterEntry(meterdbEntry):
log.info("resetMeterEntry : print entry for bandwidth table reset query")
log.info("resetMeterEntry: %s", str(meterdbEntry))
conn = sql.connect(meterdb)
cursor = conn.cursor()
rc = True;
try:
cursor.execute("""
update meterTable set rxBytes=:rxBytes,
timeStamp=:timeStamp, rxRate=:rxRate
where dpid=:dpid and meterID=:meterID""", meterdbEntry);
conn.commit();
except sql.Error as e:
log.info("An error occured %s ", e.args[0]);
rc = False
cursor.close();
conn.close();
return rc;
def updateMeterDict(meterdbEntry):
log.info("updateMeterDict: print entry for bandwidth table update query")
log.info("updateMeterDict: %s", str(meterdbEntry))
conn = sql.connect(meterdb)
cursor = conn.cursor()
rc = True;
try:
cursor.execute("""
select rxBytes, timeStamp from meterTable
where dpid=:dpid and meterID=:meterID
""",meterdbEntry)
row = cursor.fetchone();
if (row is not None) and (row[1] != 0):
prevRxBytes = row[0];
prevTimeStamp = row[1];
rxByte = (meterdbEntry.get('rxBytes'));
timeStamp = meterdbEntry.get('timeStamp');
errMultiplier = 8;
newBytes = (rxByte - prevRxBytes);
sec = timeStamp - prevTimeStamp;
try:
KB = ((float(newBytes))/(1024)); #bytes -> KB
rxRate = (KB/sec); #KBps
except ZeroDivisionError:
log.info("division by zero error setting to 0")
rxRate = 0;
meterdbEntry['rxRate'] = int(rxRate) #* errorMultiplier;
log.info("meterDBupdate: switch:%s meterID:%d newBytes[%d], sec[%d], "
"KB[%f], rate[%.2f]", meterdbEntry.get('dpid'),
int(meterdbEntry.get('meterID')), newBytes,
sec, KB, rxRate)
cursor.execute("""
update meterTable set rxBytes=:rxBytes,
timeStamp=:timeStamp, rxRate=:rxRate
where dpid=:dpid and meterID=:meterID""", meterdbEntry);
conn.commit();
#print "update done"
elif row is not None:
rxByte = (meterdbEntry.get('rxBytes'));
timeStamp = meterdbEntry.get('timeStamp');
if timeStamp == 0:
meterdbEntry['rxRate']= 0;
else:
meterdbEntry['rxRate']= (rxByte/(timeStamp *1024)) #bytes -> KB
log.info("meterDBupdate: switch:%s meterID:%d newBytes[%d], sec[%d], "
"rate[%.2f]", meterdbEntry.get('dpid'),
int(meterdbEntry.get('meterID')), rxByte,
timeStamp, meterdbEntry.get('rxRate'))
cursor.execute("""
update meterTable set rxBytes=:rxBytes,
timeStamp=:timeStamp, rxRate=:rxRate
where dpid=:dpid and meterID=:meterID""", meterdbEntry);
conn.commit();
else:
log.info("Nothing to update --- WRONG ---!!")
rc = False
except sql.Error as e:
log.info("An error occured %s ", e.args[0]);
rc = False
cursor.close();
conn.close();
return rc;
def main():
meterdbEntry= {
'dpid': '000000001', 'meterID':3, 'rxBytes':204034,
'timeStamp':10, 'rxRate':0
}
print meterdbEntry
createmeterdbConnection()
uniqID = insertmeterdbDict(meterdbEntry)
if uniqID is None:
print "insert failed"
meterdbEntry['rxBytes']= 2*204034
meterdbEntry['timeStamp']= 2*10
rc = updateMeterDict(meterdbEntry);
print rc;
dumpMeterTable();
if os.path.exists(meterdb):
os.remove(meterdb) #clear the sqllite file
'''
if __name__ == "__main__":
main()
'''
......@@ -2,9 +2,13 @@ import os
from os.path import isfile, getsize
import sqlite3 as sql
import logging
global db
db ='tapDataBase'
log = logging.getLogger()
log.setLevel(logging.INFO)
def createTapDBConnection():
if os.path.exists(db):
os.remove(db) #clear the sqllite file
......@@ -119,6 +123,25 @@ def getTapEntry(tapID):
conn.close();
return tapList;
def getMeterIDs(tapDBEntry):
log.debug("getMeterID query for tapTable")
conn = sql.connect(db);
cursor = conn.cursor();
cursor.execute("""
select tapID from tapTable where dpid=:dpid""",
tapDBEntry);
log.debug("getMeterIDs: select query output")
rc = []
for row in cursor.fetchall():
rc.append(int(row[0])+1) #meterID = tapID +1
cursor.close();
conn.close();
return rc;
def getTapEntryID(tapDBEntry):
print "getTapEntryID query for tapTable"
print tapDBEntry
......@@ -366,6 +389,27 @@ def insertFlowDBDict(flowEntry):
print "insertFlowDBDict: Insert successful"
return rc;
def getTapPortsForSwitch(dpidDict):
portList = []
conn = sql.connect(db)
cursor = conn.cursor()
try:
cursor.execute("""
select monitorPort from tapTable where dpid=:dpid
""",dpidDict);
for row in cursor.fetchall():
portList.append(row[0]);
except sql.error as e:
print "an error occured ",e.args[0];
log.debug("portlist : %s", str(portList))
cursor.close()
conn.close()
return portList;
def getAllTapMeters():
tapList = []
conn = sql.connect(db)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment