Source

tinymq / src / message_queue.hpp

Full commit
/*
 Copyright (c) 2009 Andy Gross <andy@andygross.org>

 This file is provided to you under the Apache License,
 Version 2.0 (the "License"); you may not use this file
 except in compliance with the License.  You may obtain
 a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
*/

#ifndef MESSAGE_QUEUE_HPP_
#define MESSAGE_QUEUE_HPP_

#include <set>
#include <boost/noncopyable.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/managed_mapped_file.hpp>
#include <boost/interprocess/mem_algo/rbtree_best_fit.hpp>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/interprocess/indexes/iset_index.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "message.hpp"
#include "client.hpp"

namespace bip = boost::interprocess;

namespace tinymq {

typedef bip::basic_managed_mapped_file<
    char,
    bip::rbtree_best_fit<bip::mutex_family, bip::offset_ptr<void> >,
    bip::iset_index> tinymq_message_file;

typedef bip::offset_ptr<message> message_optr;
typedef bip::managed_mapped_file::segment_manager segment_manager_t;
typedef bip::allocator<message_optr, segment_manager_t> message_allocator;
typedef bip::deque<message_optr, message_allocator> message_deque;

class message_queue : private boost::noncopyable {
    std::string file_name_;
    std::size_t maxsize_;
    tinymq_message_file file_;
    boost::shared_ptr<void_allocator> valloc_;
    message_id_generator idgen_;
    message_deque *deque_;
    std::set<client_wref> waiters_;
public:
    message_queue(const char *mfile_name, std::size_t maxsize);
    virtual ~message_queue();
    void add_waiter(client_ptr client);
    void notify_one();
    std::size_t qsize() const ;
    std::size_t free_memory() const { return file_.get_free_memory(); }
    void grow(size_t by);
    message_id enqueue_message(unsigned char *data, std::size_t len);
    void enqueue_message(message *m);
    message* dequeue_message();
    void ack_message(message *m);
    void sync();
    void print_stats();
};

} // namespace tinymq

#endif /* MESSAGE_QUEUE_HPP_ */