Left: | ||
Right: |
OLD | NEW |
---|---|
1 package rpc | 1 package rpc |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "io" | 5 "io" |
6 "launchpad.net/juju-core/log" | 6 "launchpad.net/juju-core/log" |
7 "reflect" | 7 "reflect" |
8 "sync" | 8 "sync" |
9 ) | 9 ) |
10 | 10 |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
62 sending sync.Mutex | 62 sending sync.Mutex |
63 } | 63 } |
64 | 64 |
65 // ErrorCoder represents an any error that has an associated | 65 // ErrorCoder represents an any error that has an associated |
66 // error code. An error code is a short string that describes the | 66 // error code. An error code is a short string that describes the |
67 // class of error. | 67 // class of error. |
68 type ErrorCoder interface { | 68 type ErrorCoder interface { |
69 ErrorCode() string | 69 ErrorCode() string |
70 } | 70 } |
71 | 71 |
72 // Killer represents a type that can be asked to | |
73 // abort any outstanding requests. The Kill | |
74 // method should return immediately. | |
75 type Killer interface { | |
TheMue
2013/02/26 12:01:56
Hehe, funny name.
dave_cheney.net
2013/02/27 05:24:14
Sometimes the language hand you these gems.
| |
76 Kill() | |
77 } | |
78 | |
72 // ServeCodec runs the server on a single connection. ServeCodec | 79 // ServeCodec runs the server on a single connection. ServeCodec |
73 // blocks, serving the connection until the client hangs up. The caller | 80 // blocks, serving the connection until the client hangs up. The caller |
74 // typically invokes ServeCodec in a go statement. The given | 81 // typically invokes ServeCodec in a go statement. The given |
75 // root value, which must be the same type as that passed to | 82 // root value, which must be the same type as that passed to |
76 // NewServer, is used to invoke the RPC requests. If rootValue | 83 // NewServer, is used to invoke the RPC requests. If rootValue |
77 // nil, the original root value passed to NewServer will | 84 // nil, the original root value passed to NewServer will |
78 // be used instead. | 85 // be used instead. |
79 // | 86 // |
80 // ServeCodec will only return when all its outstanding calls have | 87 // ServeCodec stops serving requests when it receives an error |
88 // reading a request. Before returning, if rootValue implements | |
89 // the Killer interface, its Kill method will be called. | |
90 // ServeCodec will then return only when all its outstanding calls have | |
81 // completed. | 91 // completed. |
82 func (srv *Server) ServeCodec(codec ServerCodec, rootValue interface{}) error { | 92 func (srv *Server) ServeCodec(codec ServerCodec, root interface{}) error { |
83 » return srv.serve(reflect.ValueOf(rootValue), codec) | |
84 } | |
85 | |
86 func (srv *Server) serve(root reflect.Value, codec ServerCodec) error { | |
87 » // TODO(rog) allow concurrent requests. | |
88 » if root.Type() != srv.root.Type() { | |
89 » » panic(fmt.Errorf("rpc: unexpected type of root value; got %s, wa nt %s", root.Type(), srv.root.Type())) | |
90 » } | |
91 csrv := &codecServer{ | 93 csrv := &codecServer{ |
92 Server: srv, | 94 Server: srv, |
93 codec: codec, | 95 codec: codec, |
94 » » root: root, | 96 » » root: reflect.ValueOf(root), |
97 » } | |
98 » if csrv.root.Type() != srv.root.Type() { | |
99 » » panic(fmt.Errorf("rpc: unexpected type of root value; got %s, wa nt %s", csrv.root.Type(), srv.root.Type())) | |
95 } | 100 } |
96 defer csrv.pending.Wait() | 101 defer csrv.pending.Wait() |
102 err := csrv.serve() | |
103 if killer, ok := root.(Killer); ok { | |
104 killer.Kill() | |
105 } | |
106 return err | |
107 } | |
108 | |
109 func (csrv *codecServer) serve() error { | |
97 var req Request | 110 var req Request |
98 for { | 111 for { |
99 req = Request{} | 112 req = Request{} |
100 » » err := codec.ReadRequestHeader(&req) | 113 » » err := csrv.codec.ReadRequestHeader(&req) |
101 if err != nil { | 114 if err != nil { |
102 if err == io.EOF { | 115 if err == io.EOF { |
103 return nil | 116 return nil |
104 } | 117 } |
105 return err | 118 return err |
106 } | 119 } |
107 o, a, err := csrv.findRequest(&req) | 120 o, a, err := csrv.findRequest(&req) |
108 if err != nil { | 121 if err != nil { |
109 » » » _ = codec.ReadRequestBody(&struct{}{}) | 122 » » » _ = csrv.codec.ReadRequestBody(&struct{}{}) |
110 resp := &Response{ | 123 resp := &Response{ |
111 RequestId: req.RequestId, | 124 RequestId: req.RequestId, |
112 } | 125 } |
113 csrv.setError(resp, err) | 126 csrv.setError(resp, err) |
114 » » » if err := codec.WriteResponse(resp, struct{}{}); err != nil { | 127 » » » if err := csrv.codec.WriteResponse(resp, struct{}{}); er r != nil { |
115 return err | 128 return err |
116 } | 129 } |
117 continue | 130 continue |
118 } | 131 } |
119 var argp interface{} | 132 var argp interface{} |
120 var arg reflect.Value | 133 var arg reflect.Value |
121 if a.arg != nil { | 134 if a.arg != nil { |
122 v := reflect.New(a.arg) | 135 v := reflect.New(a.arg) |
123 arg = v.Elem() | 136 arg = v.Elem() |
124 argp = v.Interface() | 137 argp = v.Interface() |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
179 } | 192 } |
180 } | 193 } |
181 | 194 |
182 func (csrv *codecServer) runRequest0(reqId uint64, objId string, o *obtainer, a *action, arg reflect.Value) (reflect.Value, error) { | 195 func (csrv *codecServer) runRequest0(reqId uint64, objId string, o *obtainer, a *action, arg reflect.Value) (reflect.Value, error) { |
183 obj, err := o.call(csrv.root, objId) | 196 obj, err := o.call(csrv.root, objId) |
184 if err != nil { | 197 if err != nil { |
185 return reflect.Value{}, err | 198 return reflect.Value{}, err |
186 } | 199 } |
187 return a.call(obj, arg) | 200 return a.call(obj, arg) |
188 } | 201 } |
OLD | NEW |