Commits

Anonymous committed f09382c

asio buffers dont own their contents - make sure they dont get invalidated

  • Participants
  • Parent commits c08a259

Comments (0)

Files changed (5)

src/connection.cpp

   : io_service_(io_service),
     socket_(io_service),
     server_(server),
-    client_(new tcp_client(&server->queue(), io_service)){}
+    client_(new tcp_client(&server->queue(), io_service)),
+    curmsg_() {}
 
 connection::~connection() {}
 
 }
 
 void connection::cmd_get() {
-    message_shared_ptr m = client_->dequeue_message();
-    if (!m) wait();
+    curmsg_ = client_->dequeue_message();
+    if (!curmsg_) wait();
     else {
-        message_id mid = m->id();
-        std::size_t msize = m->data()->size();
-        std::vector<char> data(m->data()->begin(), m->data()->end());
+        const message_header& hdr(curmsg_->header());
+        //curid_ = curmsg_->id();
+        //cursize_ = curmsg_->data()->size();
         boost::array<boost::asio::const_buffer, 3> bufs = {
-                asio::buffer((void*)&mid, sizeof(message_id)),
-            asio::buffer((void*)&msize, sizeof(uint32_t)),
-            asio::buffer(data)
+            asio::buffer((void*)&hdr.id, sizeof(message_id)),
+            asio::buffer((void*)&hdr.len, sizeof(message_len_t)),
+            asio::buffer((void*)&curmsg_->data()->front(), curmsg_->size())
         };
         asio::async_write(socket_, bufs,
                 boost::bind(&connection::handle_write, shared_from_this(),
                    asio::placeholders::bytes_transferred));
         }
         else if (*command_type == put_msg) {
-            asio::async_read(socket_, asio::buffer(buffer_, sizeof(uint32_t)),
+            asio::async_read(socket_, asio::buffer(buffer_, sizeof(message_len_t)),
                  boost::bind(&connection::handle_read_length, shared_from_this(),
                    asio::placeholders::error,
                    asio::placeholders::bytes_transferred));

src/connection.hpp

     std::vector<char> incoming_;
     server *server_;
     tcp_client_ptr client_;
+    message_shared_ptr curmsg_;
 public:
-    enum command {
+    enum command { // command ids
         get_msg = 0,
         ack_msg = 1,
         put_msg = 2
     };
     enum { command_length = 4 };
+public: // construction
     explicit connection(boost::asio::io_service& io_service, server *server);
     virtual ~connection();
+public: // api
     boost::asio::ip::tcp::socket& socket();
     void start();
 private:
 
 namespace tinymq {
 
-message::message(message_id id, std::size_t len, unsigned char* data,
+message::message(message_id id, message_len_t len, unsigned char* data,
         void_allocator *void_alloc) :
-    id_(id),
-    len_(len),
+    header_(id, len),
     data_(*void_alloc) {
     data_.reserve(len);
     data_.insert(data_.begin(), data, data+len);
 }
 
-message_id message::id() const {
-    return id_;
+const message_id message::id() const {
+    return header_.id;
 }
 
-std::size_t message::size() const {
-    return len_;
+const message_len_t message::size() const {
+    return header_.len;
 }
 
 uchar_vector *message::data() {
 namespace bip = boost::interprocess;
 
 typedef uint64_t message_id;
+typedef uint32_t message_len_t;
 
 typedef bip::managed_shared_memory::segment_manager      segment_manager_t;
 typedef bip::allocator<void, segment_manager_t>          void_allocator;
 typedef bip::allocator<uchar_vector, segment_manager_t>  uchar_vector_allocator;
 
 
+struct message_header {
+    message_id id;
+    message_len_t len;
+    message_header(message_id id, message_len_t len) :
+        id(id),
+        len(len) {}
+};
+
 class message {
-    message_id id_;
-    std::size_t len_;
+    message_header header_;
     uchar_vector data_;
 public:
-    message(message_id id, std::size_t len, unsigned char* data,
+    message(message_id id, message_len_t len, unsigned char* data,
             void_allocator *void_alloc);
-    message_id id() const;
-    std::size_t size() const;
+    const message_header header() const { return header_; }
+    const message_id id() const;
+    const message_len_t size() const;
     uchar_vector* data();
 
 };

src/message_queue.cpp

 
 message_queue::~message_queue() {
     sync();
+    bip::shared_memory_object::remove(file_name_.c_str());
 }
 
  void message_queue::add_waiter(client_ptr client) {