Commits

Pavel Kozlov committed 28dd517

buffer messages to fix issue with frames fragmentation

Comments (0)

Files changed (6)

qcp-p2p-sim/src/main/scala/qcp/p2p/impl/Client.scala

     case Connected(remote, local) =>
       log.info("Connected, remote={}, local={}", remote, local)
       val connection = sender()
-      val handler = context.actorOf(Sender.props(message, connection))
+      val handler = context.actorOf(ClientHandler.props(message, connection))
       connection ! Register(handler)
 
     case c: ConnectionClosed => context become disconnected

qcp-p2p-sim/src/main/scala/qcp/p2p/impl/ClientHandler.scala

+package qcp.p2p.impl
+
+import java.nio._
+
+import akka.actor._
+import akka.io.Tcp._
+import akka.util._
+
+class ClientHandler(message: String, connection: ActorRef) extends Actor with ActorLogging {
+
+  val messageSender = context.actorOf(Sender.props(message))
+
+  override def receive = {
+    case Received(data) =>
+      val (part1, part2) = data.splitAt(4)
+      val size = part1.toByteBuffer.getInt(0)
+      log.info(s"Received size $size = $part1")
+      if (part2.size >= size) messageSender ! part2
+      else context.become(buffering(part2, size))
+
+    case Write(data, ack) =>
+      val size = data.size
+      val bs = ByteString(ByteBuffer.allocate(4).putInt(size).array())
+      log.info(s"Writing size $size = $bs")
+      connection ! Write(bs ++ data, ack)
+
+    case c: CloseCommand =>
+      log.info(s"Closing connection $c")
+      connection ! c
+
+    case c: ConnectionClosed =>
+      log.info(s"Closed $c")
+      context.parent ! c
+      context stop self
+  }
+
+  def buffering(buffer: ByteString, size: Int): Receive = {
+    case Received(data) =>
+      val bs = buffer ++ data
+      if (bs.size >= size) {
+        messageSender ! bs
+        context.unbecome()
+      } else context.become(buffering(bs, size))
+  }
+
+}
+
+object ClientHandler {
+  def props(message: String, connection: ActorRef) = Props(new ClientHandler(message, connection))
+}

qcp-p2p-sim/src/main/scala/qcp/p2p/impl/Receiver.scala

 import akka.actor._
 import akka.serialization._
 import akka.io.Tcp._
+import akka.util._
 
 class Receiver(address: InetSocketAddress, publisher: Publisher) extends Actor with ActorLogging {
 
   override def receive = step0
 
   val step0: Receive = {
-    case Received(data) =>
+    case data: ByteString =>
       val lists = deserialize(data, classOf[Array[(Boolean, Boolean)]])
       log.info(s"Received list, size ${lists.size}")
       val opSeq = rand.booleans(lists.size)
         (op, b) <- opSeq zip lists
       } yield if (op) b._2 else b._1
       log.info(s"Read bits, size ${bits.size}")
-      sender ! Write(serialize(opSeq.toArray))
+      context.parent ! Write(serialize(opSeq.toArray))
       context.become(step1(bits))
   }
 
   def step1(bits: IndexedSeq[Boolean]): Receive = {
-    case Received(data) =>
+    case data: ByteString =>
       val indices = deserialize(data, classOf[Array[Int]])
       log.info(s"Received indices ${indices.size}")
       val matched = subListByIndices(bits.toList, indices)
       val (part1, part2) = matched.splitAt(matched.size / 2)
       log.info("Sending control bits")
-      sender ! Write(serialize(part1.toArray))
+      context.parent ! Write(serialize(part1.toArray))
       val key = part2.grouped(8).toList.map(_.toInt)
       log.info(s"key=$key")
       context.become(step2(key))
   }
 
   def step2(key: List[Int]): Receive = {
-
-    case Received(data) =>
+    case data: ByteString =>
       val cypher = deserialize(data, classOf[Array[Char]])
       log.info("Received encrypted message, decrypting")
       val message = (for ((b, k) <- cypher zip key) yield (b ^ k).toChar).mkString
       publisher.publish(MessageEvent(address, message))
       log.info(s"Decrypted message '$message'")
-
-    case c: ConnectionClosed =>
-      log.info(s"Connection closed $c")
-      context stop self
   }
 
 }

qcp-p2p-sim/src/main/scala/qcp/p2p/impl/Sender.scala

 import akka.util._
 import akka.serialization._
 
