diff --git a/pyof/utils.py b/pyof/utils.py new file mode 100644 index 000000000..a71d8945d --- /dev/null +++ b/pyof/utils.py @@ -0,0 +1,37 @@ +"""General Unpack utils for python-openflow.""" +import pyof.v0x01.common.header +import pyof.v0x01.common.utils +import pyof.v0x04.common.header +import pyof.v0x04.common.utils +from pyof.foundation.exceptions import UnpackException + +pyof_version_libs = {0x01: pyof.v0x01, + 0x04: pyof.v0x04} + + +def unpack(packet): + """Unpack the OpenFlow Packet and returns a message.""" + try: + version = packet[0] + except IndexError: + raise UnpackException('null packet') + + try: + pyof_lib = pyof_version_libs[version] + except KeyError: + raise UnpackException('Version not supported') + + try: + header = pyof_lib.common.header.Header() + header.unpack(packet[:8]) + message = pyof_lib.common.utils.new_message_from_header(header) + binary_data = packet[8:] + if binary_data: + if len(binary_data) == header.length - 8: + message.unpack(binary_data) + else: + raise UnpackException( + 'packet size does not match packet length field') + return message + except (UnpackException, ValueError) as e: + raise UnpackException(e) diff --git a/tests/test_struct.py b/tests/test_struct.py index 057b21033..cee396d7f 100644 --- a/tests/test_struct.py +++ b/tests/test_struct.py @@ -1,6 +1,7 @@ """Automate struct tests.""" import unittest +from pyof.utils import unpack from tests.raw_dump import RawDump @@ -161,3 +162,146 @@ def test_raw_dump_size(self): self.assertEqual(obj.get_size(), unpacked.get_size()) except FileNotFoundError: raise self.skipTest('No raw dump file found.') + + +class TestStructDump(unittest.TestCase): + """Run message pack, unpack and get_size tests using bytes struct dump. + + Test the lib with raw dumps. We assume the raw files are valid according + to the OF specs to check whether our pack, unpack and get_size + implementations are correct. + + Also, check the minimum size of the struct by instantiating an object with + no parameters. + + To run the tests, just extends this class and set the 'dump' and 'obj' + attributes. You can also optionally set the 'min_size' attribute. + + Example: + .. code-block:: python3 + + class TestMatch(TestStructDump): + dump = b'' # needs to be filled + obj = pyof.v0x01.common.flow_match.Match(xid=0) + min_size = 8 + """ + + dump = b'' + obj = None + min_size = 0 + + def __init__(self, *args, **kwargs): + """Constructor to avoid this base class being executed as a test.""" + if self.__class__ == TestStructDump: + self.run = self.run = lambda self, *args, **kwargs: None + super().__init__(*args, **kwargs) + + def setUp(self): + """Setup the instance before testing.""" + self._msg_cls = type(self.obj) + self._unpacked_dump = self._unpack_dump() + super().setUp() + + def _unpack_dump(self): + obj = self._msg_cls() + obj.unpack(self.dump) + return obj + + def test_pack(self): + """Check whether packed objects equals to dump file.""" + self.assertEqual(self.dump, self.obj.pack()) + + def test_unpack(self): + """Check whether the unpacked dump equals to expected object.""" + self.assertEqual(self._unpacked_dump, self.obj) + + def test_get_size(self): + """Check if get_size method return the correct size.""" + self.assertEqual(self.obj.get_size(), len(self.dump)) + + def test_minimum_size(self): + """Test struct minimum size.""" + if not self.min_size: + raise self.skipTest('skipped, no minimum size set.') + obj = self._msg_cls() + self.assertEqual(obj.get_size(), self.min_size) + + def test_raw_dump_size(self): + """Check whether the unpacked dump has the expected size.""" + self.assertEqual(self.obj.get_size(), self._unpacked_dump.get_size()) + + +class TestMsgDump(TestStructDump): + r"""Run message pack, unpack and get_size tests using bytes message dump. + + Test the lib with raw dumps. We assume the raw files are valid according + to the OF specs to check whether our pack, unpack and get_size + implementations are correct. + + Also, check the minimum size of the struct by instantiating an object with + no parameters. + + To run the tests, just extends this class and set the 'dump' and 'obj' + attributes. You can also optionally set the 'min_size' attribute. + + Example: + .. code-block:: python3 + + class TestHello(TestMsgDump): + dump = b'\x01\x00\x00\x08\x00\x00\x00\x00' + obj = pyof.v0x01.symmetric.hello.Hello(xid=0) + min_size = 8 + """ + + def __init__(self, *args, **kwargs): + """Constructor to avoid this base class beeing executed as a test.""" + if self.__class__ == TestMsgDump: + self.run = self.run = lambda self, *args, **kwargs: None + super().__init__(*args, **kwargs) + + def _unpack_dump(self): + return unpack(self.dump) + + +class TestMsgDumpFile(TestMsgDump): + """Run message pack, unpack and get_size tests using message in a dumpfile. + + Test the lib with raw dumps. We assume the raw files are valid according + to the OF specs to check whether our pack, unpack and get_size + implementations are correct. + + Also, check the minimum size of the message by instantiating an object with + no parameters. + + To run the tests, just extends this class and set the 'dumpfile' and 'obj' + attributes. You can also optionally set the 'min_size' attribute. + + Example: + .. code-block:: python3 + + class TestHelloFileDump(TestMsgDumpFile): + dumpfile = 'v0x01/ofpt_hello.dat' + obj = pyof.v0x01.symmetric.hello.Hello(xid=1) + + """ + + dumpfile = None + + def __init__(self, *args, **kwargs): + """Constructor to avoid this base class beeing executed as a test.""" + if self.__class__ == TestMsgDumpFile: + self.run = self.run = lambda self, *args, **kwargs: None + super().__init__(*args, **kwargs) + + def setUp(self): + """Setup the instance before testing.""" + self._read_dump_file() + super().setUp() + + def _read_dump_file(self): + dumpfile = f'raw/{self.dumpfile}' + try: + with open(dumpfile, 'rb') as fd: + self.dump = fd.read() + except FileNotFoundError: + raise self.skipTest(f'No raw dump file found: {dumpfile}')