Commits

Can Xue committed d91cd7d

0.1.2 add ipv4addr.py and tests

  • Participants
  • Parent commits 315410e

Comments (0)

Files changed (4)

 
 Kahgean is a set of handy utils for daily development.
 
-The ``kahgean.options`` module makes it easy to parse command-line
+``kahgean.options`` makes it easy to parse command-line
 arguments and configuration file by just one set of defines.
 
-The ``kahgean.logkit`` make configuring log handlers become easy.
+``kahgean.logkit`` makes configuring log handlers become easy.
 
-The ``kahgean.daemonize`` can help developers to write daemon programs.
+``kahgean.daemonize`` can help developers to write daemon programs.
+
+``kahgean.initial`` provides some functions that should be invoked
+as soon as possible before doing any meaningful processing.
+
+``kahgean.ipv4addr`` provides handy classes for describing IPv4
+addresses and/or subnetworks.

File kahgean/ipv4addr.py

+# Copyright (C) 2012 Xue Can <xuecan@gmail.com> and contributors.
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license
+
+from collections import Sequence
+
+
+class Address(long):
+    
+    def __new__(cls, address):
+        if isinstance(address, basestring):
+            value = Address._init_from_seq(address.strip().split('.'))
+        elif isinstance(address, Sequence):
+            value = Address._init_from_seq(address)
+        elif type(address) in [int, long]:
+            value = Address._init_from_int(address)
+        else:
+            raise ValueError('unsupported address type')
+        return super(Address, cls).__new__(cls, value)
+    
+    def __init__(self, address):
+        value = long(self)
+        v0 = value/2**24
+        value = value-v0*2**24
+        v1 = value/2**16
+        value = value-v1*2**16
+        v2 = value/2**8
+        v3 = value-v2*2**8
+        self._tuple_value = (v0, v1, v2, v3)
+            
+    @staticmethod
+    def _init_from_int(value):
+        if value<0 or value>2**32-1:
+            raise ValueError('value should be in [0, 2**32-1]')
+        return value
+    
+    @staticmethod
+    def _init_from_seq(seq):
+        if len(seq)!=4:
+            raise ValueError('length of seq should be 4')
+        v = [int(i) for i in seq]
+        for i in v:
+            if i<0 or i>2**8-1:
+                raise ValueError('each piece in a IPv4 address should be in '
+                                 '[0, 255]')
+        return v[0]*2**24 + v[1]*2**16 + v[2]*2**8 + v[3]
+    
+    @property
+    def tuple_value(self):
+        return self._tuple_value
+    
+    def __str__(self):
+        return '%d.%d.%d.%d' % self.tuple_value
+    
+    def __repr__(self):
+        return 'Address <%s>' % self.__str__()
+
+
+# Address(MASKS[24]) == Address('255.255.255.0')
+MASKS = [long((2**i-1)<<(32-i)) for i in range(0,33)]
+COMPS = [long((2**32-1)-i) for i in MASKS]
+
+
+class Network(Sequence):
+    """
+
+    Network('192.168.1.0/24')
+    Network('192.168.1.0', '24')
+    Network('192.168.1.0', '255.255.255.0')
+    Network('192.168.1.0', '192.168.1.255')
+    """
+    
+    def __init__(self, address, extra=None):
+        if not extra:
+            # '192.168.1.0/24' => '192.168.1.0', '24'
+            address, extra = address.split('/')
+        address = Address(address)
+        try:
+            prefix_size = int(extra)
+            if prefix_size<0 or prefix_size>32:
+                raise ValueError
+        except ValueError:
+            prefix_size = -1
+        if prefix_size >= 0:
+            # extra is a prefix size
+            mask = MASKS[prefix_size]
+        else:
+            extra = Address(extra)
+            if extra in MASKS:
+                # extra is a subnetwork mask 
+                mask = long(extra)
+            elif extra-address in COMPS:
+                # extra is a broadcast address
+                mask = MASKS[COMPS.index(extra-address)]
+            else:
+                mask = -1
+        if mask==-1:
+            raise ValueError('subnetwork mask not found from <%s/%s>' % (
+                str(address), str(extra)))
+        index = MASKS.index(mask)
+        comp = COMPS[index]
+        if address & comp:
+            raise ValueError('<%s/%s> is not a valid network prefix' % (
+                str(address), str(extra)))
+        self._network = address # network prefix
+        self._size = index      # prefix size, e.g. 24 means a /24 subnetwork
+        self._length = comp+1   # length of the network, for example there
+                                #    are 256 addresses in a /24 subnetwork
+        self._broadcast = Address(address+comp) # broadcast address
+        self._mask = Address(mask)              # subnetwork mask
+
+    @property
+    def network(self):
+        return self._network
+
+    @property
+    def broadcast(self):
+        return self._broadcast
+
+    @property
+    def size(self):
+        return self._size
+
+    @property
+    def mask(self):
+        return self._mask
+    
+    def __contains__(self, value):
+        if type(value) in [int, long, Address]:
+            return value>=self._network and value<=self._broadcast
+        elif type(value) in [Network]:
+            return self.network<=value.network \
+                   and self.broadcast>=value.broadcast
+    
+    def __len__(self):
+        return self._length
+
+    def __getitem__(self, index):
+        index = long(index)
+        if index<0 and index>=self._length:
+            raise IndexError('index out of range')
+        return Address(self._network+index)
+
+    def __iter__(self):
+        i = 0
+        while i<self._length:
+            yield self[i]
+            i+=1
+
+    def index(self, value):
+        if Address(value) in self:
+            return value-self._network
+        raise ValueError('%s not in this network' % str(value))
+
+    def count(self, value):
+        return 1 if Address(value) in self else 0
+
+    def __reversed__(self):
+        i = self._length
+        while i>0:
+            i -=1
+            yield self[i]
+    
+    def __eq__(self, other):
+        return self.network==other.network and self.mask==other.mask
+    
+    def __ne__(self, other):
+        return not self==other
+    
+    def __str__(self):
+        return '%s/%d' % (str(self._network), self.size)
+    
+    def __repr__(self):
+        return 'Network <%s>' % self.__str__()
 
 from distutils.core import setup
 
-__version__ = "0.1.1"
+__version__ = "0.1.2"
 
 with open('README.rst', 'r') as f:
     long_description = f.read()

File test/test_ipv4addr.py

+# Copyright (C) 2012 Xue Can <xuecan@gmail.com> and contributors.
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license
+
+from kahgean.ipv4addr import Address, Network, MASKS
+from nose.tools import *
+
+def test_address():
+    a1 = Address('192.168.1.1')
+    eq_(a1, 3232235777)
+    a2 = Address(a1+1)
+    eq_(str(a2), '192.168.1.2')
+
+
+def test_network():
+    n1 = Network('192.168.1.0/24')
+    n2 = Network('192.168.1.0', 24)
+    n3 = Network('192.168.1.0', '255.255.255.0')
+    n4 = Network('192.168.1.0', '192.168.1.255')
+    eq_(n1, n2)
+    eq_(n1, n3)
+    eq_(n1, n4)
+    a1 = Address('192.168.1.1')
+    ok_(a1 in n1)
+    a2 = Address('192.168.0.1')
+    ok_(a2 not in n1)
+
+
+def test_masks():
+    for i in range(33):
+        base2 = '1'*i+'0'*(32-i)
+        ok_(int(base2, base=2) in MASKS)
+
+
+@raises(ValueError)
+def test_invalid_addr():
+    Address('192.168.1.256')
+
+
+@raises(ValueError)
+def test_invalid_net1():
+    Network('192.168.1.0')
+
+
+@raises(ValueError)
+def test_invalid_net2():
+    Network('192.168.1.0/33')
+
+
+@raises(ValueError)
+def test_invalid_net3():
+    Network('192.168.1.0', '192.168.1.128')
+
+
+@raises(ValueError)
+def test_invalid_net3():
+    Network('192.168.1.1', '192.168.1.128')