Commits

Roger Light committed 0e13a8b Merge

Merge 1.0.4 branch.

  • Participants
  • Parent commits ddb96ed, 9de2130

Comments (0)

Files changed (12)

 ca1469365dfe432d4285b2cf4039ce257d3f8886 v1.0.1
 9ab851b666cd770923532bcd862af838e0c7df3d v1.0.2
 f870aeaa0c3248be76fcf5db9c92308795bb2263 v1.0.3
+c33776067dbbd3a4b1731637bf0070f5bd3e2e7c v1.0.4

File CMakeLists.txt

 
 cmake_minimum_required(VERSION 2.6)
 
-set (VERSION 1.0.3)
+set (VERSION 1.0.4)
 
 if (WIN32)
 	execute_process(COMMAND cmd /c echo %DATE% %TIME% OUTPUT_VARIABLE TIMESTAMP

File ChangeLog.txt

+1.0.4 - 20121017
+================
+
+Broker:
+- Deal with poll() POLLIN/POLLOUT before POLL[RD]HUP to correctly handle the
+  case where a client sends data and immediately closes its socket.
+
+Library:
+- Fix memory leak with messages of QoS=2. Fixes bug #1064981.
+- Fix potential thread synchronisation problem with outgoing packets in the
+  Python module. Fixes bug #1064977.
+
+Clients:
+- Fix "mosquitto_sub -l" incorrectly only sending one message per second.
+
 1.0.3 - 20120927
 ================
 

File client/pub_client.c

 
 #define STATUS_CONNECTING 0
 #define STATUS_CONNACK_RECVD 1
+#define STATUS_DISCONNECTING 2
 
 /* Global variables for use in callbacks. See sub_client.c for an example of
  * using a struct to hold variables for use in callbacks. */
 	char *host = "localhost";
 	int port = 1883;
 	int keepalive = 60;
-#ifndef WIN32
-	int opt;
-#endif
 	char buf[1024];
 	bool debug = false;
 	struct mosquitto *mosq = NULL;
 				return 1;
 			}else{
 				mode = MSGMODE_STDIN_LINE;
-#ifndef WIN32
-				opt = fcntl(fileno(stdin), F_GETFL, 0);
-				if(opt == -1 || fcntl(fileno(stdin), F_SETFL, opt | O_NONBLOCK) == -1){
-					fprintf(stderr, "Error: Unable to set stdin to non-blocking.\n");
-					return 1;
-				}
-#endif
 			}
 		}else if(!strcmp(argv[i], "-m") || !strcmp(argv[i], "--message")){
 			if(mode != MSGMODE_NONE){
 		return rc;
 	}
 
+	if(mode == MSGMODE_STDIN_LINE){
+		mosquitto_loop_start(mosq);
+	}
+
 	do{
-		if(mode == MSGMODE_STDIN_LINE && status == STATUS_CONNACK_RECVD){
-			if(fgets(buf, 1024, stdin)){
-				buf[strlen(buf)-1] = '\0';
-				rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, retain);
-				if(rc2){
-					if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
+		if(mode == MSGMODE_STDIN_LINE){
+			if(status == STATUS_CONNACK_RECVD){
+				if(fgets(buf, 1024, stdin)){
+					buf[strlen(buf)-1] = '\0';
+					rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), buf, qos, retain);
+					if(rc2){
+						if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
+						mosquitto_disconnect(mosq);
+					}
+				}else if(feof(stdin) && disconnect_sent == false){
 					mosquitto_disconnect(mosq);
+					disconnect_sent = true;
+					status = STATUS_DISCONNECTING;
 				}
-			}else if(feof(stdin) && disconnect_sent == false){
-				mosquitto_disconnect(mosq);
-				disconnect_sent = true;
 			}
+			rc = MOSQ_ERR_SUCCESS;
+		}else{
+			rc = mosquitto_loop(mosq, -1, 1);
 		}
-		rc = mosquitto_loop(mosq, -1, 1);
 	}while(rc == MOSQ_ERR_SUCCESS && connected);
 
+	if(mode == MSGMODE_STDIN_LINE){
+		mosquitto_loop_stop(mosq, false);
+	}
+
 	if(message && mode == MSGMODE_FILE){
 		free(message);
 	}
 
 # Also bump lib/mosquitto.h, lib/python/setup.py, CMakeLists.txt,
 # installer/mosquitto.nsi, installer/mosquitto-cygwin.nsi
-VERSION=1.0.3
+VERSION=1.0.4
 TIMESTAMP:=$(shell date "+%F %T%z")
 
 # Client library SO version. Bump if incompatible API/ABI changes are made.

File installer/mosquitto-cygwin.nsi

 !define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
 
 Name "mosquitto"
-!define VERSION 1.0.3
+!define VERSION 1.0.4
 OutFile "mosquitto-${VERSION}-install-cygwin.exe"
 
 InstallDir "$PROGRAMFILES\mosquitto"

File installer/mosquitto.nsi

 !define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
 
 Name "mosquitto"
-!define VERSION 1.0.3
+!define VERSION 1.0.4
 OutFile "mosquitto-${VERSION}-install-win32.exe"
 
 InstallDir "$PROGRAMFILES\mosquitto"

File lib/mosquitto.h

 
 #define LIBMOSQUITTO_MAJOR 1
 #define LIBMOSQUITTO_MINOR 0
-#define LIBMOSQUITTO_REVISION 3
+#define LIBMOSQUITTO_REVISION 4
 #define LIBMOSQUITTO_VERSION_NUMBER (LIBMOSQUITTO_MAJOR*1000000+LIBMOSQUITTO_MINOR*1000+LIBMOSQUITTO_REVISION)
 
 /* Log types */

File lib/python/mosquitto.py

         self._password = ""
         self._in_packet = MosquittoInPacket()
         self._out_packet = []
+        self._current_out_packet = None
         self._last_msg_in = time.time()
         self._last_msg_out = time.time()
         self._ping_t = 0
         self._out_packet = []
         self._out_packet_mutex.release()
 
+        self._current_out_packet_mutex.acquire()
+        self._current_out_packet = None
+        self._current_out_packet_mutex.release()
+
         self._msgtime_mutex.acquire()
         self._last_msg_in = time.time()
         self._last_msg_out = time.time()
         if max_packets < 1:
             raise ValueError('Invalid max_packets.')
 
-        if len(self._out_packet) > 0:
+        self._current_out_packet_mutex.acquire()
+        self._out_packet_mutex.acquire()
+        if self._current_out_packet == None and len(self._out_packet) > 0:
+            self._current_out_packet = self._out_packet.pop(0)
+
+        if self._current_out_packet:
             wlist = [self.socket()]
         else:
             wlist = []
+        self._out_packet_mutex.release()
+        self._current_out_packet_mutex.release()
 
         rlist = [self.socket()]
         try:
         """Call to determine if there is network data waiting to be written.
         Useful if you are calling select() yourself rather than using loop().
         """
-        if self._out_packet == None:
+        if self._current_out_packet or len(self._out_packet) > 0:
+            return True
+        else:
             return False
-        else:
-            return True
 
     def loop_misc(self):
         """Process miscellaneous network events. Use in place of calling loop() if you
         return rc
 
     def _packet_write(self):
-        while len(self._out_packet) > 0:
-            packet = self._out_packet[0]
+        self._current_out_packet_mutex.acquire()
+
+        while self._current_out_packet:
+            packet = self._current_out_packet
 
             if self._ssl:
                 try:
                     write_length = self._ssl.write(packet.packet[packet.pos:])
                 except AttributeError:
+                    self._current_out_packet_mutex.release()
                     return MOSQ_ERR_SUCCESS
             else:
                 write_length = self._sock.send(packet.packet[packet.pos:])
 
                         self._callback_mutex.release()
 
-                    self._out_packet.pop(0)
+                    self._out_packet_mutex.acquire()
+                    if len(self._out_packet) > 0:
+                        self._current_out_packet = self._out_packet.pop(0)
+                    else:
+                        self._current_out_packet = None
+                    self._out_packet_mutex.release()
             else:
                 pass # FIXME
         
+        self._current_out_packet_mutex.release()
+
         self._msgtime_mutex.acquire()
         self._last_msg_out = time.time()
         self._msgtime_mutex.release()
 
     def _packet_queue(self, command, packet, mid, qos):
         mpkt = MosquittoPacket(command, packet, mid, qos)
+
         self._out_packet_mutex.acquire()
         self._out_packet.append(mpkt)
+        if self._current_out_packet_mutex.acquire(False) == True:
+            if self._current_out_packet == None and len(self._out_packet) > 0:
+                self._current_out_packet = self._out_packet.pop(0)
+            self._current_out_packet_mutex.release()
         self._out_packet_mutex.release()
 
         if self._in_callback == False:

File lib/python/setup.py

 
 from distutils.core import setup
 setup(name='mosquitto',
-	version='1.0.3',
+	version='1.0.4',
 	description='MQTT version 3.1 client class',
 	author='Roger Light',
 	author_email='roger@atchoo.org',

File lib/read_handle_shared.c

 			mosq->in_callback = true;
 			mosq->on_message(mosq, mosq->obj, &message->msg);
 			mosq->in_callback = false;
-		}else{
-			_mosquitto_message_cleanup(&message);
 		}
 		pthread_mutex_unlock(&mosq->callback_mutex);
+		_mosquitto_message_cleanup(&message);
 	}
 #endif
 	rc = _mosquitto_send_pubcomp(mosq, mid);
 
 	for(i=0; i<db->context_count; i++){
 		if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
-			if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)){
-				do_disconnect(db, i);
-			}
-		}
-		if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
 			assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
 #ifdef WITH_TLS
 			if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT ||
 				}
 			}
 		}
+		if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
+			if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)){
+				do_disconnect(db, i);
+			}
+		}
 	}
 }