-class Sender(message: String, connection: ActorRef) extends Actor with ActorLogging {
+class Sender(message: String) extends Actor with ActorLogging {
 
   val rand = new Random
   implicit val serializer = SerializationExtension(context.system)
     (op, bit) <- opSeq zip bitSeq
   } yield if (op) (rand.nextBoolean(), bit) else (bit, rand.nextBoolean())
 
-  connection ! Write(ByteString.fromArray(serializer.serialize(lists.toArray).get))
+  context.parent ! Write(ByteString.fromArray(serializer.serialize(lists.toArray).get))
 
   override def receive = {
 
-    case Received(data) =>
+    case data: ByteString =>
       val booleans = deserialize(data, classOf[Array[Boolean]])
       val indices = for {
         (i, op1, op2) <- (0 until size, opSeq, booleans).zipped.toArray
       } yield i
       log.info(s"Sending indices, size ${indices.size}")
       val matched = subListByIndices(bitSeq.toList, indices)
-      sender ! Write(ByteString.fromArray(serializer.serialize(indices).get))
+      context.parent ! Write(ByteString.fromArray(serializer.serialize(indices).get))
       context.become(step1(matched))
   }
 
   def step1(matched: List[Boolean]): Receive = {
 
-    case Received(data) =>
+    case data: ByteString =>
       val control = deserialize(data, classOf[Array[Boolean]])
       log.info(s"Received control bits ${control.size}")
       val (part1, part2) = matched.splitAt(matched.size / 2)
       if (part1 == control.toList) {
         log.info("Encrypting and sending message")
         val cypher = for ((b, k) <- message zip key) yield (b ^ k).toChar
-        sender ! Write(ByteString.fromArray(serializer.serialize(cypher.toArray).get))
-        sender ! ConfirmedClose
+        context.parent ! Write(ByteString.fromArray(serializer.serialize(cypher.toArray).get))
+        context.parent ! ConfirmedClose
       } else {
         log.error("Key exchange error!")
         context stop self
       }
-
-    case c: ConnectionClosed =>
-      log.info(s"Closed $c")
-      context.parent ! c
-      context stop self
-
   }
 
 }
 
 object Sender {
 
-  def props(message: String, connection: ActorRef) = Props(new Sender(message, connection))
+  def props(message: String) = Props(new Sender(message))
 
 }

qcp-p2p-sim/src/main/scala/qcp/p2p/impl/Server.scala

 
     case c@Connected(remote, local) =>
       log.info(s"Connected remote=$remote, local=$local")
-      val receiver = context.actorOf(Receiver.props(remote, publisher))
-      sender ! Register(receiver)
+      val handler = context.actorOf(ServerHandler.props(sender(), remote, publisher))
+      sender ! Register(handler)
 
     case c: ConnectionClosed => log.info(s"Connection closed $c")
 

qcp-p2p-sim/src/main/scala/qcp/p2p/impl/ServerHandler.scala

+package qcp.p2p.impl
+
+import java.net._
+import java.nio._
+
+import scala.swing._
+
+import akka.actor._
+import akka.io.Tcp._
+import akka.util._
+
+class ServerHandler(connection: ActorRef, address: InetSocketAddress, publisher: Publisher) extends Actor with ActorLogging {
+
+  val receiver = context.actorOf(Receiver.props(address, publisher))
+
+  override def receive = {
+    case Received(data) =>
+      val (part1, part2) = data.splitAt(4)
+      val size = part1.toByteBuffer.getInt(0)
+      log.info(s"Received size $size = $part1")
+      if (part2.size >= size) receiver ! part2
+      else context.become(buffering(part2, size))
+
+    case Write(data, ack) =>
+      val size = data.size
+      val bs = ByteString(ByteBuffer.allocate(4).putInt(size).array())
+      log.info(s"Writing size $size = $bs")
+      connection ! Write(bs ++ data, ack)
+
+    case c: ConnectionClosed =>
+      log.info(s"Connection closed $c")
+      context stop self
+  }
+
+  def buffering(buffer: ByteString, size: Int): Receive = {
+    case Received(data) =>
+      val bs = buffer ++ data
+      if (bs.size >= size) {
+        receiver ! bs
+        context.unbecome()
+      } else context.become(buffering(bs, size))
+  }
+
+
+}
+
+object ServerHandler {
+  def props(connection: ActorRef, address: InetSocketAddress, publisher: Publisher) = Props(new ServerHandler(connection, address, publisher))
+}