Commits

Roger Light committed 29dbe6e

SSL support for mosquitto.py.

Comments (0)

Files changed (2)

 - Rename C++ namespace from mosquittopp to mosqpp to remove ambiguity.
 - C++ lib_init(), lib_version() and lib_cleanup() are now in the mosqpp
   namespace directly, not mosquittopp class members.
-- The Python library has modified to match the libmosquitto changes and
-  simplified at the same time.
+- The Python library is now written in pure Python and so no longer depends on
+  libmosquitto. 
+- The Python library includes SSL support.
 - The Python library should now be compatible with Python 3.
 
 Other:

lib/python/mosquitto.py

 
 import select
 import socket
+import ssl
 import struct
 import sys
 import threading
         self._msgtime_mutex = threading.Lock()
         self._thread = None
         self._thread_terminate = False
+        self._ssl = None
+        self._ssl_certfile = None
+        self._ssl_keyfile = None
+        self._ssl_ca_certs = None
 
     def __del__(self):
         pass
 
+    def ssl_set(self, ca_certs, certfile=None, keyfile=None, verify=2):
+        if ca_certs == None:
+            raise ValueError('ca_certs must not be None.')
+
+        self._ssl_ca_certs = ca_certs
+        self._ssl_certfile = certfile
+        self._ssl_keyfile = keyfile
+        self._ssl_verify = verify
+
     def connect(self, host, port=1883, keepalive=60):
         self.connect_async(host, port, keepalive)
         return self.reconnect()
         self._state_mutex.acquire()
         self._state = mosq_cs_new
         self._state_mutex.release()
-        if self._sock:
+        if self._ssl:
+            self._ssl.close()
+            self._ssl = None
+            self._sock = None
+        elif self._sock:
             self._sock.close()
             self._sock = None
 
         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         # FIXME use create_connection here
 
-        self._sock.setblocking(0)
+
+        if self._ssl_ca_certs != None:
+            self._ssl = ssl.wrap_socket(self._sock,
+                    certfile=self._ssl_certfile,
+                    keyfile=self._ssl_keyfile,
+                    ca_certs=self._ssl_ca_certs,
+                    cert_reqs=ssl.CERT_REQUIRED,
+                    ssl_version=ssl.PROTOCOL_TLSv1)
+
         try:
-            self._sock.connect((self._host, self._port))
+            self.socket().connect((self._host, self._port))
         except socket.error as err:
             (msg) = err
             if msg.errno != 115:
                 print(msg)
                 return 1
+
+        self._sock.setblocking(0)
+
         return self._send_connect(self._keepalive, self._clean_session)
 
     def loop(self, timeout=1.0, max_packets=1):
             raise ValueError('Invalid timeout.')
 
         if len(self._out_packet) > 0:
-            wlist = [self._sock]
+            wlist = [self.socket()]
         else:
             wlist = []
 
-        rlist = [self._sock]
+        rlist = [self.socket()]
         socklist = select.select(rlist, wlist, [], timeout)
 
-        if self._sock in socklist[0]:
+        if self.socket() in socklist[0]:
             rc = self.loop_read(max_packets)
             if rc != MOSQ_ERR_SUCCESS:
-                self._sock.close()
+                if self._ssl:
+                    self._ssl.close()
+                    self._ssl = None
+                else:
+                    self._sock.close()
+
                 self._sock = None
                 self._state_mutex.acquire()
                 if self._state == mosq_cs_disconnecting:
                 self._callback_mutex.release()
                 return rc
 
-        if self._sock in socklist[1]:
+        if self.socket() in socklist[1]:
             rc = self.loop_write(max_packets)
             if rc != MOSQ_ERR_SUCCESS:
