1. Roger Light
  2. mosquitto

Commits

Roger Light  committed dc8bf47 Merge

Merge 1.1 branch.

  • Participants
  • Parent commits aacc57f, ba06f5a
  • Branches default

Comments (0)

Files changed (151)

File .hgtags

View file
 f870aeaa0c3248be76fcf5db9c92308795bb2263 v1.0.3
 c33776067dbbd3a4b1731637bf0070f5bd3e2e7c v1.0.4
 65bbd5a34b20309f2c194ec2aca489630cb123d4 v1.0.5
+1d105f0f49978140c2d9d92619730a2e025b3d27 v1.1

File CMakeLists.txt

View file
 
 cmake_minimum_required(VERSION 2.6)
 
-set (VERSION 1.0.5)
+set (VERSION 1.1)
 
 if (WIN32)
 	execute_process(COMMAND cmd /c echo %DATE% %TIME% OUTPUT_VARIABLE TIMESTAMP

File ChangeLog.txt

View file
+1.1 - 20121219
+==============
+
+Broker:
+- Add $SYS/broker/messages/dropped
+- Add $SYS/broker/clients/expired
+- Replace $SYS/broker/+/per second/+ with moving average versions published at
+  $SYS/broker/load/#
+- Add $SYS/broker/load/sockets/+ and $SYS/broker/load/connections/+
+- Documentation on password file format has been fixed.
+- Disable SSL compression. This reduces memory usage significantly and removes
+  the possibility of CRIME type attacks.
+- Enable SSL_MODE_RELEASE_BUFFERS mode to reduce SSL memory usage further.
+- Add allow_duplicate_messages option.
+- ACL files can now have comment lines with # as the first character.
+- Display message on startup about which config is being loaded.
+- Fix max_inflight_messages and max_queued_messages not being applied.
+- Fix documentation error in mosquitto.conf.
+- Ensure that QoS 2 queued messages are sent out in a timely manner.
+- Local bridges now act on clean_session correctly.
+- Local bridges with clean_session==false now remove unused subscriptions on
+  broker restart.
+- The $SYS/broker/heap/# messages now no longer include "bytes" as part of the
+  string for ease of use.
+
+Client library:
+- Free memory used by OpenSSL in mosquitto_lib_cleanup() where possible.
+- Change WebSocket subprotocol name to mqttv3.1 to make future changes easier
+  and for compatibility with other implementations.
+- mosquitto_loop_read() and mosquitto_loop_write() now handle errors
+  themselves rather than having mosquitto_loop() handle their errors. This
+  makes using them in a separate event loop more straightforward.
+- Add mosquitto_loop_forever() / loop_forever() function call to make simple
+  clients easier.
+- Disable SSL compression. This reduces memory usage significantly and removes
+  the possibility of CRIME type attacks.
+- Enable SSL_MODE_RELEASE_BUFFERS mode to reduce SSL memory usage further.
+- mosquitto_tls_set() will now return an error or raise an exception
+  immediately if the CA certificate or client certificate/key cannot be
+  accessed.
+- Fix potential memory leaks on connection failures.
+- Don't produce return error from mosquitto_loop() if a system call is
+  interrupted. This prevents disconnects/reconnects in threaded mode and
+  simplifies non-threaded client handling.
+- Ignore SIGPIPE to prevent unnecessary client quits in threaded mode.
+- Fix document error for mosquitto_message_retry_set().
+- Fix mosquitto_topic_matches_sub() for subscriptions with + as the final
+  character. Fixes bug #1085797.
+- Rename all "obj" parameters to "userdata" for consistency with other
+  libraries.
+- Reset errno before network read/write to ensure EAGAIN isn't mistakenly
+  returned.
+- The message queue length is now tracked and used to determine the maximum
+  number of packets to process at once. This removes the need for the
+  max_packets parameter which is now unused.
+- Fix incorrect error value in Python error_string() function. Fixes bug
+  #1086777.
+- Reset last message in/out timer in Python module when we send a PINGREQ.
+  Fixes too-early disconnects.
+
+Clients:
+- Clients now display their own version number and library version number in
+  their help messages.
+- Fix "mosquitto_pub -l -q 2" disconnecting before all messages were
+  transmitted.
+- Fix potential out-of-bounds array access with client ids. Fixes bug
+  #1083182.
+
+Other:
+- mosquitto_passwd can now convert password files with plain text files to
+  hashed versions.
+
 1.0.5 - 20121103
 ================
 

File THANKS.txt

View file
+These people have reported bugs / provided patches / done something else to aid
+the mosquitto project. Thanks to you all!
+
+If you think I've missed you off the list, please rest assured that it wasn't
+intentional and get in touch and I'll fix it.
+
+Adam Rudd
+Andrew Elwell
+Andy Piper
+Andy Stanford-Clark
+Bart Van Der Meerssche
+Ben Tobin
+Brad Stancel
+Chris Willing
+Craig Hollabaugh
+Dan Anderson
+Dariusz Suchojad
+David Huang
+David Monro
+Dirk O. Kaar	
+Dominik Obermaier
+Dominik Zajac
+Ed Morris
+Frank Hansen
+Joan Zapata
+Karl Palsson
+Larry Lendo
+Martin Assarsson
+Marty Lee
+Matt Daubney
+Michael C
+Michael Frisch
+Michael Rushton
+Mike Bush
+Neil Bothwick
+Nicholas Humfrey
+Nicholas O'Leary
+Nithin Kumar
+Peter George
+Rob Pridham
+Roland de Boo
+Sebastian Kroll
+Sharon Ben-Asher
+sskaje
+Stefan Hudelmaier
+Stefano Costa
+Steven Lougheed
+Tobias Assarsson
+Toby Jaffey
+Vicente Ruiz
+Wayne Ingram
+Yuvraaj Kelkar

File aclfile.example

View file
+# This is a comment
 topic read $SYS/#
 user roger
 topic foo/bar

File client/pub_client.c

View file
 
 #define STATUS_CONNECTING 0
 #define STATUS_CONNACK_RECVD 1
-#define STATUS_DISCONNECTING 2
+#define STATUS_WAITING 2
 
 /* Global variables for use in callbacks. See sub_client.c for an example of
  * using a struct to hold variables for use in callbacks. */
 static int mode = MSGMODE_NONE;
 static int status = STATUS_CONNECTING;
 static int mid_sent = 0;
+static int last_mid = -1;
 static bool connected = true;
 static char *username = NULL;
 static char *password = NULL;
 
 void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
 {
-	if(mode != MSGMODE_STDIN_LINE && disconnect_sent == false){
+	if(mode == MSGMODE_STDIN_LINE){
+		if(mid == last_mid){
+			mosquitto_disconnect(mosq);
+			disconnect_sent = true;
+		}
+	}else if(disconnect_sent == false){
 		mosquitto_disconnect(mosq);
 		disconnect_sent = true;
 	}
 
 void print_usage(void)
 {
-	printf("mosquitto_pub is a simple mqtt client that will publish a message on a single topic and exit.\n\n");
+	int major, minor, revision;
+
+	mosquitto_lib_version(&major, &minor, &revision);
+	printf("mosquitto_pub is a simple mqtt client that will publish a message on a single topic and exit.\n");
+	printf("mosquitto_pub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
 	printf("Usage: mosquitto_pub [-h host] [-p port] [-q qos] [-r] {-f file | -l | -n | -m message} -t topic\n");
 	printf("                     [-i id] [-I id_prefix]\n");
 	printf("                     [-d] [--quiet]\n");
 			return 1;
 		}
 		snprintf(id, len, "mosqpub/%d-%s", getpid(), hostname);
-		id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+		if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){
+			/* Enforce maximum client id length of 23 characters */
+			id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+		}
 	}
 
 	mosq = mosquitto_new(id, true, NULL);
 						if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
 						mosquitto_disconnect(mosq);
 					}
-				}else if(feof(stdin) && disconnect_sent == false){
-					mosquitto_disconnect(mosq);
-					disconnect_sent = true;
-					status = STATUS_DISCONNECTING;
+				}else if(feof(stdin)){
+					last_mid = mid_sent;
+					status = STATUS_WAITING;
 				}
