Commits

Jukka Jylänki  committed 8c92c96

Fix support for sending large individual messages without fragmenting the transfers when using TCPMessageConnection.

  • Participants
  • Parent commits 08a2689

Comments (0)

Files changed (7)

File include/kNet/DataSerializer.h

 	/// @return The number of bits filled so far total.
 	size_t BitsFilled() const { return elemOfs * 8 + bitOfs; }
 
-	/// @return The total capacity of the buffer we are filling into.
+	/// @return The total capacity of the buffer we are filling into, in bytes.
 	size_t Capacity() const { return maxBytes; }
 
 	/// Returns the current byte offset the DataSerializer is writing to.
 	/// Returns the current bit offset in the current byte this DataSerializer is writing to, [0, 7].
 	size_t BitOffset() const { return bitOfs; }
 
+    /// Returns the total number of bits that can still be serialized into this DataSerializer object before overflowing (which throws an exception).
+    size_t BitsLeft() const { return Capacity()*8 - BitsFilled(); }
+
+    /// Returns the total number of full bytes that can still be serialized into this DataSerializer object before overflowing (which throws an exception).
+    /// @return floor(BitsLeft()/8).
+    size_t BytesLeft() const { return BitsLeft() / 8; }
+
 	/// Returns the bit serialized at the given bit index of this buffer.
 	bool DebugReadBit(int bitIndex) const;
 

File include/kNet/Socket.h

 	/// specify the actual number of bytes filled to buffer.buf here.
 	int bytesContains;
 
+    /// Stores the total number of bytes allocated to the buffer in the overlapped structure.
+    int bytesAllocated;
+
 	sockaddr_in from;
 	socklen_t fromLen;
 };
 	/// Starts the sending of new data. After having filled the data to send to the OverlappedTransferBuffer that is
 	/// returned here, commit the send by calling EndSend. If you have called BeginSend, but decide not to send any data,
 	/// call AbortSend instead (otherwise memory will leak).
+    /// @param maxBytesToSend Specifies the size of the buffer that must be returned. Specify the size (or at least an 
+    ///         upper limit) of the message you are sending here. Specify the actual number of bytes filled in the resulting
+    ///         structure.
 	/// @return A transfer buffer where the data to send is to be filled in. If no new data can be sent at this time,
 	///         this function returns 0.
-	OverlappedTransferBuffer *BeginSend();
+	OverlappedTransferBuffer *BeginSend(int maxBytesToSend);
 	/// Finishes and queues up the given transfer that was created with a call to BeginSend.
 	/// @return True if send succeeded, false otherwise. In either case, the ownership of the passed buffer send
 	///         is taken by this Socket and may not be accessed anymore. Discard the pointer after calling this function.

