LEFT | RIGHT |
(no file at all) | |
1 // Copyright 2014 The Go Authors. All rights reserved. | 1 // Copyright 2014 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package runtime | 5 package runtime |
6 | 6 |
| 7 import "unsafe" |
| 8 |
7 // This file contains the implementation of Go channels | 9 // This file contains the implementation of Go channels |
8 // and select statements. | 10 // and select statements. |
9 | 11 |
10 import "unsafe" | 12 // Outline of the synchronization protocols. |
| 13 // |
| 14 // Single channel sync send/recv first check the opposite wait queue (WaitQ). |
| 15 // If the queue is not empty, it dequeues a waiter (SudoG), copies the data |
| 16 // directly to/from the waiter buffer and wakes up the waiter. |
| 17 // If the queue is empty, it queues itself into wait queue and parks. When it is |
| 18 // woken up, the operation is already executed, so it just returns. |
| 19 // SudoG.signal is set to nil before parking and is left intact during wake up. |
| 20 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine acts accor
dingly. |
| 21 // |
| 22 // Single channel async send/recv first check whether the operation can be execu
ted |
| 23 // (buffer is not full/empty, respectively). If the operation can be executed, |
| 24 // it copies data to/from the buffer, adjusts indices and wakes up an opposite |
| 25 // waiter if there is any. If the operation cannot be executed at the moment, |
| 26 // it queues itself into wait queue and parks. When it is woken up, it retries |
| 27 // from the beginning. |
| 28 // SudoG.signal is set to nil before parking and is left intact during wake up. |
| 29 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine |
| 30 // accomplishes the operation w/o retry. |
| 31 // |
| 32 // Select operations proceed in 3 passes. In the pass 1 channels are checked one
-by-one; |
| 33 // if some channel is ready, the operation is executed (as outlined above for |
| 34 // single channel operations) and select returns. In the pass 2 rediness of the |
| 35 // channels is re-checked and select queues itself as waiter on all channels. |
| 36 // If no channel turned out to be ready in pass 2, select blocks. In the pass 3 |
| 37 // (after wake up) select dequeues itself from unsuccessful channels. If it is |
| 38 // woken up by an operation on a sync channel or close, select returns. Otherwis
e |
| 39 // it retries from the beginning. |
| 40 // SudoG.signal for all channels is setup to point to a single signal variable, |
| 41 // signal variable is set to PREWAIT before enqueue. The signal variable is oper
ated |
| 42 // with CAS and allows to resolve competition between concurrent wakers and the |
| 43 // select goroutine. When SudoG is removed from a wait queue, SudoG.signal is se
t to nil; |
| 44 // this allows select to skip dequeueing it again. Below is the signal state mac
hine: |
| 45 // |
| 46 // PREWAIT -> WAIT: select commits to park |
| 47 // |
| 48 // PREWAIT -> sg|CLOSED: sg chan is closed, select is not woken up |
| 49 // PREWAIT -> READY: an async chan is ready or select discovers a ready chan dur
ing pass 2, select is not woken up |
| 50 // PREWAIT -> SYNC: a sync chan is ready but the operation is not yet completed,
select is not woken up |
| 51 // |
| 52 // WAIT -> sg|CLOSED: the same as above, but select is woken up |
| 53 // WAIT -> READY: |
| 54 // WAIT -> SYNC: |
| 55 // |
| 56 // SYNC -> sg: operation on the sync chan is completed now |
| 57 const ( |
| 58 » sgPrewait = iota |
| 59 » sgClosed |
| 60 » sgWait |
| 61 » sgReady |
| 62 » sgSync |
| 63 ) |
11 | 64 |
12 const ( | 65 const ( |
13 maxAlign = 8 | 66 maxAlign = 8 |
14 hchanSize = unsafe.Sizeof(hchan{}) | 67 hchanSize = unsafe.Sizeof(hchan{}) |
15 debugChan = false | 68 debugChan = false |
16 ) | 69 ) |
17 | 70 |
18 // TODO: make hchan.buf an unsafe.Pointer, not a *uint8 | 71 // TODO: make hchan.buf an unsafe.Pointer, not a *uint8 |
19 | 72 |
20 func makechan(t *chantype, size int64) *hchan { | 73 func makechan(t *chantype, size int64) *hchan { |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
126 t0 = gocputicks() | 179 t0 = gocputicks() |
127 } | 180 } |
128 | 181 |
129 golock(&c.lock) | 182 golock(&c.lock) |
130 if c.closed != 0 { | 183 if c.closed != 0 { |
131 gounlock(&c.lock) | 184 gounlock(&c.lock) |
132 panic("send on closed channel") | 185 panic("send on closed channel") |
133 } | 186 } |
134 | 187 |
135 if c.dataqsiz == 0 { // synchronous channel | 188 if c.dataqsiz == 0 { // synchronous channel |
136 » » sg := c.recvq.dequeue() | 189 » » sg, wake := c.recvq.dequeue(sgSync) |
137 if sg != nil { // found a waiting receiver | 190 if sg != nil { // found a waiting receiver |
138 if raceenabled { | 191 if raceenabled { |
139 racesync(c, sg) | 192 racesync(c, sg) |
140 } | 193 } |
| 194 // Disable preemption. If we communicate with a select, |
| 195 // it can be still running. It such case completion of t
he operation |
| 196 // is the store to sg->signal below. We don't want this
goroutine |
| 197 // to be preempted until that store. Otherwise the selec
t would |
| 198 // need to spin in while(signal == SYNC) using a mix of
osyield and gosched. |
| 199 mp := acquirem() |
141 gounlock(&c.lock) | 200 gounlock(&c.lock) |
142 | 201 |
143 recvg := sg.g | |
144 recvg.param = unsafe.Pointer(sg) | |
145 if sg.elem != nil { | 202 if sg.elem != nil { |
146 memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.e
lemsize)) | 203 memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.e
lemsize)) |
147 } | 204 } |
148 » » » if sg.releasetime != 0 { | 205 » » » if signalp := sg.signal; signalp != nil { |
149 » » » » // Yes, this is ugly. On 64-bit sg.releasetime
has type | 206 » » » » sg.signal = nil |
150 » » » » // int. On 32-bit it has type int64. There's n
o easy way | 207 » » » » goatomicstorep(unsafe.Pointer(signalp), unsafe.P
ointer(sg)) |
151 » » » » // to assign to both types in Go. At some point
we'll | 208 » » » } |
152 » » » » // write the Go types directly instead of genera
ting them | 209 » » » if wake { |
153 » » » » // via the C types. At that point, this nastine
ss goes away. | 210 » » » » if sg.releasetime != 0 { |
154 » » » » *(*int64)(unsafe.Pointer(&sg.releasetime)) = goc
puticks() | 211 » » » » » // Yes, this is ugly. On 64-bit sg.rele
asetime has type |
155 » » » } | 212 » » » » » // int. On 32-bit it has type int64. T
here's no easy way |
156 » » » goready(recvg) | 213 » » » » » // to assign to both types in Go. At so
me point we'll |
| 214 » » » » » // write the Go types directly instead o
f generating them |
| 215 » » » » » // via the C types. At that point, this
nastiness goes away. |
| 216 » » » » » *(*int64)(unsafe.Pointer(&sg.releasetime
)) = gocputicks() |
| 217 » » » » } |
| 218 » » » » goready(sg.g) |
| 219 » » » } |
| 220 » » » releasem(mp) |
157 return true | 221 return true |
158 } | 222 } |
159 | 223 |
160 if !block { | 224 if !block { |
161 gounlock(&c.lock) | 225 gounlock(&c.lock) |
162 return false | 226 return false |
163 } | 227 } |
164 | 228 |
165 // no receiver available: block on this channel. | 229 // no receiver available: block on this channel. |
166 gp := getg() | 230 gp := getg() |
167 mysg := acquireSudog() | 231 mysg := acquireSudog() |
168 if t0 != 0 { | 232 if t0 != 0 { |
169 mysg.releasetime = -1 | 233 mysg.releasetime = -1 |
170 } | 234 } |
171 mysg.elem = (*uint8)(ep) | 235 mysg.elem = (*uint8)(ep) |
172 mysg.waitlink = nil | 236 mysg.waitlink = nil |
173 gp.waiting = mysg | 237 gp.waiting = mysg |
174 mysg.g = gp | 238 mysg.g = gp |
175 » » mysg.selectdone = nil | 239 » » mysg.signal = nil |
176 gp.param = nil | 240 gp.param = nil |
177 c.sendq.enqueue(mysg) | 241 c.sendq.enqueue(mysg) |
178 goparkunlock(&c.lock, "chan send") | 242 goparkunlock(&c.lock, "chan send") |
179 | 243 |
180 // someone woke us up. | 244 // someone woke us up. |
181 » » if gp.param == nil { | 245 » » if mysg.signal != nil { |
182 if c.closed == 0 { | 246 if c.closed == 0 { |
183 gothrow("chansend: spurious wakeup") | 247 gothrow("chansend: spurious wakeup") |
184 } | 248 } |
185 panic("send on closed channel") | 249 panic("send on closed channel") |
186 } | 250 } |
187 if mysg.releasetime > 0 { | 251 if mysg.releasetime > 0 { |
188 goblockevent(int64(mysg.releasetime)-t0, 3) | 252 goblockevent(int64(mysg.releasetime)-t0, 3) |
189 } | 253 } |
190 if mysg != gp.waiting { | 254 if mysg != gp.waiting { |
191 gothrow("G waiting list is corrupted!") | 255 gothrow("G waiting list is corrupted!") |
(...skipping 11 matching lines...) Expand all Loading... |
203 gounlock(&c.lock) | 267 gounlock(&c.lock) |
204 return false | 268 return false |
205 } | 269 } |
206 gp := getg() | 270 gp := getg() |
207 mysg := acquireSudog() | 271 mysg := acquireSudog() |
208 if t0 != 0 { | 272 if t0 != 0 { |
209 mysg.releasetime = -1 | 273 mysg.releasetime = -1 |
210 } | 274 } |
211 mysg.g = gp | 275 mysg.g = gp |
212 mysg.elem = nil | 276 mysg.elem = nil |
213 » » mysg.selectdone = nil | 277 » » mysg.signal = nil |
214 c.sendq.enqueue(mysg) | 278 c.sendq.enqueue(mysg) |
215 goparkunlock(&c.lock, "chan send") | 279 goparkunlock(&c.lock, "chan send") |
216 | 280 |
217 // someone woke us up - try again | 281 // someone woke us up - try again |
218 if mysg.releasetime != 0 { | 282 if mysg.releasetime != 0 { |
219 t1 = int64(mysg.releasetime) | 283 t1 = int64(mysg.releasetime) |
220 } | 284 } |
221 releaseSudog(mysg) | 285 releaseSudog(mysg) |
222 golock(&c.lock) | 286 golock(&c.lock) |
223 if c.closed != 0 { | 287 if c.closed != 0 { |
224 gounlock(&c.lock) | 288 gounlock(&c.lock) |
225 panic("send on closed channel") | 289 panic("send on closed channel") |
226 } | 290 } |
227 } | 291 } |
228 | 292 |
229 // write our data into the channel buffer | 293 // write our data into the channel buffer |
230 if raceenabled { | 294 if raceenabled { |
231 raceacquire(chanbuf(c, c.sendx)) | 295 raceacquire(chanbuf(c, c.sendx)) |
232 racerelease(chanbuf(c, c.sendx)) | 296 racerelease(chanbuf(c, c.sendx)) |
233 } | 297 } |
234 memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize)) | 298 memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize)) |
235 c.sendx++ | 299 c.sendx++ |
236 if c.sendx == c.dataqsiz { | 300 if c.sendx == c.dataqsiz { |
237 c.sendx = 0 | 301 c.sendx = 0 |
238 } | 302 } |
239 c.qcount++ | 303 c.qcount++ |
240 | 304 |
241 // wake up a waiting receiver | 305 // wake up a waiting receiver |
242 » sg := c.recvq.dequeue() | 306 » sg, wake := c.recvq.dequeue(sgReady) |
243 » if sg != nil { | 307 » gounlock(&c.lock) |
244 » » recvg := sg.g | 308 » if wake { |
245 » » gounlock(&c.lock) | |
246 if sg.releasetime != 0 { | 309 if sg.releasetime != 0 { |
247 *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks(
) | 310 *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks(
) |
248 } | 311 } |
249 » » goready(recvg) | 312 » » goready(sg.g) |
250 » } else { | |
251 » » gounlock(&c.lock) | |
252 } | 313 } |
253 if t1 > 0 { | 314 if t1 > 0 { |
254 goblockevent(t1-t0, 3) | 315 goblockevent(t1-t0, 3) |
255 } | 316 } |
256 return true | 317 return true |
257 } | 318 } |
258 | 319 |
259 func (q *waitq) enqueue(sgp *sudog) { | 320 func (q *waitq) enqueue(sgp *sudog) { |
260 sgp.link = nil | 321 sgp.link = nil |
261 if q.first == nil { | 322 if q.first == nil { |
262 q.first = sgp | 323 q.first = sgp |
263 q.last = sgp | 324 q.last = sgp |
264 return | 325 return |
265 } | 326 } |
266 q.last.link = sgp | 327 q.last.link = sgp |
267 q.last = sgp | 328 q.last = sgp |
268 } | 329 } |
269 | 330 |
270 func (q *waitq) dequeue() *sudog { | 331 func (q *waitq) dequeue(what int) (*sudog, bool) { |
| 332 loop: |
271 for { | 333 for { |
272 sgp := q.first | 334 sgp := q.first |
273 if sgp == nil { | 335 if sgp == nil { |
274 » » » return nil | 336 » » » return nil, false |
275 } | 337 } |
276 q.first = sgp.link | 338 q.first = sgp.link |
277 if q.last == sgp { | 339 if q.last == sgp { |
278 q.last = nil | 340 q.last = nil |
279 } | 341 } |
280 | 342 |
281 » » // if sgp participates in a select and is already signaled, igno
re it | 343 » » if sgp.signal == nil { |
282 » » if sgp.selectdone != nil { | 344 » » » // single chan op |
283 » » » // claim the right to signal | 345 » » » if what == sgClosed { |
284 » » » if *sgp.selectdone != 0 || !gocas(sgp.selectdone, 0, 1)
{ | 346 » » » » sgp.signal = (**sudog)(unsafe.Pointer(uintptr(sg
Closed))) |
285 » » » » continue | 347 » » » } |
286 » » » } | 348 » » » return sgp, true |
287 » » } | 349 » » } |
288 | 350 |
289 » » return sgp | 351 » » // select |
290 » } | 352 » » if sgp.g == getg() { |
| 353 » » » // self select, not interesting |
| 354 » » » sgp.signal = nil |
| 355 » » » continue |
| 356 » » } |
| 357 » » for { |
| 358 » » » old := *sgp.signal |
| 359 » » » if old != sgState(sgPrewait) && old != sgState(sgWait) { |
| 360 » » » » // already signaled on a different chan |
| 361 » » » » sgp.signal = nil |
| 362 » » » » continue loop |
| 363 » » » } |
| 364 » » » new := sgState(what) |
| 365 » » » if what == sgClosed { |
| 366 » » » » new = (*sudog)(unsafe.Pointer(uintptr(unsafe.Poi
nter(sgp)) | uintptr(sgClosed))) |
| 367 » » » } |
| 368 » » » // claim the right to signal this select |
| 369 » » » if gocasp(unsafe.Pointer(sgp.signal), unsafe.Pointer(old
), unsafe.Pointer(new)) { |
| 370 » » » » if what != sgSync { |
| 371 » » » » » sgp.signal = nil |
| 372 » » » » } |
| 373 » » » » return sgp, old == sgState(sgWait) // do not wak
e if it was not waiting |
| 374 » » » } |
| 375 » » } |
| 376 » } |
| 377 } |
| 378 |
| 379 func sgState(x int) *sudog { |
| 380 » return (*sudog)(unsafe.Pointer(uintptr(x))) |
291 } | 381 } |
292 | 382 |
293 func racesync(c *hchan, sg *sudog) { | 383 func racesync(c *hchan, sg *sudog) { |
294 racerelease(chanbuf(c, 0)) | 384 racerelease(chanbuf(c, 0)) |
295 raceacquireg(sg.g, chanbuf(c, 0)) | 385 raceacquireg(sg.g, chanbuf(c, 0)) |
296 racereleaseg(sg.g, chanbuf(c, 0)) | 386 racereleaseg(sg.g, chanbuf(c, 0)) |
297 raceacquire(chanbuf(c, 0)) | 387 raceacquire(chanbuf(c, 0)) |
298 } | 388 } |
LEFT | RIGHT |