Source

txMysql / txmysql / test / test_packet.py

Full commit
"""tests for the mysql packets stuff

"""
import struct

from twisted.trial import unittest

from txmysql import packet
from txmysql import imysql


class TestUint8(unittest.TestCase):
    def test_pack(self):
        n = 10
        bytes = packet.pack_uint8(n)
        self.assertEqual(bytes, "\x0a")

    def test_unpack(self):
        bytes = "\x0a"
        n = packet.unpack_uint8(bytes)
        self.assertEqual(n, 10)

    def test_overflow(self):
        n = 256
        self.assertRaises(struct.error, packet.pack_uint8, n)

    def test_negative(self):
        n = -10
        self.assertRaises(struct.error, packet.pack_uint8, n)


class TestPackUint24(unittest.TestCase):
    def test_pack(self):
        n = 0xFFFFFF - 1 # 16777214 (biggest signed number in 24bits)
        bytes = packet.pack_uint24(n)
        self.assertEqual(bytes, "\xfe\xff\xff")

    def test_unpack(self):
        bytes = "\xfe\xff\xff"
        r = packet.unpack_uint24(bytes)
        self.assertEqual(r, 16777214)


class TestPacketHeader(unittest.TestCase):
    def test_pack(self):
        """Convert (length, packetOrder) to
        
        BBBB

        to packet header
        """
        packed = packet.packHeader(12, 0)
        self.assertEqual(packed, "\x0c\x00\x00\x00")

    def test_unpack(self):
        bytes = "\x0c\x00\x00\x00"
        length, order = packet.unpackHeader(bytes)
        self.assertEqual(length, 12)
        self.assertEqual(order, 0)
                         

class TestHandshakeInitialization(unittest.TestCase):
    def test_pack(self):
        protocolVersion = 8
        serverVersion = "5.1b1"
        threadId = 1000
        scrambleBuffer = "abcdefgh"
        serverCapabilities = 0x0000
        serverLanguage = 33 # language code 33 is unicode
        serverStatus = 0x0000
        restOfScramble = "abcdefghijklm"
        bytes = packet.packHandshakeInitialization(protocolVersion,
                                                   serverVersion,
                                                   threadId,
                                                   scrambleBuffer,
                                                   serverCapabilities,
                                                   serverLanguage,
                                                   serverStatus,
                                                   restOfScramble)
        self.assertEqual(bytes, "\x085.1b1\x00\xe8\x03\x00\x00abcdefgh\x00\x00\x00!\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00abcdefghijklm")

    def test_unpack(self):
        bytes = "\x085.1b1\x00\xe8\x03\x00\x00abcdefgh\x00\x00\x00!\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00abcdefghijklm"
        values = packet.unpackHandshakeInitialization(bytes)
        self.assertEqual(len(values), 8)
        self.assertEqual(values, (8, "5.1b1",
                                  1000, "abcdefgh",
                                  0x0000, 33,
                                  0x0000, "abcdefghijklm"))
                                  

class TestUnpackingPacket(unittest.TestCase):
    def setUp(self):
        pass

    def test_packetHeader(self):
        """See if we can parse a message that just consists of the packet header
        """
        bytes = "\x11\x00\x00\x01"
        p = packet.Packet.fromBytes(bytes)
        self.assertEqual(p.length, 17)
        self.assertEqual(p.order, 1)

    def test_handshakeInitializationPacket(self):
        """See if we can parse a message that consists of the handshake initialization packet
        """
        handshakeMessage = "\x085.1b1\x00\xe8\x03\x00\x00abcdefgh\x00\x00\x00!\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00abcdefghijklm"
        bytes = packet.packHeader(len(handshakeMessage), 0)
        bytes += handshakeMessage
        p = imysql.IHandshakeInitializationPacket(packet.Packet.fromBytes(bytes))
        self.assertEqual(p.serverVersion, "5.1b1")
        self.assertEqual(p.scrambleBuffer, "abcdefgh")
        self.assertEqual(p.restOfScrambleBuffer, "abcdefghijklm")