+			}else if(status == STATUS_WAITING){
+#ifdef WIN32
+				Sleep(1000);
+#else
+				usleep(1000000);
+#endif
 			}
 			rc = MOSQ_ERR_SUCCESS;
 		}else{

File client/sub_client.c

View file
 
 void print_usage(void)
 {
-	printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n\n");
+	int major, minor, revision;
+
+	mosquitto_lib_version(&major, &minor, &revision);
+	printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
+	printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
 	printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-v] -t topic ...\n");
 	printf("                     [-i id] [-I id_prefix]\n");
 	printf("                     [-d] [--quiet]\n");
 			return 1;
 		}
 		snprintf(id, len, "mosqsub/%d-%s", getpid(), hostname);
-		id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+		if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){
+			/* Enforce maximum client id length of 23 characters */
+			id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+		}
 	}
 
 	mosq = mosquitto_new(id, clean_session, &ud);
 		mosquitto_lib_cleanup();
 	}
 
-	do{
-		rc = mosquitto_loop(mosq, -1, 100);
-	}while(rc == MOSQ_ERR_SUCCESS);
+	rc = mosquitto_loop_forever(mosq, -1, 1);
 
 	mosquitto_destroy(mosq);
 	mosquitto_lib_cleanup();

File config.mk

View file
 
 # Also bump lib/mosquitto.h, lib/python/setup.py, CMakeLists.txt,
 # installer/mosquitto.nsi, installer/mosquitto-cygwin.nsi
-VERSION=1.0.5
+VERSION=1.1
 TIMESTAMP:=$(shell date "+%F %T%z")
 
 # Client library SO version. Bump if incompatible API/ABI changes are made.
 LIB_CXXFLAGS:=$(LIB_CFLAGS)
 
 BROKER_CFLAGS:=${LIB_CFLAGS} -DVERSION="\"${VERSION}\"" -DTIMESTAMP="\"${TIMESTAMP}\"" -DWITH_BROKER
-CLIENT_CFLAGS:=${CFLAGS} -I../lib
+CLIENT_CFLAGS:=${CFLAGS} -I../lib -DVERSION="\"${VERSION}\""
 
 ifeq ($(UNAME),FreeBSD)
-	BROKER_LIBS:=
+	BROKER_LIBS:=-lm
 else
-	BROKER_LIBS:=-ldl
+	BROKER_LIBS:=-ldl -lm
 endif
 LIB_LIBS:=
 PASSWD_LIBS:=
 	BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_TLS
 	LIB_CFLAGS:=$(LIB_CFLAGS) -DWITH_TLS
 	PASSWD_LIBS:=-lcrypto
-	CLIENT_CFLAGS:=$(LIB_CFLAGS) -DWITH_TLS
+	CLIENT_CFLAGS:=$(CLIENT_CFLAGS) -DWITH_TLS
 
 	ifeq ($(WITH_TLS_PSK),yes)
 		BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_TLS_PSK
 		LIB_CFLAGS:=$(LIB_CFLAGS) -DWITH_TLS_PSK
-		CLIENT_CFLAGS:=$(LIB_CFLAGS) -DWITH_TLS_PSK
+		CLIENT_CFLAGS:=$(CLIENT_CFLAGS) -DWITH_TLS_PSK
 	endif
 endif
 

File doc/historical/old-regex.txt

View file
+This is the description of the regex used previously for topic/subscription
+matching. It is reproduced here for posterity.
+
+When a message is ready to be published at the broker, we need to check all
+of the subscriptions to see which ones the message should be sent to. This
+would be easy without wildcards, but requires a bit more work with them.
+
+The regex used to do the matching is of the form below for a topic of a/b/c:
+
+^(?:(?:(a|\+)(?!$))(?:(?:/(?:(b|\+)(?!$)))(?:(?:/(?:c|\+))|/#)?|/#)?|#)$
+
+In general, we're matching (a or +) followed by (the next levels of
+hierarchy or #).
+More specifically, all the levels of hierarchy must match, unless the last
+level is #.
+
+^(?:							# Must start at beginning of string
+		(?:						# (Level 1 hierarchy)
+			(a|\+)(?!$) 		# Match a or +, but only if not EOL.
+		)						# AND 
+		(?:
+			(?:					# (Level 2 hierarchy)
+				/				# Match /
+				(?:				# AND
+					(b|\+)(?!$)	# Match b or +, but only if not EOL.
+				)
+			)					# AND
+			(?:
+				(?:				# (Level 3 hierarchy)
+					/			# Match /
+					(?:			# AND
+						c|\+	# Match c or +.
+					)
+				)
+				|				# OR (instead of level 3)
+				/#				# Match /# at level 3
+			)?					# Level 3 exist 1/0 times
+			|					# OR (instead of level 2)
+			/#					# Match /# at level 2
+		)?						# Level 2 exist 1/0 times
+		|						# OR (instead of level 1)
+		#						# Match # at level 1
+	)$							# Must end on EOL.

File doc/historical/topic-match.kds

View file
+S'^(?:(?:(a|\\+)(?!$))(?:(?:/(?:(b|\\+)(?!$)))(?:(?:/(?:c|\\+))|/#)?|/#)?|#)$'
+p1
+.S'a/#\na/b/c\na/b/+\na/b\na/+\n+\n+/b\n+/+/+\n+/b/c\na/c'
+p2
+.I8
+.S''
+.

File doc/old-regex.txt

-This is the description of the regex used previously for topic/subscription
-matching. It is reproduced here for posterity.
-
-When a message is ready to be published at the broker, we need to check all
-of the subscriptions to see which ones the message should be sent to. This
-would be easy without wildcards, but requires a bit more work with them.
-
-The regex used to do the matching is of the form below for a topic of a/b/c:
-
-^(?:(?:(a|\+)(?!$))(?:(?:/(?:(b|\+)(?!$)))(?:(?:/(?:c|\+))|/#)?|/#)?|#)$
-
-In general, we're matching (a or +) followed by (the next levels of
-hierarchy or #).
-More specifically, all the levels of hierarchy must match, unless the last
-level is #.
-
-^(?:							# Must start at beginning of string
-		(?:						# (Level 1 hierarchy)
-			(a|\+)(?!$) 		# Match a or +, but only if not EOL.
-		)						# AND 
-		(?:
-			(?:					# (Level 2 hierarchy)
-				/				# Match /
-				(?:				# AND
-					(b|\+)(?!$)	# Match b or +, but only if not EOL.
-				)
-			)					# AND
-			(?:
-				(?:				# (Level 3 hierarchy)
-					/			# Match /
-					(?:			# AND
-						c|\+	# Match c or +.
-					)
-				)
-				|				# OR (instead of level 3)
-				/#				# Match /# at level 3
-			)?					# Level 3 exist 1/0 times
-			|					# OR (instead of level 2)
-			/#					# Match /# at level 2
-		)?						# Level 2 exist 1/0 times
-		|						# OR (instead of level 1)
-		#						# Match # at level 1
-	)$							# Must end on EOL.

File doc/topic-match.kds

-S'^(?:(?:(a|\\+)(?!$))(?:(?:/(?:(b|\\+)(?!$)))(?:(?:/(?:c|\\+))|/#)?|/#)?|#)$'
-p1
-.S'a/#\na/b/c\na/b/+\na/b\na/+\n+\n+/b\n+/+/+\n+/b/c\na/c'
-p2
-.I8
-.S''
-.

File installer/mosquitto-cygwin.nsi

View file
 !define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
 
 Name "mosquitto"
-!define VERSION 1.0.5
+!define VERSION 1.1
 OutFile "mosquitto-${VERSION}-install-cygwin.exe"
 
 InstallDir "$PROGRAMFILES\mosquitto"

File installer/mosquitto.nsi

View file
 !define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
 
 Name "mosquitto"
-!define VERSION 1.0.5
+!define VERSION 1.1
 OutFile "mosquitto-${VERSION}-install-win32.exe"
 
 InstallDir "$PROGRAMFILES\mosquitto"

File lib/cpp/mosquittopp.cpp

View file
 
 namespace mosqpp {
 
-static void on_connect_wrapper(struct mosquitto *mosq, void *obj, int rc)
+static void on_connect_wrapper(struct mosquitto *mosq, void *userdata, int rc)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_connect(rc);
 }
 
-static void on_disconnect_wrapper(struct mosquitto *mosq, void *obj, int rc)
+static void on_disconnect_wrapper(struct mosquitto *mosq, void *userdata, int rc)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_disconnect(rc);
 }
 
-static void on_publish_wrapper(struct mosquitto *mosq, void *obj, int mid)
+static void on_publish_wrapper(struct mosquitto *mosq, void *userdata, int mid)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_publish(mid);
 }
 
-static void on_message_wrapper(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
+static void on_message_wrapper(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_message(message);
 }
 
-static void on_subscribe_wrapper(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
+static void on_subscribe_wrapper(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_subscribe(mid, qos_count, granted_qos);
 }
 
-static void on_unsubscribe_wrapper(struct mosquitto *mosq, void *obj, int mid)
+static void on_unsubscribe_wrapper(struct mosquitto *mosq, void *userdata, int mid)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_unsubscribe(mid);
 }
 
 
-static void on_log_wrapper(struct mosquitto *mosq, void *obj, int level, const char *str)
+static void on_log_wrapper(struct mosquitto *mosq, void *userdata, int level, const char *str)
 {
-	class mosquittopp *m = (class mosquittopp *)obj;
+	class mosquittopp *m = (class mosquittopp *)userdata;
 	m->on_log(level, str);
 }
 
 	return mosquitto_loop_write(m_mosq, max_packets);
 }
 
+int mosquittopp::loop_forever(int timeout, int max_packets)
+{
+	return mosquitto_loop_forever(m_mosq, timeout, max_packets);
+}
+
 int mosquittopp::loop_start()
 {
 	return mosquitto_loop_start(m_mosq);
 	return mosquitto_want_write(m_mosq);
 }
 
-void mosquittopp::user_data_set(void *obj)
+void mosquittopp::user_data_set(void *userdata)
 {
-	mosquitto_user_data_set(m_mosq, obj);
+	mosquitto_user_data_set(m_mosq, userdata);
 }
 
 int mosquittopp::tls_set(const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))

File lib/cpp/mosquittopp.h

View file
 		int subscribe(int *mid, const char *sub, int qos=0);
 		int unsubscribe(int *mid, const char *sub);
 		void message_retry_set(unsigned int message_retry);
-		void user_data_set(void *obj);
+		void user_data_set(void *userdata);
 		int tls_set(const char *cafile, const char *capath=NULL, const char *certfile=NULL, const char *keyfile=NULL, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata)=NULL);
 		int tls_opts_set(int cert_reqs, const char *tls_version=NULL, const char *ciphers=NULL);
 		int tls_psk_set(const char *psk, const char *identity, const char *ciphers=NULL);
 		int loop_misc();
 		int loop_read(int max_packets=1);
 		int loop_write(int max_packets=1);
+		int loop_forever(int timeout=-1, int max_packets=1);
 		int loop_start();
 		int loop_stop(bool force=false);
 		bool want_write();

File lib/jsws/mosquitto.js

View file
 		this.mid = 1;
 		this.out_queue = new Array();
 
-		this.ws = new WebSocket(url, 'mqtt');
+		this.ws = new WebSocket(url, 'mqttv3.1');
 		this.ws.binaryType = "arraybuffer";
 		this.ws.onopen = this.ws_onopen;
 		this.ws.onclose = this.ws_onclose;

File lib/linker.version

View file
 	local: *;
 };
 
+MOSQ_1.1 {
+	global:
+		mosquitto_loop_forever;
+} MOSQ_1.0;

File lib/logging_mosq.c

View file
 		va_end(va);
 		s[len-1] = '\0'; /* Ensure string is null terminated. */
 
-		mosq->on_log(mosq, mosq->obj, priority, s);
+		mosq->on_log(mosq, mosq->userdata, priority, s);
 
 		_mosquitto_free(s);
 	}

