Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1245)

Delta Between Two Patch Sets: src/pkg/runtime/chan.go

Issue 112990043: code review 112990043: runtime: fine-grained locking in select
Left Patch Set: diff -r c0a68bcf19ae https://dvyukov%40google.com@code.google.com/p/go/ Created 9 years, 8 months ago
Right Patch Set: diff -r 03b003455359b09fff0f1662255dc5fe10b93290 https://dvyukov%40google.com@code.google.com/p/go/ Created 9 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « src/pkg/runtime/chan.h ('k') | src/pkg/runtime/chan.goc » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 // Copyright 2014 The Go Authors. All rights reserved. 1 // Copyright 2014 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 package runtime 5 package runtime
6 6
7 import "unsafe"
8
7 // This file contains the implementation of Go channels 9 // This file contains the implementation of Go channels
8 // and select statements. 10 // and select statements.
9 11
10 import "unsafe" 12 // Outline of the synchronization protocols.
13 //
14 // Single channel sync send/recv first check the opposite wait queue (WaitQ).
15 // If the queue is not empty, it dequeues a waiter (SudoG), copies the data
16 // directly to/from the waiter buffer and wakes up the waiter.
17 // If the queue is empty, it queues itself into wait queue and parks. When it is
18 // woken up, the operation is already executed, so it just returns.
19 // SudoG.signal is set to nil before parking and is left intact during wake up.
20 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine acts accor dingly.
21 //
22 // Single channel async send/recv first check whether the operation can be execu ted
23 // (buffer is not full/empty, respectively). If the operation can be executed,
24 // it copies data to/from the buffer, adjusts indices and wakes up an opposite
25 // waiter if there is any. If the operation cannot be executed at the moment,
26 // it queues itself into wait queue and parks. When it is woken up, it retries
27 // from the beginning.
28 // SudoG.signal is set to nil before parking and is left intact during wake up.
29 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine
30 // accomplishes the operation w/o retry.
31 //
32 // Select operations proceed in 3 passes. In the pass 1 channels are checked one -by-one;
33 // if some channel is ready, the operation is executed (as outlined above for
34 // single channel operations) and select returns. In the pass 2 rediness of the
35 // channels is re-checked and select queues itself as waiter on all channels.
36 // If no channel turned out to be ready in pass 2, select blocks. In the pass 3
37 // (after wake up) select dequeues itself from unsuccessful channels. If it is
38 // woken up by an operation on a sync channel or close, select returns. Otherwis e
39 // it retries from the beginning.
40 // SudoG.signal for all channels is setup to point to a single signal variable,
41 // signal variable is set to PREWAIT before enqueue. The signal variable is oper ated
42 // with CAS and allows to resolve competition between concurrent wakers and the
43 // select goroutine. When SudoG is removed from a wait queue, SudoG.signal is se t to nil;
44 // this allows select to skip dequeueing it again. Below is the signal state mac hine:
45 //
46 // PREWAIT -> WAIT: select commits to park
47 //
48 // PREWAIT -> sg|CLOSED: sg chan is closed, select is not woken up
49 // PREWAIT -> READY: an async chan is ready or select discovers a ready chan dur ing pass 2, select is not woken up
50 // PREWAIT -> SYNC: a sync chan is ready but the operation is not yet completed, select is not woken up
51 //
52 // WAIT -> sg|CLOSED: the same as above, but select is woken up
53 // WAIT -> READY:
54 // WAIT -> SYNC:
55 //
56 // SYNC -> sg: operation on the sync chan is completed now
57 const (
58 » sgPrewait = iota
59 » sgClosed
60 » sgWait
61 » sgReady
62 » sgSync
63 )
11 64
12 const ( 65 const (
13 maxAlign = 8 66 maxAlign = 8
14 hchanSize = unsafe.Sizeof(hchan{}) 67 hchanSize = unsafe.Sizeof(hchan{})
15 debugChan = false 68 debugChan = false
16 ) 69 )
17 70
18 // TODO: make hchan.buf an unsafe.Pointer, not a *uint8 71 // TODO: make hchan.buf an unsafe.Pointer, not a *uint8
19 72
20 func makechan(t *chantype, size int64) *hchan { 73 func makechan(t *chantype, size int64) *hchan {
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
126 t0 = gocputicks() 179 t0 = gocputicks()
127 } 180 }
128 181
129 golock(&c.lock) 182 golock(&c.lock)
130 if c.closed != 0 { 183 if c.closed != 0 {
131 gounlock(&c.lock) 184 gounlock(&c.lock)
132 panic("send on closed channel") 185 panic("send on closed channel")
133 } 186 }
134 187
135 if c.dataqsiz == 0 { // synchronous channel 188 if c.dataqsiz == 0 { // synchronous channel
136 » » sg := c.recvq.dequeue() 189 » » sg, wake := c.recvq.dequeue(sgSync)
137 if sg != nil { // found a waiting receiver 190 if sg != nil { // found a waiting receiver
138 if raceenabled { 191 if raceenabled {
139 racesync(c, sg) 192 racesync(c, sg)
140 } 193 }
194 // Disable preemption. If we communicate with a select,
195 // it can be still running. It such case completion of t he operation
196 // is the store to sg->signal below. We don't want this goroutine
197 // to be preempted until that store. Otherwise the selec t would
198 // need to spin in while(signal == SYNC) using a mix of osyield and gosched.
199 mp := acquirem()
141 gounlock(&c.lock) 200 gounlock(&c.lock)
142 201
143 recvg := sg.g
144 recvg.param = unsafe.Pointer(sg)
145 if sg.elem != nil { 202 if sg.elem != nil {
146 memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.e lemsize)) 203 memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.e lemsize))
147 } 204 }
148 » » » if sg.releasetime != 0 { 205 » » » if signalp := sg.signal; signalp != nil {
149 » » » » // Yes, this is ugly. On 64-bit sg.releasetime has type 206 » » » » sg.signal = nil
150 » » » » // int. On 32-bit it has type int64. There's n o easy way 207 » » » » goatomicstorep(unsafe.Pointer(signalp), unsafe.P ointer(sg))
151 » » » » // to assign to both types in Go. At some point we'll 208 » » » }
152 » » » » // write the Go types directly instead of genera ting them 209 » » » if wake {
153 » » » » // via the C types. At that point, this nastine ss goes away. 210 » » » » if sg.releasetime != 0 {
154 » » » » *(*int64)(unsafe.Pointer(&sg.releasetime)) = goc puticks() 211 » » » » » // Yes, this is ugly. On 64-bit sg.rele asetime has type
155 » » » } 212 » » » » » // int. On 32-bit it has type int64. T here's no easy way
156 » » » goready(recvg) 213 » » » » » // to assign to both types in Go. At so me point we'll
214 » » » » » // write the Go types directly instead o f generating them
215 » » » » » // via the C types. At that point, this nastiness goes away.
216 » » » » » *(*int64)(unsafe.Pointer(&sg.releasetime )) = gocputicks()
217 » » » » }
218 » » » » goready(sg.g)
219 » » » }
220 » » » releasem(mp)
157 return true 221 return true
158 } 222 }
159 223
160 if !block { 224 if !block {
161 gounlock(&c.lock) 225 gounlock(&c.lock)
162 return false 226 return false
163 } 227 }
164 228
165 // no receiver available: block on this channel. 229 // no receiver available: block on this channel.
166 gp := getg() 230 gp := getg()
167 mysg := acquireSudog() 231 mysg := acquireSudog()
168 if t0 != 0 { 232 if t0 != 0 {
169 mysg.releasetime = -1 233 mysg.releasetime = -1
170 } 234 }
171 mysg.elem = (*uint8)(ep) 235 mysg.elem = (*uint8)(ep)
172 mysg.waitlink = nil 236 mysg.waitlink = nil
173 gp.waiting = mysg 237 gp.waiting = mysg
174 mysg.g = gp 238 mysg.g = gp
175 » » mysg.selectdone = nil 239 » » mysg.signal = nil
176 gp.param = nil 240 gp.param = nil
177 c.sendq.enqueue(mysg) 241 c.sendq.enqueue(mysg)
178 goparkunlock(&c.lock, "chan send") 242 goparkunlock(&c.lock, "chan send")
179 243
180 // someone woke us up. 244 // someone woke us up.
181 » » if gp.param == nil { 245 » » if mysg.signal != nil {
182 if c.closed == 0 { 246 if c.closed == 0 {
183 gothrow("chansend: spurious wakeup") 247 gothrow("chansend: spurious wakeup")
184 } 248 }
185 panic("send on closed channel") 249 panic("send on closed channel")
186 } 250 }
187 if mysg.releasetime > 0 { 251 if mysg.releasetime > 0 {
188 goblockevent(int64(mysg.releasetime)-t0, 3) 252 goblockevent(int64(mysg.releasetime)-t0, 3)
189 } 253 }
190 if mysg != gp.waiting { 254 if mysg != gp.waiting {
191 gothrow("G waiting list is corrupted!") 255 gothrow("G waiting list is corrupted!")
(...skipping 11 matching lines...) Expand all
203 gounlock(&c.lock) 267 gounlock(&c.lock)
204 return false 268 return false
205 } 269 }
206 gp := getg() 270 gp := getg()
207 mysg := acquireSudog() 271 mysg := acquireSudog()
208 if t0 != 0 { 272 if t0 != 0 {
209 mysg.releasetime = -1 273 mysg.releasetime = -1
210 } 274 }
211 mysg.g = gp 275 mysg.g = gp
212 mysg.elem = nil 276 mysg.elem = nil
213 » » mysg.selectdone = nil 277 » » mysg.signal = nil
214 c.sendq.enqueue(mysg) 278 c.sendq.enqueue(mysg)
215 goparkunlock(&c.lock, "chan send") 279 goparkunlock(&c.lock, "chan send")
216 280
217 // someone woke us up - try again 281 // someone woke us up - try again
218 if mysg.releasetime != 0 { 282 if mysg.releasetime != 0 {
219 t1 = int64(mysg.releasetime) 283 t1 = int64(mysg.releasetime)
220 } 284 }
221 releaseSudog(mysg) 285 releaseSudog(mysg)
222 golock(&c.lock) 286 golock(&c.lock)
223 if c.closed != 0 { 287 if c.closed != 0 {
224 gounlock(&c.lock) 288 gounlock(&c.lock)
225 panic("send on closed channel") 289 panic("send on closed channel")
226 } 290 }
227 } 291 }
228 292
229 // write our data into the channel buffer 293 // write our data into the channel buffer
230 if raceenabled { 294 if raceenabled {
231 raceacquire(chanbuf(c, c.sendx)) 295 raceacquire(chanbuf(c, c.sendx))
232 racerelease(chanbuf(c, c.sendx)) 296 racerelease(chanbuf(c, c.sendx))
233 } 297 }
234 memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize)) 298 memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize))
235 c.sendx++ 299 c.sendx++
236 if c.sendx == c.dataqsiz { 300 if c.sendx == c.dataqsiz {
237 c.sendx = 0 301 c.sendx = 0
238 } 302 }
239 c.qcount++ 303 c.qcount++
240 304
241 // wake up a waiting receiver 305 // wake up a waiting receiver
242 » sg := c.recvq.dequeue() 306 » sg, wake := c.recvq.dequeue(sgReady)
243 » if sg != nil { 307 » gounlock(&c.lock)
244 » » recvg := sg.g 308 » if wake {
245 » » gounlock(&c.lock)
246 if sg.releasetime != 0 { 309 if sg.releasetime != 0 {
247 *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks( ) 310 *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks( )
248 } 311 }
249 » » goready(recvg) 312 » » goready(sg.g)
250 » } else {
251 » » gounlock(&c.lock)
252 } 313 }
253 if t1 > 0 { 314 if t1 > 0 {
254 goblockevent(t1-t0, 3) 315 goblockevent(t1-t0, 3)
255 } 316 }
256 return true 317 return true
257 } 318 }
258 319
259 func (q *waitq) enqueue(sgp *sudog) { 320 func (q *waitq) enqueue(sgp *sudog) {
260 sgp.link = nil 321 sgp.link = nil
261 if q.first == nil { 322 if q.first == nil {
262 q.first = sgp 323 q.first = sgp
263 q.last = sgp 324 q.last = sgp
264 return 325 return
265 } 326 }
266 q.last.link = sgp 327 q.last.link = sgp
267 q.last = sgp 328 q.last = sgp
268 } 329 }
269 330
270 func (q *waitq) dequeue() *sudog { 331 func (q *waitq) dequeue(what int) (*sudog, bool) {
332 loop:
271 for { 333 for {
272 sgp := q.first 334 sgp := q.first
273 if sgp == nil { 335 if sgp == nil {
274 » » » return nil 336 » » » return nil, false
275 } 337 }
276 q.first = sgp.link 338 q.first = sgp.link
277 if q.last == sgp { 339 if q.last == sgp {
278 q.last = nil 340 q.last = nil
279 } 341 }
280 342
281 » » // if sgp participates in a select and is already signaled, igno re it 343 » » if sgp.signal == nil {
282 » » if sgp.selectdone != nil { 344 » » » // single chan op
283 » » » // claim the right to signal 345 » » » if what == sgClosed {
284 » » » if *sgp.selectdone != 0 || !gocas(sgp.selectdone, 0, 1) { 346 » » » » sgp.signal = (**sudog)(unsafe.Pointer(uintptr(sg Closed)))
285 » » » » continue 347 » » » }
286 » » » } 348 » » » return sgp, true
287 » » } 349 » » }
288 350
289 » » return sgp 351 » » // select
290 » } 352 » » if sgp.g == getg() {
353 » » » // self select, not interesting
354 » » » sgp.signal = nil
355 » » » continue
356 » » }
357 » » for {
358 » » » old := *sgp.signal
359 » » » if old != sgState(sgPrewait) && old != sgState(sgWait) {
360 » » » » // already signaled on a different chan
361 » » » » sgp.signal = nil
362 » » » » continue loop
363 » » » }
364 » » » new := sgState(what)
365 » » » if what == sgClosed {
366 » » » » new = (*sudog)(unsafe.Pointer(uintptr(unsafe.Poi nter(sgp)) | uintptr(sgClosed)))
367 » » » }
368 » » » // claim the right to signal this select
369 » » » if gocasp(unsafe.Pointer(sgp.signal), unsafe.Pointer(old ), unsafe.Pointer(new)) {
370 » » » » if what != sgSync {
371 » » » » » sgp.signal = nil
372 » » » » }
373 » » » » return sgp, old == sgState(sgWait) // do not wak e if it was not waiting
374 » » » }
375 » » }
376 » }
377 }
378
379 func sgState(x int) *sudog {
380 » return (*sudog)(unsafe.Pointer(uintptr(x)))
291 } 381 }
292 382
293 func racesync(c *hchan, sg *sudog) { 383 func racesync(c *hchan, sg *sudog) {
294 racerelease(chanbuf(c, 0)) 384 racerelease(chanbuf(c, 0))
295 raceacquireg(sg.g, chanbuf(c, 0)) 385 raceacquireg(sg.g, chanbuf(c, 0))
296 racereleaseg(sg.g, chanbuf(c, 0)) 386 racereleaseg(sg.g, chanbuf(c, 0))
297 raceacquire(chanbuf(c, 0)) 387 raceacquire(chanbuf(c, 0))
298 } 388 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b