Commits

Andy Gross  committed 251fcc0

use shared_ptrs for messages

  • Participants
  • Parent commits 9df8543

Comments (0)

Files changed (7)

File src/client.cpp

     queue_->add_waiter(shared_from_this());
 }
 
-message *client::dequeue_message() {
-    message *m = queue_->dequeue_message();
+message_shared_ptr client::dequeue_message() {
+    message_shared_ptr m = queue_->dequeue_message();
     if (m) {
         message_receipt r(std::time(0), m);
         outstanding_[m->id()] = r;
 bool client::ack_message(message_id id) {
     receipt_map::iterator it = outstanding_.find(id);
     if (it != outstanding_.end()) {
-        queue_->ack_message(it->second.second);
         outstanding_.erase(it);
         return true;
     }

File src/client.hpp

 class client : public boost::enable_shared_from_this<client>,
                private boost::noncopyable {
 public:
-    typedef std::pair<std::time_t, message *> message_receipt;
+    typedef std::pair<std::time_t, message_shared_ptr> message_receipt;
     typedef std::map<message_id, message_receipt> receipt_map;
 private:
     receipt_map outstanding_;
     message_queue *queue();
     std::size_t qsize() const;
     receipt_map& outstanding();
-    message *dequeue_message();
+    message_shared_ptr dequeue_message();
     message_id enqueue_message(unsigned char *data, std::size_t len);
     void wait();
     bool ack_message(message_id id);

File src/connection.cpp

 }
 
 void connection::cmd_get() {
-    message *m = client_->dequeue_message();
+    message_shared_ptr m = client_->dequeue_message();
     if (!m) wait();
     else {
         message_id mid = m->id();

File src/message.cpp

 namespace tinymq {
 
 message::message(message_id id, std::size_t len, unsigned char* data,
-        void_allocator& void_alloc) :
-    m_id(id),
-    m_len(len),
-    m_data(void_alloc) {
-    m_data.reserve(len);
-    m_data.insert(m_data.begin(), data, data+len);
+        void_allocator *void_alloc) :
+    id_(id),
+    len_(len),
+    data_(*void_alloc) {
+    data_.reserve(len);
+    data_.insert(data_.begin(), data, data+len);
 }
 
 message_id message::id() const {
-    return m_id;
+    return id_;
 }
 
 std::size_t message::size() const {
-    return m_len;
+    return len_;
 }
 
-uchar_vector* message::data() {
-    return &m_data;
+uchar_vector *message::data() {
+    return &data_;
 }
 
 message_id_generator::message_id_generator() {

File src/message.hpp

 #define MESSAGE_HPP_
 
 #include <cstddef>
+#include <vector>
 #include <stdint.h>
 #include <stdexcept>
 #include <boost/interprocess/managed_shared_memory.hpp>
+#include <boost/interprocess/managed_mapped_file.hpp>
 #include <boost/interprocess/containers/vector.hpp>
 #include <boost/interprocess/allocators/allocator.hpp>
 #include <boost/interprocess/offset_ptr.hpp>
+#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
+#include <boost/interprocess/smart_ptr/weak_ptr.hpp>
+
+namespace tinymq {
 
 namespace bip = boost::interprocess;
 
-namespace tinymq {
-
 typedef uint64_t message_id;
 
 typedef bip::managed_shared_memory::segment_manager      segment_manager_t;
 typedef bip::vector<unsigned char, uchar_allocator>      uchar_vector;
 typedef bip::allocator<uchar_vector, segment_manager_t>  uchar_vector_allocator;
 
+
 class message {
-    message_id m_id;
-    std::size_t m_len;
-    uchar_vector m_data;
+    message_id id_;
+    std::size_t len_;
+    uchar_vector data_;
 public:
     message(message_id id, std::size_t len, unsigned char* data,
-            void_allocator& void_alloc);
+            void_allocator *void_alloc);
     message_id id() const;
     std::size_t size() const;
     uchar_vector* data();
+
 };
 
 class message_id_generator {
     message_id next_id();
 };
 
+typedef bip::managed_shared_ptr<message, bip::managed_mapped_file>::type message_shared_ptr;
+typedef bip::managed_weak_ptr<message, bip::managed_mapped_file>::type message_weak_ptr;
+
 } // namespace tinymq
 
-#endif /* MESSAGE_HPP_ */
+#endif // include guard

File src/message_queue.cpp

     bool was_empty = (deque_->size() == 0);
     message_id id(idgen_.next_id());
     try {
-        message *m = file_.construct<message>(bip::anonymous_instance)(id, len, data, *valloc_);
+        message_shared_ptr m = 
+            bip::make_managed_shared_ptr(
+                file_.construct<message>(
+                bip::anonymous_instance)(id, len, data, valloc_.get()),
+                file_
+            );
         deque_->push_back(m);
         if (was_empty) notify_one();
         return id;
     }
     catch (const bip::bad_alloc&) {
         grow(maxsize_);
-        message *m = file_.construct<message>(bip::anonymous_instance)(id, len, data, *valloc_);
+        message_shared_ptr m = 
+            bip::make_managed_shared_ptr(
+                file_.construct<message>(
+                bip::anonymous_instance)(id, len, data, valloc_.get()),
+                file_
+            );
         deque_->push_back(m);
         if (was_empty) notify_one();
         return id;
     }
 }
-    
-void message_queue::enqueue_message(message *m) {
+
+void message_queue::enqueue_message(message_shared_ptr m) {
     bool was_empty = (deque_->size() == 0);
     try {
         deque_->push_back(m);
         if (was_empty) notify_one();
     }
 }
-    
-message* message_queue::dequeue_message() {
+
+message_shared_ptr message_queue::dequeue_message() {
     if (deque_->size()) {
-        message_optr m = deque_->front();
+        message_shared_ptr m(deque_->front());
         deque_->pop_front();
-        return m.get();
+        return m;
     }
-    return 0;
-}
+    else {
+        return message_shared_ptr();
+    }
 
-void message_queue::ack_message(message *m) {
-    if (m)
-        file_.destroy_ptr(m);
 }
 
 void message_queue::sync() {

File src/message_queue.hpp

 #include <boost/interprocess/managed_mapped_file.hpp>
 #include <boost/interprocess/mem_algo/rbtree_best_fit.hpp>
 #include <boost/interprocess/offset_ptr.hpp>
+#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
+#include <boost/interprocess/smart_ptr/weak_ptr.hpp>
+#include <boost/interprocess/smart_ptr/deleter.hpp>
 #include <boost/interprocess/indexes/iset_index.hpp>
 #include <boost/interprocess/containers/deque.hpp>
 #include <boost/interprocess/allocators/allocator.hpp>
     bip::rbtree_best_fit<bip::mutex_family, bip::offset_ptr<void> >,
     bip::iset_index> tinymq_message_file;
 
-typedef bip::offset_ptr<message> message_optr;
 typedef bip::managed_mapped_file::segment_manager segment_manager_t;
-typedef bip::allocator<message_optr, segment_manager_t> message_allocator;
-typedef bip::deque<message_optr, message_allocator> message_deque;
+typedef bip::allocator<message_shared_ptr, segment_manager_t> message_allocator;
+typedef bip::deque<message_shared_ptr, message_allocator> message_deque;
 
 class message_queue : private boost::noncopyable {
     std::string file_name_;
     std::size_t free_memory() const { return file_.get_free_memory(); }
     void grow(size_t by);
     message_id enqueue_message(unsigned char *data, std::size_t len);
-    void enqueue_message(message *m);
-    message* dequeue_message();
-    void ack_message(message *m);
+    void enqueue_message(message_shared_ptr m);
+    message_shared_ptr dequeue_message();
     void sync();
     void print_stats();
 };