Commits

bitblaze committed 78d56a2 Merge
  • Participants
  • Parent commits 988eb88, f66e416

Comments (0)

Files changed (5)

File client/configure.ac

 # Process this file with autoconf to produce a configure script.
 
 AC_PREREQ([2.68])
-AC_INIT([libringmaster], [0.1], [d.zhekai.chen@gmail.com])
+AC_INIT([libringmaster], [1], [d.zhekai.chen@gmail.com])
 AC_CONFIG_SRCDIR([src/ringmaster.c])
 AC_CONFIG_HEADERS([config.h])
 AC_CONFIG_MACRO_DIR([m4])
 AC_USE_SYSTEM_EXTENSIONS
-AM_INIT_AUTOMAKE
-
+AM_INIT_AUTOMAKE([silent-rules subdir-objects])
+AM_SILENT_RULES([yes])
 LT_INIT
 
 # Checks for programs.

File client/src/client.h

 #ifndef RINGMASTER_CLIENT_H_
 #define RINGMASTER_CLIENT_H
 
+#include "list.h"
+
 struct threads {
 	pthread_t io_thread;
 	pthread_t event_thread;
 
 struct buffer {
 	uint32_t length;
-	uint32_t off;
+	uint32_t offset;
 	char *buffer;
 	struct list_head list_node;
 };

File client/src/mtio.c

 #include <sys/socket.h>
 #include <sys/epoll.h>
 #include <netinet/in.h>
+#include <netinet/tcp.h>
 #include <pthread.h>
 
 #include <ringmaster.h>
 	if (buf == NULL)
 		return NULL;
 
-	buf->len = len;
-	buf->off = 0;
+	buf->length = len;
+	buf->offset = 0;
 	buf->buffer = (char *) malloc(len);
 	if (buf->buffer == NULL) {
 		free(buf);
 
 }
 
-static void event_main(RgHandle *handle)
+static void *event_main(void *h)
 {
-	struct outbound_completions *outbound_completions;
+	RgHandle *handle = (void *) h;
+	struct completion_queue *inbound_completions;
 
-	outbound_completions = &(handle->outbound_completions);
+	inbound_completions = &(handle->inbound_completions);
 
 	/* Event dispatch thread loop. */
 	while (handle->close_requested == 0) {
-		pthread_mutex_lock(&(outbound_completions->lock));
-		while (list_empty(outbound_completions->list)) {
-			pthread_cond_wait(&(outbound_completions->cond));
+		pthread_mutex_lock(&(inbound_completions->lock));
+		while (list_empty(inbound_completions->list)) {
+			pthread_cond_wait(&(inbound_completions->cond),
+			                  &(inbound_completions->lock));
 		}
-		pthread_mutex_unlock(&(outbound_completions->lock));
+		pthread_mutex_unlock(&(inbound_completions->lock));
 		event_process(handle);
 	}
 }
 		return -1;
 	}
 
-	retval = setsockopt(socket_fd, IPROTO, TCP, TCP_NODELAY, 
+	retval = setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, 
 	                    &enable_tcp_nodelay, 
 			    sizeof(enable_tcp_nodelay));
 	if (retval != 0) {
 	}
 
 	handle->socket_fd = socket_fd;
-	*fd = socket_fd;
 
 	retval = connect(socket_fd, &(handle->addr), sizeof(struct sockaddr));
 	if (retval == -1) {
-		if (errno == EWOULDBLOCK || errno = EINPROGRESS) {
+		if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
 			handle->state = RG_STATE_CONNECTING;
 		} else {
 			perror("connect");
 		if (buffer == NULL) {
 			return -1;
 		}
-		list_node = buffer->list_node;
+		list_node = &(buffer->list_node);
 		// XXX We need to enqueue it!
 	} else {
 		list_node = recv_buffers->list->prev;
 
 	/* Receive the header first so we know how much to realloc for the 
 	   message body because it might be variable length. */
-	if (buffer->off < response_header_length()) {
-		retval = recv(socket_fd, buffer->buffer + buffer->off, 
-		              response_header_length() - buffer->off, 0);
+	if (buffer->offset < response_header_length()) {
+		retval = recv(socket_fd, buffer->buffer + buffer->offset, 
+		              response_header_length() - buffer->offset, 0);
 		switch (retval) {
 		case 0:
 			errno = EHOSTDOWN;
 			}
 			return -1;
 		default:
-			buffer->off += retval;
+			buffer->offset += retval;
 		}
 
-		if (recv_buffer->off == response_header_length()) {
+		if (buffer->offset == response_header_length()) {
 			struct response_header *header = (struct response_header *)
 				buffer->buffer;
 			int length = header->length;
 		}
 	}
 
-	if (buffer->off >= response_header_length() &&
-	    buffer->off < buffer->length) {
-		retval = recv(socket_fd, buffer->buffer + buffer->off,
-		              buffer->length - buffer->off, 0);
+	if (buffer->offset >= response_header_length() &&
+	    buffer->offset < buffer->length) {
+		retval = recv(socket_fd, buffer->buffer + buffer->offset,
+		              buffer->length - buffer->offset, 0);
 		switch (retval) {
 		case 0:
 			errno = EHOSTDOWN;
 			}
 			return -1;
 		default:
-			buffer->off += retval;
+			buffer->offset += retval;
 		}
 	}
 
-	return buffer->off == buffer->length;
+	return buffer->offset == buffer->length;
 }
 
 static void io_handle_read(RgHandle *handle, int interest)
 
 	int retval;
 
-	retval = io_recv(handle->socket_fd, handle->recv_buffers);
+	retval = io_recv(handle->socket_fd, &(handle->recv_buffers));
 
 	switch (retval) {
 	case -1:
 
 static int io_send(int socket_fd, struct buffer_queue *send_buffers)
 {
-	struct buffer *send_buffer;
+	struct buffer *buffer;
 	struct list_head *list_node;
 	int retval;
 
 	list_node = send_buffers->list->prev;
 	buffer = list_entry(list_node, struct buffer, list_node);
 
-	if (buffer->off < buffer->length) {
-		retval = send(socket_fd, buffer->buffer + buffer->off,
-		              buffer->length - buffer->off, 0);
+	if (buffer->offset < buffer->length) {
+		retval = send(socket_fd, buffer->buffer + buffer->offset,
+		              buffer->length - buffer->offset, 0);
 		switch (retval) {
 		case 0:
 			errno = EHOSTDOWN;
 			return -1;
 			break;
 		default:
-			buffer->off += retval;
+			buffer->offset += retval;
 		}
 		
 	}
 
-	return buffer->off == buffer->length;
+	return buffer->offset == buffer->length;
 }
 
 static void io_handle_write(RgHandle *handle, int interest)
 	if (!(interest & INTEREST_WRITE))
 		return;
 
-	retval = io_read(handle->socket_fd, handle->send_buffers);
+	retval = io_send(handle->socket_fd, &(handle->send_buffers));
 	
 	switch (retval) {
 	case -1:
 	}
 }
 
-static void io_main(RgHandle *handle)
+static void *io_main(void *h)
 {
+	RgHandle *handle = (RgHandle *) h;
 	struct epoll_event events[2];
 	int max_events;
 	int interest;
 	/* Main event loop. */
 	while (handle->close_requested == 0) {
 		interest = 0;
-		num_events = epoll_wait(epoll_fd, events, max_events, -1);
-		if (num_events < 0) {
+		retval = epoll_wait(epoll_fd, events, max_events, -1);
+		if (retval < 0) {
 			perror("epoll_wait");
 			return;
 		}
 			interest |= INTEREST_WRITE;
 
 		if (interest & INTEREST_READ) {
-			io_handle_read(handle);	
+			io_handle_read(handle, interest);	
 		}
 		if (interest & INTEREST_WRITE) {
-			io_handle_write(handle);
+			io_handle_write(handle, interest);
 		}
 	}
 }
 		: RG_ESYSTEM;
 }
 int mtio_queue_completion(RgHandle *handle, int type, void *callback,
-                          const void *callback_data)
+                          void *callback_data)
 {
 	struct completion *completion;
 
 	completion->callback = callback;
 	completion->callback_data = callback_data;
 
-	completion_queue_lock(handle->outbound_completions);
+	completion_queue_lock(&(handle->outbound_completions));
 	list_add(&(completion->list_node), &(handle->outbound_completions.list));
-	completion_queue_unlock(handle->outbound_completions);
+	completion_queue_unlock(&(handle->outbound_completions));
 
 	return RG_OK;
 }
 	struct buffer *send_buffer;
 
 	send_buffer = (struct buffer *) malloc(sizeof(*send_buffer));
-	send_buffer->len = packed_len;
+	send_buffer->length = packed_len;
 	send_buffer->buffer = packed_buf;
 
-	buffer_queue_lock(handle->send_buffers);
+	buffer_queue_lock(&(handle->send_buffers));
 	list_add(&(send_buffer->list_node), &(handle->send_buffers.list));
-	buffer_queue_unlock(handle->send_buffers);
+	buffer_queue_unlock(&(handle->send_buffers));
 
 	return RG_OK;
 }
 
 	/* Create I/O and event dispatch threads. */
 	pthread_t *io_thread = &(handle->threads.io_thread);
-	pthread_t *event_thread = &(handle->threads.events_thread);
+	pthread_t *event_thread = &(handle->threads.event_thread);
 
-	retval = pthread_create(io_thread, NULL, io_main, NULL);
+	retval = pthread_create(io_thread, NULL, io_main, (void *) handle);
 	if (retval < 0) {
 		perror("pthread_create");
 		return -1;
 	}
-	retval = pthread_create(event_thread, NULL, event_main, NULL);
+	retval = pthread_create(event_thread, NULL, event_main, (void *) handle);
 	if (retval < 0) {
 		perror("pthread_create");
 		return -1;
 	close(handle->socket_fd);
 	close(handle->epoll_fd);
 
-	pthread_join(&(handle->threads.io_thread));
-	pthread_join(&(handle->threads.event_thread));
+	void *status;
+
+	pthread_join(handle->threads.io_thread, &status);
+	pthread_join(handle->threads.event_thread, &status);
 }

File client/src/mtio.h

 int mtio_next_xid();
 
 int mtio_queue_completion(RgHandle *handle, int type, void *callback,
-                          const void *callback_data);
+                          void *callback_data);
 
 int mtio_queue_buffer(RgHandle *handle, char *packed_buf, size_t packed_len);
 

File client/src/ringmaster.c

 	struct sockaddr_in *sin = (struct sockaddr_in *) &handle->addr;
 	memset(sin, 0, sizeof(struct sockaddr_in));
 	sin->sin_family      = AF_INET;
-	sin->sin_port        = htons(port);
-	sin->sin_addr.s_addr = ringmaster_name_resolve(address);
+	sin->sin_port        = htons(handle->port);
+	sin->sin_addr.s_addr = ringmaster_name_resolve(handle->host);
 	if (sin->sin_addr.s_addr == NULL)
-		return -1;
+		return NULL;
 
 	/* Start threads. */
 	retval = mtio_init(handle);
 		return RG_ESYSTEM;
 
 	append_request->header.xid = mtio_next_xid();
-	if (append_request_set_data(append_request, buffer, len) < 0 )
+	if (append_request_set_data(&append_request, buffer, len) < 0 )
 		return RG_ESYSTEM;
 
 	mtio_queue_completion(handle, RG_COMPLETION_STAT, callback, 
 	                      callback_data);
 	mtio_queue_buffer(handle, (char *) append_request,
-	                  append_request_length(request));
+	                  append_request_length(append_request));
 	mtio_send(handle);
 	
 	return RG_OK;
 {
 	char *packed_buf;
 	size_t packed_len;
-	RequestHeader request_header;
-	RetrRequest retr_request;
+	struct retr_request *retr_request;
 
 	if (handle == NULL || callback == NULL)
 		return RG_EBADARG;
 
 int ringmaster_close(RgHandle *handle)
 {
-	close(handle->epfd);
+	close(handle->epoll_fd);
 	mtio_close(handle);
 	free(handle);
 }