1. seewind
  2. grpc

Commits

seewind  committed 3f83d7b

fix DictItemProxy.get_owner;

  • Participants
  • Parent commits 329ba0a
  • Branches default

Comments (0)

Files changed (8)

File golang/export.go

View file
  • Ignore whitespace
+/**
+ * Created with IntelliJ IDEA.
+ * User: remote
+ * Date: 12-12-18
+ * Time: 上午10:42
+ * To change this template use File | Settings | File Templates.
+ */
+package grpc
+
+
+type Exporter interface {
+	Name() string
+}
+
+type exportHandler struct {
+	exports map[string]Exporter
+}
+
+func (hd *exportHandler) init() {
+	hd.exports = make(map[string]Exporter)
+}
+
+func (hd *exportHandler) Register(export Exporter) (name string ) {
+	name = export.Name()
+	hd.exports[name] = export
+	return name
+}
+
+func (hd *exportHandler) UnRegister(export Exporter) {
+	name := export.Name()
+	delete(hd.exports, name)
+}
+
+func (hd *exportHandler) GetExport(name string) (export Exporter) {
+	return hd.exports[name]
+}
+
+

File golang/grpc.go

View file
  • Ignore whitespace
-/**
- * Created with IntelliJ IDEA.
- * User: remote
- * Date: 12-12-11
- * Time: 上午11:06
- * To change this template use File | Settings | File Templates.
- */
-package grpc
-
-import (
-	"fmt"
-	"net"
-)
-
-const (
-	//rpc数据类型
-	RT_REQUEST = 1 << iota
-	RT_RESPONSE
-	RT_HEARTBEAT
-	//rpc处理类型
-	ST_NO_RESULT = 1 << 5
-	ST_NO_MSG    = 1 << 6
-	//rpc参数类型
-	DT_PICKLE = 1 << 7 //默认用msgpack
-	DT_ZIP    = 1 << 8
-	DT_PROXY  = 1 << 9 //标示传递的第1个参数是obj, 需要转换成proxy
-	//rpc数据类型 mark
-	RT_MARK = ST_NO_RESULT - 1
-
-	RECONNECT_TIMEOUT = 3  //wait reconnect time, zero will disable reconnect wait
-	HEARTBEAT_TIME    = 30 //heartbeat, if disconnect time > (HEARTBEAT_TIME + RECONNECT_TIMEOUT), connect lost
-	CALL_TIMEORUT     = 120
-	ZIP_LENGTH        = 1024 * 2 //if argkw > Nk, use zlib to compress
-	ZIP_LEVEL         = 3
-)
-
-func printf(msg string) {
-	fmt.Printf(msg)
-}
-
-type rpcBase struct {
-	Stoped     bool
-	Addr net.Addr
-	Host string
-	Port       int
-	sock       *net.Conn
-	listener   *net.Listener
-	exports    map[string]interface{}
-}
-
-func resolveAddr(addr string) (rs net.Addr, err error ) {
-	rs, err = net.ResolveTCPAddr("tcp", addr)
-	if err != nil {
-		rs, err = net.ResolveUnixAddr("unix", addr)
-	}
-	return
-}
-
-func (rpc *rpcBase) Bind(addr string) {
-	if rpc.listener != nil {
-		rpc.listener.Close()
-		rpc.listener = nil
-	}
-	tcp_addr, err := net.ResolveTCPAddr("tcp", addr)
-	if err == nil {
-		rpc.listener, err = net.ListenTCP("tcp", tcp_addr)
-	} else {
-		unix_addr, _ := net.ResolveUnixAddr("unix", addr)
-		rpc.listener, err = net.ListenUnix("unix", unix_addr)
-	}
-}
-
-func (rpc *rpcBase) Connect(addr string) {
-	rpc.
-}
-
-//
-//
+/**
+ * Created with IntelliJ IDEA.
+ * User: remote
+ * Date: 12-12-11
+ * Time: 上午11:06
+ * To change this template use File | Settings | File Templates.
+ */
+package grpc
+
+import (
+	"fmt"
+	"net"
+)
+
+const (
+	//rpc数据类型
+	RT_REQUEST = 1 << iota
+	RT_RESPONSE
+	RT_HEARTBEAT
+	//rpc处理类型
+	ST_NO_RESULT = 1 << 5
+	ST_NO_MSG    = 1 << 6
+	//rpc参数类型
+	DT_PICKLE = 1 << 7 //默认用msgpack
+	DT_ZIP    = 1 << 8
+	DT_PROXY  = 1 << 9 //标示传递的第1个参数是obj, 需要转换成proxy
+	//rpc数据类型 mark
+	RT_MARK = ST_NO_RESULT - 1
+
+	RECONNECT_TIMEOUT = 3  //wait reconnect time, zero will disable reconnect wait
+	HEARTBEAT_TIME    = 30 //heartbeat, if disconnect time > (HEARTBEAT_TIME + RECONNECT_TIMEOUT), connect lost
+	CALL_TIMEORUT     = 120
+	ZIP_LENGTH        = 1024 * 2 //if argkw > Nk, use zlib to compress
+	ZIP_LEVEL         = 3
+)
+
+func printf(msg string) {
+	fmt.Printf(msg)
+}
+
+type rpcBase struct {
+	Started  bool
+	Addr     net.Addr
+	SAddr    string
+	Host     string
+	Port     int
+	sock     net.Conn
+	listener net.Listener
+	exports  map[string]interface{}
+}
+
+type RpcClient struct {
+	rpcBase
+	exportHandler
+	Service
+}
+
+func resolveAddr(addr string) (rs net.Addr, err error) {
+	rs, err = net.ResolveTCPAddr("tcp", addr)
+	if err != nil {
+		rs, err = net.ResolveUnixAddr("unix", addr)
+	}
+	return
+}
+
+func (rpc *rpcBase) resolveAddr(addr string) bool {
+	var err error
+	rpc.Addr, err = resolveAddr(addr)
+	if err != nil {
+		return false
+	}
+	rpc.SAddr = addr
+	return true
+}
+
+//close rpc
+func (rpc *rpcBase) Close() {
+	if rpc.listener != nil {
+		rpc.listener.Close()
+		rpc.listener = nil
+	}
+	if rpc.sock != nil {
+		rpc.sock.Close()
+		rpc.sock = nil
+	}
+}
+
+//bind addr
+func (rpc *rpcBase) Bind(addr string) error {
+	var err error
+	rpc.Close()
+	ok := rpc.resolveAddr(addr)
+	if !ok {
+		return err
+	}
+	tcp_addr, ok := rpc.Addr.(*net.TCPAddr)
+	if ok {
+		rpc.listener, err = net.ListenTCP("tcp", tcp_addr)
+		if err != nil {
+			return err
+		}
+	} else {
+		unix_addr, _ := rpc.Addr.(*net.UnixAddr)
+		rpc.listener, err = net.ListenUnix("unix", unix_addr)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+//connect to addr
+func (rpc *rpcBase) Connect(addr string) bool {
+	rpc.Close()
+	ok := rpc.resolveAddr(addr)
+	if !ok {
+		return ok
+	}
+	var err error
+	tcp_addr, ok := rpc.Addr.(*net.TCPAddr)
+	if ok {
+		rpc.sock, err = net.DialTCP("tcp", nil, tcp_addr)
+		if err != nil {
+			return false
+		}
+	} else {
+		unix_addr, _ := rpc.Addr.(*net.UnixAddr)
+		rpc.sock, err = net.DialUnix("unix", nil, unix_addr)
+		if err != nil {
+			return false
+		}
+	}
+	return true
+}
+
+func (rpc *rpcBase) Start() {
+	if rpc.Started {
+		return
+	}
+	rpc.Started = true
+}
+
+func (rpc *rpcBase) Stop() {
+	if !rpc.Started {
+		return
+	}
+	rpc.Started = false
+}
+
+
+//
+//

File golang/pack.go

View file
  • Ignore whitespace
+/**
+ * Created with IntelliJ IDEA.
+ * User: remote
+ * Date: 12-12-18
+ * Time: 上午11:43
+ * To change this template use File | Settings | File Templates.
+ */
+package grpc
+
+import (
+	"bytes"
+	"encoding/binary"
+	msgpack "github.com/msgpack/msgpack-go"
+)
+
+type Packer interface {
+	Pack(value interface{}) (data []byte, err error )
+	Unpack(data []byte) (values []interface{})
+}
+
+type MsgPack struct {
+	ubuf
+}
+
+func (mp *MsgPack) Pack(value interface {}) (data []byte, err error ) {
+	dbuf := bytes.NewBuffer(nil)
+	_, err = msgpack.Pack(dbuf, value)
+	if err != nil {
+		return nil, err
+	}
+	rs := bytes.NewBuffer(nil)
+	binary.Write(rs, binary.LittleEndian, dbuf.Len())
+	rs.Write(dbuf.Bytes())
+	return rs.Bytes(), err
+}
+
+func (mp *MsgPack) Unpack(data []byte) (values []interface {}) {
+
+}
+

File golang/server.go

  • Ignore whitespace
-package main
-
-import (
-	"bytes"
-	"encoding/binary"
-	"fmt"
-	msgpack "github.com/msgpack/msgpack-go"
-	"net"
-	"reflect"
-)
-
-const (
-	BUFF_SIZE = 1024 * 32
-	//rpc数据类型
-	RT_REQUEST   = 1 << 0
-	RT_RESPONSE  = 1 << 1
-	RT_HEARTBEAT = 1 << 2
-	RT_EXCEPTION = 1 << 3
-	//rpc处理类型
-	ST_NO_RESULT = 1 << 5
-	ST_NO_MSG    = 1 << 6
-	//rpc参数类型
-	DT_PICKLE = 1 << 7 //默认用msgpack
-	DT_ZIP    = 1 << 8
-	DT_PROXY  = 1 << 9 //标示传递的第1个参数是obj, 需要转换成proxy
-)
-
-var (
-	Debug = true
-)
-
-func Log(msgs ...interface{}) {
-	if Debug {
-		fmt.Println(msgs...)
-	}
-}
-
-type Request struct {
-	DType    int8
-	ObjId    string
-	Index    int64
-	FuncName string
-	Params   []interface{}
-}
-
-type RpcConn struct {
-	svr  *RpcServer
-	conn net.Conn
-	uid  string
-	send chan []interface{}
-}
-
-func NewRpcConn(svr *RpcServer, c net.Conn) (conn *RpcConn) {
-	conn = &RpcConn{svr, c, "", make(chan []interface{})}
-	return conn
-}
-
-func (c *RpcConn) _handle(req Request) {
-	defer func() {
-		err := recover()
-		if err != nil {
-			return
-		}
-	}()
-	Log("begin _handle")
-	value := reflect.ValueOf(c.svr.Exporter)
-	func_value := value.MethodByName(req.FuncName)
-	in := make([]reflect.Value, len(req.Params))
-	for i, param := range req.Params {
-		in[i] = reflect.ValueOf(param)
-	}
-	out := func_value.Call(in)
-	rs := make([]interface{}, len(out))
-	for k, v := range out {
-		rs[k] = v.Interface()
-	}
-
-}
-
-func (c *RpcConn) _sender() {
-	for values := range c.send {
-		Log("send:", values)
-		data := bytes.NewBuffer(nil)
-		_, err := msgpack.Pack(data, values)
-		if err != nil {
-			continue
-		}
-
-		buffer := bytes.NewBuffer(nil)
-		binary.Write(buffer, binary.LittleEndian, data.Len())
-		c.conn.Write(buffer.Bytes())
-		c.conn.Write(data.Bytes())
-	}
-}
-
-func (c *RpcConn) Handle() {
-	defer c.conn.Close()
-	defer close(c.send)
-	go c._sender()
-	var buf [BUFF_SIZE]byte
-
-	Log("begin Handle")
-	n, _ := c.conn.Read(buf[0:32])
-	if n != 32 {
-		return
-	}
-	c.uid = string(buf[0:32])
-	for {
-		_, err := c.conn.Read(buf[0:4])
-		if err != nil {
-			c.svr.Log("read len error:", err)
-			break
-		}
-		//len
-		var l uint32
-		buffer := bytes.NewBuffer(buf[0:4])
-		binary.Read(buffer, binary.LittleEndian, &l)
-		//data
-		n, err = c.conn.Read(buf[0:l])
-		if err != nil {
-			c.svr.Log("read data error:", err)
-			break
-		}
-		buffer = bytes.NewBuffer(buf[0:l])
-		data, _, err := msgpack.Unpack(buffer)
-		if err != nil {
-			c.svr.Log("unpack data error:", err)
-			continue
-		}
-		//dtype, obj_id, index, func_name, argskw
-		values := data.Interface().([]interface{})
-		if len(values) < 5 {
-			c.svr.Log("data len error:", len(values))
-			continue
-		}
-		dtype := values[0].(int8)
-		obj_id := string(values[1].([]byte))
-		index := reflect.ValueOf(values[2]).Int()
-		func_name := string(values[3].([]byte))
-		argskw := values[4].([]byte)
-		buffer = bytes.NewBuffer(argskw)
-		var params reflect.Value
-		params, _, err = msgpack.Unpack(buffer)
-		if err != nil {
-			c.svr.Log("unpack argskw error:", err)
-			continue
-		}
-		req := Request{dtype, obj_id, index, func_name, params.Interface().([]interface{})}
-		go c._handle(req)
-	}
-
-}
-
-type RpcServer struct {
-	listener net.Listener
-	Exporter interface{}
-}
-
-func (s *RpcServer) Log(msgs ...interface{}) {
-	if Debug {
-		fmt.Println(msgs...)
-	}
-}
-
-func (s *RpcServer) Listen(snet, addr string) {
-	if s.listener != nil {
-		s.listener.Close()
-		s.listener = nil
-	}
-	listener, err := net.Listen(snet, addr)
-	if err != nil {
-		panic(err)
-	}
-	s.listener = listener
-}
-
-func (s *RpcServer) Start() {
-	for {
-		c, err := s.listener.Accept()
-		s.Log("accept", c.RemoteAddr())
-		if err != nil {
-			s.Log(err)
-			continue
-		}
-		conn := NewRpcConn(s, c)
-		go conn.Handle()
-	}
-}
-
-func main() {
-	svr := new(RpcServer)
-	svr.Listen("tcp", "127.0.0.1:12388")
-	svr.Start()
-}

File golang/service.go

View file
  • Ignore whitespace
+/**
+ * Created with IntelliJ IDEA.
+ * User: remote
+ * Date: 12-12-18
+ * Time: 上午11:16
+ * To change this template use File | Settings | File Templates.
+ */
+package grpc
+
+import (
+	"net"
+)
+
+type Service struct {
+	Uid string
+	Stoped bool
+	ChSend chan string
+	ChRecv chan string
+	Sock net.Conn
+}
+
+func (svc *Service) Start() {
+	svc.Stoped = false
+	go svc.goRecv()
+}
+
+func (svc *Service) Stop() {
+	svc.Stoped = true
+}
+
+func (svc *Service) goRecv() {
+	for !svc.Stoped {
+//		svc.Sock.Read()
+	}
+}
+
+

File golang/test_grpc.go

  • Ignore whitespace
-/**
- * Created with IntelliJ IDEA.
- * User: remote
- * Date: 12-12-11
- * Time: 上午11:06
- * To change this template use File | Settings | File Templates.
- */
-package main
-
-import (
-	"fmt"
-	"net"
-	"reflect"
-)
-
-const (
-	//rpc数据类型
-	RT_REQUEST = 1 << iota
-	RT_RESPONSE
-	RT_HEARTBEAT
-	//rpc处理类型
-	ST_NO_RESULT = 1 << 5
-	ST_NO_MSG    = 1 << 6
-	//rpc参数类型
-	DT_PICKLE = 1 << 7 //默认用msgpack
-	DT_ZIP    = 1 << 8
-	DT_PROXY  = 1 << 9 //标示传递的第1个参数是obj, 需要转换成proxy
-	//rpc数据类型 mark
-	RT_MARK = ST_NO_RESULT - 1
-
-	RECONNECT_TIMEOUT = 3  //wait reconnect time, zero will disable reconnect wait
-	HEARTBEAT_TIME    = 30 //heartbeat, if disconnect time > (HEARTBEAT_TIME + RECONNECT_TIMEOUT), connect lost
-	CALL_TIMEORUT     = 120
-	ZIP_LENGTH        = 1024 * 2 //if argkw > Nk, use zlib to compress
-	ZIP_LEVEL         = 3
-)
-
-func printf(msg string) {
-	fmt.Printf(msg)
-}
-
-type rpcBase struct {
-	Stoped   bool
-	Addr     net.Addr
-	Host     string
-	Port     int
-	sock     *net.Conn
-	listener *net.Listener
-	exports  map[string]interface{}
-}
-
-func resolveAddr(addr string) (rs net.Addr, err error) {
-	rs, err = net.ResolveTCPAddr("tcp", addr)
-	if err != nil {
-		rs, err = net.ResolveUnixAddr("unix", addr)
-	}
-	return
-}
-
-func main() {
-	saddr := "172.16.8.10:8008"
-	var i1 interface{}
-	addr, err := resolveAddr(saddr)
-	i1 = addr
-	if err != nil {
-		fmt.Printf(err.Error())
-		return
-	}
-	switch reflect. {
-	case net.TCPAddr:
-		fmt.Println("abc", v)
-	case net.Addr:
-		fmt.Println("addr")
-	default:
-		fmt.Println("default")
-	}
-	fmt.Println(reflect.TypeOf(addr))
-}
-
-//
-//

File grpc/__init__.py

View file
  • Ignore whitespace
 from .rpc_shell import *
 
 
-VERSION = (0, 3, 5)
+VERSION = (0, 3, 6)
 __version__ = VERSION

File grpc/rpc.py

View file
  • Ignore whitespace
     def get_owner(self):
         if self.is_local:
             return self.export
-        return RpcProxy(self._id, addr=self._addr)
+        return RpcProxy(self._id, addr=self._addr, svc=self._svc)
 
 
 def map_items(proxys, attr, *args, **kw):