Commits

spencercw  committed c56058b

#33 Implement message sending.

  • Participants
  • Parent commits 10a15da
  • Branches serial

Comments (0)

Files changed (14)

File gb_net/Doxyfile

 # Note: This option applies only to the class list, not to the 
 # alphabetical list.
 
-SORT_BY_SCOPE_NAME     = NO
+SORT_BY_SCOPE_NAME     = YES
 
 # If the STRICT_PROTO_MATCHING option is enabled and doxygen fails to 
 # do proper type resolution of all parameters of a function it will reject a 

File gb_net/include/gb_net/networking.h

 #include <gb_net/defs.h>
 #include <gb_net/types.h>
 
-namespace boost {
-	namespace asio {
-		class io_service;
-	}
-}
-
 namespace gbnet {
 
 namespace internal {
 class GBNET_API Networking
 {
 public:
-	//////////
-	// Signals
+	//! Posts the given message to any connected remote hosts.
+	/**
+	 * \param identifier The message type identifier. It is up to the application to decide on a
+	 * value to use here.
+	 * \param message The raw message data to send.
+	 */
+	void postMessage(uint16_t identifier, const std::basic_string<uint8_t> &message);
 
 	//! Register to receive notifications when the server has started listening.
 	/**
 	// Implementation
 	internal::NetworkingImpl *impl_;
 
-	Networking(boost::asio::io_service &ioService);
+	Networking(internal::NetworkingImpl *impl);
 	virtual ~Networking();
 
 	/** \endcond */

File gb_net/include/gb_net/tcp_client.h

 	~TcpClient();
 
 private:
-	// Implementation
-	internal::TcpClientImpl *impl_;
-
 	// Disabled operations
 	TcpClient(const TcpClient &);
 	TcpClient & operator=(const TcpClient &);

File gb_net/include/gb_net/tcp_server.h

 	~TcpServer();
 
 private:
-	// Implementation
-	internal::TcpServerImpl *impl_;
-
 	// Disabled operations
 	TcpServer(const TcpServer &);
 	TcpServer & operator=(const TcpServer &);

File gb_net/src/internal/networking.h

 namespace gbnet {
 namespace internal {
 
+class Session;
+
 //! Implementation class for Networking.
 class NetworkingImpl
 {
+	friend class Session;
+
 public:
-	//! The I/O service to use for communications.
-	boost::asio::io_service &ioService_;
-
 	//! Constructor.
 	/**
 	 * \param ioService The I/O service to use for communications.
 	NetworkingImpl(boost::asio::io_service &ioService);
 
 	//! Destructor.
-	~NetworkingImpl();
+	virtual ~NetworkingImpl();
+
+	//! Posts the given message to any connected remote hosts.
+	/**
+	 * \param identifier The message type identifier.
+	 * \param message The raw message data to send.
+	 */
+	void postMessage(uint16_t identifier, const std::basic_string<uint8_t> &message);
 
 	//! Server listening signal.
 	/** See Networking::onListening(). */
 	/** See Networking::onNetworkError(). */
 	boost::signal<void (ErrorType type, const boost::system::error_code &ec)> onNetworkError;
 
+protected:
+	//! The I/O service to use for communications.
+	boost::asio::io_service &ioService_;
+
+	//! Posts the given message to any connected remote hosts.
+	/**
+	 * This is initiated by postMessage() and called from the thread in which the I/O service is
+	 * running.
+	 * \param packet The raw data to send.
+	 */
+	virtual void doPostPacket(std::basic_string<uint8_t> packet) = 0;
+
 private:
 	// Disabled operations
 	NetworkingImpl(const NetworkingImpl &);

File gb_net/src/internal/session.h

 
 #include <stdint.h>
 
+#include <queue>
 #include <string>
 
 #include <boost/asio/ip/tcp.hpp>
 	//! Ends the session.
 	void stop();
 
+	//! Posts the given packet to any connected remote hosts.
+	/**
+	 * \param packet The packet to send It is up to the caller to ensure this contains a properly
+	 * constructed MessageHeader.
+	 */
+	void postPacket(const std::basic_string<uint8_t> &packet);
+
 private:
 	NetworkingImpl &net_;
 	boost::asio::ip::tcp::socket socket_;
 
 	MessageHeader rxHeader_;
 	std::basic_string<uint8_t> rxBody_;
+	std::queue<std::basic_string<uint8_t> > txQueue_;
 
 	void readHeader();
+	void processQueue();
 	
 	void handleReadHeader(const boost::system::error_code &ec);
 	void handleReadBody(const boost::system::error_code &ec);
+	void handleWrite(const boost::system::error_code &ec);
 	
 	// Disabled operations
 	Session(const Session &);

File gb_net/src/internal/tcp_client.h

 
 #include <gb_net/tcp_client.h>
 
+#include "networking.h"
 #include "session.h"
 
 namespace gbnet {
 namespace internal {
 
 //! Implementation class for TcpClient.
-class TcpClientImpl
+class TcpClientImpl: public NetworkingImpl
 {
 public:
 	//! Constructor; connects to the given remote server.
 	/**
-	 * \param net The associated NetworkingImpl instance.
+	 * \param ioService The I/O service to use for communications.
 	 * \param host The hostname or IP address (may be IPv4 or IPv6 where the operating system
 	 * supports it) of the server to connect to.
 	 * \param port The TCP port the server is listening on.
 	 */
-	TcpClientImpl(NetworkingImpl &net, const std::string &host, uint16_t port);
+	TcpClientImpl(boost::asio::io_service &ioService, const std::string &host, uint16_t port);
 	
 	//! Destructor.
 	~TcpClientImpl();
 
 private:
-	NetworkingImpl &net_;
 	Session session_;
 	boost::asio::ip::tcp::resolver resolver_;
+
+	void doPostPacket(std::basic_string<uint8_t> packet);
 	
 	void handleResolve(boost::asio::ip::tcp::resolver::iterator endpointIt,
 		const boost::system::error_code &ec);

File gb_net/src/internal/tcp_server.h

 
 #include <gb_net/tcp_server.h>
 
+#include "networking.h"
+
 namespace gbnet {
 namespace internal {
 
 class SingleTcpServer;
 
 //! Implementation class for TcpServer.
-class TcpServerImpl
+class TcpServerImpl: public NetworkingImpl
 {
 	friend class ServerSession;
 	friend class SingleTcpServer;
 	//! Constructor; starts listening on the given port.
 	/**
 	 * This may create multiple SingleTcpServer instances based on the system configuration.
-	 * \param net The associated NetworkingImpl instance.
+	 * \param ioService The I/O service to use for communications.
 	 * \param port The TCP port number to listen on.
 	 */
-	TcpServerImpl(NetworkingImpl &net, uint16_t port);
+	TcpServerImpl(boost::asio::io_service &ioService, uint16_t port);
 	
 	//! Destructor.
 	~TcpServerImpl();
 
 private:
-	NetworkingImpl &net_;
 	boost::asio::ip::tcp::resolver resolver_;
 
 	std::vector<boost::shared_ptr<SingleTcpServer> > servers_;
 	std::set<boost::shared_ptr<ServerSession> > sessions_;
 
+	void doPostPacket(std::basic_string<uint8_t> packet);
+
 	void handleResolve(boost::asio::ip::tcp::resolver::iterator endpoint,
 		const boost::system::error_code &ec);
 

File gb_net/src/networking.cpp

 #include <gb_net/networking.h>
 #include "internal/networking.h"
 
+#include <boost/bind.hpp>
+
+#include "internal/message_header.h"
+
 namespace bs = boost::system;
 namespace io = boost::asio;
 namespace sig = boost::signals;
 
 namespace gbnet {
 
-Networking::Networking(io::io_service &ioService):
-impl_(new internal::NetworkingImpl(ioService))
+Networking::Networking(internal::NetworkingImpl *impl):
+impl_(impl)
 {
 }
 
 	delete impl_;
 }
 
+void Networking::postMessage(uint16_t identifier, const basic_string<uint8_t> &message)
+{
+	impl_->postMessage(identifier, message);
+}
+
 sig::connection Networking::onListening(
 	boost::function<void (const tcp::endpoint &endpoint)> callback)
 {
 {
 }
 
+void NetworkingImpl::postMessage(uint16_t identifier, const basic_string<uint8_t> &message)
+{
+	// Build the packet
+	MessageHeader header;
+	header.magic = htons(MessageHeader::MAGIC_VALUE);
+	header.length = htons(static_cast<uint16_t>(message.length()));
+	header.identifier = htons(identifier);
+
+	basic_string<uint8_t> packet(sizeof(header) + message.length(), 0);
+	memcpy(&*packet.begin(), &header, sizeof(header));
+	memcpy(&packet[sizeof(header)], message.data(), message.length());
+
+	// Post the request to the I/O service thread
+	ioService_.post(boost::bind(&NetworkingImpl::doPostPacket, this, packet));
 }
+
 }
+}

File gb_net/src/server_session.cpp

 namespace internal {
 
 ServerSession::ServerSession(TcpServerImpl &manager):
-Session(manager.net_),
+Session(manager),
 manager_(manager)
 {
 }

File gb_net/src/session.cpp

 
 #include <boost/asio/placeholders.hpp>
 #include <boost/asio/read.hpp>
+#include <boost/asio/write.hpp>
 #include <boost/bind.hpp>
 
 #include "internal/networking.h"
 namespace bs = boost::system;
 namespace io = boost::asio;
 using boost::asio::ip::tcp;
+using std::basic_string;
 
 namespace gbnet {
 namespace internal {
 
 	// Read some data
 	readHeader();
+
+	// Post any queued messages
+	processQueue();
 }
 
 void Session::stop()
 	socket_.close();
 }
 
+void Session::postPacket(const basic_string<uint8_t> &packet)
+{
+	if (txQueue_.empty())
+	{
+		// Begin sending messages
+		txQueue_.push(packet);
+		processQueue();
+	}
+	else
+	{
+		txQueue_.push(packet);
+	}
+}
+
 void Session::readHeader()
 {
 	io::async_read(socket_, io::buffer(&rxHeader_, sizeof(MessageHeader)),
 		boost::bind(&Session::handleReadHeader, this, io::placeholders::error));
 }
 
+void Session::processQueue()
+{
+	if (!txQueue_.empty())
+	{
+		const basic_string<uint8_t> &packet = txQueue_.front();
+		io::async_write(socket_, io::buffer(&*packet.begin(), packet.length()),
+			boost::bind(&Session::handleWrite, this, io::placeholders::error));
+	}
+}
+
 void Session::handleReadHeader(const bs::error_code &ec)
 {
 	if (ec)
 	readHeader();
 }
 
+void Session::handleWrite(const bs::error_code &ec)
+{
+	if (ec)
+	{
+		net_.onNetworkError(GBNET_ERR_TX, ec);
+		stop();
+		return;
+	}
+
+	// Send the next message
+	txQueue_.pop();
+	processQueue();
 }
+
 }
+}

File gb_net/src/single_tcp_server.cpp

 
 SingleTcpServer::SingleTcpServer(TcpServerImpl &manager, const tcp::endpoint &endpoint):
 manager_(manager),
-acceptor_(manager.net_.ioService_, endpoint)
+acceptor_(manager.ioService_, endpoint)
 {
-	manager_.net_.onListening(endpoint);
+	manager_.onListening(endpoint);
 
 	// Accept a new connection
 	shared_ptr<ServerSession> session(new ServerSession(manager_));
 {
 	if (ec)
 	{
-		manager_.net_.onNetworkError(GBNET_ERR_ACCEPT, ec);
+		manager_.onNetworkError(GBNET_ERR_ACCEPT, ec);
 		return;
 	}
 
-	manager_.net_.onConnected(session->socket().remote_endpoint());
+	manager_.onConnected(session->socket().remote_endpoint());
 
 	// Start the session and accept the next connection
 	session->start();

File gb_net/src/tcp_client.cpp

     You should have received a copy of the GNU General Public License
     along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
 
+#ifdef _MSC_VER
+#pragma warning(disable: 4355) // 'this': used in base member initializer list
+#endif
+
 #include <gb_net/tcp_client.h>
 #include "internal/tcp_client.h"
 
 namespace bs = boost::system;
 namespace io = boost::asio;
 using boost::asio::ip::tcp;
+using std::basic_string;
 using std::ostringstream;
 using std::string;
 
 namespace gbnet {
 
 TcpClient::TcpClient(io::io_service &ioService, const string &host, uint16_t port):
-Networking(ioService),
-impl_(new internal::TcpClientImpl(*Networking::impl_, host, port))
+Networking(new internal::TcpClientImpl(ioService, host, port))
 {
 }
 
 TcpClient::~TcpClient()
 {
-	delete impl_;
 }
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 
 namespace internal {
 
-TcpClientImpl::TcpClientImpl(NetworkingImpl &net, const string &host, uint16_t port):
-net_(net),
-session_(net_),
-resolver_(net_.ioService_)
+TcpClientImpl::TcpClientImpl(io::io_service &ioService, const string &host, uint16_t port):
+NetworkingImpl(ioService),
+session_(*this),
+resolver_(ioService_)
 {
 	// Resolve the host:port pair to an endpoint
 	ostringstream oss;
 {
 }
 
+void TcpClientImpl::doPostPacket(basic_string<uint8_t> packet)
+{
+	session_.postPacket(packet);
+}
+
 void TcpClientImpl::handleResolve(tcp::resolver::iterator endpointIt, const bs::error_code &ec)
 {
 	if (ec)
 	{
-		net_.onNetworkError(GBNET_ERR_RESOLVE, ec);
+		onNetworkError(GBNET_ERR_RESOLVE, ec);
 		return;
 	}
 
 		if (endpointIt == tcp::resolver::iterator())
 		{
 			// No more endpoints to try; connection failed
-			net_.onNetworkError(GBNET_ERR_CONNECT, ec);
+			onNetworkError(GBNET_ERR_CONNECT, ec);
 			return;
 		}
 
 			++endpointIt, io::placeholders::error));
 	}
 
-	net_.onConnected(session_.socket().remote_endpoint());
+	onConnected(session_.socket().remote_endpoint());
 
 	// Read some data
 	session_.start();

File gb_net/src/tcp_server.cpp

 #include <boost/bind.hpp>
 
 #include "internal/networking.h"
+#include "internal/server_session.h"
 #include "internal/single_tcp_server.h"
 
 namespace bs = boost::system;
 namespace io = boost::asio;
 using boost::asio::ip::tcp;
 using boost::shared_ptr;
+using std::basic_string;
 using std::ostringstream;
+using std::set;
 
 namespace gbnet {
 
 TcpServer::TcpServer(io::io_service &ioService, uint16_t port):
-Networking(ioService),
-impl_(new internal::TcpServerImpl(*Networking::impl_, port))
+Networking(new internal::TcpServerImpl(ioService, port))
 {
 }
 
 TcpServer::~TcpServer()
 {
-	delete impl_;
 }
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 
 namespace internal {
 
-TcpServerImpl::TcpServerImpl(NetworkingImpl &net, uint16_t port):
-net_(net),
-resolver_(net_.ioService_)
+TcpServerImpl::TcpServerImpl(io::io_service &ioService, uint16_t port):
+NetworkingImpl(ioService),
+resolver_(ioService_)
 {
 	// Resolve the port to an endpoint
 	ostringstream oss;
 {
 }
 
+void TcpServerImpl::doPostPacket(basic_string<uint8_t> packet)
+{
+	// Send the message to all active sessions
+	for (set<shared_ptr<ServerSession> >::const_iterator session = sessions_.begin(),
+		end = sessions_.end(); session != end; ++session)
+	{
+		(*session)->postPacket(packet);
+	}
+}
+
 void TcpServerImpl::handleResolve(tcp::resolver::iterator endpoint, const bs::error_code &ec)
 {
 	if (ec)
 	{
-		net_.onNetworkError(GBNET_ERR_RESOLVE, ec);
+		onNetworkError(GBNET_ERR_RESOLVE, ec);
 		return;
 	}