OLD | NEW |
1 // Copyright 2009 The Go Authors. All rights reserved. | 1 // Copyright 2009 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package rpc | 5 package rpc |
6 | 6 |
7 import ( | 7 import ( |
8 "bufio" | 8 "bufio" |
9 "encoding/gob" | 9 "encoding/gob" |
10 "errors" | 10 "errors" |
(...skipping 23 matching lines...) Expand all Loading... |
34 Done chan *Call // Strobes when call is complete. | 34 Done chan *Call // Strobes when call is complete. |
35 } | 35 } |
36 | 36 |
37 // Client represents an RPC Client. | 37 // Client represents an RPC Client. |
38 // There may be multiple outstanding Calls associated | 38 // There may be multiple outstanding Calls associated |
39 // with a single Client, and a Client may be used by | 39 // with a single Client, and a Client may be used by |
40 // multiple goroutines simultaneously. | 40 // multiple goroutines simultaneously. |
41 type Client struct { | 41 type Client struct { |
42 codec ClientCodec | 42 codec ClientCodec |
43 | 43 |
44 » sending sync.Mutex | 44 » reqMutex sync.Mutex // protects following |
| 45 » request Request |
45 | 46 |
46 mutex sync.Mutex // protects following | 47 mutex sync.Mutex // protects following |
47 request Request | |
48 seq uint64 | 48 seq uint64 |
49 pending map[uint64]*Call | 49 pending map[uint64]*Call |
50 closing bool // user has called Close | 50 closing bool // user has called Close |
51 shutdown bool // server has told us to stop | 51 shutdown bool // server has told us to stop |
52 } | 52 } |
53 | 53 |
54 // A ClientCodec implements writing of RPC requests and | 54 // A ClientCodec implements writing of RPC requests and |
55 // reading of RPC responses for the client side of an RPC session. | 55 // reading of RPC responses for the client side of an RPC session. |
56 // The client calls WriteRequest to write a request to the connection | 56 // The client calls WriteRequest to write a request to the connection |
57 // and calls ReadResponseHeader and ReadResponseBody in pairs | 57 // and calls ReadResponseHeader and ReadResponseBody in pairs |
58 // to read responses. The client calls Close when finished with the | 58 // to read responses. The client calls Close when finished with the |
59 // connection. ReadResponseBody may be called with a nil | 59 // connection. ReadResponseBody may be called with a nil |
60 // argument to force the body of the response to be read and then | 60 // argument to force the body of the response to be read and then |
61 // discarded. | 61 // discarded. |
62 type ClientCodec interface { | 62 type ClientCodec interface { |
63 // WriteRequest must be safe for concurrent use by multiple goroutines. | 63 // WriteRequest must be safe for concurrent use by multiple goroutines. |
64 WriteRequest(*Request, interface{}) error | 64 WriteRequest(*Request, interface{}) error |
65 ReadResponseHeader(*Response) error | 65 ReadResponseHeader(*Response) error |
66 ReadResponseBody(interface{}) error | 66 ReadResponseBody(interface{}) error |
67 | 67 |
68 Close() error | 68 Close() error |
69 } | 69 } |
70 | 70 |
71 func (client *Client) send(call *Call) { | 71 func (client *Client) send(call *Call) { |
72 » client.sending.Lock() | 72 » client.reqMutex.Lock() |
73 » defer client.sending.Unlock() | 73 » defer client.reqMutex.Unlock() |
74 | 74 |
75 // Register this call. | 75 // Register this call. |
76 client.mutex.Lock() | 76 client.mutex.Lock() |
77 if client.shutdown || client.closing { | 77 if client.shutdown || client.closing { |
78 call.Error = ErrShutdown | 78 call.Error = ErrShutdown |
79 client.mutex.Unlock() | 79 client.mutex.Unlock() |
80 call.done() | 80 call.done() |
81 return | 81 return |
82 } | 82 } |
83 seq := client.seq | 83 seq := client.seq |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
139 call.done() | 139 call.done() |
140 default: | 140 default: |
141 err = client.codec.ReadResponseBody(call.Reply) | 141 err = client.codec.ReadResponseBody(call.Reply) |
142 if err != nil { | 142 if err != nil { |
143 call.Error = errors.New("reading body " + err.Er
ror()) | 143 call.Error = errors.New("reading body " + err.Er
ror()) |
144 } | 144 } |
145 call.done() | 145 call.done() |
146 } | 146 } |
147 } | 147 } |
148 // Terminate pending calls. | 148 // Terminate pending calls. |
149 » client.sending.Lock() | 149 » client.reqMutex.Lock() |
150 client.mutex.Lock() | 150 client.mutex.Lock() |
151 client.shutdown = true | 151 client.shutdown = true |
152 closing := client.closing | 152 closing := client.closing |
153 if err == io.EOF { | 153 if err == io.EOF { |
154 if closing { | 154 if closing { |
155 err = ErrShutdown | 155 err = ErrShutdown |
156 } else { | 156 } else { |
157 err = io.ErrUnexpectedEOF | 157 err = io.ErrUnexpectedEOF |
158 } | 158 } |
159 } | 159 } |
160 for _, call := range client.pending { | 160 for _, call := range client.pending { |
161 call.Error = err | 161 call.Error = err |
162 call.done() | 162 call.done() |
163 } | 163 } |
164 client.mutex.Unlock() | 164 client.mutex.Unlock() |
165 » client.sending.Unlock() | 165 » client.reqMutex.Unlock() |
166 if debugLog && err != io.EOF && !closing { | 166 if debugLog && err != io.EOF && !closing { |
167 log.Println("rpc: client protocol error:", err) | 167 log.Println("rpc: client protocol error:", err) |
168 } | 168 } |
169 } | 169 } |
170 | 170 |
171 func (call *Call) done() { | 171 func (call *Call) done() { |
172 select { | 172 select { |
173 case call.Done <- call: | 173 case call.Done <- call: |
174 // ok | 174 // ok |
175 default: | 175 default: |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
308 call.Done = done | 308 call.Done = done |
309 client.send(call) | 309 client.send(call) |
310 return call | 310 return call |
311 } | 311 } |
312 | 312 |
313 // Call invokes the named function, waits for it to complete, and returns its er
ror status. | 313 // Call invokes the named function, waits for it to complete, and returns its er
ror status. |
314 func (client *Client) Call(serviceMethod string, args interface{}, reply interfa
ce{}) error { | 314 func (client *Client) Call(serviceMethod string, args interface{}, reply interfa
ce{}) error { |
315 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Don
e | 315 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Don
e |
316 return call.Error | 316 return call.Error |
317 } | 317 } |
OLD | NEW |