OLD | NEW |
1 // Copyright 2010 The Go Authors. All rights reserved. | 1 // Copyright 2010 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package netchan | 5 package netchan |
6 | 6 |
7 import ( | 7 import ( |
8 "encoding/gob" | 8 "encoding/gob" |
9 "errors" | 9 "errors" |
10 "io" | 10 "io" |
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
158 time.Sleep(100 * time.Millisecond) | 158 time.Sleep(100 * time.Millisecond) |
159 } | 159 } |
160 return nil | 160 return nil |
161 } | 161 } |
162 | 162 |
163 // See the comment for Exporter.Sync. | 163 // See the comment for Exporter.Sync. |
164 func (cs *clientSet) sync(timeout time.Duration) error { | 164 func (cs *clientSet) sync(timeout time.Duration) error { |
165 deadline := time.Now().Add(timeout) | 165 deadline := time.Now().Add(timeout) |
166 // seq remembers the clients and their seqNum at point of entry. | 166 // seq remembers the clients and their seqNum at point of entry. |
167 seq := make(map[unackedCounter]int64) | 167 seq := make(map[unackedCounter]int64) |
| 168 cs.mu.Lock() |
168 for client := range cs.clients { | 169 for client := range cs.clients { |
169 seq[client] = client.seq() | 170 seq[client] = client.seq() |
170 } | 171 } |
| 172 cs.mu.Unlock() |
171 for { | 173 for { |
172 pending := false | 174 pending := false |
173 cs.mu.Lock() | 175 cs.mu.Lock() |
174 // Any unacknowledged messages? Look only at clients that exist
ed | 176 // Any unacknowledged messages? Look only at clients that exist
ed |
175 // when we started and are still in this client set. | 177 // when we started and are still in this client set. |
176 for client := range seq { | 178 for client := range seq { |
177 if _, ok := cs.clients[client]; ok { | 179 if _, ok := cs.clients[client]; ok { |
178 if client.ack() < seq[client] { | 180 if client.ack() < seq[client] { |
179 pending = true | 181 pending = true |
180 break | 182 break |
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
327 panic("recv on wrong direction of channel") | 329 panic("recv on wrong direction of channel") |
328 } | 330 } |
329 select { | 331 select { |
330 case nch.ackCh <- true: | 332 case nch.ackCh <- true: |
331 // ok | 333 // ok |
332 default: | 334 default: |
333 // TODO: should this be more resilient? | 335 // TODO: should this be more resilient? |
334 panic("netchan: remote receiver sent too many acks") | 336 panic("netchan: remote receiver sent too many acks") |
335 } | 337 } |
336 } | 338 } |
OLD | NEW |