File src/Network.cpp

 
 void Network::SendUDPConnectDatagram(Socket &socket, Datagram *connectMessage)
 {
-	OverlappedTransferBuffer *sendData = socket.BeginSend();
+    const int connectMessageSize = connectMessage ? connectMessage->size : 8;
+	OverlappedTransferBuffer *sendData = socket.BeginSend(connectMessageSize);
 	if (!sendData)
 	{
 		LOG(LogError, "Network::SendUDPConnectDatagram: socket.BeginSend failed! Cannot send UDP connection datagram!");
 		return;
 	}
+	sendData->bytesContains = connectMessageSize;
 	if (connectMessage)
 	{
 		///\todo Craft the proper connection attempt datagram.
-		sendData->bytesContains = std::min<int>(connectMessage->size, sendData->buffer.len);
 		memcpy(sendData->buffer.buf, connectMessage->data, sendData->buffer.len);
 		LOG(LogVerbose, "Network::SendUDPConnectDatagram: Sending UDP connect message of size %d.", (int)sendData->buffer.len);
 	}
 	else
 	{
 		///\todo Craft the proper connection attempt datagram.
-		sendData->bytesContains = std::min<int>(8, sendData->buffer.len);
 		memset(sendData->buffer.buf, 0, sendData->buffer.len);
 		LOG(LogVerbose, "Network::SendUDPConnectDatagram: Sending null UDP connect message of size %d.", (int)sendData->buffer.len);
 	}

File src/NetworkSimulator.cpp

 	{
 		QueuedBuffer b;
 		assert(socket);
-		b.buffer = socket->BeginSend();
+        b.buffer = socket->BeginSend(buffer->bytesContains);
 		if (b.buffer)
 		{
 			assert(b.buffer->buffer.len >= buffer->bytesContains);

File src/Socket.cpp

 	buffer->buffer.buf = new char[bytes];
 	buffer->buffer.len = bytes;
 	buffer->bytesContains = 0;
+    buffer->bytesAllocated = bytes;
 #ifdef WIN32
 	buffer->overlapped.hEvent = WSACreateEvent();
 	if (buffer->overlapped.hEvent == WSA_INVALID_EVENT)
 #endif
 }
 
-OverlappedTransferBuffer *Socket::BeginSend()
+OverlappedTransferBuffer *Socket::BeginSend(int maxBytesToSend)
 {
 	if (!writeOpen)
 		return 0;
 		if (ret == TRUE)
 		{
 			queuedSendBuffers.PopFront();
-			sentData->buffer.len = maxSendSize; // This is the number of bytes that the client is allowed to fill.
-			sentData->bytesContains = 0; // No bytes currently in use.
-			return sentData;
+
+            // If the buffer we pulled off was too small, free it and allocate a new one which is of the desired size.
+            if (sentData->bytesAllocated < maxBytesToSend)
+            {
+                DeleteOverlappedTransferBuffer(sentData);
+	            return AllocateOverlappedTransferBuffer(maxBytesToSend); ///\todo In debug mode - track this pointer.
+            }
+            else
+            {
+                // The existing transfer buffer is large enough. Prepare it for reuse and pass back to caller.
+			    sentData->buffer.len = sentData->bytesAllocated; // This is the number of bytes that the client is allowed to fill.
+			    sentData->bytesContains = 0; // No bytes currently in use.
+
+			    return sentData;
+            }
 		}
 		if (ret == FALSE && error != WSA_IO_INCOMPLETE)
 		{
 #endif
 
 	// No previous send buffer has finished from use (or not using overlapped transfers) - allocate a new buffer.
-	OverlappedTransferBuffer *transfer = AllocateOverlappedTransferBuffer(maxSendSize);
-	return transfer; ///\todo In debug mode - track this pointer.
+	return AllocateOverlappedTransferBuffer(maxBytesToSend);
 }
 
 bool Socket::EndSend(OverlappedTransferBuffer *sendBuffer)

File src/TCPMessageConnection.cpp

 		return PacketSendSocketClosed;
 	}
 
+    // 'serializedMessages' is a temporary data structure used only by this member function.
+    // It caches a list of all the messages we are pushing out during this call.
+	serializedMessages.clear();
+
 	// In the following, we start coalescing multiple messages into a single socket send() calls.
 	// Get the maximum number of bytes we can coalesce for the send() call. This is only a soft limit
 	// in the sense that if we encounter a single message that is larger than this limit, then we try
 	// to send that through in one send() call.
-	const size_t maxSendSize = socket->MaxSendSize();
+//	const size_t maxSendSize = socket->MaxSendSize();
 
 	// Push out all the pending data to the socket.
-//	assert(ContainerUniqueAndNoNullElements(serializedMessages));
-//	assert(ContainerUniqueAndNoNullElements(outboundQueue));
-	serializedMessages.clear(); // 'serializedMessages' is a temporary data structure used only by this member function.
-	OverlappedTransferBuffer *overlappedTransfer = socket->BeginSend();
-	if (!overlappedTransfer)
-	{
-		LOG(LogError, "TCPMessageConnection::SendOutPacket: Starting an overlapped send failed!");
-		return PacketSendSocketClosed;
-	}
+	OverlappedTransferBuffer *overlappedTransfer = 0;
 
 	int numMessagesPacked = 0;
-	DataSerializer writer(overlappedTransfer->buffer.buf, overlappedTransfer->buffer.len);
+	DataSerializer writer;
+//	assert(ContainerUniqueAndNoNullElements(outboundQueue)); // This precondition should always hold (but very heavy to test, uncomment to debug)
 	while(outboundQueue.Size() > 0)
 	{
 #ifdef KNET_NO_MAXHEAP
 			outboundQueue.PopFront();
 			continue;
 		}
+
 		const int encodedMsgIdLength = VLE8_16_32::GetEncodedBitLength(msg->id) / 8;
 		const size_t messageContentSize = msg->dataSize + encodedMsgIdLength; // 1 byte: Message ID. X bytes: Content.
 		const int encodedMsgSizeLength = VLE8_16_32::GetEncodedBitLength(messageContentSize) / 8;
 		const size_t totalMessageSize = messageContentSize + encodedMsgSizeLength; // 2 bytes: Content length. X bytes: Content.
-		// If this message won't fit into the buffer, send out all previously gathered messages (except if there were none, then try to get the big message through).
-		if (writer.BytesFilled() + totalMessageSize >= maxSendSize && numMessagesPacked > 0)
+
+        if (!overlappedTransfer)
+        {
+            overlappedTransfer = socket->BeginSend(std::max<size_t>(socket->MaxSendSize(), totalMessageSize));
+	        if (!overlappedTransfer)
+	        {
+		        LOG(LogError, "TCPMessageConnection::SendOutPacket: Starting an overlapped send failed!");
+                assert(serializedMessages.size() == 0);
+		        return PacketSendSocketClosed;
+	        }
+            writer = DataSerializer(overlappedTransfer->buffer.buf, overlappedTransfer->buffer.len);
+        }
+
+		// If this message won't fit into the buffer, send out all previously gathered messages.
+        if (writer.BytesLeft() < totalMessageSize)
 			break;
 
 		writer.AddVLE<VLE8_16_32>(messageContentSize);
 #endif
 		outboundQueue.PopFront();
 	}
-//	assert(ContainerUniqueAndNoNullElements(serializedMessages));
+//	assert(ContainerUniqueAndNoNullElements(serializedMessages)); // This precondition should always hold (but very heavy to test, uncomment to debug)
 
 	if (writer.BytesFilled() == 0 && outboundQueue.Size() > 0)
 		LOG(LogError, "Failed to send any messages to socket %s! (Probably next message was too big to fit in the buffer).", socket->ToString().c_str());

File src/UDPMessageConnection.cpp

 	}
 
 	// Do a fixed flow control for testing.
-	datagramSendRate = 100; ///\todo Remove.
+	datagramSendRate = 1000; ///\todo Remove.
 }
 
 void UDPMessageConnection::SendOutPackets()
 	if (!CanSendOutNewDatagram())
 		return PacketSendThrottled;
 
-	OverlappedTransferBuffer *data = socket->BeginSend();
+    OverlappedTransferBuffer *data = socket->BeginSend(socket->MaxSendSize());
 	if (!data)
 		return PacketSendThrottled;