OLD | NEW |
1 // Copyright 2009 The Go Authors. All rights reserved. | 1 // Copyright 2009 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package rpc | 5 package rpc |
6 | 6 |
7 import ( | 7 import ( |
8 "bufio" | 8 "bufio" |
9 "encoding/gob" | 9 "encoding/gob" |
10 "errors" | 10 "errors" |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
42 mutex sync.Mutex // protects pending, seq, request | 42 mutex sync.Mutex // protects pending, seq, request |
43 sending sync.Mutex | 43 sending sync.Mutex |
44 request Request | 44 request Request |
45 seq uint64 | 45 seq uint64 |
46 codec ClientCodec | 46 codec ClientCodec |
47 pending map[uint64]*Call | 47 pending map[uint64]*Call |
48 closing bool | 48 closing bool |
49 shutdown bool | 49 shutdown bool |
50 } | 50 } |
51 | 51 |
| 52 // ClientCodecFactory returns a ClientCodec factory. |
| 53 type ClientCodecFactory interface { |
| 54 NewClientCodec(conn io.ReadWriteCloser) ClientCodec |
| 55 } |
| 56 |
52 // A ClientCodec implements writing of RPC requests and | 57 // A ClientCodec implements writing of RPC requests and |
53 // reading of RPC responses for the client side of an RPC session. | 58 // reading of RPC responses for the client side of an RPC session. |
54 // The client calls WriteRequest to write a request to the connection | 59 // The client calls WriteRequest to write a request to the connection |
55 // and calls ReadResponseHeader and ReadResponseBody in pairs | 60 // and calls ReadResponseHeader and ReadResponseBody in pairs |
56 // to read responses. The client calls Close when finished with the | 61 // to read responses. The client calls Close when finished with the |
57 // connection. ReadResponseBody may be called with a nil | 62 // connection. ReadResponseBody may be called with a nil |
58 // argument to force the body of the response to be read and then | 63 // argument to force the body of the response to be read and then |
59 // discarded. | 64 // discarded. |
60 type ClientCodec interface { | 65 type ClientCodec interface { |
61 WriteRequest(*Request, interface{}) error | 66 WriteRequest(*Request, interface{}) error |
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
219 } | 224 } |
220 | 225 |
221 func (c *gobClientCodec) ReadResponseBody(body interface{}) error { | 226 func (c *gobClientCodec) ReadResponseBody(body interface{}) error { |
222 return c.dec.Decode(body) | 227 return c.dec.Decode(body) |
223 } | 228 } |
224 | 229 |
225 func (c *gobClientCodec) Close() error { | 230 func (c *gobClientCodec) Close() error { |
226 return c.rwc.Close() | 231 return c.rwc.Close() |
227 } | 232 } |
228 | 233 |
| 234 type gobClientCodecFactory int |
| 235 |
| 236 func (_ *gobClientCodecFactory) NewClientCodec(conn io.ReadWriteCloser) ClientCo
dec { |
| 237 encBuf := bufio.NewWriter(conn) |
| 238 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(enc
Buf), encBuf} |
| 239 return client |
| 240 } |
| 241 |
229 // DialHTTP connects to an HTTP RPC server at the specified network address | 242 // DialHTTP connects to an HTTP RPC server at the specified network address |
230 // listening on the default HTTP RPC path. | 243 // listening on the default HTTP RPC path. |
231 func DialHTTP(network, address string) (*Client, error) { | 244 func DialHTTP(network, address string) (*Client, error) { |
232 » return DialHTTPPath(network, address, DefaultRPCPath) | 245 » return DialHTTPPathWithCodecFactory(network, address, DefaultRPCPath, ne
w(gobClientCodecFactory)) |
| 246 } |
| 247 |
| 248 // DialHTTPWithCodecFactory connects to an HTTP RPC server at the specified netw
ork address |
| 249 // listening on the default HTTP RPC path with ClientCodecFactory. |
| 250 func DialHTTPWithCodecFactory(network, address string, factory ClientCodecFactor
y) (*Client, error) { |
| 251 » return DialHTTPPathWithCodecFactory(network, address, DefaultRPCPath, fa
ctory) |
233 } | 252 } |
234 | 253 |
235 // DialHTTPPath connects to an HTTP RPC server | 254 // DialHTTPPath connects to an HTTP RPC server |
236 // at the specified network address and path. | 255 // at the specified network address and path. |
237 func DialHTTPPath(network, address, path string) (*Client, error) { | 256 func DialHTTPPath(network, address, path string) (*Client, error) { |
| 257 return DialHTTPPathWithCodecFactory(network, address, path, new(gobClien
tCodecFactory)) |
| 258 } |
| 259 |
| 260 // DialHTTPPathWithCodecFactory connects to an HTTP RPC server |
| 261 // at the specified network address and path with ClientCodecFactory. |
| 262 func DialHTTPPathWithCodecFactory(network, address, path string, factory ClientC
odecFactory) (*Client, error) { |
238 var err error | 263 var err error |
239 conn, err := net.Dial(network, address) | 264 conn, err := net.Dial(network, address) |
240 if err != nil { | 265 if err != nil { |
241 return nil, err | 266 return nil, err |
242 } | 267 } |
243 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") | 268 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") |
244 | 269 |
245 // Require successful HTTP response | 270 // Require successful HTTP response |
246 // before switching to RPC protocol. | 271 // before switching to RPC protocol. |
247 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Meth
od: "CONNECT"}) | 272 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Meth
od: "CONNECT"}) |
248 if err == nil && resp.Status == connected { | 273 if err == nil && resp.Status == connected { |
249 » » return NewClient(conn), nil | 274 » » codec := factory.NewClientCodec(conn) |
| 275 » » client := NewClientWithCodec(codec) |
| 276 » » return client, nil |
250 } | 277 } |
251 if err == nil { | 278 if err == nil { |
252 err = errors.New("unexpected HTTP response: " + resp.Status) | 279 err = errors.New("unexpected HTTP response: " + resp.Status) |
253 } | 280 } |
254 conn.Close() | 281 conn.Close() |
255 return nil, &net.OpError{ | 282 return nil, &net.OpError{ |
256 Op: "dial-http", | 283 Op: "dial-http", |
257 Net: network + " " + address, | 284 Net: network + " " + address, |
258 Addr: nil, | 285 Addr: nil, |
259 Err: err, | 286 Err: err, |
260 } | 287 } |
261 } | 288 } |
262 | 289 |
263 // Dial connects to an RPC server at the specified network address. | 290 // Dial connects to an RPC server at the specified network address. |
264 func Dial(network, address string) (*Client, error) { | 291 func Dial(network, address string) (*Client, error) { |
| 292 return DialWithCodecFactory(network, address, new(gobClientCodecFactory)
) |
| 293 } |
| 294 |
| 295 // DialWithCodecFactory connects to an RPC server at the specified network addre
ss with ClientCodecFactory. |
| 296 func DialWithCodecFactory(network, address string, factory ClientCodecFactory) (
*Client, error) { |
265 conn, err := net.Dial(network, address) | 297 conn, err := net.Dial(network, address) |
266 if err != nil { | 298 if err != nil { |
267 return nil, err | 299 return nil, err |
268 } | 300 } |
269 » return NewClient(conn), nil | 301 » codec := factory.NewClientCodec(conn) |
| 302 » return NewClientWithCodec(codec), nil |
270 } | 303 } |
271 | 304 |
272 func (client *Client) Close() error { | 305 func (client *Client) Close() error { |
273 client.mutex.Lock() | 306 client.mutex.Lock() |
274 if client.shutdown || client.closing { | 307 if client.shutdown || client.closing { |
275 client.mutex.Unlock() | 308 client.mutex.Unlock() |
276 return ErrShutdown | 309 return ErrShutdown |
277 } | 310 } |
278 client.closing = true | 311 client.closing = true |
279 client.mutex.Unlock() | 312 client.mutex.Unlock() |
(...skipping 23 matching lines...) Expand all Loading... |
303 call.Done = done | 336 call.Done = done |
304 client.send(call) | 337 client.send(call) |
305 return call | 338 return call |
306 } | 339 } |
307 | 340 |
308 // Call invokes the named function, waits for it to complete, and returns its er
ror status. | 341 // Call invokes the named function, waits for it to complete, and returns its er
ror status. |
309 func (client *Client) Call(serviceMethod string, args interface{}, reply interfa
ce{}) error { | 342 func (client *Client) Call(serviceMethod string, args interface{}, reply interfa
ce{}) error { |
310 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Don
e | 343 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Don
e |
311 return call.Error | 344 return call.Error |
312 } | 345 } |
OLD | NEW |