LEFT | RIGHT |
(no file at all) | |
1 // Copyright 2010 The Go Authors. All rights reserved. | 1 // Copyright 2010 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package netchan | 5 package netchan |
6 | 6 |
7 import ( | 7 import ( |
8 "io" | 8 "io" |
9 "log" | 9 "log" |
10 "net" | 10 "net" |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
95 case payData: | 95 case payData: |
96 // done lower in loop | 96 // done lower in loop |
97 case payError: | 97 case payError: |
98 if e := imp.decode(errValue); e != nil { | 98 if e := imp.decode(errValue); e != nil { |
99 impLog("error:", e) | 99 impLog("error:", e) |
100 return | 100 return |
101 } | 101 } |
102 if err.Error != "" { | 102 if err.Error != "" { |
103 impLog("response error:", err.Error) | 103 impLog("response error:", err.Error) |
104 select { | 104 select { |
105 » » » » case imp.errors <- os.ErrorString(err.Error): | 105 » » » » case imp.errors <- os.NewError(err.Error): |
106 continue // errors are not acknowledged | 106 continue // errors are not acknowledged |
107 default: | 107 default: |
108 imp.shutdown() | 108 imp.shutdown() |
109 return | 109 return |
110 } | 110 } |
111 } | 111 } |
112 case payClosed: | 112 case payClosed: |
113 nch := imp.getChan(hdr.Id, false) | 113 nch := imp.getChan(hdr.Id, false) |
114 if nch != nil { | 114 if nch != nil { |
115 nch.close() | 115 nch.close() |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
196 // fmt.Printf("%+v\n", <-ch) | 196 // fmt.Printf("%+v\n", <-ch) |
197 func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size,
n int) os.Error { | 197 func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size,
n int) os.Error { |
198 ch, err := checkChan(chT, dir) | 198 ch, err := checkChan(chT, dir) |
199 if err != nil { | 199 if err != nil { |
200 return err | 200 return err |
201 } | 201 } |
202 imp.chanLock.Lock() | 202 imp.chanLock.Lock() |
203 defer imp.chanLock.Unlock() | 203 defer imp.chanLock.Unlock() |
204 _, present := imp.names[name] | 204 _, present := imp.names[name] |
205 if present { | 205 if present { |
206 » » return os.ErrorString("channel name already being imported:" + n
ame) | 206 » » return os.NewError("channel name already being imported:" + name
) |
207 } | 207 } |
208 if size < 1 { | 208 if size < 1 { |
209 size = 1 | 209 size = 1 |
210 } | 210 } |
211 id := imp.maxId | 211 id := imp.maxId |
212 imp.maxId++ | 212 imp.maxId++ |
213 nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n
)) | 213 nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n
)) |
214 imp.names[name] = nch | 214 imp.names[name] = nch |
215 imp.chans[id] = nch | 215 imp.chans[id] = nch |
216 // Tell the other side about this channel. | 216 // Tell the other side about this channel. |
(...skipping 30 matching lines...) Expand all Loading... |
247 return nil | 247 return nil |
248 } | 248 } |
249 | 249 |
250 // Hangup disassociates the named channel from the Importer and closes | 250 // Hangup disassociates the named channel from the Importer and closes |
251 // the channel. Messages in flight for the channel may be dropped. | 251 // the channel. Messages in flight for the channel may be dropped. |
252 func (imp *Importer) Hangup(name string) os.Error { | 252 func (imp *Importer) Hangup(name string) os.Error { |
253 imp.chanLock.Lock() | 253 imp.chanLock.Lock() |
254 defer imp.chanLock.Unlock() | 254 defer imp.chanLock.Unlock() |
255 nc := imp.names[name] | 255 nc := imp.names[name] |
256 if nc == nil { | 256 if nc == nil { |
257 » » return os.ErrorString("netchan import: hangup: no such channel:
" + name) | 257 » » return os.NewError("netchan import: hangup: no such channel: " +
name) |
258 } | 258 } |
259 imp.names[name] = nil, false | 259 imp.names[name] = nil, false |
260 imp.chans[nc.id] = nil, false | 260 imp.chans[nc.id] = nil, false |
261 nc.close() | 261 nc.close() |
262 return nil | 262 return nil |
263 } | 263 } |
264 | 264 |
265 func (imp *Importer) unackedCount() int64 { | 265 func (imp *Importer) unackedCount() int64 { |
266 imp.mu.Lock() | 266 imp.mu.Lock() |
267 n := imp.unacked | 267 n := imp.unacked |
268 imp.mu.Unlock() | 268 imp.mu.Unlock() |
269 return n | 269 return n |
270 } | 270 } |
271 | 271 |
272 // Drain waits until all messages sent from this exporter/importer, including | 272 // Drain waits until all messages sent from this exporter/importer, including |
273 // those not yet sent to any server and possibly including those sent while | 273 // those not yet sent to any server and possibly including those sent while |
274 // Drain was executing, have been received by the exporter. In short, it | 274 // Drain was executing, have been received by the exporter. In short, it |
275 // waits until all the importer's messages have been received. | 275 // waits until all the importer's messages have been received. |
276 // If the timeout (measured in nanoseconds) is positive and Drain takes | 276 // If the timeout (measured in nanoseconds) is positive and Drain takes |
277 // longer than that to complete, an error is returned. | 277 // longer than that to complete, an error is returned. |
278 func (imp *Importer) Drain(timeout int64) os.Error { | 278 func (imp *Importer) Drain(timeout int64) os.Error { |
279 startTime := time.Nanoseconds() | 279 startTime := time.Nanoseconds() |
280 for imp.unackedCount() > 0 { | 280 for imp.unackedCount() > 0 { |
281 if timeout > 0 && time.Nanoseconds()-startTime >= timeout { | 281 if timeout > 0 && time.Nanoseconds()-startTime >= timeout { |
282 » » » return os.ErrorString("timeout") | 282 » » » return os.NewError("timeout") |
283 } | 283 } |
284 time.Sleep(100 * 1e6) | 284 time.Sleep(100 * 1e6) |
285 } | 285 } |
286 return nil | 286 return nil |
287 } | 287 } |
LEFT | RIGHT |