Commits

Adam Lindsay  committed a46108c

Added service_type(s) method calls, refactored to generic_remote_call. Use of functools.partial creates dependency on Python 2.5

  • Participants
  • Parent commits f580943

Comments (0)

Files changed (6)

File gen-py/locator/Locator-remote

   print '   get_all()'
   print '  Location get_node(string key)'
   print '  void ping()'
+  print '  string service_type()'
+  print '   service_types()'
   print '  void debug()'
   print ''
   sys.exit(0)
     sys.exit(1)
   pp.pprint(client.ping())
 
+elif cmd == 'service_type':
+  if len(args) != 0:
+    print 'service_type requires 0 args'
+    sys.exit(1)
+  pp.pprint(client.service_type())
+
+elif cmd == 'service_types':
+  if len(args) != 0:
+    print 'service_types requires 0 args'
+    sys.exit(1)
+  pp.pprint(client.service_types())
+
 elif cmd == 'debug':
   if len(args) != 0:
     print 'debug requires 0 args'

File gen-py/locator/Locator.py

   def ping(self, ):
     pass
 
+  def service_type(self, ):
+    pass
+
+  def service_types(self, ):
+    pass
+
   def debug(self, ):
     pass
 
     self._iprot.readMessageEnd()
     return
 
+  def service_type(self, ):
+    self.send_service_type()
+    return self.recv_service_type()
+
+  def send_service_type(self, ):
+    self._oprot.writeMessageBegin('service_type', TMessageType.CALL, self._seqid)
+    args = service_type_args()
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_service_type(self, ):
+    (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(self._iprot)
+      self._iprot.readMessageEnd()
+      raise x
+    result = service_type_result()
+    result.read(self._iprot)
+    self._iprot.readMessageEnd()
+    if result.success != None:
+      return result.success
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "service_type failed: unknown result");
+
+  def service_types(self, ):
+    self.send_service_types()
+    return self.recv_service_types()
+
+  def send_service_types(self, ):
+    self._oprot.writeMessageBegin('service_types', TMessageType.CALL, self._seqid)
+    args = service_types_args()
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_service_types(self, ):
+    (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(self._iprot)
+      self._iprot.readMessageEnd()
+      raise x
+    result = service_types_result()
+    result.read(self._iprot)
+    self._iprot.readMessageEnd()
+    if result.success != None:
+      return result.success
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "service_types failed: unknown result");
+
   def debug(self, ):
     self.send_debug()
 
     self._processMap["get_all"] = Processor.process_get_all
     self._processMap["get_node"] = Processor.process_get_node
     self._processMap["ping"] = Processor.process_ping
+    self._processMap["service_type"] = Processor.process_service_type
+    self._processMap["service_types"] = Processor.process_service_types
     self._processMap["debug"] = Processor.process_debug
 
   def process(self, iprot, oprot):
     oprot.writeMessageEnd()
     oprot.trans.flush()
 
+  def process_service_type(self, seqid, iprot, oprot):
+    args = service_type_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = service_type_result()
+    result.success = self._handler.service_type()
+    oprot.writeMessageBegin("service_type", TMessageType.REPLY, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
+  def process_service_types(self, seqid, iprot, oprot):
+    args = service_types_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = service_types_result()
+    result.success = self._handler.service_types()
+    oprot.writeMessageBegin("service_types", TMessageType.REPLY, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
   def process_debug(self, seqid, iprot, oprot):
     args = debug_args()
     args.read(iprot)
   def __ne__(self, other):
     return not (self == other)
 
+class service_type_args(object):
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('service_type_args')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class service_type_result(object):
+  """
+  Attributes:
+   - success
+  """
+
+  thrift_spec = (
+    (0, TType.STRING, 'success', None, None, ), # 0
+  )
+
+  def __init__(self, success=None,):
+    self.success = success
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.STRING:
+          self.success = iprot.readString();
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('service_type_result')
+    if self.success != None:
+      oprot.writeFieldBegin('success', TType.STRING, 0)
+      oprot.writeString(self.success)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class service_types_args(object):
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('service_types_args')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class service_types_result(object):
+  """
+  Attributes:
+   - success
+  """
+
+  thrift_spec = (
+    (0, TType.LIST, 'success', (TType.STRING,None), None, ), # 0
+  )
+
+  def __init__(self, success=None,):
+    self.success = success
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.LIST:
+          self.success = []
+          (_etype24, _size21) = iprot.readListBegin()
+          for _i25 in xrange(_size21):
+            _elem26 = iprot.readString();
+            self.success.append(_elem26)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('service_types_result')
+    if self.success != None:
+      oprot.writeFieldBegin('success', TType.LIST, 0)
+      oprot.writeListBegin(TType.STRING, len(self.success))
+      for iter27 in self.success:
+        oprot.writeString(iter27)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class debug_args(object):
 
   thrift_spec = (
 from math import sqrt
 from time import sleep
 from optparse import OptionParser, make_option
+from functools import partial
+
 
 from thrift import Thrift
 from thrift.transport import TSocket
     comp = location.rsplit(':', 1)
     return Location(comp[0], int(comp[1]))
 
-def remote_call(destination, method, *args):
+def generic_remote_call(clientclass, destination, method, *args):
     transport = TSocket.TSocket(destination.address, destination.port)
     transport = TTransport.TBufferedTransport(transport)
     protocol = TBinaryProtocol.TBinaryProtocol(transport)
-    client = Locator.Client(protocol)
+    client = clientclass(protocol)
     try:
         transport.open()
     except Thrift.TException, tx:
     transport.close()
     return out
 
+remote_call = partial(generic_remote_call, Locator.Client)
+
 def select_peers(in_set):
     lst = sorted(in_set)
     return lst
         "Give the canonical Location"
         return Location(address=self.address, port=self.port)
     
+    @classmethod
+    def service_type(cls):
+        return "Locator"
+    
+    @classmethod
+    def service_types(cls):
+        services = list()
+        print cls.__bases__
+        for base in cls.__bases__:
+            print base.__name__
+            try:
+                services.extend(base.service_types())
+            except:
+                pass
+        services.append(cls.service_type())
+        return services
+    
     def join(self, location):
         """
         Parameters:

File locator.thrift

  list<Location> get_all ()
  Location       get_node(1:string key)
  void           ping    ()
+ string         service_type ()
+ list<string>   service_types()
  oneway void    debug   ()
 }
         except NodeNotFound:
             print 'No peer autodiscovered.'
             sys.exit()
-    value = remote_call(loc, 'get', key)
-    print value
+    print remote_call(loc, 'get', key)

File storeserver.py

 sys.path.append('gen-py')
 from collections import defaultdict
 from time import sleep
+from functools import partial
 
 from thrift import Thrift
 from thrift.transport import TSocket
 parser = location.parser
 parser.set_usage(usage)
 
-def remote_call(destination, method, *args):
-    transport = TSocket.TSocket(destination.address, destination.port)
-    transport = TTransport.TBufferedTransport(transport)
-    protocol = TBinaryProtocol.TBinaryProtocol(transport)
-    client = Store.Client(protocol)
-    try:
-        transport.open()
-    except Thrift.TException, tx:
-        raise location.NodeNotFound(destination)
-    out = getattr(client, method)(*args)
-    transport.close()
-    return out
+remote_call = partial(location.generic_remote_call, Store.Client)
 
 class StoreHandler(location.LocatorHandler):
     def __init__(self, peer=None, port=9900):
         location.LocatorHandler.__init__(self, peer, port)
         self.store = defaultdict(str)
     
+    @classmethod
+    def service_type(cls):
+        return "Diststore"
+    
     def get(self, key):
         """
         Parameters: