Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1)

Unified Diff: src/pkg/netchan/common.go

Issue 2137041: code review 2137041: netchan: use acknowledgements on export send. (Closed)
Patch Set: code review 2137041: netchan: use acknowledgements on export send. Created 14 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | src/pkg/netchan/export.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: src/pkg/netchan/common.go
===================================================================
--- a/src/pkg/netchan/common.go
+++ b/src/pkg/netchan/common.go
@@ -10,6 +10,7 @@
"os"
"reflect"
"sync"
+ "time"
)
// The direction of a connection from the client's perspective.
@@ -25,6 +26,7 @@
payRequest = iota // request structure follows
payError // error structure follows
payData // user payload follows
+ payAck // acknowledgement; no payload
)
// A header is sent as a prefix to every transmission. It will be followed by
@@ -32,13 +34,14 @@
type header struct {
name string
payloadType int
+ seqNum int64
}
// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages. If count is zero, it means unlimited.
type request struct {
- count int
+ count int64
dir Dir
}
@@ -47,6 +50,27 @@
error string
}
+// Used to unify management of acknowledgements for import and export.
+type unackedCounter interface {
+ unackedCount() int64
+ ack() int64
+ seq() int64
+}
+
+// A channel and its direction.
+type chanDir struct {
+ ch *reflect.ChanValue
+ dir Dir
+}
+
+// clientSet contains the objects and methods needed for tracking
+// clients of an exporter and draining outstanding messages.
+type clientSet struct {
+ mu sync.Mutex // protects access to channel and client maps
+ chans map[string]*chanDir
+ clients map[unackedCounter]bool
+}
+
// Mutex-protected encoder and decoder pair.
type encDec struct {
decLock sync.Mutex
@@ -79,10 +103,78 @@
hdr.payloadType = payloadType
err := ed.enc.Encode(hdr)
if err == nil {
- err = ed.enc.Encode(payload)
- } else {
+ if payload != nil {
+ err = ed.enc.Encode(payload)
+ }
+ }
+ if err != nil {
// TODO: tear down connection if there is an error?
}
ed.encLock.Unlock()
return err
}
+
+// See the comment for Exporter.Drain.
+func (cs *clientSet) drain(timeout int64) os.Error {
+ startTime := time.Nanoseconds()
+ for {
+ pending := false
+ cs.mu.Lock()
+ // Any messages waiting for a client?
+ for _, chDir := range cs.chans {
+ if chDir.ch.Len() > 0 {
+ pending = true
+ }
+ }
+ // Any unacknowledged messages?
+ for client := range cs.clients {
+ n := client.unackedCount()
+ if n > 0 { // Check for > rather than != just to be safe.
+ pending = true
+ break
+ }
+ }
+ cs.mu.Unlock()
+ if !pending {
+ break
+ }
+ if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
+ return os.ErrorString("timeout")
+ }
+ time.Sleep(100 * 1e6) // 100 milliseconds
+ }
+ return nil
+}
+
+// See the comment for Exporter.Sync.
+func (cs *clientSet) sync(timeout int64) os.Error {
+ startTime := time.Nanoseconds()
+ // seq remembers the clients and their seqNum at point of entry.
+ seq := make(map[unackedCounter]int64)
+ for client := range cs.clients {
+ seq[client] = client.seq()
+ }
+ for {
+ pending := false
+ cs.mu.Lock()
+ // Any unacknowledged messages? Look only at clients that existed
+ // when we started and are still in this client set.
+ for client := range seq {
+ if _, ok := cs.clients[client]; ok {
+ if client.ack() < seq[client] {
+ pending = true
+ break
+ }
+ }
+ }
+ cs.mu.Unlock()
+ if !pending {
+ break
+ }
+ if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
+ return os.ErrorString("timeout")
+ }
+ time.Sleep(100 * 1e6) // 100 milliseconds
+ }
+ return nil
+}
« no previous file with comments | « no previous file | src/pkg/netchan/export.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b