dynamic_rate_limiter.py 5.11 KB
Newer Older
1 2 3 4 5 6 7 8 9
from operator import attrgetter
from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller import dpset
from ryu.controller.handler import (MAIN_DISPATCHER, DEAD_DISPATCHER,
                                    CONFIG_DISPATCHER)
from ryu.controller.handler import set_ev_cls 
from ryu.lib.dpid import str_to_dpid,dpid_to_str
from ryu.lib import hub
10 11 12
import os
import sys
#sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
13 14
from ryu.lib import tapDB
from ryu.lib import bwDB
15
from ryu.lib import meterDB
16
from subprocess import Popen
17
import logging
18 19 20 21 22 23 24 25 26 27 28

#get all the tap points from the tap DB.
# for eac of the metering check the bandwidth associated with it.
# if there is not error seen in the ports increase the metering.

class dynamicRateLimiter(simple_switch_13.SimpleSwitch13):
    def __init__(self, *args, **kwargs): 
        super(dynamicRateLimiter,
                self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)
29 30
        log = logging.getLogger()
        log.setLevel(logging.INFO)
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55

    @set_ev_cls(ofp_event.EventOFPStateChange,
                        [MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.datapaths:
                self.logger.debug('register datapath: %016x', datapath.id)
                self.datapaths[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                self.logger.debug('unregister datapath: %016x', datapath.id)
                del self.datapaths[datapath.id]

    def _monitor(self):
        while True:
            tapList = tapDB.getAllTapMeters();
            for tap in tapList:
                print tap;
                self._reconfigure_metering(tap)
            hub.sleep(10)

    def _reconfigure_metering(self, tap):
        bwList = bwDB.getBWEntry(tap)
        for bw in bwList:
56
            self.logger.info('reconfiguring : %s', str(bw))
57
            dpid     = str_to_dpid(tap.get('dpid'))
58
            datapath = self.datapaths[dpid]
59

60 61 62
            bandwidth   = bw.get('bandwidth')
            usedbw      = bw.get('usedbw')
            pktError    = bw.get('pktError')
63 64
            ratelimit   = int(tap.get('ratelimit'))
            rateKBps = ratelimit;
65 66
            tapid       = int(tap.get('tapID'))
            meterid     = tapid +1
67

68 69
            self.logger.info('usedbw: %d', usedbw)

70
            max_rate_limit = (bandwidth / 2) #max limit = 50%
71
            increment = (bandwidth *(0.1))  # 10% increment [bandwidth is in KBps]
72

73 74 75 76 77 78 79 80
            meterdbEntry= {
                    'dpid':tap.get('dpid'),
                    'meterID': meterid,
                    'rxBytes': 0,
                    'rxRate': 0,
                    'timeStamp':0
                    };

81
            if (95 <= usedbw <= 100):
82 83 84 85 86
                self.logger.info("Reduce the rate limiting rapidly by dividing it by 2")
                new_limit = round(ratelimit/2 )
                new_limitKbps = new_limit * 8 #KBps -> Kbps
                self.logger.info("meterid[%d] Increase the rate[%d] for tunnel traffic",
                        meterid, new_limit)
87 88 89 90
                of  = datapath.ofproto
                ofp = datapath.ofproto_parser
                burst_size = 0
                bands = []
91
                bands.append(ofp.OFPMeterBandDrop(new_limitKbps, burst_size))
92
                meter_mod = ofp.OFPMeterMod(datapath, of.OFPMC_MODIFY, 
93
                        (of.OFPMF_KBPS), meterid, bands)
94
                datapath.send_msg(meter_mod)
95

96 97 98 99 100
                rateEntry = {
                        'ratelimit' : new_limit,
                        'tapID' : tapid
                        }
                tapDB.udpateRateLimit(rateEntry)
101
                #meterDB.resetMeterEntry(meterdbEntry);
102
                return ;
103 104 105 106
            else:
                self.logger.info("Increasing the rate limit additively..")
                if rateKBps == max_rate_limit:
                    self.logger.info("Already at maximum time ratelimit")
107
                    return ;
108 109

                new_limit = round(rateKBps + increment);
110 111
                if new_limit > max_rate_limit:
                    new_limit = max_rate_limit;
112
                new_limitKbps = new_limit * 8 #KBps -> Kbps
113 114
                self.logger.info("meterid[%d] Increase the rate[%d] for tunnel traffic",
                        meterid, new_limit)
115 116 117 118
                of  = datapath.ofproto
                ofp = datapath.ofproto_parser
                burst_size = 0
                bands = []
119
                bands.append(ofp.OFPMeterBandDrop(new_limitKbps, burst_size))
120 121
                meter_mod = ofp.OFPMeterMod(datapath, of.OFPMC_MODIFY, 
                        of.OFPMF_KBPS, meterid, bands)
122
                datapath.send_msg(meter_mod)
123

124 125 126 127 128
                rateEntry = {
                        'ratelimit' : new_limit,
                        'tapID' : tapid
                        }
                tapDB.udpateRateLimit(rateEntry)
129
                #meterDB.resetMeterEntry(meterdbEntry);