Commits

Jakub Gustak committed 3844f08

intra-node retransmission

Comments (0)

Files changed (3)

 
     def replicate(self):
         in_nodes = self.nodes
-        assert in_nodes
+        if not in_nodes:
+            # no valid nodes!
+            return
+
         out_nodes = [node for node in NODES if node not in in_nodes]
         assert out_nodes
 
         self.chunks = chunks
         self.parts = [None for i in xrange(chunks)]
 
-    def get_parts(self):
-        parts = []
-        for p in self.parts:
-            assert p.nodes
-            parts.append( (p, p.rand_node()) )
-
-        return parts
-
     def __repr__(self):
         return self.parts.__repr__()
 
                 f.parts[bitem.chunk].del_node(self.node)
 
             except KeyError:
-                #wtf
+                # no such file in cache
+                # new file incoming from client?
+                # XXX
                 pass
 
-            self.sendCmd(RmCmd(bitem.id, bitem.filename,
-                bitem.chunk, bitem.chunks))
+            except ValueError:
+                print "copy failed:", bitem.filename, bitem.chunk
+                # node not cached
+                # send to node failed?
+                f.parts[bitem.chunk].check_redundancy()
+
+            else:
+                self.sendCmd(RmCmd(bitem.id, bitem.filename,
+                    bitem.chunk, bitem.chunks))
 
         elif cmd.valid(PutCmd):
             if len(NODES) < MINNODES:
         ItemCmd, BItemCmd, EhloCmd, PdoneCmd, ByeCmd, ExpectCmd, FailedCmd
 
 import os, re, time, sys, hashlib
-from ctrl import RS, DATA_LEN, CODE_LEN, CHUNK_SIZE, ChecksumError
+from ctrl import RS, DATA_LEN, CODE_LEN, CHUNK_SIZE
 
 
 try:
     print "warning> no RS correcion"
     RS = False
 
+if not RS:
+    UncorrectableError = Exception
+
 
 DIR = "./data"
 PORT = 8000
     def connectionMade(self):
         self.steps = 0
         self.chunkid = None
-        self.file = None
         self.state = self.INIT
         self._buff = ''
 
                 if self.factory.aquireChunk(put.id, self):
                     self.sendCmd(ReadyCmd(put.id))
                     self.chunkid = put.id
-                    fpath = os.path.join(self.factory.dir, put.chunkname())
-                    self.file = open(fpath, "w")
                     self.state = self.RECV
                     self.cmd = put 
                 else:
                     #not expecting such chunk
                     self.sendCmd(FailedCmd())
-                    #self.transport.loseConnection()
                     return
 
             elif cmd.valid(GetCmd):
                 #sending to client
                 get = cmd.parse(GetCmd)
                 fpath = os.path.join(self.factory.dir, get.chunkname())
-                self.file = open(fpath, "r")
+                file = open(fpath, "r")
                 self.state = self.SEND
                 if RS:
-                    enc, raw = cc.encode_file(self.file)
+                    enc, raw = cc.encode_file(file)
                 else:
-                    enc = raw = self.file.read() # XXX in parts
+                    enc = raw = file.read() # XXX in parts
 
-                self.file.close()
+                file.close()
 
                 if get.id != hashlib.md5(raw).hexdigest():
-                    self.factory.ctrl.sendCmd(
-                            BItemCmd(get.id, get.filename, get.chunk, get.chunks))
+                    self.factory.ctrl.sendCmd(BItemCmd(get.id,
+                        get.filename, get.chunk, get.chunks))
                     self.transport.loseConnection()
+                    return
 
                 self.cmd = get
 
                 if md5 != hashlib.md5(decoded).hexdigest():
                     raise UncorrectableError
 
-                self.file.write(decoded)
+            except UncorrectableError:
+                print "|||", self.cmd.filename, self.cmd.chunk
+                self.factory.ctrl.sendCmd(BItemCmd(self.cmd.id,
+                    self.cmd.filename, self.cmd.chunk, self.cmd.chunks))
 
-            except UncorrectableError:
-                #XXX: retransmit
-                return 
+            else:
+                fpath = os.path.join(self.factory.dir, self.cmd.chunkname())
+                file = open(fpath, "w")
+                file.write(decoded)
+                file.close()
 
-            finally:
-                self.file.close()
+                #print "chunk: %s saved(%d)" % (self.chunkid, self.steps)
+                self.factory.releaseChunk(self.chunkid, self)
+                self.factory.chunks.append(self.cmd.filename, self.cmd.chunk,
+                        self.cmd.chunks, self.cmd.id)
+                print "<<<", self.cmd.filename, self.cmd.chunk
+                self.factory.ctrl.sendCmd(ItemCmd(self.cmd.id,
+                    self.cmd.filename, self.cmd.chunk, self.cmd.chunks))
 
-            #print "chunk: %s saved(%d)" % (self.chunkid, self.steps)
-            self.factory.releaseChunk(self.chunkid, self)
-            self.factory.chunks.append(self.cmd.filename, self.cmd.chunk,
-                    self.cmd.chunks, self.cmd.id)
-            print "<<<", self.cmd.filename, self.cmd.chunk
-            self.factory.ctrl.sendCmd(ItemCmd(self.cmd.id,
-                self.cmd.filename, self.cmd.chunk, self.cmd.chunks))
 
     def sendCmd(self, cmd):
         return self.transport.write(str(cmd) + "\r\n")
 class nodeCTL(basic.LineReceiver):
 
     def connectionMade(self):
-        print "Controller connected."
+        print "connected (ctrl)."
         self.sendCmd(EhloCmd(self.factory.port))
         self.factory.dtp.ctrl = self
 
     def connectionLost(self, reason):
         try:
-            print "Controller disconnected. Reconnecting"
+            print "disconnected (ctrl)."
             self.factory.srv.nodeCTLConnect(self.factory.port,
                                             self.factory.dtp)
 
             f.protocol = clientDTP
             f.send = send
             f.dir = self.factory.dir
+            f.ctrl = self
             reactor.connectTCP(send.host, send.port, f)
 
         elif cmd.valid(LsCmd):
 
         elif cmd.valid(RmCmd):
             rm = cmd.parse(RmCmd)
-            self.factory.chunks.rm(rm.filename, rm.chunk)
             fpath = os.path.join(self.factory.dir, rm.chunkname())
-            os.unlink(fpath)
+
+            try:
+                self.factory.chunks.rm(rm.filename, rm.chunk)
+                os.unlink(fpath)
+            except Exception, e:
+                print e
 
         elif cmd.valid(PdoneCmd):
             pdone = cmd.parse(PdoneCmd)
 
     def connectionMade(self):
         self.retries = 3
-        send = self.factory.send
+        self.cmd = send = self.factory.send
         self.sendCmd(PutCmd(id=send.id, filename=send.filename,
             chunk=send.chunk, chunks=send.chunks))
 
             else:
                 enc = raw = self.file.read()
 
+            send = self.cmd
+            if send.id != hashlib.md5(raw).hexdigest():
+                self.factory.ctrl.sendCmd(BItemCmd(send.id,
+                    send.filename, send.chunk, send.chunks))
+                self.transport.loseConnection()
+                return
+
+            # XXX simluate transmission error
+            #import random
+            #if random.coice((True, False)):
+            #    enc[0] = 'a'
+
             self.transport.write(enc)
             self.transport.loseConnection()
             self.file.close()
         return f
 
     def nodeCTLConnectionFailed(self, conn, reason):
-        print "Connection failed. Reconnecting."
+        print "connection failed (ctrl)."
         reactor.callLater(RECONNECT_TIME, self.nodeCTLConnect,
                 conn.factory.port, conn.factory.dtp)
 
 from twisted.application import service
 
 import sys, hashlib, os, math
-from ctrl import RS, DATA_LEN, CODE_LEN, CHUNK_SIZE, ChecksumError
+from ctrl import RS, DATA_LEN, CODE_LEN, CHUNK_SIZE
 
 
 try:
     print "warning> no RS correcion"
     RS = False
 
+if not RS:
+    UncorrectableError = Exception
+
 
 CTRL = ('localhost', 2000)
 
     def connectionLost(self, reason):
         try:
             self.ctrl.f.append_chunk(self.item.chunk, self._buff, self.id)
-        except ChecksumError:
+        except UncorrectableError:
             print "chunk failed:", self.item.filename, self.id
             self.ctrl.sendCmd(GetCmd(id=self.item.id, filename=self.item.filename,
                 chunk=self.item.chunk, chunks=self.item.chunks))
         else:
             decoded = raw
 
-        #XXX: check this id
         if id != hashlib.md5(decoded).hexdigest():
-            raise ChecksumError
+            raise UncorrectableError
 
         self.chunks[chunkid] = decoded
         self.size += len(decoded)