Commits

Roger Light  committed 57c0342

Break out of select() on publish(), subscribe() etc. when using the threaded
interface. Fixes bug #1270062.

  • Participants
  • Parent commits 1e4c021
  • Branches 1.3

Comments (0)

Files changed (11)

File ChangeLog.txt

   the behaviour as clarified by the Oasis MQTT spec. This will lead to
   unexpected behaviour if you were using topics of this form.
 - Add support for SRV lookups.
+- Break out of select() on publish(), subscribe() etc. when using the threaded
+  interface. Fixes bug #1270062.
 
 Clients:
 - Add --ciphers to allow specifying which TLS ciphers to support.

File client/pub_client.c

 	char *host = "localhost";
 	int port = 1883;
 	char *bind_address = NULL;
-	int keepalive = 60;
+	int keepalive = 1;
 	char buf[1024];
 	bool debug = false;
 	struct mosquitto *mosq = NULL;

File lib/memory_mosq.h

 #ifndef _MEMORY_MOSQ_H_
 #define _MEMORY_MOSQ_H_
 
+#include <stdio.h>
 #include <sys/types.h>
 
 #if defined(WITH_MEMORY_TRACKING) && defined(WITH_BROKER) && !defined(WIN32) && !defined(__SYMBIAN32__) && !defined(__ANDROID__)

File lib/mosquitto.c

 	mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
 	if(mosq){
 		mosq->sock = INVALID_SOCKET;
+		mosq->sockpairR = INVALID_SOCKET;
+		mosq->sockpairW = INVALID_SOCKET;
 #ifdef WITH_THREADING
 		mosq->thread_id = pthread_self();
 #endif
 		mosq->userdata = mosq;
 	}
 	mosq->sock = INVALID_SOCKET;
+	mosq->sockpairR = INVALID_SOCKET;
+	mosq->sockpairW = INVALID_SOCKET;
 	mosq->keepalive = 60;
 	mosq->message_retry = 20;
 	mosq->last_retry_check = 0;
 	}
 
 	_mosquitto_packet_cleanup(&mosq->in_packet);
+	if(mosq->sockpairR != INVALID_SOCKET){
+		COMPAT_CLOSE(mosq->sockpairR);
+		mosq->sockpairR = INVALID_SOCKET;
+	}
+	if(mosq->sockpairW != INVALID_SOCKET){
+		COMPAT_CLOSE(mosq->sockpairW);
+		mosq->sockpairW = INVALID_SOCKET;
+	}
 }
 
 void mosquitto_destroy(struct mosquitto *mosq)
 
 	mosq->keepalive = keepalive;
 
+	_mosquitto_socketpair(&mosq->sockpairR, &mosq->sockpairW);
+
 	return MOSQ_ERR_SUCCESS;
 }
 
 	fd_set readfds, writefds;
 	int fdcount;
 	int rc;
+	char pairbuf;
+	int maxfd = 0;
 
 	if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
 
 	FD_ZERO(&readfds);
 	FD_ZERO(&writefds);
 	if(mosq->sock != INVALID_SOCKET){
+		maxfd = mosq->sock;
 		FD_SET(mosq->sock, &readfds);
 		pthread_mutex_lock(&mosq->current_out_packet_mutex);
 		pthread_mutex_lock(&mosq->out_packet_mutex);
 		if(mosq->achan){
 			pthread_mutex_lock(&mosq->state_mutex);
 			if(mosq->state == mosq_cs_connect_srv){
-				ares_fds(mosq->achan, &readfds, &writefds);
+				rc = ares_fds(mosq->achan, &readfds, &writefds);
+				if(rc > maxfd){
+					maxfd = rc;
+				}
 			}else{
 				return MOSQ_ERR_NO_CONN;
 			}
 		return MOSQ_ERR_NO_CONN;
 #endif
 	}
+	if(mosq->sockpairR != INVALID_SOCKET){
+		/* sockpairR is used to break out of select() before the timeout, on a
+		 * call to publish() etc. */
+		FD_SET(mosq->sockpairR, &readfds);
+		if(mosq->sockpairR > maxfd){
+			maxfd = mosq->sockpairR;
+		}
+	}
 
 	if(timeout >= 0){
 		local_timeout.tv_sec = timeout/1000;
 	}
 
 #ifdef HAVE_PSELECT
-	fdcount = pselect(mosq->sock+1, &readfds, &writefds, NULL, &local_timeout, NULL);
+	fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
 #else
-	fdcount = select(mosq->sock+1, &readfds, &writefds, NULL, &local_timeout);
+	fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
 #endif
 	if(fdcount == -1){
 #ifdef WIN32
 					return rc;
 				}
 			}
+			if(mosq->sockpairR >= 0 && FD_ISSET(mosq->sockpairR, &readfds)){
+#ifndef WIN32
+				if(read(mosq->sockpairR, &pairbuf, 1) == 0){
+				}
+#else
+				recv(mosq->sockpairR, &pairbuf, 1, 0);
+#endif
+				/* Fake write possible, to stimulate output write even though
+				 * we didn't ask for it, because at that point the publish or
+				 * other command wasn't present. */
+				FD_SET(mosq->sock, &writefds);
+			}
 			if(FD_ISSET(mosq->sock, &writefds)){
 				rc = mosquitto_loop_write(mosq, max_packets);
 				if(rc || mosq->sock == INVALID_SOCKET){

File lib/mosquitto_internal.h

 struct mosquitto {
 #ifndef WIN32
 	int sock;
+#  ifndef WITH_BROKER
+	int sockpairR, sockpairW;
+#  endif
 #else
 	SOCKET sock;
+#  ifndef WITH_BROKER
+	SOCKET sockpairR, sockpairW;
+#  endif
 #endif
 	enum _mosquitto_protocol protocol;
 	char *address;

File lib/net_mosq.c

 
 int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
 {
+#ifndef WITH_BROKER
+	char sockpair_data = 0;
+#endif
 	assert(mosq);
 	assert(packet);
 
 #ifdef WITH_BROKER
 	return _mosquitto_packet_write(mosq);
 #else
+
+	/* Write a single byte to sockpairW (connected to sockpairR) to break out
+	 * of select() if in threaded mode. */
+	if(mosq->sockpairW != INVALID_SOCKET){
+#ifndef WIN32
+		if(write(mosq->sockpairW, &sockpair_data, 1)){
+		}
+#else
+		send(mosq->sockpairW, &sockpair_data, 1, 0);
+#endif
+	}
+
 	if(mosq->in_callback == false && mosq->threaded == false){
 		return _mosquitto_packet_write(mosq);
 	}else{
 	return rc;
 }
 
+int _mosquitto_socket_nonblock(int sock)
+{
+	int opt = 1;
+
+#ifndef WIN32
+	/* Set non-blocking */
+	opt = fcntl(sock, F_GETFL, 0);
+	if(opt == -1 || fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
+		/* If either fcntl fails, don't want to allow this client to connect. */
+		COMPAT_CLOSE(sock);
+		return 1;
+	}
+#else
+	if(ioctlsocket(sock, FIONBIO, &opt)){
+		COMPAT_CLOSE(sock);
+		return 1;
+	}
+#endif
+	return 0;
+}
+
+
+#ifndef WITH_BROKER
+int _mosquitto_socketpair(int *pairR, int *pairW)
+{
+	int family;
+	struct sockaddr_storage ss;
+	struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
+	struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
+	socklen_t ss_len;
+	int spR, spW;
+
+#ifdef WIN32
+	char ss_opt;
+#else
+	int ss_opt;
+#endif
+	int listensock;
+
+	*pairR = -1;
+	*pairW = -1;
+
+	for(family=AF_INET; ; family=AF_INET6){
+		memset(&ss, 0, sizeof(ss));
+		if(family == AF_INET){
+			sa->sin_family = family;
+			sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+			sa->sin_port = 0;
+		}else{
+			sa6->sin6_family = family;
+			sa6->sin6_addr = in6addr_loopback;
+			sa6->sin6_port = 0;
+		}
+
+		listensock = socket(family, SOCK_STREAM, IPPROTO_IP);
+		if(listensock == -1){
+			continue;
+		}
+
+#ifndef WIN32
+		ss_opt = 1;
+		setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR, &ss_opt, sizeof(ss_opt));
+#endif
+		ss_opt = 1;
+		setsockopt(listensock, IPPROTO_IPV6, IPV6_V6ONLY, &ss_opt, sizeof(ss_opt));
+
+		if(bind(listensock, (struct sockaddr *)&ss, sizeof(ss)) == -1){
+			COMPAT_CLOSE(listensock);
+			continue;
+		}
+
+		if(listen(listensock, 1) == -1){
+			COMPAT_CLOSE(listensock);
+			continue;
+		}
+		memset(&ss, 0, sizeof(ss));
+		ss_len = sizeof(ss);
+		if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
+			COMPAT_CLOSE(listensock);
+			continue;
+		}
+		
+		if(_mosquitto_socket_nonblock(listensock)){
+			continue;
+		}
+
+		if(family == AF_INET){
+			sa->sin_family = family;
+			sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+		}else{
+			sa6->sin6_family = family;
+			sa6->sin6_addr = in6addr_loopback;
+		}
+
+		spR = socket(family, SOCK_STREAM, IPPROTO_TCP);
+		if(spR == -1){
+			COMPAT_CLOSE(listensock);
+			continue;
+		}
+		if(_mosquitto_socket_nonblock(spR)){
+			COMPAT_CLOSE(listensock);
+			continue;
+		}
+		if(connect(spR, (struct sockaddr *)&ss, sizeof(ss)) < 0){
+#ifdef WIN32
+			errno = WSAGetLastError();
+#endif
+			if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
+				COMPAT_CLOSE(spR);
+				COMPAT_CLOSE(listensock);
+				continue;
+			}
+		}
+		spW = accept(listensock, NULL, 0);
+		if(spW == -1){
+#ifdef WIN32
+			errno = WSAGetLastError();
+#endif
+			if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
+				COMPAT_CLOSE(spR);
+				COMPAT_CLOSE(listensock);
+				continue;
+			}
+		}
+
+		if(_mosquitto_socket_nonblock(spW)){
+			COMPAT_CLOSE(spR);
+			COMPAT_CLOSE(listensock);
+			continue;
+		}
+		COMPAT_CLOSE(listensock);
+
+		*pairR = spR;
+		*pairW = spW;
+		return 0;
+	}
+	return 1;
+}
+#endif

File lib/net_mosq.h

 int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking);
 int _mosquitto_socket_close(struct mosquitto *mosq);
 int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking);
+int _mosquitto_socket_nonblock(int sock);
+int _mosquitto_socketpair(int *sp1, int *sp2);
 
 int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte);
 int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count);

File lib/srv_mosq.c

 
 #ifdef WITH_SRV
 #  include <ares.h>
+
+#  include <arpa/nameser.h>
+#  include <stdio.h>
+#  include <string.h>
 #endif
 
-#include <arpa/nameser.h>
-#include <stdio.h>
-#include <string.h>
-
 #include "logging_mosq.h"
 #include "memory_mosq.h"
 #include "mosquitto_internal.h"

File lib/thread_mosq.c

 		pthread_mutex_unlock(&mosq->state_mutex);
 	}
 
-	mosquitto_loop_forever(mosq, 10000, 1);
+	if(!mosq->keepalive){
+		/* Sleep for a day if keepalive disabled. */
+		mosquitto_loop_forever(mosq, mosq->keepalive*1000*86400, 1);
+	}else{
+		/* Sleep for our keepalive value. publish() etc. will wake us up. */
+		mosquitto_loop_forever(mosq, mosq->keepalive*1000, 1);
+	}
 
 	mosq->threaded = false;
 	return obj;

File lib/util_mosq.c

 	last_msg_out = mosq->last_msg_out;
 	last_msg_in = mosq->last_msg_in;
 	pthread_mutex_unlock(&mosq->msgtime_mutex);
-	if(mosq->sock != INVALID_SOCKET &&
+	if(mosq->keepalive && mosq->sock != INVALID_SOCKET &&
 			(now - last_msg_out >= mosq->keepalive || now - last_msg_in >= mosq->keepalive)){
 
 		if(mosq->state == mosq_cs_connected && mosq->ping_t == 0){
 	g_socket_connections++;
 #endif
 
-#ifndef WIN32
-	/* Set non-blocking */
-	opt = fcntl(new_sock, F_GETFL, 0);
-	if(opt == -1 || fcntl(new_sock, F_SETFL, opt | O_NONBLOCK) == -1){
-		/* If either fcntl fails, don't want to allow this client to connect. */
-		close(new_sock);
-		return -1;
-	}
-#else
-	if(ioctlsocket(new_sock, FIONBIO, &opt)){
-		closesocket(new_sock);
+	if(_mosquitto_socket_nonblock(new_sock)){
 		return INVALID_SOCKET;
 	}
-#endif
 
 #ifdef WITH_WRAP
 	/* Use tcpd / libwrap to determine whether a connection is allowed. */
 	struct addrinfo hints;
 	struct addrinfo *ainfo, *rp;
 	char service[10];
-	int opt = 1;
 #ifndef WIN32
 	int ss_opt = 1;
 #else
 		ss_opt = 1;
 		setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &ss_opt, sizeof(ss_opt));
 
-
-#ifndef WIN32
-		/* Set non-blocking */
-		opt = fcntl(sock, F_GETFL, 0);
-		if(opt == -1 || fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
-			/* If either fcntl fails, don't want to allow this client to connect. */
-			COMPAT_CLOSE(sock);
+		if(_mosquitto_socket_nonblock(sock)){
 			return 1;
 		}
-#else
-		if(ioctlsocket(sock, FIONBIO, &opt)){
-			COMPAT_CLOSE(sock);
-			return 1;
-		}
-#endif
 
 		if(bind(sock, rp->ai_addr, rp->ai_addrlen) == -1){
 			strerror_r(errno, err, 256);