Index: src/pkg/netchan/common.go |
=================================================================== |
--- a/src/pkg/netchan/common.go |
+++ b/src/pkg/netchan/common.go |
@@ -10,6 +10,7 @@ |
"os" |
"reflect" |
"sync" |
+ "time" |
) |
// The direction of a connection from the client's perspective. |
@@ -25,6 +26,7 @@ |
payRequest = iota // request structure follows |
payError // error structure follows |
payData // user payload follows |
+ payAck // acknowledgement; no payload |
) |
// A header is sent as a prefix to every transmission. It will be followed by |
@@ -32,13 +34,14 @@ |
type header struct { |
name string |
payloadType int |
+ seqNum int64 |
} |
// Sent with a header once per channel from importer to exporter to report |
// that it wants to bind to a channel with the specified direction for count |
// messages. If count is zero, it means unlimited. |
type request struct { |
- count int |
+ count int64 |
dir Dir |
} |
@@ -47,6 +50,27 @@ |
error string |
} |
+// Used to unify management of acknowledgements for import and export. |
+type unackedCounter interface { |
+ unackedCount() int64 |
+ ack() int64 |
+ seq() int64 |
+} |
+ |
+// A channel and its direction. |
+type chanDir struct { |
+ ch *reflect.ChanValue |
+ dir Dir |
+} |
+ |
+// clientSet contains the objects and methods needed for tracking |
+// clients of an exporter and draining outstanding messages. |
+type clientSet struct { |
+ mu sync.Mutex // protects access to channel and client maps |
+ chans map[string]*chanDir |
+ clients map[unackedCounter]bool |
+} |
+ |
// Mutex-protected encoder and decoder pair. |
type encDec struct { |
decLock sync.Mutex |
@@ -79,10 +103,78 @@ |
hdr.payloadType = payloadType |
err := ed.enc.Encode(hdr) |
if err == nil { |
- err = ed.enc.Encode(payload) |
- } else { |
+ if payload != nil { |
+ err = ed.enc.Encode(payload) |
+ } |
+ } |
+ if err != nil { |
// TODO: tear down connection if there is an error? |
} |
ed.encLock.Unlock() |
return err |
} |
+ |
+// See the comment for Exporter.Drain. |
+func (cs *clientSet) drain(timeout int64) os.Error { |
+ startTime := time.Nanoseconds() |
+ for { |
+ pending := false |
+ cs.mu.Lock() |
+ // Any messages waiting for a client? |
+ for _, chDir := range cs.chans { |
+ if chDir.ch.Len() > 0 { |
+ pending = true |
+ } |
+ } |
+ // Any unacknowledged messages? |
+ for client := range cs.clients { |
+ n := client.unackedCount() |
+ if n > 0 { // Check for > rather than != just to be safe. |
+ pending = true |
+ break |
+ } |
+ } |
+ cs.mu.Unlock() |
+ if !pending { |
+ break |
+ } |
+ if timeout > 0 && time.Nanoseconds()-startTime >= timeout { |
+ return os.ErrorString("timeout") |
+ } |
+ time.Sleep(100 * 1e6) // 100 milliseconds |
+ } |
+ return nil |
+} |
+ |
+// See the comment for Exporter.Sync. |
+func (cs *clientSet) sync(timeout int64) os.Error { |
+ startTime := time.Nanoseconds() |
+ // seq remembers the clients and their seqNum at point of entry. |
+ seq := make(map[unackedCounter]int64) |
+ for client := range cs.clients { |
+ seq[client] = client.seq() |
+ } |
+ for { |
+ pending := false |
+ cs.mu.Lock() |
+ // Any unacknowledged messages? Look only at clients that existed |
+ // when we started and are still in this client set. |
+ for client := range seq { |
+ if _, ok := cs.clients[client]; ok { |
+ if client.ack() < seq[client] { |
+ pending = true |
+ break |
+ } |
+ } |
+ } |
+ cs.mu.Unlock() |
+ if !pending { |
+ break |
+ } |
+ if timeout > 0 && time.Nanoseconds()-startTime >= timeout { |
+ return os.ErrorString("timeout") |
+ } |
+ time.Sleep(100 * 1e6) // 100 milliseconds |
+ } |
+ return nil |
+} |