1. Andy Gross
  2. tinymq

Commits

Andy Gross  committed 52f8213

re-enqueue unacknowledged messages on client disconnect

  • Participants
  • Parent commits 3d8a10d
  • Branches default

Comments (0)

Files changed (6)

File .hgignore

View file
  • Ignore whitespace
 syntax regex
-^Debug/.*
 .*~
 ^src/.*.d
 ^src/.*.o
-^tinymq
+^tinymq$
+^tinymq.data$

File src/client.cpp

View file
  • Ignore whitespace
     outstanding_(),
     queue_(mq) {}
 
-client::~client() {}
+client::~client() {
+    for (receipt_map::iterator it=outstanding_.begin();
+         it != outstanding_.end(); ++it) {
+        queue_->enqueue_message(it->second.second);
+    }
+    outstanding_.clear();
+}
 
 std::size_t client::qsize() const {
     return queue_->qsize();
 }
 
+message_queue *client::queue() {
+    return queue_;
+}
+
+
+client::receipt_map& client::outstanding() {
+    return outstanding_;
+}
+
 message_id client::enqueue_message(unsigned char *data, std::size_t len) {
     return queue_->enqueue_message(data, len);
 }

File src/client.hpp

View file
  • Ignore whitespace
 public:
     client(message_queue *mq);
     virtual ~client();
+    message_queue *queue();
     std::size_t qsize() const;
+    receipt_map& outstanding();
     message *dequeue_message();
     message_id enqueue_message(unsigned char *data, std::size_t len);
     void wait();

File src/message_queue.cpp

View file
  • Ignore whitespace
         return id;
     }
 }
-
+    
+void message_queue::enqueue_message(message *m) {
+    bool was_empty = (deque_->size() == 0);
+    try {
+        deque_->push_back(m);
+        if (was_empty) notify_one();
+    }
+    catch (const bip::bad_alloc&) {
+        grow(maxsize_);
+        deque_->push_back(m);
+        if (was_empty) notify_one();
+    }
+}
+    
 message* message_queue::dequeue_message() {
     if (deque_->size()) {
         message_optr m = deque_->front();

File src/message_queue.hpp

View file
  • Ignore whitespace
     std::size_t free_memory() const { return file_.get_free_memory(); }
     void grow(size_t by);
     message_id enqueue_message(unsigned char *data, std::size_t len);
+    void enqueue_message(message *m);
     message* dequeue_message();
     void ack_message(message *m);
     void sync();

File src/tcp_client.cpp

View file
  • Ignore whitespace
 }
 
 void tcp_client::notify() {
-    boost::asio::write(notifier_, boost::asio::buffer(0));
+    boost::asio::write(notifier_, boost::asio::buffer("0000"));
 }
 
 } // namespace tinymq