Integrated the BW moniotoring modules with DB to track the usage.

Flow monitoring is still pending.
parent 40b30562
tapDataBase
bwDataBase
tags
build/*
cscope*
......
from operator import attrgetter
from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller import dpset
from ryu.controller.handler import (MAIN_DISPATCHER, DEAD_DISPATCHER,
CONFIG_DISPATCHER)
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub
from ryu.lib import bwDB
class SimpleMonitor(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
......@@ -11,18 +14,27 @@ class SimpleMonitor(simple_switch_13.SimpleSwitch13):
self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)
bwDB.createBWDBConnection();
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
bwDBEntry = {'dpid':datapath.id}
if ev.state == MAIN_DISPATCHER:
if not datapath.id in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
for port,desc in datapath.ports.items():
bwDBEntry['port']=port
bwDBEntry['bandwidth']=desc.curr_speed
bwDB.insertBWDBDict(bwDBEntry);
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
for port in datapath.ports:
bwDBEntry['port']=port
bwDB.deleteBWDict(bwDBEntry);
del self.datapaths[datapath.id]
def _monitor(self):
......@@ -47,12 +59,24 @@ class SimpleMonitor(simple_switch_13.SimpleSwitch13):
body = ev.msg.body
self.logger.info('datapath port '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error')
'tx-pkts tx-bytes tx-error '
'rx-drop tx-drop')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d %d %d',
ev.msg.datapath.id, stat.port_no,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors)
stat.tx_packets, stat.tx_bytes, stat.tx_errors,
stat.rx_dropped, stat.tx_dropped)
bwDbEntry= {
'dpid': ev.msg.datapath.id,
'port':stat.port_no,
'usedbw':0,
'rxPkts':stat.rx_packets,
'txPkts':stat.tx_packets,
'pktError':stat.tx_dropped,
'timeStamp':stat.duration_nsec
};
bwDB.updateBWDict(bwDbEntry);
import os
from os.path import isfile, getsize
import sqlite3 as sql
#single write and single read so no locks are implemented.
global bwdb
bwdb ='bwDataBase'
def createBWDBConnection():
if os.path.exists(bwdb):
os.remove(bwdb) #clear the sqllite file
conn = sql.connect(bwdb)
cursor = conn.cursor()
print "createBWDBConnection: Create bwdb trace.."
#cursor.execute(""" PRAGMA foreign_keys=ON;""");
cursor.execute("""
create table if not exists bwTable(
uniqID INTEGER PRIMARY KEY AUTOINCREMENT,
dpid VARCHAR(255),
port INTEGER,
bandwidth INTEGER,
usedbw INTEGER,
rxPkts BIGINT,
txPkts BIGINT,
pktError BIGINT,
timeStamp DOUBLE PRECISION
);
""");
print "createBWDBConnection: bwdb Creation Done.. "
print "createBWDBConnection: Clear all table entries.. "
conn.execute("""delete from bwTable;""");
conn.commit()
cursor.close()
conn.close()
return True
#check if there is already a tap entry for the same monitor inputs
#primary condition are the dpid and port
#if there exists an entry check for traffic type whether to split
#also we need to consider the match whether the pipeline to be altered or
#extended [for now extension alone is to be done] no need to do complex
#stuffs
def checkDuplicateEntry(bwDBEntry):
#print "checkDuplicateEntry: print entry for checkDuplicate query"
#print bwDBEntry
rc = False; #no duplicates found
conn = sql.connect(bwdb)
cursor = conn.cursor()
cursor.execute("""select * from bwTable where dpid=:dpid and
port=:port """,bwDBEntry);
#print "checkDuplicateEntry: select query output"
len = 0
for row in cursor.fetchall():
print row;
len = len + 1
if len != 0:
print "checkDuplicateEntry: Duplicates found.."
rc = True; #duplicates found
cursor.close()
conn.close()
return rc;
def getBWEntry(bwDBEntry):
print "getBWEntry query for flowTable"
print tapID
conn = sql.connect(bwdb)
cursor = conn.cursor()
cursor.execute("""select uniqID,bandwidth, usedbw, rxPkts, txPkts,
pktErro, timeStamp from bwTable where dpid=:dpid and
port=:port """,bwDBEntry);
bwList = []
for row in cursor.fetchall():
print row;
rowDict= { 'uniqID' : row[0],
'bandwidth':row[1],
'usedbw':row[2],
'rxPkts':row[3],
'txPkts':row[4],
'pktError':row[5],
'timeStamp':row[6]
}
print rowDict
bwList.append(rowDict)
cursor.close();
conn.close();
return bwList;
def deleteBWDict(bwDBEntry):
print "deleteBWDict: print delete tap entry"
conn = sql.connect(bwdb)
cursor = conn.cursor()
rc = False;
try:
cursor.execute("""delete from bwTable where dpid=:dpid and
port=:port """,bwDBEntry);
conn.commit();
rc = True;
except sql.Error as e:
print "An error occured ",e.args[0];
cursor.close();
conn.close();
return rc;
#assuming that the entries are always legit to avoid too much
#sanity checks..
def insertBWDBDict(bwDBEntry):
print "insertBWDBDict: print entry for bandwidth table insert query"
print bwDBEntry
if checkDuplicateEntry(bwDBEntry) is True:
print "insertBWDBDict insertion failed[duplicate error]"
return None;
conn = sql.connect(bwdb)
cursor = conn.cursor()
rc = None;
try:
cursor.execute("""
insert into bwTable( dpid, port, bandwidth)
values (:dpid, :port, :bandwidth)
""", bwDBEntry);
conn.commit();
rc = cursor.lastrowid
except sql.Error as e:
print "An error occured ",e.args[0];
cursor.close();
conn.close();
return rc;
cursor.close();
conn.close();
return rc;
def dumpBWTable():
conn = sql.connect(bwdb)
cursor = conn.cursor()
cursor.execute("""select * from bwTable""");
for row in cursor.fetchall():
print row;
cursor.close();
conn.close();
def updateBWDict(bwDBEntry):
print "updateBWDict: print entry for bandwidth table update query"
print bwDBEntry
conn = sql.connect(bwdb)
cursor = conn.cursor()
rc = None;
try:
cursor.execute("""
update bwTable set usedbw=:usedbw,rxPkts=:rxPkts,
txPkts=:txPkts, pktError=:pktError, timeStamp=:timeStamp
where dpid=:dpid and port=:port""", bwDBEntry);
conn.commit();
except sql.Error as e:
print "An error occured ",e.args[0];
cursor.close();
conn.close();
return False;
return True;
def main():
bwDbEntry= {
'dpid': '000000001', 'port':3, 'bandwidth':10,
'usedbw':10, 'rxPkts':204034, 'txPkts':432432,
'pktError':5, 'timeStamp':12.4
}
#print bwDbEntry
#print flowDbEntry
createBWDBConnection()
uniqID = insertBWDBDict(bwDbEntry)
if uniqID is None:
print "insert failed"
rc = updateBWDict(bwDbEntry);
print rc;
dumpBWTable();
if os.path.exists(bwdb):
os.remove(bwdb) #clear the sqllite file
'''
if __name__ == "__main__":
main()
'''
......@@ -6,7 +6,8 @@ import sqlite3 as sql
global db
db ='tapDataBase'
def createTapDBConnection():
os.remove(db) #clear the sqllite file
if os.path.exists(db):
os.remove(db) #clear the sqllite file
conn = sql.connect(db)
cursor = conn.cursor()
print "createTapDBConnection: Create DB trace.."
......
......@@ -2,6 +2,6 @@
WSAPI_HOST=$1
#./bin/ryu-manager --verbose --app-lists ryu/app/rest,ryu/app/rest_topology,ryu/topology/dumper,ryu/app/ofctl_rest,ryu/app/cnac_rest
#./bin/ryu-manager --verbose --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu.app.simple_switch_lacp
#./bin/ryu-manager --verbose --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu/app/rest,ryu/app/rest_topology,ryu/topology/dumper,ryu/app/ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu/app/simple_switch_13
#./bin/ryu-manager --verbose --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu/app/rest,ryu/app/rest_topology,ryu/topology/dumper,ryu/app/ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu/app/simple_switch_13,ryu/app/simple_monitor
./bin/ryu-manager --observe-links --wsapi-host=$WSAPI_HOST --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology,ryu/app/simple_switch_13,ryu/app/simple_monitor
#./bin/ryu-manager --verbose --observe-links --app-lists ryu.topology.switches,ryu.app.rest,ryu.app.rest_topology,ryu.app.ofctl_rest,ryu/app/cnac_rest,ryu/app/rest_net_topology
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment