OLD | NEW |
1 package rpc | 1 package rpc |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "io" | 5 "io" |
6 "launchpad.net/juju-core/log" | 6 "launchpad.net/juju-core/log" |
7 "reflect" | 7 "reflect" |
8 "sync" | 8 "sync" |
9 ) | 9 ) |
10 | 10 |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
62 sending sync.Mutex | 62 sending sync.Mutex |
63 } | 63 } |
64 | 64 |
65 // ErrorCoder represents an any error that has an associated | 65 // ErrorCoder represents an any error that has an associated |
66 // error code. An error code is a short string that describes the | 66 // error code. An error code is a short string that describes the |
67 // class of error. | 67 // class of error. |
68 type ErrorCoder interface { | 68 type ErrorCoder interface { |
69 ErrorCode() string | 69 ErrorCode() string |
70 } | 70 } |
71 | 71 |
| 72 // Killer represents a type that can be asked to |
| 73 // abort any outstanding requests. The Kill |
| 74 // method should return immediately. |
| 75 type Killer interface { |
| 76 Kill() |
| 77 } |
| 78 |
72 // ServeCodec runs the server on a single connection. ServeCodec | 79 // ServeCodec runs the server on a single connection. ServeCodec |
73 // blocks, serving the connection until the client hangs up. The caller | 80 // blocks, serving the connection until the client hangs up. The caller |
74 // typically invokes ServeCodec in a go statement. The given | 81 // typically invokes ServeCodec in a go statement. The given |
75 // root value, which must be the same type as that passed to | 82 // root value, which must be the same type as that passed to |
76 // NewServer, is used to invoke the RPC requests. If rootValue | 83 // NewServer, is used to invoke the RPC requests. If rootValue |
77 // nil, the original root value passed to NewServer will | 84 // nil, the original root value passed to NewServer will |
78 // be used instead. | 85 // be used instead. |
79 // | 86 // |
80 // ServeCodec will only return when all its outstanding calls have | 87 // ServeCodec stops serving requests when it receives an error |
| 88 // reading a request. Before returning, if rootValue implements |
| 89 // the Killer interface, its Kill method will be called. |
| 90 // ServeCodec will then return only when all its outstanding calls have |
81 // completed. | 91 // completed. |
82 func (srv *Server) ServeCodec(codec ServerCodec, rootValue interface{}) error { | 92 func (srv *Server) ServeCodec(codec ServerCodec, root interface{}) error { |
83 » return srv.serve(reflect.ValueOf(rootValue), codec) | |
84 } | |
85 | |
86 func (srv *Server) serve(root reflect.Value, codec ServerCodec) error { | |
87 » // TODO(rog) allow concurrent requests. | |
88 » if root.Type() != srv.root.Type() { | |
89 » » panic(fmt.Errorf("rpc: unexpected type of root value; got %s, wa
nt %s", root.Type(), srv.root.Type())) | |
90 » } | |
91 csrv := &codecServer{ | 93 csrv := &codecServer{ |
92 Server: srv, | 94 Server: srv, |
93 codec: codec, | 95 codec: codec, |
94 » » root: root, | 96 » » root: reflect.ValueOf(root), |
| 97 » } |
| 98 » if csrv.root.Type() != srv.root.Type() { |
| 99 » » panic(fmt.Errorf("rpc: unexpected type of root value; got %s, wa
nt %s", csrv.root.Type(), srv.root.Type())) |
95 } | 100 } |
96 defer csrv.pending.Wait() | 101 defer csrv.pending.Wait() |
| 102 err := csrv.serve() |
| 103 if killer, ok := root.(Killer); ok { |
| 104 killer.Kill() |
| 105 } |
| 106 return err |
| 107 } |
| 108 |
| 109 func (csrv *codecServer) serve() error { |
97 var req Request | 110 var req Request |
98 for { | 111 for { |
99 req = Request{} | 112 req = Request{} |
100 » » err := codec.ReadRequestHeader(&req) | 113 » » err := csrv.codec.ReadRequestHeader(&req) |
101 if err != nil { | 114 if err != nil { |
102 if err == io.EOF { | 115 if err == io.EOF { |
103 return nil | 116 return nil |
104 } | 117 } |
105 return err | 118 return err |
106 } | 119 } |
107 o, a, err := csrv.findRequest(&req) | 120 o, a, err := csrv.findRequest(&req) |
108 if err != nil { | 121 if err != nil { |
109 » » » _ = codec.ReadRequestBody(&struct{}{}) | 122 » » » _ = csrv.codec.ReadRequestBody(&struct{}{}) |
110 resp := &Response{ | 123 resp := &Response{ |
111 RequestId: req.RequestId, | 124 RequestId: req.RequestId, |
112 } | 125 } |
113 csrv.setError(resp, err) | 126 csrv.setError(resp, err) |
114 » » » if err := codec.WriteResponse(resp, struct{}{}); err !=
nil { | 127 » » » if err := csrv.codec.WriteResponse(resp, struct{}{}); er
r != nil { |
115 return err | 128 return err |
116 } | 129 } |
117 continue | 130 continue |
118 } | 131 } |
119 var argp interface{} | 132 var argp interface{} |
120 var arg reflect.Value | 133 var arg reflect.Value |
121 if a.arg != nil { | 134 if a.arg != nil { |
122 v := reflect.New(a.arg) | 135 v := reflect.New(a.arg) |
123 arg = v.Elem() | 136 arg = v.Elem() |
124 argp = v.Interface() | 137 argp = v.Interface() |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
179 } | 192 } |
180 } | 193 } |
181 | 194 |
182 func (csrv *codecServer) runRequest0(reqId uint64, objId string, o *obtainer, a
*action, arg reflect.Value) (reflect.Value, error) { | 195 func (csrv *codecServer) runRequest0(reqId uint64, objId string, o *obtainer, a
*action, arg reflect.Value) (reflect.Value, error) { |
183 obj, err := o.call(csrv.root, objId) | 196 obj, err := o.call(csrv.root, objId) |
184 if err != nil { | 197 if err != nil { |
185 return reflect.Value{}, err | 198 return reflect.Value{}, err |
186 } | 199 } |
187 return a.call(obj, arg) | 200 return a.call(obj, arg) |
188 } | 201 } |
OLD | NEW |