File lib/messages_mosq.c

View file
 	assert(mosq);
 	assert(message);
 
+	mosq->queue_len++;
 	message->next = NULL;
 	if(mosq->messages){
 		tail = mosq->messages;
 	struct mosquitto_message_all *prev = NULL;
 	assert(mosq);
 
+	mosq->queue_len = 0;
 	message = mosq->messages;
 	while(message){
 		message->timestamp = 0;
 			}else if(message->msg.qos == 2){
 				message->state = mosq_ms_wait_pubrec;
 			}
+			mosq->queue_len++;
 		}else{
 			if(prev){
 				prev->next = message->next;
 				mosq->messages = cur->next;
 			}
 			*message = cur;
+			mosq->queue_len--;
 			return MOSQ_ERR_SUCCESS;
 		}
 		prev = cur;

File lib/mosquitto.c

View file
 
 #include <assert.h>
 #include <errno.h>
+#include <signal.h>
 #include <stdio.h>
 #include <string.h>
 #ifndef WIN32
 	return MOSQ_ERR_SUCCESS;
 }
 
-struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj)
+struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata)
 {
 	struct mosquitto *mosq = NULL;
 	int rc;
 		return NULL;
 	}
 
+#ifndef WIN32
+	signal(SIGPIPE, SIG_IGN);
+#endif
+
 	mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
 	if(mosq){
 		mosq->sock = INVALID_SOCKET;
 #ifdef WITH_THREADING
 		mosq->thread_id = pthread_self();
 #endif
-		rc = mosquitto_reinitialise(mosq, id, clean_session, obj);
+		rc = mosquitto_reinitialise(mosq, id, clean_session, userdata);
 		if(rc){
 			mosquitto_destroy(mosq);
 			if(rc == MOSQ_ERR_INVAL){
 	return mosq;
 }
 
-int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *obj)
+int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *userdata)
 {
 	int i;
 
 	_mosquitto_destroy(mosq);
 	memset(mosq, 0, sizeof(struct mosquitto));
 
-	if(obj){
-		mosq->obj = obj;
+	if(userdata){
+		mosq->userdata = userdata;
 	}else{
-		mosq->obj = mosq;
+		mosq->userdata = mosq;
 	}
 	mosq->sock = INVALID_SOCKET;
 	mosq->keepalive = 60;
 	mosq->host = NULL;
 	mosq->port = 1883;
 	mosq->in_callback = false;
+	mosq->queue_len = 0;
 #ifdef WITH_TLS
 	mosq->ssl = NULL;
 #endif
 int mosquitto_tls_set(struct mosquitto *mosq, const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))
 {
 #ifdef WITH_TLS
+	FILE *fptr;
+
 	if(!mosq || (!cafile && !capath) || (certfile && !keyfile) || (!certfile && keyfile)) return MOSQ_ERR_INVAL;
 
 	if(cafile){
+		fptr = fopen(cafile, "rt");
+		if(fptr){
+			fclose(fptr);
+		}else{
+			return MOSQ_ERR_INVAL;
+		}
 		mosq->tls_cafile = _mosquitto_strdup(cafile);
+
 		if(!mosq->tls_cafile){
 			return MOSQ_ERR_NOMEM;
 		}
 	}
 
 	if(certfile){
+		fptr = fopen(certfile, "rt");
+		if(fptr){
+			fclose(fptr);
+		}else{
+			if(mosq->tls_cafile){
+				_mosquitto_free(mosq->tls_cafile);
+				mosq->tls_cafile = NULL;
+			}
+			if(mosq->tls_capath){
+				_mosquitto_free(mosq->tls_capath);
+				mosq->tls_capath = NULL;
+			}
+			return MOSQ_ERR_INVAL;
+		}
 		mosq->tls_certfile = _mosquitto_strdup(certfile);
 		if(!mosq->tls_certfile){
 			return MOSQ_ERR_NOMEM;
 	}
 
 	if(keyfile){
+		fptr = fopen(keyfile, "rt");
+		if(fptr){
+			fclose(fptr);
+		}else{
+			if(mosq->tls_cafile){
+				_mosquitto_free(mosq->tls_cafile);
+				mosq->tls_cafile = NULL;
+			}
+			if(mosq->tls_capath){
+				_mosquitto_free(mosq->tls_capath);
+				mosq->tls_capath = NULL;
+			}
+			if(mosq->tls_certfile){
+				_mosquitto_free(mosq->tls_certfile);
+				mosq->tls_capath = NULL;
+			}
+			return MOSQ_ERR_INVAL;
+		}
 		mosq->tls_keyfile = _mosquitto_strdup(keyfile);
 		if(!mosq->tls_keyfile){
 			return MOSQ_ERR_NOMEM;
 #ifdef WIN32
 		errno = WSAGetLastError();
 #endif
-		return MOSQ_ERR_ERRNO;
+		if(errno == EINTR){
+			return MOSQ_ERR_SUCCESS;
+		}else{
+			return MOSQ_ERR_ERRNO;
+		}
 	}else{
 		if(FD_ISSET(mosq->sock, &readfds)){
 			rc = mosquitto_loop_read(mosq, max_packets);
-			if(rc){
-				_mosquitto_socket_close(mosq);
-				pthread_mutex_lock(&mosq->state_mutex);
-				if(mosq->state == mosq_cs_disconnecting){
-					rc = MOSQ_ERR_SUCCESS;
-				}
-				pthread_mutex_unlock(&mosq->state_mutex);
-				pthread_mutex_lock(&mosq->callback_mutex);
-				if(mosq->on_disconnect){
-					mosq->in_callback = true;
-					mosq->on_disconnect(mosq, mosq->obj, rc);
-					mosq->in_callback = false;
-				}
-				pthread_mutex_unlock(&mosq->callback_mutex);
+			if(rc || mosq->sock == INVALID_SOCKET){
 				return rc;
 			}
 		}
 		if(FD_ISSET(mosq->sock, &writefds)){
 			rc = mosquitto_loop_write(mosq, max_packets);
-			if(rc){
-				_mosquitto_socket_close(mosq);
-				pthread_mutex_lock(&mosq->state_mutex);
-				if(mosq->state == mosq_cs_disconnecting){
-					rc = MOSQ_ERR_SUCCESS;
-				}
-				pthread_mutex_unlock(&mosq->state_mutex);
-				pthread_mutex_lock(&mosq->callback_mutex);
-				if(mosq->on_disconnect){
-					mosq->in_callback = true;
-					mosq->on_disconnect(mosq, mosq->obj, rc);
-					mosq->in_callback = false;
-				}
-				pthread_mutex_unlock(&mosq->callback_mutex);
+			if(rc || mosq->sock == INVALID_SOCKET){
 				return rc;
 			}
 		}
 	return mosquitto_loop_misc(mosq);
 }
 
+int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
+{
+	int run = 1;
+	int rc;
+
+	if(!mosq) return MOSQ_ERR_INVAL;
+
+	if(mosq->state == mosq_cs_connect_async){
+		mosquitto_reconnect(mosq);
+	}
+
+	while(run){
+		do{
+			rc = mosquitto_loop(mosq, timeout, max_packets);
+		}while(rc == MOSQ_ERR_SUCCESS);
+		if(mosq->state == mosq_cs_disconnecting){
+			run = 0;
+		}else{
+#ifdef WIN32
+			Sleep(1000);
+#else
+			sleep(1);
+#endif
+			mosquitto_reconnect(mosq);
+		}
+	}
+	return rc;
+}
+
 int mosquitto_loop_misc(struct mosquitto *mosq)
 {
 	time_t now = time(NULL);
 		pthread_mutex_lock(&mosq->callback_mutex);
 		if(mosq->on_disconnect){
 			mosq->in_callback = true;
-			mosq->on_disconnect(mosq, mosq->obj, rc);
+			mosq->on_disconnect(mosq, mosq->userdata, rc);
 			mosq->in_callback = false;
 		}
 		pthread_mutex_unlock(&mosq->callback_mutex);
 	return MOSQ_ERR_SUCCESS;
 }
 
+static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
+{
+	if(rc){
+		_mosquitto_socket_close(mosq);
+		pthread_mutex_lock(&mosq->state_mutex);
+		if(mosq->state == mosq_cs_disconnecting){
+			rc = MOSQ_ERR_SUCCESS;
+		}
+		pthread_mutex_unlock(&mosq->state_mutex);
+		pthread_mutex_lock(&mosq->callback_mutex);
+		if(mosq->on_disconnect){
+			mosq->in_callback = true;
+			mosq->on_disconnect(mosq, mosq->userdata, rc);
+			mosq->in_callback = false;
+		}
+		pthread_mutex_unlock(&mosq->callback_mutex);
+		return rc;
+	}
+	return rc;
+}
+
 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
 {
 	int rc;
 	int i;
 	if(max_packets < 1) return MOSQ_ERR_INVAL;
 
+	max_packets = mosq->queue_len;
+	if(max_packets < 1) max_packets = 1;
+	/* Queue len here tells us how many messages are awaiting processing and
+	 * have QoS > 0. We should try to deal with that many in this loop in order
+	 * to keep up. */
 	for(i=0; i<max_packets; i++){
 		rc = _mosquitto_packet_read(mosq);
 		if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
-			return rc;
+			return _mosquitto_loop_rc_handle(mosq, rc);
 		}
 	}
 	return rc;
 	int i;
 	if(max_packets < 1) return MOSQ_ERR_INVAL;
 
+	max_packets = mosq->queue_len;
+	if(max_packets < 1) max_packets = 1;
+	/* Queue len here tells us how many messages are awaiting processing and
+	 * have QoS > 0. We should try to deal with that many in this loop in order
+	 * to keep up. */
 	for(i=0; i<max_packets; i++){
 		rc = _mosquitto_packet_write(mosq);
 		if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
-			return rc;
+			return _mosquitto_loop_rc_handle(mosq, rc);
 		}
 	}
 	return rc;
 	pthread_mutex_unlock(&mosq->log_callback_mutex);
 }
 
-void mosquitto_user_data_set(struct mosquitto *mosq, void *obj)
+void mosquitto_user_data_set(struct mosquitto *mosq, void *userdata)
 {
 	if(mosq){
-		mosq->obj = obj;
+		mosq->userdata = userdata;
 	}
 }
 

File lib/mosquitto.h

View file
 #endif
 
 #define LIBMOSQUITTO_MAJOR 1
-#define LIBMOSQUITTO_MINOR 0
-#define LIBMOSQUITTO_REVISION 5
+#define LIBMOSQUITTO_MINOR 1
+#define LIBMOSQUITTO_REVISION 0
 #define LIBMOSQUITTO_VERSION_NUMBER (LIBMOSQUITTO_MAJOR*1000000+LIBMOSQUITTO_MINOR*1000+LIBMOSQUITTO_REVISION)
 
 /* Log types */
  * Function: mosquitto_loop
  *
  * The main network loop for the client. You must call this frequently in order
- * to keep communications between the client and broker working. An alternative
- * approach is to use <mosquitto_loop_start> to run the client loop in its own
- * thread.
+ * to keep communications between the client and broker working. If incoming
+ * data is present it will then be processed. Outgoing commands, from e.g.
+ * <mosquitto_publish>, are normally sent immediately that their function is
+ * called, but this is not always possible. <mosquitto_loop> will also attempt
+ * to send any remaining outgoing messages, which also includes commands that
+ * are part of the flow for messages with QoS>0.
+ *
+ * An alternative approach is to use <mosquitto_loop_start> to run the client
+ * loop in its own thread.
  *
  * This calls select() to monitor the client network socket. If you want to
  * integrate mosquitto client operation with your own select() call, use
  *	timeout -     Maximum number of milliseconds to wait for network activity
  *	              in the select() call before timing out. Set to 0 for instant
  *	              return.  Set negative to use the default of 1000ms.
- *	max_packets - the maximum number of packets to process in this call. Must
- *	              be >0. If set to 1, only a single packet will be processed
- *	              per call. Avoid setting too high if you have a high incoming
- *	              message rate.
+ *	max_packets - this parameter is currently unused.
  * 
  * Returns:
  *	MOSQ_ERR_SUCCESS -   on success.
  *                       Use strerror_r() where available or FormatMessage() on
  *                       Windows.
  * See Also:
- *	<mosquitto_loop_start>, <mosquitto_loop_stop>
+ *	<mosquitto_loop_forever>, <mosquitto_loop_start>, <mosquitto_loop_stop>
  */
 libmosq_EXPORT int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets);
 
 /*
+ * Function: mosquitto_loop_forever
+ *
+ * This function call loop() for you in an infinite blocking loop. It is useful
+ * for the case where you only want to run the MQTT client loop in your
+ * program.
+ *
+ * It handles reconnecting in case server connection is lost. If you call
+ * mosquitto_disconnect() in a callback it will return.
+ *
+ * Parameters:
+ *  mosq - a valid mosquitto instance.
+ *	timeout -     Maximum number of milliseconds to wait for network activity
+ *	              in the select() call before timing out. Set to 0 for instant
+ *	              return.  Set negative to use the default of 1000ms.
+ *	max_packets - this parameter is currently unused.
+ *
+ * Returns:
+ *	MOSQ_ERR_SUCCESS -   on success.
+ * 	MOSQ_ERR_INVAL -     if the input parameters were invalid.
+ * 	MOSQ_ERR_NOMEM -     if an out of memory condition occurred.
+ * 	MOSQ_ERR_NO_CONN -   if the client isn't connected to a broker.
+ *  MOSQ_ERR_CONN_LOST - if the connection to the broker was lost.
+ *	MOSQ_ERR_PROTOCOL -  if there is a protocol error communicating with the
+ *                       broker.
+ * 	MOSQ_ERR_ERRNO -     if a system call returned an error. The variable errno
+ *                       contains the error code, even on Windows.
+ *                       Use strerror_r() where available or FormatMessage() on
+ *                       Windows.
+ *
+ * See Also:
+ *	<mosquitto_loop>, <mosquitto_loop_start>
+ */
+libmosq_EXPORT int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets);
+
+/*
  * Function: mosquitto_loop_start
  *
  * This is part of the threaded client interface. Call this once to start a new
  *	MOSQ_ERR_NOT_SUPPORTED - if thread support is not available.
  *
  * See Also:
- *	<mosquitto_connect_async>, <mosquitto_loop>, <mosquitto_loop_stop>
+ *	<mosquitto_connect_async>, <mosquitto_loop>, <mosquitto_loop_forever>, <mosquitto_loop_stop>
  */
 libmosq_EXPORT int mosquitto_loop_start(struct mosquitto *mosq);
 
  *
  * Parameters:
  *	mosq -        a valid mosquitto instance.
- *	max_packets - the maximum number of packets to process in this call. Must
- *	              be >0. If set to 1, only a single packet will be processed
- *	              per call. Avoid setting too high if you have a high incoming
- *	              message rate.
+ *	max_packets - this parameter is currently unused.
  *
  * Returns:
  *	MOSQ_ERR_SUCCESS -   on success.
  *
  * Parameters:
  *	mosq -        a valid mosquitto instance.
- *	max_packets - the maximum number of packets to process in this call. Must
- *	              be >0. If set to 1, only a single packet will be processed
- *	              per call.
+ *	max_packets - this parameter is currently unused.
  *
  * Returns:
  *	MOSQ_ERR_SUCCESS -   on success.
  * Parameters:
  *  mosq -          a valid mosquitto instance.
  *  message_retry - the number of seconds to wait for a response before
- *                  retrying. Defaults to 60.
+ *                  retrying. Defaults to 20.
  */
 libmosq_EXPORT void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry);
 

File lib/mosquitto_internal.h

View file
 
 #include <mosquitto.h>
 #ifdef WITH_BROKER
-struct _mosquitto_client_msg;
+struct mosquitto_client_msg;
 #endif
 
 enum mosquitto_msg_direction {
 #ifdef WITH_BROKER
 	bool is_bridge;
 	struct _mqtt3_bridge *bridge;
-	struct _mosquitto_client_msg *msgs;
+	struct mosquitto_client_msg *msgs;
 	struct _mosquitto_acl_user *acl_list;
 	struct _mqtt3_listener *listener;
 	time_t disconnect_t;
 	int pollfd_index;
 #else
-	void *obj;
+	void *userdata;
 	bool in_callback;
 	unsigned int message_retry;
 	time_t last_retry_check;
 	struct mosquitto_message_all *messages;
-	void (*on_connect)(struct mosquitto *, void *obj, int rc);
-	void (*on_disconnect)(struct mosquitto *, void *obj, int rc);
-	void (*on_publish)(struct mosquitto *, void *obj, int mid);
-	void (*on_message)(struct mosquitto *, void *obj, const struct mosquitto_message *message);
-	void (*on_subscribe)(struct mosquitto *, void *obj, int mid, int qos_count, const int *granted_qos);
-	void (*on_unsubscribe)(struct mosquitto *, void *obj, int mid);
-	void (*on_log)(struct mosquitto *, void *obj, int level, const char *str);
+	void (*on_connect)(struct mosquitto *, void *userdata, int rc);
+	void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
+	void (*on_publish)(struct mosquitto *, void *userdata, int mid);
+	void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
+	void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
+	void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
+	void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
 	//void (*on_error)();
 	char *host;
 	int port;
+	int queue_len;
 #endif
 };
 

File lib/net_mosq.c

View file
 
 void _mosquitto_net_cleanup(void)
 {
+#ifdef WITH_TLS
+	ERR_free_strings();
+	EVP_cleanup();
+#endif
+
 #ifdef WIN32
 	WSACleanup();
 #endif
 #endif
 		COMPAT_CLOSE(sock);
 	}
+	freeaddrinfo(ainfo);
 	if(!rp){
 		return MOSQ_ERR_ERRNO;
 	}
-	freeaddrinfo(ainfo);
 
 	/* Set non-blocking */
 #ifndef WIN32
 			return MOSQ_ERR_INVAL;
 		}
 