-                self._sock.close()
+                if self._ssl:
+                    self._ssl.close()
+                    self._ssl = None
+                else:
+                    self._sock.close()
+
                 self._sock = None
                 self._state_mutex.acquire()
                 if self._state == mosq_cs_disconnecting:
         self._password = password
 
     def disconnect(self):
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         self._state_mutex.acquire()
         if topic == None or len(topic) == 0:
             raise ValueError('Invalid topic.')
 
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         return self._send_subscribe(False, topic, qos)
     def unsubscribe(self, topic):
         if topic == None or len(topic) == 0:
             raise ValueError('Invalid topic.')
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         return self._send_unsubscribe(False, topic)
 
     def loop_read(self, max_packets=1):
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         if max_packets < 1:
         return MOSQ_ERR_SUCCESS
 
     def loop_write(self, max_packets=1):
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         if max_packets < 1:
             return True
 
     def loop_misc(self):
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         now = time.time()
 
         if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
             # mosq->ping_t != 0 means we are waiting for a pingresp.
-            # This hasn''t happened in the keepalive time so we should disconnect.
-            self._sock.close()
+            # This hasn't happened in the keepalive time so we should disconnect.
+            if self._ssl:
+                self._ssl.close()
+                self._ssl = None
+            else:
+                self._sock.close()
             self._sock = None
             return MOSQ_ERR_CONN_LOST
 
             return "Connection Refused: unknown reason."
 
     def socket(self):
-        return self._sock
+        if self._ssl:
+            return self._ssl
+        else:
+            return self._sock
 
     def loop_start(self):
         if self._thread != None:
         # Finally, free the memory and reset everything to starting conditions.
         if self._in_packet.command == 0:
             try:
-                command = self._sock.recv(1)
+                if self._ssl:
+                    try:
+                        command = self._ssl.read(1)
+                    except AttributeError:
+                        return MOSQ_ERR_SUCCESS
+                else:
+                    command = self._sock.recv(1)
             except socket.error as err:
                 (msg) = err
                 if msg.errno == 11:
             # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
             while True:
                 try:
-                    byte = self._sock.recv(1)
+                    if self._ssl:
+                        byte = self._ssl.read(1)
+                    else:
+                        byte = self._sock.recv(1)
                 except socket.error as err:
                     (msg) = err
                     if msg.errno == 11:
 
         while self._in_packet.to_process > 0:
             try:
-                data = self._sock.recv(self._in_packet.to_process)
+                if self._ssl:
+                    data = self._ssl.read(self._in_packet.to_process)
+                else:
+                    data = self._sock.recv(self._in_packet.to_process)
             except socket.error as err:
                 (msg) = err
                 if msg.errno == 11:
         while len(self._out_packet) > 0:
             packet = self._out_packet[0]
 
-            write_length = self._sock.send(packet.packet[packet.pos:])
+            if self._ssl:
+                try:
+                    write_length = self._ssl.write(packet.packet[packet.pos:])
+                except AttributeError:
+                    return MOSQ_ERR_SUCCESS
+            else:
+                write_length = self._sock.send(packet.packet[packet.pos:])
             if write_length > 0:
                 packet.to_process = packet.to_process - write_length
                 packet.pos = packet.pos + write_length
             else:
                 pass # FIXME
         
+        self._msgtime_mutex.acquire()
+        self._last_msg_out = time.time()
+        self._msgtime_mutex.release()
+
         return MOSQ_ERR_SUCCESS
 
     def _easy_log(self, level, buf):
         last_msg_out = self._last_msg_out
         last_msg_in = self._last_msg_in
         self._msgtime_mutex.release()
-        if self._sock != None and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
+        if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
             if self._state == mosq_cs_connected and self._ping_t == 0:
                 self._send_pingreq()
             else:
-                self._sock.close()
+                if self._ssl:
+                    self._ssl.close()
+                    self._ssl = None
+                else:
+                    self._sock.close()
                 self._sock = None
 
     def _fix_sub_topic(self, subtopic):
                 raise TypeError
 
     def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False):
-        if self._sock == None:
+        if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
         command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain