add meter band scale up changes; will increase the meter when the usage is...

add meter band scale up changes; will increase the meter when the usage is low; still error handling is pending
parent 3b694d43
......@@ -205,7 +205,7 @@ class TapPolicyController(ControllerBase):
tapDbEntry= {
'dpid': dpid_pd, 'monitorPort':tap_port, 'bandwidth':10,
'trafficType':'all', 'tunnelVlan':int(vlan_id),
'destinationVlan':int(vlan_id)
'destinationVlan':int(vlan_id), 'ratelimit':10000
}
......
from operator import attrgetter
from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller import dpset
from ryu.controller.handler import (MAIN_DISPATCHER, DEAD_DISPATCHER,
CONFIG_DISPATCHER)
from ryu.controller.handler import set_ev_cls
from ryu.lib.dpid import str_to_dpid,dpid_to_str
from ryu.lib import hub
from ryu.lib import tapDB
from ryu.lib import bwDB
#get all the tap points from the tap DB.
# for eac of the metering check the bandwidth associated with it.
# if there is not error seen in the ports increase the metering.
class dynamicRateLimiter(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
super(dynamicRateLimiter,
self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if not datapath.id in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]
def _monitor(self):
while True:
tapList = tapDB.getAllTapMeters();
for tap in tapList:
print tap;
self._reconfigure_metering(tap)
hub.sleep(10)
def _reconfigure_metering(self, tap):
bwList = bwDB.getBWEntry(tap)
for bw in bwList:
print bw
dpid = str_to_dpid(tap.get('dpid'))
datapath = self.datapaths[dpid]
'''
for dp in self.datapaths.values():
print dpid_str
print dpid_to_str(dp.id)
if dpid_str == dpid_to_str(dp.id):
print "Datapath found %d" % dp.id
datapath = self.datapaths[dp.id]
break;
'''
bandwidth = bw.get('bandwidth')
usedbw = bw.get('usedbw')
pktError = bw.get('pktError')
limit = tap.get('limit')
tapid = tap.get('tapID')
max_rate_limit = (bandwidth / 2) #max limit = 50%
increment = (bandwidth / 20 ) # 5% increment
if (pktError > 0):
print "Reduce the rate limiting "
return ;
if limit == max_rate_limit:
print "Already at maximum time limit"
return ;
if (usedbw < 50): #less than 50%
new_limit = limit + increment;
if new_limit > max_rate_limit:
new_limit = max_rate_limit;
of = datapath.ofproto
ofp = datapath.ofproto_parser
burst_size = 0
bands = []
bands.append(ofp.OFPMeterBandDrop(new_limit, burst_size))
meter_mod = ofp.OFPMeterMod(
datapath, of.OFPMC_MODIFY, of.OFPMF_KBPS, int(tapid), bands)
datapath.send_msg(meter_mod)
rateEntry = {
'ratelimit' : new_limit,
'tapID' : tapid
}
tapDB.udpateRateLimit(rateEntry)
......@@ -5,6 +5,7 @@ from ryu.controller import dpset
from ryu.controller.handler import (MAIN_DISPATCHER, DEAD_DISPATCHER,
CONFIG_DISPATCHER)
from ryu.controller.handler import set_ev_cls
from ryu.lib.dpid import dpid_to_str
from ryu.lib import hub
from ryu.lib import bwDB
......@@ -20,7 +21,8 @@ class SimpleMonitor(simple_switch_13.SimpleSwitch13):
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
bwDBEntry = {'dpid':datapath.id,
dp_str = dpid_to_str(datapath.id)
bwDBEntry = {'dpid':dp_str,
'usedbw':0,
'rxBytes':0,
'txBytes':0,
......@@ -63,21 +65,23 @@ class SimpleMonitor(simple_switch_13.SimpleSwitch13):
@set_ev_cls(ofp_event.EventOFPPortStatsReply,[MAIN_DISPATCHER])
def _port_stats_reply_handler(self, ev):
body = ev.msg.body
self.logger.info('datapath port '
self.logger.info('datapath port timestamp '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error '
'rx-drop tx-drop')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d %d %d',
ev.msg.datapath.id, stat.port_no,
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id, stat.port_no, stat.duration_sec,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors,
stat.rx_dropped, stat.tx_dropped)
dp_str = dpid_to_str(ev.msg.datapath.id)
bwDbEntry= {
'dpid': ev.msg.datapath.id,
'dpid': dp_str,
'port':stat.port_no,
'usedbw':0,
'rxBytes':stat.rx_bytes,
......
......@@ -70,20 +70,21 @@ def checkDuplicateEntry(bwDBEntry):
return rc;
def getBWEntry(bwDBEntry):
#print "getBWEntry query for flowTable"
#print tapID
print "getBWEntry query for flowTable"
#print bwDBEntry
conn = sql.connect(bwdb)
cursor = conn.cursor()
cursor.execute("""select uniqID,bandwidth, usedbw, rxBytes, txBytes,
pktErro, timeStamp from bwTable where dpid=:dpid and
pktError, timeStamp from bwTable where dpid=:dpid and
port=:port """,bwDBEntry);
bwList = []
for row in cursor.fetchall():
print row;
rowDict= { 'uniqID' : row[0],
#print row;
rowDict = {};
rowDict = { 'uniqID' : row[0],
'bandwidth':row[1],
'usedbw':row[2],
'rxBytes':row[3],
......@@ -91,7 +92,7 @@ def getBWEntry(bwDBEntry):
'pktError':row[5],
'timeStamp':row[6]
}
print rowDict
#print rowDict
bwList.append(rowDict)
cursor.close();
......@@ -120,7 +121,7 @@ def deleteBWDict(bwDBEntry):
#assuming that the entries are always legit to avoid too much
#sanity checks..
def insertBWDBDict(bwDBEntry):
#print "insertBWDBDict: print entry for bandwidth table insert query"
print "insertBWDBDict: print entry for bandwidth table insert query"
#print bwDBEntry
if checkDuplicateEntry(bwDBEntry) is True:
......@@ -160,7 +161,7 @@ def dumpBWTable():
conn.close();
def updateBWDict(bwDBEntry):
#print "updateBWDict: print entry for bandwidth table update query"
print "updateBWDict: print entry for bandwidth table update query"
#print bwDBEntry
conn = sql.connect(bwdb)
......
......@@ -22,6 +22,7 @@ def createTapDBConnection():
path VARCHAR(255),
trafficType VARCHAR(255),
tunnelVlan INTEGER,
ratelimit INTEGER,
destinationVlan INTEGER
);
""");
......@@ -204,8 +205,8 @@ def deleteTapEntry(tapID):
""", {'tapID':tapID});
conn.commit();
rc = True;
except sql.Error as e:
print "An error occured ",e.args[0];
except sql.error as e:
print "an error occured ",e.args[0];
cursor.close();
conn.close();
......@@ -228,9 +229,9 @@ def insertTapDBDict(tapDBEntry):
try:
cursor.execute("""
insert into tapTable( dpid, monitorPort, bandwidth, path,
trafficType, tunnelVlan, destinationVlan)
trafficType, tunnelVlan, destinationVlan, ratelimit)
values (:dpid, :monitorPort, :bandwidth, :path,
:trafficType, :tunnelVlan, :destinationVlan)
:trafficType, :tunnelVlan, :destinationVlan, :ratelimit)
""", tapDBEntry);
conn.commit();
rc = cursor.lastrowid
......@@ -240,29 +241,31 @@ def insertTapDBDict(tapDBEntry):
conn.close();
return rc;
'''
cursor.execute("""
select tapID from tapTable where dpid=:dpid and monitorPort=:monitorPort
and trafficType=:trafficType and tunnelVlan=:tunnelVlan and
destinationVlan=:destinationVlan""", tapDBEntry);
cursor.close();
conn.close();
return rc;
print "insertTapDBDict: all inserted records for select"
len = 0;
for row in cursor.fetchall():
print row
len = len + 1
rc = int(row[0])
#input : dict(tapID, rate)
def udpateRateLimit(rateEntry):
print "udpateRateLimit: update rate entry query"
print rateEntry
print "insertTapDBDict: length of select query:"
print len
if len != 1:
rc = -1
'''
conn = sql.connect(db)
cursor = conn.cursor()
rc = True;
try:
cursor.execute("""
update tapTable set ratelimit=:ratelimit
where tapID=:tapID """, rateEntry);
conn.commit();
print "rate update done"
except sql.Error as e:
print "An error occured ",e.args[0];
rc = False
cursor.close();
conn.close();
return rc;
def insertFlowDBDict(flowEntry):
print "insertFlowDBDict: print entry for flowtable insert query"
print flowEntry
......@@ -363,6 +366,33 @@ def insertFlowDBDict(flowEntry):
print "insertFlowDBDict: Insert successful"
return rc;
def getAllTapMeters():
tapList = []
conn = sql.connect(db)
cursor = conn.cursor()
try:
cursor.execute("""
select tapID, dpid, monitorPort, ratelimit
from tapTable""");
for row in cursor.fetchall():
#print row;
rowDict = {
'tapID':row[0],
'dpid':row[1],
'port':row[2],
'limit':row[3]
}
#print rowDict;
tapList.append(rowDict);
except sql.error as e:
print "an error occured ",e.args[0];
cursor.close()
conn.close()
return tapList;
def main():
tapDbEntry= {
'dpid': '000000001', 'monitorPort':3, 'bandwidth':10,
......
......@@ -3,5 +3,5 @@ WSAPI_HOST=$1
#./bin/ryu-manager --verbose --app-lists ryu/app/rest,ryu/app/rest_topology,ryu/topology/dumper,ryu/app/ofctl_rest,ryu/app/cnac_rest
#./bin/ryu-manager --verbose --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu.app.simple_switch_lacp
#./bin/ryu-manager --verbose --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu/app/rest,ryu/app/rest_topology,ryu/topology/dumper,ryu/app/ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu/app/simple_switch_13,ryu/app/simple_monitor
./bin/ryu-manager --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu/app/simple_switch_13,ryu/app/simple_monitor
./bin/ryu-manager --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu/app/simple_switch_13,ryu/app/simple_monitor,ryu/app/dynamic_rate_limiter
#./bin/ryu-manager --verbose --observe-links --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment