Commits

Andy Gross  committed cf23610

separate message_queue from underlying storage implementation

  • Participants
  • Parent commits a3c875c

Comments (0)

Files changed (12)

 src/message_queue.cpp \
 src/server.cpp \
 src/tcp_client.cpp \
-src/tinymq.cpp 
+src/tinymq.cpp \
+src/durable_store.cpp
 
 OBJS += \
 src/client.o \
 src/message_queue.o \
 src/server.o \
 src/tcp_client.o \
-src/tinymq.o 
+src/tinymq.o \
+src/durable_store.o
 
 CPP_DEPS += \
 src/client.d \
 src/message_queue.d \
 src/server.d \
 src/tcp_client.d \
-src/tinymq.d 
+src/tinymq.d \
+src/durable_store.d
 
-LIBS := -lboost_thread-xgcc40-mt -lboost_system-xgcc40-mt -lboost_program_options-xgcc40-mt
-CFLAGS := -I/usr/local/include/boost-1_38 -O0 -gdwarf-2 -Wall 
+LIBS := -lboost_thread-xgcc40-mt -lboost_system-xgcc40-mt -lboost_program_options-xgcc40-mt -lboost_filesystem-xgcc40-mt 
+CFLAGS := -I/usr/local/include/boost-1_38 -Iextlib -O0 -gdwarf-2 -Wall 
 all: tinymq
 
 tinymq: $(OBJS)
-	g++ -o "tinymq" $(OBJS) $(USER_OBJS) $(LIBS)
+	g++  -o "tinymq" $(OBJS) $(USER_OBJS) $(LIBS)
 
 src/%.o: src/%.cpp
 	@echo 'Building file: $<'

File client/tinymq_client.py

         return msg_id
 
 def test():
-    c = tinymq_client("127.0.0.1", 5673)
+    c = tinymq_client("127.0.0.1", 5674)
     c.connect()
-    for i in range(0, 1000):
-        c.put("test%s" % i)
-    for i in range(0, 1000):
+    msg = "a"*1024
+    for i in range(0, 10):
+        print c.put(msg)
+    for i in range(0, 10):
         msg_id, msg_data = c.get()
         print msg_id, msg_data
         c.ack(msg_id)

File src/client.hpp

 class client : public boost::enable_shared_from_this<client>,
                private boost::noncopyable {
 public:
+    typedef boost::shared_ptr<client> shared_ptr;
+    typedef boost::weak_ptr<client> weak_ptr;
+public:
     typedef std::pair<std::time_t, message_shared_ptr> message_receipt;
     typedef std::map<message_id, message_receipt> receipt_map;
 private:
     virtual void notify();
 };
 
-typedef boost::shared_ptr<client> client_ptr;
-typedef boost::weak_ptr<client> client_wref;
 
 } // namespace tinymq
 

File src/connection.cpp

 #include <vector>
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
-#include <boost/asio/local/stream_protocol.hpp>
-#include <boost/asio/local/connect_pair.hpp>
 #include <boost/asio/buffer.hpp>
 #include "connection.hpp"
 #include "server.hpp"
+#include "tinymq_log.hpp"
 
-namespace asio = boost::asio;
+namespace a = boost::asio;
 
 namespace tinymq {
 
-connection::connection(asio::io_service& io_service, server *server)
+connection::connection(a::io_service& io_service, server *server)
   : io_service_(io_service),
     socket_(io_service),
     server_(server),
     client_(new tcp_client(&server->queue(), io_service)),
     curmsg_() {}
 
-connection::~connection() {}
+connection::~connection() {
 
-asio::ip::tcp::socket& connection::socket() {
+}
+
+a::ip::tcp::socket& connection::socket() {
     return socket_;
 }
 
 void connection::read_next_command() {
-    asio::async_read(socket_, asio::buffer(buffer_, command_length),
+    a::async_read(socket_, a::buffer(buffer_, command_length),
         boost::bind(&connection::handle_read_command, shared_from_this(),
-          asio::placeholders::error,
-          asio::placeholders::bytes_transferred));
+          a::placeholders::error,
+          a::placeholders::bytes_transferred));
 }
 
 void connection::wait() {
     client_->wait();
-    asio::async_read(client_->waker_sock(), asio::buffer(buffer_, command_length),
+    a::async_read(client_->waker_sock(), a::buffer(buffer_, command_length),
         boost::bind(&connection::handle_resume, shared_from_this(),
-                asio::placeholders::error,
-                asio::placeholders::bytes_transferred));
+                a::placeholders::error,
+                a::placeholders::bytes_transferred));
 }
 
 void connection::start() {
     std::string client_ip = socket_.remote_endpoint().address().to_string();
     std::string client_port = boost::lexical_cast<std::string>(
             socket_.remote_endpoint().port());
+    LAPP_ << "got TCP connection from " << client_ip << ":" << client_port;
     read_next_command();
 }
 
 void connection::cmd_get() {
+    LAPP_ << "dequeueing message, qsize=" << client_->qsize();
     curmsg_ = client_->dequeue_message();
     if (!curmsg_) wait();
     else {
         const message_header& hdr(curmsg_->header());
-        //curid_ = curmsg_->id();
-        //cursize_ = curmsg_->data()->size();
-        boost::array<boost::asio::const_buffer, 3> bufs = {
-            asio::buffer((void*)&hdr.id, sizeof(message_id)),
-            asio::buffer((void*)&hdr.len, sizeof(message_len_t)),
-            asio::buffer((void*)&curmsg_->data()->front(), curmsg_->size())
+        boost::array<a::const_buffer, 3> bufs = {
+            a::buffer((void*)&hdr.id, sizeof(message_id)),
+            a::buffer((void*)&hdr.len, sizeof(message_len_t)),
+            a::buffer((void*)&curmsg_->data()->front(), curmsg_->size())
         };
-        asio::async_write(socket_, bufs,
+        a::async_write(socket_, bufs,
                 boost::bind(&connection::handle_write, shared_from_this(),
-                        asio::placeholders::error,
-                        asio::placeholders::bytes_transferred));
+                        a::placeholders::error,
+                        a::placeholders::bytes_transferred));
     }
 }
 
 void connection::handle_read_command(const boost::system::error_code& e,
     std::size_t bytes_transferred) {
     if (!e) {
-        asio::const_buffer b = asio::buffer(buffer_);
-        const command *command_type = asio::buffer_cast<const command *>(b);
+        a::const_buffer b = a::buffer(buffer_);
+        const command *command_type = a::buffer_cast<const command *>(b);
         if (*command_type == get_msg)  {
             cmd_get();
         }
         else if (*command_type == ack_msg) {
-            asio::async_read(socket_, asio::buffer(buffer_, sizeof(message_id)),
+            a::async_read(socket_, a::buffer(buffer_, sizeof(message_id)),
                  bind(&connection::handle_read_message_id, shared_from_this(),
-                   asio::placeholders::error,
-                   asio::placeholders::bytes_transferred));
+                   a::placeholders::error,
+                   a::placeholders::bytes_transferred));
         }
         else if (*command_type == put_msg) {
-            asio::async_read(socket_, asio::buffer(buffer_, sizeof(message_len_t)),
+            a::async_read(socket_, a::buffer(buffer_, sizeof(message_len_t)),
                  boost::bind(&connection::handle_read_length, shared_from_this(),
-                   asio::placeholders::error,
-                   asio::placeholders::bytes_transferred));
+                   a::placeholders::error,
+                   a::placeholders::bytes_transferred));
         }
         else {
             printf("ERROR: unknown message type %d, closing connection\n", *command_type);
             socket_.shutdown(
-                    asio::ip::tcp::socket::shutdown_both);
+                    a::ip::tcp::socket::shutdown_both);
         }
     }
 }
 void connection::handle_read_message_id(const boost::system::error_code& e,
         std::size_t bytes_transferred) {
     if (!e) {
-        asio::const_buffer b = asio::buffer(buffer_);
-        const message_id *id = asio::buffer_cast<const message_id*>(b);
+        a::const_buffer b = a::buffer(buffer_);
+        const message_id *id = a::buffer_cast<const message_id*>(b);
         cmd_ack(*id);
     }
     else {
 void connection::handle_read_length(const boost::system::error_code& e,
         std::size_t bytes_transferred) {
     if (!e) {
-        asio::const_buffer b = asio::buffer(buffer_);
-        const std::size_t *msg_size = asio::buffer_cast<const std::size_t*>(b);
+        a::const_buffer b = a::buffer(buffer_);
+        const std::size_t *msg_size = a::buffer_cast<const std::size_t*>(b);
         incoming_.reserve(*msg_size);
         incoming_.insert(incoming_.begin(), *msg_size, 0);
-        asio::async_read(socket_, asio::buffer(incoming_),
+        a::async_read(socket_, a::buffer(incoming_),
              boost::bind(&connection::handle_read_message, shared_from_this(),
-               asio::placeholders::error,
-               asio::placeholders::bytes_transferred));
+               a::placeholders::error,
+               a::placeholders::bytes_transferred));
 
     }
     else {
 void connection::handle_read_message(const boost::system::error_code& e,
     std::size_t bytes_transferred) {
     if (!e) {
-        asio::const_buffer b = asio::buffer(incoming_);
-        const char *data = asio::buffer_cast<const char*>(b);
+        a::const_buffer b = a::buffer(incoming_);
+        const char *data = a::buffer_cast<const char*>(b);
         message_id mid = client_->enqueue_message((unsigned char *)data, incoming_.size());
+        LAPP_ << "received message id " << mid << " (" << incoming_.size() << " bytes)";
         incoming_.resize(0);
-        asio::async_write(socket_, asio::buffer((void*)&mid, sizeof(message_id)),
+        a::async_write(socket_, a::buffer((void*)&mid, sizeof(message_id)),
                  boost::bind(&connection::handle_write, shared_from_this(),
-                         asio::placeholders::error,
-                         asio::placeholders::bytes_transferred));
+                         a::placeholders::error,
+                         a::placeholders::bytes_transferred));
 
     }
     else {

File src/durable_store.cpp

+#include "durable_store.hpp"
+
+namespace tinymq {
+
+
+
+
+} // namespace tinymq

File src/durable_store.hpp

+#ifndef STORE_HPP_
+#define STORE_HPP_
+
+#include <string>
+#include <boost/noncopyable.hpp>
+#include <boost/interprocess/shared_memory_object.hpp>
+#include <boost/interprocess/managed_mapped_file.hpp>
+#include <boost/interprocess/allocators/allocator.hpp>
+#include <boost/interprocess/mem_algo/rbtree_best_fit.hpp>
+#include <boost/interprocess/offset_ptr.hpp>
+#include <boost/interprocess/indexes/flat_map_index.hpp>
+#include "tinymq_log.hpp"
+
+
+namespace tinymq { 
+
+namespace bip = boost::interprocess;
+
+template <template <class> class index_type=bip::flat_map_index>
+class store : public bip::basic_managed_mapped_file<
+    char, 
+    bip::rbtree_best_fit<bip::mutex_family, bip::offset_ptr<void> >,
+    index_type > {
+public:
+    typedef typename bip::basic_managed_mapped_file<
+        char,
+        bip::rbtree_best_fit<bip::mutex_family, bip::offset_ptr<void> >,
+        index_type>                          base_t;
+    typedef typename base_t::segment_manager segment_manager_type;
+    typedef bip::allocator<void, segment_manager_type> void_allocator_type;
+private:
+    std::string filename_;
+    std::size_t filesize_;
+public:
+    store(const std::string& filename, std::size_t init_size) :
+        base_t(bip::open_or_create, filename.c_str(), init_size),
+        filename_(filename),
+        filesize_(init_size) {
+        if (!base_t::check_sanity()) {
+            throw std::runtime_error("corrupt data file");
+        }
+    }
+    virtual ~store() { 
+        sync();
+        bip::shared_memory_object::remove(filename_.c_str());
+
+    }
+public:
+    const std::string& filename() const { return filename_; }
+    std::size_t  filesize() { return filesize_; }
+
+    virtual std::size_t free_memory() const { 
+        return base_t::get_free_memory(); 
+    }
+
+    virtual void grow(std::size_t by) {
+        //L_(info) << "growing store by " << by << " bytes";
+        sync();
+        filesize_ += by;
+        bip::shared_memory_object::remove(filename_.c_str());
+        base_t::grow(filename_.c_str(), filesize_);
+        base_t newstore(bip::open_only, filename_.c_str());
+        base_t::swap(newstore);
+    }
+
+    virtual void sync() { 
+        base_t::flush();
+    }
+};
+
+} // namespace tinymq
+
+#endif  // include guard

File src/message.cpp

 namespace tinymq {
 
 message::message(message_id id, message_len_t len, unsigned char* data,
-        void_allocator *void_alloc) :
+        void_allocator_type *void_alloc) :
     header_(id, len),
     data_(*void_alloc) {
     data_.reserve(len);
     return header_.len;
 }
 
-uchar_vector *message::data() {
+message::uchar_vector *message::data() {
     return &data_;
 }
 

File src/message.hpp

 #include <vector>
 #include <stdint.h>
 #include <stdexcept>
-#include <boost/interprocess/managed_shared_memory.hpp>
-#include <boost/interprocess/managed_mapped_file.hpp>
 #include <boost/interprocess/containers/vector.hpp>
 #include <boost/interprocess/allocators/allocator.hpp>
-#include <boost/interprocess/offset_ptr.hpp>
 #include <boost/interprocess/smart_ptr/shared_ptr.hpp>
 #include <boost/interprocess/smart_ptr/weak_ptr.hpp>
+#include "durable_store.hpp"
 
 namespace tinymq {
 
 typedef uint64_t message_id;
 typedef uint32_t message_len_t;
 
-typedef bip::managed_shared_memory::segment_manager      segment_manager_t;
-typedef bip::allocator<void, segment_manager_t>          void_allocator;
-typedef bip::allocator<unsigned char, segment_manager_t> uchar_allocator;
-typedef bip::vector<unsigned char, uchar_allocator>      uchar_vector;
-typedef bip::allocator<uchar_vector, segment_manager_t>  uchar_vector_allocator;
-
-
 struct message_header {
     message_id id;
     message_len_t len;
 };
 
 class message {
+public:
+    typedef store<> store_type;
+    typedef store_type::segment_manager_type segment_manager_type;
+    typedef store_type::void_allocator_type void_allocator_type;
+    typedef bip::allocator<unsigned char, segment_manager_type> uchar_allocator;
+    typedef bip::vector<unsigned char, uchar_allocator>      uchar_vector;
+    typedef bip::allocator<uchar_vector, segment_manager_type>  uchar_vector_allocator;
+private:
     message_header header_;
     uchar_vector data_;
 public:
     message(message_id id, message_len_t len, unsigned char* data,
-            void_allocator *void_alloc);
+            void_allocator_type *void_alloc);
     const message_header header() const { return header_; }
     const message_id id() const;
     const message_len_t size() const;
     uchar_vector* data();
-
 };
 
 class message_id_generator {
     message_id next_id();
 };
 
-typedef bip::managed_shared_ptr<message, bip::managed_mapped_file>::type message_shared_ptr;
-typedef bip::managed_weak_ptr<message, bip::managed_mapped_file>::type message_weak_ptr;
+typedef bip::managed_shared_ptr<message, message::store_type>::type message_shared_ptr;
+typedef bip::managed_weak_ptr<message, message::store_type>::type message_weak_ptr;
 
 } // namespace tinymq
 

File src/message_queue.cpp

 */
 
 #include "message_queue.hpp"
+#include "tinymq_log.hpp"
 
 namespace tinymq {
 
 message_queue::message_queue(const char *mfile_name, std::size_t maxsize) :
-     file_name_(mfile_name),
-     maxsize_(maxsize),
-     file_(bip::open_or_create, file_name_.c_str(), maxsize_),
-     valloc_(new void_allocator(file_.get_segment_manager())),
-     idgen_() {
-     if (!file_.check_sanity())
-         throw std::runtime_error("corrupt data file");
-     deque_ = file_.find_or_construct<message_deque>("mq")(*valloc_);
+    store_type(mfile_name, maxsize),
+    valloc_(new base_t::void_allocator_type(get_segment_manager())),
+    idgen_() {
+    deque_ = find_or_construct<message_deque>("mq")(*valloc_);
 }
 
-message_queue::~message_queue() {
-    sync();
-    bip::shared_memory_object::remove(file_name_.c_str());
-}
-
- void message_queue::add_waiter(client_ptr client) {
-     waiters_.insert(client_wref(client));
+void message_queue::add_waiter(client::shared_ptr client) {
+    waiters_.insert(client::weak_ptr(client));
  }
 
 std::size_t message_queue::qsize() const {
 
 void message_queue::notify_one() {
     if (waiters_.empty()) return;
-    client_wref c(*waiters_.begin());
+    client::weak_ptr c(*waiters_.begin());
     if (!c.expired()) {
-        client_ptr cp =c.lock();
+        client::shared_ptr cp =c.lock();
         cp->notify();
     }
     waiters_.erase(c);
 }
 
 void message_queue::grow(size_t by) {
-    sync();
-    maxsize_ += by;
-    bip::shared_memory_object::remove(file_name_.c_str());
-    file_.grow(file_name_.c_str(), maxsize_);
-    tinymq_message_file m_newfile(bip::open_only, file_name_.c_str());
-    file_.swap(m_newfile);
-    valloc_ = boost::shared_ptr<void_allocator>(new void_allocator(file_.get_segment_manager()));
-    deque_ = file_.find_or_construct<message_deque>("mq")(*valloc_);
+    store_type::grow(by);
+    valloc_ = boost::shared_ptr<base_t::void_allocator_type>(
+                new base_t::void_allocator_type(get_segment_manager()));
+    deque_ = find_or_construct<message_deque>("mq")(*valloc_);
 }
 
 message_id message_queue::enqueue_message(unsigned char *data, std::size_t len) {
     message_id id(idgen_.next_id());
     try {
         message_shared_ptr m = 
-            bip::make_managed_shared_ptr(
-                file_.construct<message>(
-                bip::anonymous_instance)(id, len, data, valloc_.get()),
-                file_
-            );
+            bip::make_managed_shared_ptr
+            (construct<message>(bip::anonymous_instance)(id, len, data, valloc_.get()),
+             *this);
         deque_->push_back(m);
         if (was_empty) notify_one();
         return id;
     }
     catch (const bip::bad_alloc&) {
-        grow(maxsize_);
+        grow(filesize());
         message_shared_ptr m = 
-            bip::make_managed_shared_ptr(
-                file_.construct<message>(
+            bip::make_managed_shared_ptr(construct<message>(
                 bip::anonymous_instance)(id, len, data, valloc_.get()),
-                file_
+                                         *this
             );
         deque_->push_back(m);
         if (was_empty) notify_one();
         if (was_empty) notify_one();
     }
     catch (const bip::bad_alloc&) {
-        grow(maxsize_);
+        grow(filesize());
         deque_->push_back(m);
         if (was_empty) notify_one();
     }
     else {
         return message_shared_ptr();
     }
-
 }
-
-void message_queue::sync() {
-    file_.flush();
-}
-
-void message_queue::print_stats() {
-    printf("named objects: %lu\n", file_.get_num_named_objects());
-    printf("free memory: %lu\n", file_.get_free_memory());
-}
-
 } // namespace tinymq

File src/message_queue.hpp

 #define MESSAGE_QUEUE_HPP_
 
 #include <set>
-#include <boost/noncopyable.hpp>
-#include <boost/interprocess/shared_memory_object.hpp>
-#include <boost/interprocess/managed_shared_memory.hpp>
-#include <boost/interprocess/managed_mapped_file.hpp>
-#include <boost/interprocess/mem_algo/rbtree_best_fit.hpp>
-#include <boost/interprocess/offset_ptr.hpp>
-#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
-#include <boost/interprocess/smart_ptr/weak_ptr.hpp>
-#include <boost/interprocess/smart_ptr/deleter.hpp>
-#include <boost/interprocess/indexes/iset_index.hpp>
 #include <boost/interprocess/containers/deque.hpp>
 #include <boost/interprocess/allocators/allocator.hpp>
-#include <boost/thread.hpp>
 #include <boost/shared_ptr.hpp>
 #include "message.hpp"
 #include "client.hpp"
+#include "durable_store.hpp"
 
 namespace bip = boost::interprocess;
 
 namespace tinymq {
 
-typedef bip::basic_managed_mapped_file<
-    char,
-    bip::rbtree_best_fit<bip::mutex_family, bip::offset_ptr<void> >,
-    bip::iset_index> tinymq_message_file;
-
-typedef bip::managed_mapped_file::segment_manager segment_manager_t;
-typedef bip::allocator<message_shared_ptr, segment_manager_t> message_allocator;
-typedef bip::deque<message_shared_ptr, message_allocator> message_deque;
-
-class message_queue : private boost::noncopyable {
-    std::string file_name_;
-    std::size_t maxsize_;
-    tinymq_message_file file_;
-    boost::shared_ptr<void_allocator> valloc_;
+class message_queue : public virtual store<> { 
+public:
+    typedef store<> base_t;
+    typedef bip::allocator<message_shared_ptr, base_t::segment_manager_type> message_allocator;
+    typedef bip::deque<message_shared_ptr, message_allocator> message_deque;
+private:
+    typedef store<> store_type;
+    boost::shared_ptr<base_t::void_allocator_type> valloc_;
     message_id_generator idgen_;
     message_deque *deque_;
-    std::set<client_wref> waiters_;
-public:
+    std::set<client::weak_ptr> waiters_;
+public: // construction/destruction
     message_queue(const char *mfile_name, std::size_t maxsize);
-    virtual ~message_queue();
-    void add_waiter(client_ptr client);
-    void notify_one();
-    std::size_t qsize() const ;
-    std::size_t free_memory() const { return file_.get_free_memory(); }
-    void grow(size_t by);
-    message_id enqueue_message(unsigned char *data, std::size_t len);
-    void enqueue_message(message_shared_ptr m);
+public: // api 
+    virtual void       grow(std::size_t by);
+    message_id         enqueue_message(unsigned char *data, std::size_t len);
+    void               enqueue_message(message_shared_ptr m);
     message_shared_ptr dequeue_message();
-    void sync();
-    void print_stats();
+    void               add_waiter(client::shared_ptr client);
+    void               notify_one();
+    std::size_t        qsize() const ;
 };
 
 } // namespace tinymq
 
-#endif /* MESSAGE_QUEUE_HPP_ */
+#endif // include guard

File src/server.cpp

  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 */
 
-#include "server.hpp"
+#include <vector>
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
-#include <vector>
+#include "server.hpp"
+#include "tinymq_log.hpp"
 
 namespace tinymq {
 
     acceptor_(io_service_),
     new_connection_(new connection(io_service_, this)),
     message_queue_("tinymq.data", 52428800L) {
+    LAPP_ << "queue initialized, now accepting connections on port " << port;
     boost::asio::ip::tcp::resolver resolver(io_service_);
     boost::asio::ip::tcp::resolver::query query(address, port);
     boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

File src/tcp_client.hpp

 namespace tinymq {
 
 class tcp_client : public virtual client {
+public: // typedefs
     typedef boost::asio::local::stream_protocol::socket local_sock;
+private:
     boost::asio::io_service& io_service_;
     local_sock notifier_;
     local_sock waker_;
-public:
+public: // construction/destruction
     tcp_client(message_queue *mq, boost::asio::io_service& io_service);
     virtual ~tcp_client();
+public:  // api
     local_sock& waker_sock();
     local_sock& notifier_sock();
     virtual void notify();