+#if OPENSSL_VERSION_NUMBER >= 0x10000000
+		/* Disable compression */
+		SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
+#endif
+#ifdef SSL_MODE_RELEASE_BUFFERS
+			/* Use even less memory per SSL connection. */
+			SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
+#endif
+
 		if(mosq->tls_ciphers){
 			ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
 			if(ret == 0){
 	unsigned long e;
 #endif
 	assert(mosq);
+	errno = 0;
 #ifdef WITH_TLS
 	if(mosq->ssl){
 		ret = SSL_read(mosq->ssl, buf, count);
 #endif
 	assert(mosq);
 
+	errno = 0;
 #ifdef WITH_TLS
 	if(mosq->ssl){
 		ret = SSL_write(mosq->ssl, buf, count);
 			if(mosq->on_publish){
 				/* This is a QoS=0 message */
 				mosq->in_callback = true;
-				mosq->on_publish(mosq, mosq->obj, packet->mid);
+				mosq->on_publish(mosq, mosq->userdata, packet->mid);
 				mosq->in_callback = false;
 			}
 			pthread_mutex_unlock(&mosq->callback_mutex);
 }
 
 #ifdef WITH_BROKER
-int _mosquitto_packet_read(mosquitto_db *db, struct mosquitto *mosq)
+int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
 #else
 int _mosquitto_packet_read(struct mosquitto *mosq)
 #endif

File lib/net_mosq.h

View file
 #include <mosquitto.h>
 
 #ifdef WITH_BROKER
-struct _mosquitto_db;
+struct mosquitto_db;
 #endif
 
 #ifdef WIN32
 
 int _mosquitto_packet_write(struct mosquitto *mosq);
 #ifdef WITH_BROKER
-int _mosquitto_packet_read(struct _mosquitto_db *db, struct mosquitto *mosq);
+int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq);
 #else
 int _mosquitto_packet_read(struct mosquitto *mosq);
 #endif

File lib/python/mosquitto.py

View file
 
 def error_string(mosq_errno):
     """Return the error string associated with a mosquitto error number."""
-    if mosq_errno == MOSQ_ERR_SUCESS:
+    if mosq_errno == MOSQ_ERR_SUCCESS:
         return "No error."
     elif mosq_errno == MOSQ_ERR_NOMEM:
         return "Out of memory."
                 spos += 1
                 while tpos < tlen and local_topic[tpos] != '/':
                     tpos += 1
+                if tpos == tlen and spos == slen:
+                    result = True
+                    break
 
             elif local_sub[spos] == '#':
                 multilevel_wildcard = True
     * Use connect()/connect_async() to connect to a broker
     * Call loop() frequently to maintain network traffic flow with the broker
     * Or use loop_start() to set a thread running to call loop() for you.
+    * Or use loop_forever() to handle calling loop() for you in a blocking
+    * function.
     * Use subscribe() to subscribe to a topic and receive messages
     * Use publish() to send messages
     * Use disconnect() to disconnect from the broker
     broker. To use a callback, define a function and then assign it to the
     client:
     
-    def on_connect(mosq, obj, rc):
+    def on_connect(mosq, userdata, rc):
         print("Connection returned " + str(rc))
 
     client.on_connect = on_connect
 
-    All of the callbacks as described below have a "mosq" and an "obj"
+    All of the callbacks as described below have a "mosq" and an "userdata"
     argument. "mosq" is the Mosquitto instance that is calling the callback.
-    "obj" is user data of any type and can be set when creating a new client
-    instance or with user_data_set(obj).
+    "userdata" is user data of any type and can be set when creating a new client
+    instance or with user_data_set(userdata).
     
     The callbacks:
 
-    on_connect(mosq, obj, rc): called when the broker responds to our connection
+    on_connect(mosq, userdata, rc): called when the broker responds to our connection
       request. The value of rc determines success or not:
       0: Connection successful
       1: Connection refused - incorrect protocol version
       5: Connection refused - not authorised
       6-255: Currently unused.
 
-    on_disconnect(mosq, obj, rc): called when the client disconnects from the broker.
+    on_disconnect(mosq, userdata, rc): called when the client disconnects from the broker.
       The rc parameter indicates the disconnection state. If MOSQ_ERR_SUCCESS
       (0), the callback was called in response to a disconnect() call. If any
       other value the disconnection was unexpected, such as might be caused by
       a network error.
 
-    on_message(mosq, obj, message): called when a message has been received on a
+    on_message(mosq, userdata, message): called when a message has been received on a
       topic that the client subscribes to. The message variable is a
       MosquittoMessage that describes all of the message parameters.
 
-    on_publish(mosq, obj, mid): called when a message that was to be sent using the
+    on_publish(mosq, userdata, mid): called when a message that was to be sent using the
       publish() call has completed transmission to the broker. For messages
       with QoS levels 1 and 2, this means that the appropriate handshakes have
       completed. For QoS 0, this simply means that the message has left the
       This callback is important because even if the publish() call returns
       success, it does not always mean that the message has been sent.
 
-    on_subscribe(mosq, obj, mid, granted_qos): called when the broker responds to a
+    on_subscribe(mosq, userdata, mid, granted_qos): called when the broker responds to a
       subscribe request. The mid variable matches the mid variable returned
       from the corresponding subscribe() call. The granted_qos variable is a
       list of integers that give the QoS level the broker has granted for each
       of the different subscription requests.
 
-    on_unsubscribe(mosq, obj, mid): called when the broker responds to an unsubscribe
+    on_unsubscribe(mosq, userdata, mid): called when the broker responds to an unsubscribe
       request. The mid variable matches the mid variable returned from the
       corresponding unsubscribe() call.
 
-    on_log(mosq, obj, level, buf): called when the client has log information. Define
+    on_log(mosq, userdata, level, buf): called when the client has log information. Define
       to allow debugging. The level variable gives the severity of the message
       and will be one of MOSQ_LOG_INFO, MOSQ_LOG_NOTICE, MOSQ_LOG_WARNING,
       MOSQ_LOG_ERR, and MOSQ_LOG_DEBUG. The message itself is in buf.
 
     """
-    def __init__(self, client_id="", clean_session=True, obj=None):
+    def __init__(self, client_id="", clean_session=True, userdata=None):
         """client_id is the unique client id string used when connecting to the
         broker. If client_id is zero length or None, then one will be randomly
         generated. In this case, clean_session must be True. If this is not the
         disconnect. Calling connect() or reconnect() will cause the messages to
         be resent.  Use reinitialise() to reset a client to its original state.
 
-        obj is user defined data of any type that is passed as the "obj"
+        userdata is user defined data of any type that is passed as the "userdata"
         parameter to callbacks. It may be updated at a later point with the
         user_data_set() function.
         """
         if clean_session == False and (client_id == "" or client_id == None):
             raise ValueError('A client id must be provided if clean session is False.')
 
-        self._obj = obj
+        self._userdata = userdata
         self._sock = None
         self._keepalive = 60
         self._message_retry = 20
     def __del__(self):
         pass
 
-    def reinitialise(self, client_id="", clean_session=True, obj=None):
+    def reinitialise(self, client_id="", clean_session=True, userdata=None):
         if self._ssl:
             self._ssl.close()
             self._ssl = None
         elif self._sock:
             self._sock.close()
             self._sock = None
-        self.__init__(client_id, clean_session, obj)
+        self.__init__(client_id, clean_session, userdata)
 
     def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None):
         """Configure network encryption and authentication options. Enables SSL/TLS support.
         if ca_certs == None:
             raise ValueError('ca_certs must not be None.')
 
+        try:
+            f = open(ca_certs, "r")
+        except IOError as err:
+            raise IOError(ca_certs+": "+err.strerror)
+        else:
+            f.close()
+        if certfile != None:
+            try:
+                f = open(certfile, "r")
+            except IOError as err:
+                raise IOError(certfile+": "+err.strerror)
+            else:
+                f.close()
+        if keyfile != None:
+            try:
+                f = open(keyfile, "r")
+            except IOError as err:
+                raise IOError(keyfile+": "+err.strerror)
+            else:
+                f.close()
+
         self._tls_ca_certs = ca_certs
         self._tls_certfile = certfile
         self._tls_keyfile = keyfile
     def loop(self, timeout=1.0, max_packets=1):
         """Process network events.
 
-        This function must be called regularly to ensure communication with the broker is carried out.
+        This function must be called regularly to ensure communication with the
+        broker is carried out. It calls select() on the network socket to wait
+        for network events. If incoming data is present it will then be
+        processed. Outgoing commands, from e.g. publish(), are normally sent
+        immediately that their function is called, but this is not always
+        possible. loop() will also attempt to send any remaining outgoing
+        messages, which also includes commands that are part of the flow for
+        messages with QoS>0.
 
         timeout: The time in seconds to wait for incoming/outgoing network
           traffic before timing out and returning. 
-        max_packets: The maximum number of packets to process before returning.
-          Must be >0. If set to 1, only a single packet will be processed per
-          call. Avoid setting too high if you have a high incoming message rate.
+        max_packets: Not currently used.
 
         Returns MOSQ_ERR_SUCCESS on success.
         Returns >0 on error.
 
-        A ValueError will be raised if timeout < 0 or if max_packets < 1"""
+        A ValueError will be raised if timeout < 0"""
         if timeout < 0.0:
             raise ValueError('Invalid timeout.')
-        if max_packets < 1:
-            raise ValueError('Invalid max_packets.')
 
         self._current_out_packet_mutex.acquire()
         self._out_packet_mutex.acquire()
 
         if self.socket() in socklist[0]:
             rc = self.loop_read(max_packets)
-            if rc != MOSQ_ERR_SUCCESS:
-                if self._ssl:
-                    self._ssl.close()
-                    self._ssl = None
-                elif self._sock:
-                    self._sock.close()
-                    self._sock = None
-
-                self._state_mutex.acquire()
-                if self._state == mosq_cs_disconnecting:
-                    rc = MOSQ_ERR_SUCCESS
-                self._state_mutex.release()
-                self._callback_mutex.acquire()
-                if self.on_disconnect:
-                    self._in_callback = True
-                    self.on_disconnect(self, self._obj, rc)
-                    self._in_callback = False
-
-                self._callback_mutex.release()
+            if rc or (self._ssl == None and self._sock == None):
                 return rc
 
         if self.socket() in socklist[1]:
             rc = self.loop_write(max_packets)
-            if rc != MOSQ_ERR_SUCCESS:
-                if self._ssl:
-                    self._ssl.close()
-                    self._ssl = None
-                else:
-                    self._sock.close()
-                    self._sock = None
-
-                self._state_mutex.acquire()
-                if self._state == mosq_cs_disconnecting:
-                    rc = MOSQ_ERR_SUCCESS
-                self._state_mutex.release()
-                self._callback_mutex.acquire()
-                if self.on_disconnect:
-                    self._in_callback = True
-                    self.on_disconnect(self, self._obj, rc)
-                    self._in_callback = False
-                self._callback_mutex.release()
+            if rc or (self._ssl == None and self._sock == None):
                 return rc
 
         return self.loop_misc()
         if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
+        max_packets = len(self._messages)
         if max_packets < 1:
-            raise ValueError('Invalid max_packets.')
+            max_packets = 1
 
         for i in range(0, max_packets):
             rc = self._packet_read()
             if rc > 0:
-                return rc
+                return self._loop_rc_handle(rc)
             elif rc == MOSQ_ERR_AGAIN:
                 return MOSQ_ERR_SUCCESS
         return MOSQ_ERR_SUCCESS
         if self._sock == None and self._ssl == None:
             return MOSQ_ERR_NO_CONN
 
+        max_packets = len(self._messages)
         if max_packets < 1:
-            raise ValueError('Invalid max_packets.')
+            max_packets = 1
 
         for i in range(0, max_packets):
             rc = self._packet_write()
             if rc > 0:
-                return rc
+                return self._loop_rc_handle(rc)
             elif rc == MOSQ_ERR_AGAIN:
                 return MOSQ_ERR_SUCCESS
         return MOSQ_ERR_SUCCESS
                 rc = 1
             if self.on_disconnect:
                 self._in_callback = True
-                self.on_disconnect(self, self._obj, rc)
+                self.on_disconnect(self, self._userdata, rc)
                 self._in_callback = False
             self._callback_mutex.release()
             return MOSQ_ERR_CONN_LOST
 
         self._message_retry = retry
 
-    def user_data_set(self, obj):
+    def user_data_set(self, userdata):
         """Set the user data variable passed to callbacks. May be any data type."""
-        self._obj = obj
+        self._userdata = userdata
 
     def will_set(self, topic, payload=None, qos=0, retain=False):
         """Set a Will to be sent by the broker in case the client disconnects unexpectedly.
         else:
             return self._sock
 
+    def loop_forever(self, timeout=1.0, max_packets=1):
+        """This function call loop() for you in an infinite blocking loop. It
+        is useful for the case where you only want to run the MQTT client loop
+        in your program.
+
+        loop_forever() will handle reconnecting for you. If you call
+        disconnect() in a callback it will return."""
+
+        run = True
+        if self._state == mosq_cs_connect_async:
+            self.reconnect()
+
+        while run == True:
+            rc = MOSQ_ERR_SUCCESS
+            while rc == MOSQ_ERR_SUCCESS:
+                rc = self.loop(timeout, max_packets)
+
+            if self._state == mosq_cs_disconnecting:
+                run = False
+            else:
+                time.sleep(1)
+                self.reconnect()
+        return rc
+
     def loop_start(self):
         """This is part of the threaded client interface. Call this once to
         start a new thread to process network traffic. This provides an
     # Private functions
     # ============================================================
 
+    def _loop_rc_handle(self, rc):
+        if rc:
+            if self._ssl:
+                self._ssl.close()
+                self._ssl = None
+            elif self._sock:
+                self._sock.close()
+                self._sock = None
+
+            self._state_mutex.acquire()
+            if self._state == mosq_cs_disconnecting:
+                rc = MOSQ_ERR_SUCCESS
+            self._state_mutex.release()
+            self._callback_mutex.acquire()
+            if self.on_disconnect:
+                self._in_callback = True
+                self.on_disconnect(self, self._userdata, rc)
+                self._in_callback = False
+
+            self._callback_mutex.release()
+        return rc
+
     def _packet_read(self):
         # This gets called if pselect() indicates that there is network data
         # available - ie. at least one byte.  What we do depends on what data we
         while self._current_out_packet:
             packet = self._current_out_packet
 
-            if self._ssl:
-                try:
+            try:
+                if self._ssl:
                     write_length = self._ssl.write(packet.packet[packet.pos:])
-                except AttributeError:
-                    self._current_out_packet_mutex.release()
-                    return MOSQ_ERR_SUCCESS
-            else:
-                write_length = self._sock.send(packet.packet[packet.pos:])
+                else:
+                    write_length = self._sock.send(packet.packet[packet.pos:])
+            except AttributeError:
+                self._current_out_packet_mutex.release()
+                return MOSQ_ERR_SUCCESS
+            except socket.error as err:
+                self._current_out_packet_mutex.release()
+                (msg) = err
+                if self._ssl and (msg.errno == ssl.SSL_ERROR_WANT_READ or msg.errno == ssl.SSL_ERROR_WANT_WRITE):
+                    return MOSQ_ERR_AGAIN
+                if msg.errno == errno.EAGAIN:
+                    return MOSQ_ERR_AGAIN
+                print(msg)
+                return 1
+
             if write_length > 0:
                 packet.to_process = packet.to_process - write_length
                 packet.pos = packet.pos + write_length
                         self._callback_mutex.acquire()
                         if self.on_publish:
                             self._in_callback = True
-                            self.on_publish(self, self._obj, packet.mid)
+                            self.on_publish(self, self._userdata, packet.mid)
                             self._in_callback = False
 
                         self._callback_mutex.release()
 
     def _easy_log(self, level, buf):
         if self.on_log:
-            self.on_log(self, self._obj, level, buf)
+            self.on_log(self, self._userdata, level, buf)
 
     def _check_keepalive(self):
         now = time.time()
         if (self._sock != None or self._ssl != None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
             if self._state == mosq_cs_connected and self._ping_t == 0:
                 self._send_pingreq()
+                self._msgtime_mutex.acquire()
+                self._last_msg_out = now
+                self._last_msg_in = now
+                self._msgtime_mutex.release()
             else:
                 if self._ssl:
                     self._ssl.close()
                 self._callback_mutex.acquire()
                 if self.on_disconnect:
                     self._in_callback = True
-                    self.on_disconnect(self, self._obj, rc)
+                    self.on_disconnect(self, self._userdata, rc)
                     self._in_callback = False
                 self._callback_mutex.release()
 
         self._callback_mutex.acquire()
         if self.on_connect:
             self._in_callback = True
-            self.on_connect(self, self._obj, result)
+            self.on_connect(self, self._userdata, result)
             self._in_callback = False
         self._callback_mutex.release()
         if result == 0:
         self._callback_mutex.acquire()
         if self.on_subscribe:
             self._in_callback = True
-            self.on_subscribe(self, self._obj, mid, granted_qos)
+            self.on_subscribe(self, self._userdata, mid, granted_qos)
             self._in_callback = False
         self._callback_mutex.release()
 
             self._callback_mutex.acquire()
             if self.on_message:
                 self._in_callback = True
-                self.on_message(self, self._obj, message)
+                self.on_message(self, self._userdata, message)
                 self._in_callback = False
 
             self._callback_mutex.release()
             self._callback_mutex.acquire()
             if self.on_message:
                 self._in_callback = True
-                self.on_message(self, self._obj, message)
+                self.on_message(self, self._userdata, message)
                 self._in_callback = False
 
             self._callback_mutex.release()
                 self._callback_mutex.acquire()
                 if self.on_message:
                     self._in_callback = True
-                    self.on_message(self, self._obj, self._messages[i])
+                    self.on_message(self, self._userdata, self._messages[i])
                     self._in_callback = False
                 self._callback_mutex.release()
                 self._messages.pop(i)
         self._callback_mutex.acquire()
         if self.on_unsubscribe:
             self._in_callback = True
-            self.on_unsubscribe(self, self._obj, mid)
+            self.on_unsubscribe(self, self._userdata, mid)
             self._in_callback = False
         self._callback_mutex.release()
         return MOSQ_ERR_SUCCESS
                     self._callback_mutex.acquire()
                     if self.on_publish:
                         self._in_callback = True
-                        self.on_publish(self, self._obj, mid)
+                        self.on_publish(self, self._userdata, mid)
                         self._in_callback = False
 
                     self._callback_mutex.release()

File lib/python/setup.py

View file
 
 from distutils.core import setup
 setup(name='mosquitto',
-	version='1.0.5',
+	version='1.1',
 	description='MQTT version 3.1 client class',
 	author='Roger Light',
 	author_email='roger@atchoo.org',

File lib/read_handle.c

View file
 			pthread_mutex_lock(&mosq->callback_mutex);
 			if(mosq->on_message){
 				mosq->in_callback = true;
-				mosq->on_message(mosq, mosq->obj, &message->msg);
+				mosq->on_message(mosq, mosq->userdata, &message->msg);
 				mosq->in_callback = false;
 			}
 			pthread_mutex_unlock(&mosq->callback_mutex);
 			pthread_mutex_lock(&mosq->callback_mutex);
 			if(mosq->on_message){
 				mosq->in_callback = true;
-				mosq->on_message(mosq, mosq->obj, &message->msg);
+				mosq->on_message(mosq, mosq->userdata, &message->msg);
 				mosq->in_callback = false;
 			}
 			pthread_mutex_unlock(&mosq->callback_mutex);

File lib/read_handle.h

View file
 #define _READ_HANDLE_H_
 
 #include <mosquitto.h>
-struct _mosquitto_db;
+struct mosquitto_db;
 
 int _mosquitto_packet_handle(struct mosquitto *mosq);
 int _mosquitto_handle_connack(struct mosquitto *mosq);
 int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type);
 int _mosquitto_handle_publish(struct mosquitto *mosq);
 int _mosquitto_handle_pubrec(struct mosquitto *mosq);
-int _mosquitto_handle_pubrel(struct _mosquitto_db *db, struct mosquitto *mosq);
+int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq);
 int _mosquitto_handle_suback(struct mosquitto *mosq);
 int _mosquitto_handle_unsuback(struct mosquitto *mosq);
 

File lib/read_handle_client.c

View file
 	pthread_mutex_lock(&mosq->callback_mutex);
 	if(mosq->on_connect){
 		mosq->in_callback = true;
-		mosq->on_connect(mosq, mosq->obj, result);
+		mosq->on_connect(mosq, mosq->userdata, result);
 		mosq->in_callback = false;
 	}
 	pthread_mutex_unlock(&mosq->callback_mutex);

File lib/read_handle_shared.c

View file
 		pthread_mutex_lock(&mosq->callback_mutex);
 		if(mosq->on_publish){
 			mosq->in_callback = true;
-			mosq->on_publish(mosq, mosq->obj, mid);
+			mosq->on_publish(mosq, mosq->userdata, mid);
 			mosq->in_callback = false;
 		}
 		pthread_mutex_unlock(&mosq->callback_mutex);
 	return MOSQ_ERR_SUCCESS;
 }
 
-int _mosquitto_handle_pubrel(struct _mosquitto_db *db, struct mosquitto *mosq)
+int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
 {
 	uint16_t mid;
 #ifndef WITH_BROKER
 		pthread_mutex_lock(&mosq->callback_mutex);
 		if(mosq->on_message){
 			mosq->in_callback = true;
-			mosq->on_message(mosq, mosq->obj, &message->msg);
+			mosq->on_message(mosq, mosq->userdata, &message->msg);
 			mosq->in_callback = false;
 		}
 		pthread_mutex_unlock(&mosq->callback_mutex);
 	pthread_mutex_lock(&mosq->callback_mutex);
 	if(mosq->on_subscribe){
 		mosq->in_callback = true;
-		mosq->on_subscribe(mosq, mosq->obj, mid, qos_count, granted_qos);
+		mosq->on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
 		mosq->in_callback = false;
 	}
 	pthread_mutex_unlock(&mosq->callback_mutex);
 	pthread_mutex_lock(&mosq->callback_mutex);
 	if(mosq->on_unsubscribe){
 		mosq->in_callback = true;
-	   	mosq->on_unsubscribe(mosq, mosq->obj, mid);
+	   	mosq->on_unsubscribe(mosq, mosq->userdata, mid);
 		mosq->in_callback = false;
 	}
 	pthread_mutex_unlock(&mosq->callback_mutex);

File lib/send_mosq.c

View file
 int _mosquitto_send_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
 {
 #ifdef WITH_BROKER
-	int len;
+	size_t len;
 #ifdef WITH_BRIDGE
 	int i;
 	struct _mqtt3_bridge_topic *cur_topic;

File lib/util_mosq.c

View file
 			pthread_mutex_lock(&mosq->callback_mutex);
 			if(mosq->on_disconnect){
 				mosq->in_callback = true;
-				mosq->on_disconnect(mosq, mosq->obj, rc);
+				mosq->on_disconnect(mosq, mosq->userdata, rc);
 				mosq->in_callback = false;
 			}
 			pthread_mutex_unlock(&mosq->callback_mutex);
 				while(tpos < tlen && local_topic[tpos] != '/'){
 					tpos++;
 				}
+				if(tpos == tlen && spos == slen){
+					*result = true;
+					break;
+				}
 			}else if(local_sub[spos] == '#'){
 				multilevel_wildcard = true;
 				if(spos+1 != slen){

File man/libmosquitto.3.xml

View file
 			<funcsynopsis><funcprototype><funcdef>struct mosquitto *<function>mosquitto_new</function></funcdef>
 					<paramdef>const char *<parameter>id</parameter></paramdef>
 					<paramdef>bool <parameter>clean_session</parameter></paramdef>
-					<paramdef>void *<parameter>obj</parameter></paramdef>
+					<paramdef>void *<parameter>userdata</parameter></paramdef>
 			</funcprototype></funcsynopsis>
 			<para>Create a new mosquitto client instance.</para>
 
 					<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
 					<paramdef>const char *<parameter>id</parameter></paramdef>
 					<paramdef>bool <parameter>clean_session</parameter></paramdef>
-					<paramdef>void *<parameter>obj</parameter></paramdef>
+					<paramdef>void *<parameter>userdata</parameter></paramdef>
 			</funcprototype></funcsynopsis>
 		</refsect2>
 
 					<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
 			</funcprototype></funcsynopsis>
 
+			<funcsynopsis><funcprototype><funcdef>int <function>mosquitto_loop_forever</function></funcdef>
+					<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
+					<paramdef>int <parameter>timeout</parameter></paramdef>
+					<paramdef>int <parameter>max_packets</parameter></paramdef>
+			</funcprototype></funcsynopsis>
+
 			<funcsynopsis><funcprototype><funcdef>int <function>mosquitto_socket</function></funcdef>
 					<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
 			</funcprototype></funcsynopsis>
 
 			<funcsynopsis><funcprototype><funcdef>int <function>mosquitto_user_data_set</function></funcdef>
 					<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
-					<paramdef>void *<parameter>obj</parameter></paramdef>
+					<paramdef>void *<parameter>userdata</parameter></paramdef>
 			</funcprototype></funcsynopsis>
 		</refsect2>
 
 #include &lt;stdio.h&gt;
 #include &lt;mosquitto.h&gt;
 
-void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
+void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
 {
 	if(message->payloadlen){
 		printf("%s %s\n", message->topic, message->payload);
 	fflush(stdout);
 }
 
-void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
+void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
 {
 	int i;
 	if(!result){
 	}
 }
 
-void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
+void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)