Commit eb726bfe authored by Peter V. Saveliev's avatar Peter V. Saveliev Committed by GitHub

Merge pull request #282 from ffourcot/ipset-options

ipset: add some options
parents 25f88989 57f80f8f
......@@ -25,6 +25,10 @@ from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_LIST
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_ADD
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_DEL
from pyroute2.netlink.nfnetlink.ipset import ipset_msg
from pyroute2.netlink.nfnetlink.ipset import IPSET_FLAG_WITH_COUNTERS
from pyroute2.netlink.nfnetlink.ipset import IPSET_FLAG_WITH_COMMENT
from pyroute2.netlink.nfnetlink.ipset import IPSET_FLAG_WITH_FORCEADD
from pyroute2.netlink.nfnetlink.ipset import IPSET_DEFAULT_MAXELEM
def _nlmsg_error(msg):
......@@ -41,7 +45,7 @@ class IPSet(NetlinkSocket):
policy = {IPSET_CMD_PROTOCOL: ipset_msg,
IPSET_CMD_LIST: ipset_msg}
def __init__(self, version=6, attr_revision=0, nfgen_family=2):
def __init__(self, version=6, attr_revision=3, nfgen_family=2):
super(IPSet, self).__init__(family=NETLINK_NETFILTER)
policy = dict([(x | (NFNL_SUBSYS_IPSET << 8), y)
for (x, y) in self.policy.items()])
......@@ -84,27 +88,43 @@ class IPSet(NetlinkSocket):
terminate=_nlmsg_error)
def create(self, name, stype='hash:ip', family=socket.AF_INET,
exclusive=True):
exclusive=True, counters=False, comment=False,
maxelem=IPSET_DEFAULT_MAXELEM, forceadd=False,
hashsize=None):
'''
Create an ipset `name` of type `stype`, by default
`hash:ip`.
Very simple and stupid method, should be extended
to support ipset options.
to support more ipset options.
'''
excl_flag = NLM_F_EXCL if exclusive else 0
msg = ipset_msg()
cadt_flags = 0
if counters:
cadt_flags |= IPSET_FLAG_WITH_COUNTERS
if comment:
cadt_flags |= IPSET_FLAG_WITH_COMMENT
if forceadd:
cadt_flags |= IPSET_FLAG_WITH_FORCEADD
data = {"attrs": [["IPSET_ATTR_CADT_FLAGS", cadt_flags],
["IPSET_ATTR_MAXELEM", maxelem]]}
if hashsize is not None:
data['attrs'] += [["IPSET_ATTR_HASHSIZE", hashsize]]
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
['IPSET_ATTR_SETNAME', name],
['IPSET_ATTR_TYPENAME', stype],
['IPSET_ATTR_FAMILY', family],
['IPSET_ATTR_REVISION', self._attr_revision]]
['IPSET_ATTR_REVISION', self._attr_revision],
["IPSET_ATTR_DATA", data]]
return self.request(msg, IPSET_CMD_CREATE,
msg_flags=NLM_F_REQUEST | NLM_F_ACK | excl_flag,
terminate=_nlmsg_error)
def _add_delete(self, name, entry, family, cmd, exclusive):
def _add_delete(self, name, entry, family, cmd, exclusive, comment=None):
if family == socket.AF_INET:
entry_type = 'IPSET_ATTR_IPADDR_IPV4'
elif family == socket.AF_INET6:
......@@ -113,21 +133,26 @@ class IPSet(NetlinkSocket):
raise TypeError('unknown family')
excl_flag = NLM_F_EXCL if exclusive else 0
data_attrs = [['IPSET_ATTR_IP', {'attrs': [[entry_type, entry]]}]]
if comment is not None:
data_attrs += [["IPSET_ATTR_COMMENT", comment],
["IPSET_ATTR_CADT_LINENO", 0]]
msg = ipset_msg()
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
['IPSET_ATTR_SETNAME', name],
['IPSET_ATTR_DATA',
{'attrs': [['IPSET_ATTR_IP',
{'attrs': [[entry_type, entry]]}]]}]]
['IPSET_ATTR_DATA', {'attrs': data_attrs}]]
return self.request(msg, cmd,
msg_flags=NLM_F_REQUEST | NLM_F_ACK | excl_flag,
terminate=_nlmsg_error)
def add(self, name, entry, family=socket.AF_INET, exclusive=True):
def add(self, name, entry, family=socket.AF_INET, exclusive=True,
comment=None):
'''
Add a member to the ipset
'''
return self._add_delete(name, entry, family, IPSET_CMD_ADD, exclusive)
return self._add_delete(name, entry, family, IPSET_CMD_ADD, exclusive,
comment=comment)
def delete(self, name, entry, family=socket.AF_INET, exclusive=True):
'''
......
......@@ -5,6 +5,7 @@ from pyroute2.netlink.nfnetlink import nfgen_msg
IPSET_MAXNAMELEN = 32
IPSET_DEFAULT_MAXELEM = 65536
IPSET_CMD_NONE = 0
IPSET_CMD_PROTOCOL = 1 # Return protocol version
......@@ -22,6 +23,12 @@ IPSET_CMD_HEADER = 12 # Get set header data only
IPSET_CMD_TYPE = 13 # 13: Get set type
IPSET_FLAG_WITH_COUNTERS = 1 << 3
IPSET_FLAG_WITH_COMMENT = 1 << 4
IPSET_FLAG_WITH_FORCEADD = 1 << 5
IPSET_FLAG_WITH_SKBINFO = 1 << 6
class ipset_msg(nfgen_msg):
'''
Since the support just begins to be developed,
......@@ -61,20 +68,23 @@ class ipset_msg(nfgen_msg):
(5, 'IPSET_ATTR_PORT_TO', 'hex'),
(6, 'IPSET_ATTR_TIMEOUT', 'hex'),
(7, 'IPSET_ATTR_PROTO', 'recursive'),
(8, 'IPSET_ATTR_CADT_FLAGS', 'hex'),
(8, 'IPSET_ATTR_CADT_FLAGS', 'be32', NLA_F_NET_BYTEORDER),
(9, 'IPSET_ATTR_CADT_LINENO', 'be32'),
(10, 'IPSET_ATTR_MARK', 'hex'),
(11, 'IPSET_ATTR_MARKMASK', 'hex'),
(17, 'IPSET_ATTR_GC', 'hex'),
(18, 'IPSET_ATTR_HASHSIZE', 'be32'),
(19, 'IPSET_ATTR_MAXELEM', 'be32'),
(18, 'IPSET_ATTR_HASHSIZE', 'be32', NLA_F_NET_BYTEORDER),
(19, 'IPSET_ATTR_MAXELEM', 'be32', NLA_F_NET_BYTEORDER),
(20, 'IPSET_ATTR_NETMASK', 'hex'),
(21, 'IPSET_ATTR_PROBES', 'hex'),
(22, 'IPSET_ATTR_RESIZE', 'hex'),
(23, 'IPSET_ATTR_SIZE', 'hex'),
(24, 'IPSET_ATTR_ELEMENTS', 'hex'),
(25, 'IPSET_ATTR_REFERENCES', 'be32'),
(26, 'IPSET_ATTR_MEMSIZE', 'be32'))
(24, 'IPSET_ATTR_BYTES', 'be64'),
(25, 'IPSET_ATTR_PACKETS', 'be64'),
(26, 'IPSET_ATTR_COMMENT', 'asciiz'),
(27, 'IPSET_ATTR_SKBMARK', 'hex'),
(28, 'IPSET_ATTR_SKBPRIO', 'be32'),
(29, 'IPSET_ATTR_SKBQUEUE', 'hex'))
class adt_data(ipset_generic):
nla_flags = NLA_F_NESTED
......@@ -88,7 +98,7 @@ class ipset_msg(nfgen_msg):
(5, 'IPSET_ATTR_PORT_TO', 'hex'),
(6, 'IPSET_ATTR_TIMEOUT', 'hex'),
(7, 'IPSET_ATTR_PROTO', 'recursive'),
(8, 'IPSET_ATTR_CADT_FLAGS', 'hex'),
(8, 'IPSET_ATTR_CADT_FLAGS', 'be32', NLA_F_NET_BYTEORDER),
(9, 'IPSET_ATTR_CADT_LINENO', 'be32'),
(10, 'IPSET_ATTR_MARK', 'hex'),
(11, 'IPSET_ATTR_MARKMASK', 'hex'),
......@@ -101,7 +111,7 @@ class ipset_msg(nfgen_msg):
(23, 'IPSET_ATTR_IFACE', 'hex'),
(24, 'IPSET_ATTR_BYTES', 'be64'),
(25, 'IPSET_ATTR_PACKETS', 'be64'),
(27, 'IPSET_ATTR_COMMENT', 'hex'),
(26, 'IPSET_ATTR_COMMENT', 'asciiz'),
(27, 'IPSET_ATTR_SKBMARK', 'hex'),
(28, 'IPSET_ATTR_SKBPRIO', 'be32'),
(29, 'IPSET_ATTR_SKBQUEUE', 'hex'))
......@@ -15,14 +15,20 @@ class TestIPSet(object):
def list_ipset(self, name):
try:
return [x.get_attr('IPSET_ATTR_IP_FROM').
get_attr('IPSET_ATTR_IPADDR_IPV4')
for x in
self.ip.list(name)[0].
get_attr('IPSET_ATTR_ADT').
get_attrs('IPSET_ATTR_PROTO')]
res = {}
msg_list = self.ip.list(name)
adt = 'IPSET_ATTR_ADT'
proto = 'IPSET_ATTR_PROTO'
ipaddr = 'IPSET_ATTR_IPADDR_IPV4'
for msg in msg_list:
for x in msg.get_attr(adt).get_attrs(proto):
ip = x.get_attr('IPSET_ATTR_IP_FROM').get_attr(ipaddr)
res[ip] = (x.get_attr("IPSET_ATTR_PACKETS"),
x.get_attr("IPSET_ATTR_BYTES"),
x.get_attr("IPSET_ATTR_COMMENT"))
return res
except:
return []
return {}
def get_ipset(self, name):
return [x for x in self.ip.list()
......@@ -132,3 +138,85 @@ class TestIPSet(object):
self.ip.destroy(name_b)
assert not self.get_ipset(name_a)
assert not self.get_ipset(name_b)
def test_counters(self):
require_user('root')
name = str(uuid4())[:16]
ipaddr = '172.16.202.202'
self.ip.create(name, counters=True)
self.ip.add(name, ipaddr)
assert ipaddr in self.list_ipset(name)
assert self.list_ipset(name)[ipaddr][0] == 0 # Bytes
assert self.list_ipset(name)[ipaddr][1] == 0 # Packets
self.ip.destroy(name)
self.ip.create(name, counters=False)
self.ip.add(name, ipaddr)
assert ipaddr in self.list_ipset(name)
assert self.list_ipset(name)[ipaddr][0] is None
assert self.list_ipset(name)[ipaddr][1] is None
self.ip.destroy(name)
def test_comments(self):
require_user('root')
name = str(uuid4())[:16]
ipaddr = '172.16.202.202'
comment = 'a very simple comment'
self.ip.create(name, comment=True)
self.ip.add(name, ipaddr, comment=comment)
assert ipaddr in self.list_ipset(name)
assert self.list_ipset(name)[ipaddr][2] == comment
self.ip.destroy(name)
def test_maxelem(self):
require_user('root')
name = str(uuid4())[:16]
self.ip.create(name, maxelem=1)
data = self.get_ipset(name)[0].get_attr("IPSET_ATTR_DATA")
maxelem = data.get_attr("IPSET_ATTR_MAXELEM")
self.ip.destroy(name)
assert maxelem == 1
def test_hashsize(self):
require_user('root')
name = str(uuid4())[:16]
min_size = 64
self.ip.create(name, hashsize=min_size)
data = self.get_ipset(name)[0].get_attr("IPSET_ATTR_DATA")
hashsize = data.get_attr("IPSET_ATTR_HASHSIZE")
self.ip.destroy(name)
assert hashsize == min_size
def test_forceadd(self):
require_user('root')
name = str(uuid4())[:16]
# The forceadd option only works when the entry that we try to add
# share the same hash in the kernel hashtable than an entry already in
# the IPSet. That is not so easy to test: we must create collisions
# We achieve this goal with a very short hashsize (the kernel minimum)
# and many entries.
maxelem = 16384
ipaddr = "172.16.202.202"
self.ip.create(name, maxelem=maxelem, hashsize=64)
def fill_to_max_entries(name):
ip_start = "10.10.%d.%d"
for i in range(0, maxelem):
self.ip.add(name, ip_start % (i / 255, i % 255))
fill_to_max_entries(name)
try:
self.ip.add(name, ipaddr)
assert False
except NetlinkError:
pass
finally:
assert ipaddr not in self.list_ipset(name)
self.ip.destroy(name)
self.ip.create(name, hashsize=64, maxelem=maxelem, forceadd=True)
fill_to_max_entries(name)
self.ip.add(name, ipaddr)
assert ipaddr in self.list_ipset(name)
self.ip.destroy(name)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment