LEFT | RIGHT |
1 // Copyright 2009 The Go Authors. All rights reserved. | 1 // Copyright 2009 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package runtime | 5 package runtime |
6 #include "runtime.h" | 6 #include "runtime.h" |
7 #include "arch_GOARCH.h" | 7 #include "arch_GOARCH.h" |
8 #include "type.h" | 8 #include "type.h" |
9 #include "race.h" | 9 #include "race.h" |
10 #include "malloc.h" | 10 #include "malloc.h" |
11 #include "chan.h" | 11 #include "chan.h" |
| 12 #include "mgc0.h" |
| 13 #include "typekind.h" |
12 #include "../../cmd/ld/textflag.h" | 14 #include "../../cmd/ld/textflag.h" |
13 | |
14 uint32 runtime·Hchansize = sizeof(Hchan); | |
15 | |
16 // Outline of the synchronization protocols. | |
17 // | |
18 // Single channel sync send/recv first check the opposite wait queue (WaitQ). | |
19 // If the queue is not empty, it dequeues a waiter (SudoG), copies the data | |
20 // directly to/from the waiter buffer and wakes up the waiter. | |
21 // If the queue is empty, it queues itself into wait queue and parks. When it is | |
22 // woken up, the operation is already executed, so it just returns. | |
23 // SudoG.signal is set to nil before parking and is left intact during wake up. | |
24 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine acts accor
dingly. | |
25 // | |
26 // Signle channel async send/recv first check whether the operation can be execu
ted | |
27 // (buffer is not full/empty, respectively). If the operation can be executed, | |
28 // it copies data to/from the buffer, adjusts indices and wakes up an opposite | |
29 // waiter if there is any. If the operation cannot be executed at the moment, | |
30 // it queues itself into wait queue and parks. When it is woken up, it retries | |
31 // from the beginning. | |
32 // SudoG.signal is set to nil before parking and is left intact during wake up. | |
33 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine | |
34 // accomplishes the operation w/o retry. | |
35 // | |
36 // Select operations proceed in 3 passes. In the pass 1 channels are checked one
-by-one; | |
37 // if some channel is ready, the operation is executed (as outlined above for | |
38 // single channel operations) and select returns. In the pass 2 rediness of the | |
39 // channels is re-checked and select queues itself as waiter on all channels. | |
40 // If no channel turned out to be ready in pass 2, select blocks. In the pass 3 | |
41 // (after wake up) select dequeues itself from unsuccessful channels. If it is | |
42 // woken up by an operation on a sync channel or close, select returns. Otherwis
e | |
43 // it retries from the beginning. | |
44 // SudoG.signal for all channels is setup to point to a single signal variable, | |
45 // signal variable is set to PREWAIT before enqueue. The signal variable is oper
ated | |
46 // with CAS and allows to resolve competition between concurrent wakers and the | |
47 // select goroutine. When SudoG is removed from a wait queue, SudoG.signal is se
t to nil; | |
48 // this allows select to skip dequeueing it again. Below is the signal state mac
hine: | |
49 // | |
50 // PREWAIT -> WAIT: select commits to park | |
51 // | |
52 // PREWAIT -> sg|CLOSED: sg chan is closed, select is not woken up | |
53 // PREWAIT -> READY: an async chan is ready or select discovers a ready chan dur
ing pass 2, select is not woken up | |
54 // PREWAIT -> SYNC: a sync chan is ready but the operation is not yet completed,
select is not woken up | |
55 // | |
56 // WAIT -> sg|CLOSED: the same as above, but select is woken up | |
57 // WAIT -> READY: | |
58 // WAIT -> SYNC: | |
59 // | |
60 // SYNC -> sg: operation on the sync chan is completed now | |
61 | 15 |
62 #define PREWAIT ((SudoG*)0) | 16 #define PREWAIT ((SudoG*)0) |
63 #define CLOSED ((SudoG*)1) | 17 #define CLOSED ((SudoG*)1) |
64 #define WAIT ((SudoG*)2) | 18 #define WAIT ((SudoG*)2) |
65 #define READY ((SudoG*)3) | 19 #define READY ((SudoG*)3) |
66 #define SYNC ((SudoG*)4) | 20 #define SYNC ((SudoG*)4) |
67 | 21 |
68 static void dequeueg(WaitQ*); | 22 static void dequeueg(WaitQ*); |
69 static SudoG* dequeue(WaitQ*, SudoG*, bool*); | 23 static SudoG* dequeue(WaitQ*, SudoG*, bool*); |
70 static bool candequeue(WaitQ*); | 24 static bool candequeue(WaitQ*); |
71 static void dequeuesel(Select*, int32); | 25 static void dequeuesel(Select*, int32); |
72 static void enqueue(WaitQ*, SudoG*); | 26 static void enqueue(WaitQ*, SudoG*); |
73 static void destroychan(Hchan*); | |
74 static void racesync(Hchan*, SudoG*); | 27 static void racesync(Hchan*, SudoG*); |
75 | |
76 static Hchan* | |
77 makechan(ChanType *t, int64 hint) | |
78 { | |
79 Hchan *c; | |
80 Type *elem; | |
81 | |
82 elem = t->elem; | |
83 | |
84 // compiler checks this but be safe. | |
85 if(elem->size >= (1<<16)) | |
86 runtime·throw("makechan: invalid channel element type"); | |
87 if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN) | |
88 runtime·throw("makechan: bad alignment"); | |
89 | |
90 if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > (MaxMem
- sizeof(*c)) / elem->size)) | |
91 runtime·panicstring("makechan: size out of range"); | |
92 | |
93 // allocate memory in one call | |
94 c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t |
TypeInfo_Chan, 0); | |
95 c->elemsize = elem->size; | |
96 c->elemtype = elem; | |
97 c->dataqsiz = hint; | |
98 | |
99 if(debug) | |
100 runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; data
qsiz=%D\n", | |
101 c, (int64)elem->size, elem->alg, (int64)c->dataqsiz); | |
102 | |
103 return c; | |
104 } | |
105 | |
106 func reflect·makechan(t *ChanType, size uint64) (c *Hchan) { | |
107 c = makechan(t, size); | |
108 } | |
109 | |
110 func makechan(t *ChanType, size int64) (c *Hchan) { | |
111 c = makechan(t, size); | |
112 } | |
113 | 28 |
114 // generic single channel send | 29 // generic single channel send |
115 static bool | 30 static bool |
116 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc) | 31 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc) |
117 { | 32 { |
118 SudoG *sg, **signalp; | 33 SudoG *sg, **signalp; |
119 SudoG mysg; | 34 SudoG mysg; |
120 int64 t0; | 35 int64 t0; |
121 bool wake; | 36 bool wake; |
122 | 37 |
123 if(raceenabled) | 38 if(raceenabled) |
124 runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), c
hansend); | 39 runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), c
hansend); |
125 | 40 |
126 if(c == nil) { | 41 if(c == nil) { |
127 USED(t); | 42 USED(t); |
128 if(!block) | 43 if(!block) |
129 return false; | 44 return false; |
130 » » runtime·park(nil, nil, "chan send (nil chan)"); | 45 » » runtime·park(nil, nil, runtime·gostringnocopy((byte*)"chan send
(nil chan)")); |
131 return false; // not reached | 46 return false; // not reached |
132 } | 47 } |
133 | 48 |
134 if(debug) { | 49 if(debug) { |
135 runtime·printf("chansend: chan=%p; elem=", c); | 50 runtime·printf("chansend: chan=%p; elem=", c); |
136 c->elemtype->alg->print(c->elemsize, ep); | 51 c->elemtype->alg->print(c->elemsize, ep); |
137 runtime·prints("\n"); | 52 runtime·prints("\n"); |
138 } | 53 } |
139 | 54 |
140 if(raceenabled) | 55 if(raceenabled) |
141 runtime·racereadpc(c, pc, chansend); | 56 runtime·racereadpc(c, pc, chansend); |
142 | 57 |
143 » // Fast path: if it's a non-blocking operation that can't proceed, | 58 » // Fast path: check for failed non-blocking operation without acquiring
the lock. |
144 » // then we can return w/o acquiring the lock. | 59 » // |
| 60 » // After observing that the channel is not closed, we observe that the c
hannel is |
| 61 » // not ready for sending. Each of these observations is a single word-si
zed read |
| 62 » // (first c.closed and second c.recvq.first or c.qcount depending on kin
d of channel). |
| 63 » // Because a closed channel cannot transition from 'ready for sending' t
o |
| 64 » // 'not ready for sending', even if the channel is closed between the tw
o observations, |
| 65 » // they imply a moment between the two when the channel was both not yet
closed |
| 66 » // and not ready for sending. We behave as if we observed the channel at
that moment, |
| 67 » // and report that the send cannot proceed. |
| 68 » // |
| 69 » // It is okay if the reads are reordered here: if we observe that the ch
annel is not |
| 70 » // ready for sending and then observe that it is not closed, that implie
s that the |
| 71 » // channel wasn't closed during the first observation. |
145 if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil)
|| | 72 if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil)
|| |
146 (c->dataqsiz > 0 && c->qcount == c->dataqsiz))) | 73 (c->dataqsiz > 0 && c->qcount == c->dataqsiz))) |
147 return false; | 74 return false; |
148 | 75 |
149 t0 = 0; | 76 t0 = 0; |
150 mysg.releasetime = 0; | 77 mysg.releasetime = 0; |
151 if(runtime·blockprofilerate > 0) { | 78 if(runtime·blockprofilerate > 0) { |
152 t0 = runtime·cputicks(); | 79 t0 = runtime·cputicks(); |
153 mysg.releasetime = -1; | 80 mysg.releasetime = -1; |
154 } | 81 } |
155 | 82 |
156 » runtime·lock(c); | 83 » runtime·lock(&c->lock); |
157 if(c->closed) | 84 if(c->closed) |
158 goto closed; | 85 goto closed; |
159 | 86 |
160 if(c->dataqsiz > 0) | 87 if(c->dataqsiz > 0) |
161 goto asynch; | 88 goto asynch; |
162 | 89 |
163 sg = dequeue(&c->recvq, SYNC, &wake); | 90 sg = dequeue(&c->recvq, SYNC, &wake); |
164 if(sg != nil) { | 91 if(sg != nil) { |
165 if(raceenabled) | 92 if(raceenabled) |
166 racesync(c, sg); | 93 racesync(c, sg); |
167 // Disable preemption. If we communicate with a select, | 94 // Disable preemption. If we communicate with a select, |
168 // it can be still running. It such case completion of the opera
tion | 95 // it can be still running. It such case completion of the opera
tion |
169 // is the store to sg->signal below. We don't want this goroutin
e | 96 // is the store to sg->signal below. We don't want this goroutin
e |
170 // to be preempted until that store. Otherwise the select would | 97 // to be preempted until that store. Otherwise the select would |
171 // need to spin in while(signal == SYNC) using a mix of osyield | 98 // need to spin in while(signal == SYNC) using a mix of osyield |
172 // and gosched. | 99 // and gosched. |
173 g->m->locks++; | 100 g->m->locks++; |
174 » » runtime·unlock(c); | 101 » » runtime·unlock(&c->lock); |
175 | 102 |
176 if(sg->elem != nil) | 103 if(sg->elem != nil) |
177 c->elemtype->alg->copy(c->elemsize, sg->elem, ep); | 104 c->elemtype->alg->copy(c->elemsize, sg->elem, ep); |
178 if((signalp = sg->signal) != nil) { | 105 if((signalp = sg->signal) != nil) { |
179 sg->signal = nil; | 106 sg->signal = nil; |
180 runtime·atomicstorep(signalp, sg); | 107 runtime·atomicstorep(signalp, sg); |
181 } | 108 } |
182 if(wake) { | 109 if(wake) { |
183 if(sg->releasetime) | 110 if(sg->releasetime) |
184 sg->releasetime = runtime·cputicks(); | 111 sg->releasetime = runtime·cputicks(); |
185 runtime·ready(sg->g); | 112 runtime·ready(sg->g); |
186 } | 113 } |
187 g->m->locks--; | 114 g->m->locks--; |
188 return true; | 115 return true; |
189 } | 116 } |
190 | 117 |
191 if(!block) { | 118 if(!block) { |
192 » » runtime·unlock(c); | 119 » » runtime·unlock(&c->lock); |
193 return false; | 120 return false; |
194 } | 121 } |
195 | 122 |
196 mysg.elem = ep; | 123 mysg.elem = ep; |
197 mysg.g = g; | 124 mysg.g = g; |
198 mysg.signal = nil; | 125 mysg.signal = nil; |
199 enqueue(&c->sendq, &mysg); | 126 enqueue(&c->sendq, &mysg); |
200 » runtime·parkunlock(c, "chan send"); | 127 » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan send"))
; |
201 | 128 |
202 if(mysg.signal != nil) { | 129 if(mysg.signal != nil) { |
203 » » runtime·lock(c); | 130 » » runtime·lock(&c->lock); |
204 if(!c->closed) | 131 if(!c->closed) |
205 runtime·throw("chansend: spurious wakeup"); | 132 runtime·throw("chansend: spurious wakeup"); |
206 goto closed; | 133 goto closed; |
207 } | 134 } |
208 | 135 |
209 if(mysg.releasetime > 0) | 136 if(mysg.releasetime > 0) |
210 runtime·blockevent(mysg.releasetime - t0, 2); | 137 runtime·blockevent(mysg.releasetime - t0, 2); |
211 | 138 |
212 return true; | 139 return true; |
213 | 140 |
214 asynch: | 141 asynch: |
215 if(c->closed) | 142 if(c->closed) |
216 goto closed; | 143 goto closed; |
217 | 144 |
218 if(c->qcount >= c->dataqsiz) { | 145 if(c->qcount >= c->dataqsiz) { |
219 if(!block) { | 146 if(!block) { |
220 » » » runtime·unlock(c); | 147 » » » runtime·unlock(&c->lock); |
221 return false; | 148 return false; |
222 } | 149 } |
223 mysg.g = g; | 150 mysg.g = g; |
224 mysg.elem = nil; | 151 mysg.elem = nil; |
225 mysg.signal = nil; | 152 mysg.signal = nil; |
226 enqueue(&c->sendq, &mysg); | 153 enqueue(&c->sendq, &mysg); |
227 » » runtime·parkunlock(c, "chan send"); | 154 » » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan
send")); |
228 | 155 |
229 » » runtime·lock(c); | 156 » » runtime·lock(&c->lock); |
230 goto asynch; | 157 goto asynch; |
231 } | 158 } |
232 | 159 |
233 if(raceenabled) { | 160 if(raceenabled) { |
234 runtime·raceacquire(chanbuf(c, c->sendx)); | 161 runtime·raceacquire(chanbuf(c, c->sendx)); |
235 runtime·racerelease(chanbuf(c, c->sendx)); | 162 runtime·racerelease(chanbuf(c, c->sendx)); |
236 } | 163 } |
237 | 164 |
238 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), ep); | 165 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), ep); |
239 if(++c->sendx == c->dataqsiz) | 166 if(++c->sendx == c->dataqsiz) |
240 c->sendx = 0; | 167 c->sendx = 0; |
241 c->qcount++; | 168 c->qcount++; |
242 | 169 |
243 sg = dequeue(&c->recvq, READY, &wake); | 170 sg = dequeue(&c->recvq, READY, &wake); |
244 » runtime·unlock(c); | 171 » runtime·unlock(&c->lock); |
245 if(wake) { | 172 if(wake) { |
246 if(sg->releasetime) | 173 if(sg->releasetime) |
247 sg->releasetime = runtime·cputicks(); | 174 sg->releasetime = runtime·cputicks(); |
248 runtime·ready(sg->g); | 175 runtime·ready(sg->g); |
249 } | 176 } |
250 if(mysg.releasetime > 0) | 177 if(mysg.releasetime > 0) |
251 runtime·blockevent(mysg.releasetime - t0, 2); | 178 runtime·blockevent(mysg.releasetime - t0, 2); |
252 return true; | 179 return true; |
253 | 180 |
254 closed: | 181 closed: |
255 » runtime·unlock(c); | 182 » runtime·unlock(&c->lock); |
256 runtime·panicstring("send on closed channel"); | 183 runtime·panicstring("send on closed channel"); |
257 return false; // not reached | 184 return false; // not reached |
258 } | 185 } |
259 | 186 |
260 // generic single channel recv | 187 // generic single channel recv |
261 static bool | 188 static bool |
262 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received) | 189 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received) |
263 { | 190 { |
264 SudoG *sg, **signalp; | 191 SudoG *sg, **signalp; |
265 SudoG mysg; | 192 SudoG mysg; |
266 int64 t0; | 193 int64 t0; |
267 bool wake; | 194 bool wake; |
268 | 195 |
269 // raceenabled: don't need to check ep, as it is always on the stack. | 196 // raceenabled: don't need to check ep, as it is always on the stack. |
270 | 197 |
271 if(debug) | 198 if(debug) |
272 runtime·printf("chanrecv: chan=%p\n", c); | 199 runtime·printf("chanrecv: chan=%p\n", c); |
273 | 200 |
274 if(c == nil) { | 201 if(c == nil) { |
275 USED(t); | 202 USED(t); |
276 if(!block) | 203 if(!block) |
277 return false; | 204 return false; |
278 » » runtime·park(nil, nil, "chan receive (nil chan)"); | 205 » » runtime·park(nil, nil, runtime·gostringnocopy((byte*)"chan recei
ve (nil chan)")); |
279 return false; // not reached | 206 return false; // not reached |
280 } | 207 } |
281 | 208 |
282 » // Fast path: if it's a non-blocking operation that can't proceed, | 209 » // Fast path: check for failed non-blocking operation without acquiring
the lock. |
283 » // then we can return w/o acquiring the lock. | 210 » // |
284 » if(!block && !c->closed && ((c->dataqsiz == 0 && c->sendq.first == nil)
|| | 211 » // After observing that the channel is not ready for receiving, we obser
ve that the |
285 » » (c->dataqsiz > 0 && c->qcount == 0))) | 212 » // channel is not closed. Each of these observations is a single word-si
zed read |
| 213 » // (first c.sendq.first or c.qcount, and second c.closed). |
| 214 » // Because a channel cannot be reopened, the later observation of the ch
annel |
| 215 » // being not closed implies that it was also not closed at the moment of
the |
| 216 » // first observation. We behave as if we observed the channel at that mo
ment |
| 217 » // and report that the receive cannot proceed. |
| 218 » // |
| 219 » // The order of operations is important here: reversing the operations c
an lead to |
| 220 » // incorrect behavior when racing with a close. |
| 221 » if(!block && ((c->dataqsiz == 0 && c->sendq.first == nil) || |
| 222 » » (c->dataqsiz > 0 && runtime·atomicloadp((void**)&c->qcount) == 0
)) && |
| 223 » » !runtime·atomicload(&c->closed)) |
286 return false; | 224 return false; |
287 | 225 |
288 t0 = 0; | 226 t0 = 0; |
289 mysg.releasetime = 0; | 227 mysg.releasetime = 0; |
290 if(runtime·blockprofilerate > 0) { | 228 if(runtime·blockprofilerate > 0) { |
291 t0 = runtime·cputicks(); | 229 t0 = runtime·cputicks(); |
292 mysg.releasetime = -1; | 230 mysg.releasetime = -1; |
293 } | 231 } |
294 | 232 |
295 » runtime·lock(c); | 233 » runtime·lock(&c->lock); |
296 if(c->dataqsiz > 0) | 234 if(c->dataqsiz > 0) |
297 goto asynch; | 235 goto asynch; |
298 | 236 |
299 if(c->closed) { | 237 if(c->closed) { |
300 » » runtime·unlock(c); | 238 » » runtime·unlock(&c->lock); |
301 goto closed; | 239 goto closed; |
302 } | 240 } |
303 | 241 |
304 sg = dequeue(&c->sendq, SYNC, &wake); | 242 sg = dequeue(&c->sendq, SYNC, &wake); |
305 if(sg != nil) { | 243 if(sg != nil) { |
306 if(raceenabled) | 244 if(raceenabled) |
307 racesync(c, sg); | 245 racesync(c, sg); |
308 g->m->locks++; | 246 g->m->locks++; |
309 » » runtime·unlock(c); | 247 » » runtime·unlock(&c->lock); |
310 | 248 |
311 if(ep != nil) | 249 if(ep != nil) |
312 c->elemtype->alg->copy(c->elemsize, ep, sg->elem); | 250 c->elemtype->alg->copy(c->elemsize, ep, sg->elem); |
313 if((signalp = sg->signal) != nil) { | 251 if((signalp = sg->signal) != nil) { |
314 sg->signal = nil; | 252 sg->signal = nil; |
315 runtime·atomicstorep(signalp, sg); | 253 runtime·atomicstorep(signalp, sg); |
316 } | 254 } |
317 if(wake) { | 255 if(wake) { |
318 if(sg->releasetime) | 256 if(sg->releasetime) |
319 sg->releasetime = runtime·cputicks(); | 257 sg->releasetime = runtime·cputicks(); |
320 runtime·ready(sg->g); | 258 runtime·ready(sg->g); |
321 } | 259 } |
322 | 260 |
323 if(received != nil) | 261 if(received != nil) |
324 *received = true; | 262 *received = true; |
325 g->m->locks--; | 263 g->m->locks--; |
326 return true; | 264 return true; |
327 } | 265 } |
328 | 266 |
329 if(!block) { | 267 if(!block) { |
330 » » runtime·unlock(c); | 268 » » runtime·unlock(&c->lock); |
331 return false; | 269 return false; |
332 } | 270 } |
333 | 271 |
334 mysg.elem = ep; | 272 mysg.elem = ep; |
335 mysg.g = g; | 273 mysg.g = g; |
336 mysg.signal = nil; | 274 mysg.signal = nil; |
337 enqueue(&c->recvq, &mysg); | 275 enqueue(&c->recvq, &mysg); |
338 » runtime·parkunlock(c, "chan receive"); | 276 » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan receive
")); |
339 | 277 |
340 if(mysg.signal != nil) { | 278 if(mysg.signal != nil) { |
341 if(!c->closed) | 279 if(!c->closed) |
342 runtime·throw("chanrecv: spurious wakeup"); | 280 runtime·throw("chanrecv: spurious wakeup"); |
343 goto closed; | 281 goto closed; |
344 } | 282 } |
345 | 283 |
346 if(received != nil) | 284 if(received != nil) |
347 *received = true; | 285 *received = true; |
348 if(mysg.releasetime > 0) | 286 if(mysg.releasetime > 0) |
349 runtime·blockevent(mysg.releasetime - t0, 2); | 287 runtime·blockevent(mysg.releasetime - t0, 2); |
350 return true; | 288 return true; |
351 | 289 |
352 asynch: | 290 asynch: |
353 if(c->qcount <= 0) { | 291 if(c->qcount <= 0) { |
354 if(c->closed) { | 292 if(c->closed) { |
355 » » » runtime·unlock(c); | 293 » » » runtime·unlock(&c->lock); |
356 goto closed; | 294 goto closed; |
357 } | 295 } |
358 | 296 |
359 if(!block) { | 297 if(!block) { |
360 » » » runtime·unlock(c); | 298 » » » runtime·unlock(&c->lock); |
361 if(received != nil) | 299 if(received != nil) |
362 *received = false; | 300 *received = false; |
363 return false; | 301 return false; |
364 } | 302 } |
365 mysg.g = g; | 303 mysg.g = g; |
366 mysg.elem = nil; | 304 mysg.elem = nil; |
367 mysg.signal = nil; | 305 mysg.signal = nil; |
368 enqueue(&c->recvq, &mysg); | 306 enqueue(&c->recvq, &mysg); |
369 » » runtime·parkunlock(c, "chan receive"); | 307 » » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan
receive")); |
370 | 308 |
371 if(mysg.signal != nil) { | 309 if(mysg.signal != nil) { |
372 if(!c->closed) | 310 if(!c->closed) |
373 runtime·throw("chanrecv: spurious wakeup"); | 311 runtime·throw("chanrecv: spurious wakeup"); |
374 goto closed; | 312 goto closed; |
375 } | 313 } |
376 » » runtime·lock(c); | 314 » » runtime·lock(&c->lock); |
377 goto asynch; | 315 goto asynch; |
378 } | 316 } |
379 | 317 |
380 if(raceenabled) { | 318 if(raceenabled) { |
381 runtime·raceacquire(chanbuf(c, c->recvx)); | 319 runtime·raceacquire(chanbuf(c, c->recvx)); |
382 runtime·racerelease(chanbuf(c, c->recvx)); | 320 runtime·racerelease(chanbuf(c, c->recvx)); |
383 } | 321 } |
384 | 322 |
385 if(ep != nil) | 323 if(ep != nil) |
386 c->elemtype->alg->copy(c->elemsize, ep, chanbuf(c, c->recvx)); | 324 c->elemtype->alg->copy(c->elemsize, ep, chanbuf(c, c->recvx)); |
387 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); | 325 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); |
388 if(++c->recvx == c->dataqsiz) | 326 if(++c->recvx == c->dataqsiz) |
389 c->recvx = 0; | 327 c->recvx = 0; |
390 c->qcount--; | 328 c->qcount--; |
391 | 329 |
392 sg = dequeue(&c->sendq, READY, &wake); | 330 sg = dequeue(&c->sendq, READY, &wake); |
393 » runtime·unlock(c); | 331 » runtime·unlock(&c->lock); |
394 if(wake) { | 332 if(wake) { |
395 if(sg->releasetime) | 333 if(sg->releasetime) |
396 sg->releasetime = runtime·cputicks(); | 334 sg->releasetime = runtime·cputicks(); |
397 runtime·ready(sg->g); | 335 runtime·ready(sg->g); |
398 } | 336 } |
399 | 337 |
400 if(received != nil) | 338 if(received != nil) |
401 *received = true; | 339 *received = true; |
402 if(mysg.releasetime > 0) | 340 if(mysg.releasetime > 0) |
403 runtime·blockevent(mysg.releasetime - t0, 2); | 341 runtime·blockevent(mysg.releasetime - t0, 2); |
404 return true; | 342 return true; |
405 | 343 |
406 closed: | 344 closed: |
407 if(ep != nil) | 345 if(ep != nil) |
408 c->elemtype->alg->copy(c->elemsize, ep, nil); | 346 c->elemtype->alg->copy(c->elemsize, ep, nil); |
409 if(received != nil) | 347 if(received != nil) |
410 *received = false; | 348 *received = false; |
411 if(raceenabled) | 349 if(raceenabled) |
412 runtime·raceacquire(c); | 350 runtime·raceacquire(c); |
413 if(mysg.releasetime > 0) | 351 if(mysg.releasetime > 0) |
414 runtime·blockevent(mysg.releasetime - t0, 2); | 352 runtime·blockevent(mysg.releasetime - t0, 2); |
415 return true; | 353 return true; |
416 } | |
417 | |
418 #pragma textflag NOSPLIT | |
419 func chansend1(t *ChanType, c *Hchan, elem *byte) { | |
420 chansend(t, c, elem, true, runtime·getcallerpc(&t)); | |
421 } | 354 } |
422 | 355 |
423 #pragma textflag NOSPLIT | 356 #pragma textflag NOSPLIT |
424 func chanrecv1(t *ChanType, c *Hchan, elem *byte) { | 357 func chanrecv1(t *ChanType, c *Hchan, elem *byte) { |
425 chanrecv(t, c, elem, true, nil); | 358 chanrecv(t, c, elem, true, nil); |
426 } | 359 } |
427 | 360 |
428 // chanrecv2(hchan *chan any, elem *any) (received bool); | 361 // chanrecv2(hchan *chan any, elem *any) (received bool); |
429 #pragma textflag NOSPLIT | 362 #pragma textflag NOSPLIT |
430 func chanrecv2(t *ChanType, c *Hchan, elem *byte) (received bool) { | 363 func chanrecv2(t *ChanType, c *Hchan, elem *byte) (received bool) { |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
500 #pragma textflag NOSPLIT | 433 #pragma textflag NOSPLIT |
501 func reflect·chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool
) { | 434 func reflect·chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool
) { |
502 selected = chansend(t, c, elem, !nb, runtime·getcallerpc(&t)); | 435 selected = chansend(t, c, elem, !nb, runtime·getcallerpc(&t)); |
503 } | 436 } |
504 | 437 |
505 func reflect·chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool
, received bool) { | 438 func reflect·chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool
, received bool) { |
506 received = false; | 439 received = false; |
507 selected = chanrecv(t, c, elem, !nb, &received); | 440 selected = chanrecv(t, c, elem, !nb, &received); |
508 } | 441 } |
509 | 442 |
510 static Select* newselect(int32); | 443 static int64 |
511 | 444 selectsize(int32 size) |
512 #pragma textflag NOSPLIT | 445 { |
513 func newselect(size int32) (sel *byte) { | |
514 » sel = (byte*)newselect(size); | |
515 } | |
516 | |
517 static Select* | |
518 newselect(int32 size) | |
519 { | |
520 » int32 n; | |
521 Select *sel; | 446 Select *sel; |
522 | 447 » int64 selsize; |
523 » n = 0; | 448 |
524 » if(size > 1) | 449 » selsize = sizeof(*sel) + |
525 » » n = size-1; | 450 » » (size-1)*sizeof(sel->scase[0]) + |
526 | 451 » » size*sizeof(sel->pollorder[0]); |
527 » // allocate all the memory we need in a single allocation | 452 » return ROUND(selsize, Int64Align); |
528 » // start with Select with size cases | 453 } |
529 » // then lockorder with size entries | 454 |
530 » // then pollorder with size entries | 455 #pragma textflag NOSPLIT |
531 » sel = runtime·mal(sizeof(*sel) + | 456 func newselect(sel *Select, selsize int64, size int32) { |
532 » » n*sizeof(sel->scase[0]) + | 457 » if(selsize != selectsize(size)) { |
533 » » size*sizeof(sel->lockorder[0]) + | 458 » » runtime·printf("runtime: bad select size %D, want %D\n", selsize
, selectsize(size)); |
534 » » size*sizeof(sel->pollorder[0])); | 459 » » runtime·throw("bad select size"); |
535 | 460 » } |
536 sel->tcase = size; | 461 sel->tcase = size; |
537 sel->ncase = 0; | 462 sel->ncase = 0; |
538 sel->pollorder = (void*)(sel->scase + size); | 463 sel->pollorder = (void*)(sel->scase + size); |
539 | 464 |
540 if(debug) | 465 if(debug) |
541 runtime·printf("newselect s=%p size=%d\n", sel, size); | 466 runtime·printf("newselect s=%p size=%d\n", sel, size); |
542 return sel; | |
543 } | 467 } |
544 | 468 |
545 // cut in half to give stack a chance to split | 469 // cut in half to give stack a chance to split |
546 static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so); | 470 static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so); |
547 | 471 |
548 #pragma textflag NOSPLIT | 472 #pragma textflag NOSPLIT |
549 func selectsend(sel *Select, c *Hchan, elem *byte) (selected bool) { | 473 func selectsend(sel *Select, c *Hchan, elem *byte) (selected bool) { |
550 selected = false; | 474 selected = false; |
551 | 475 |
552 // nil cases do not compete | 476 // nil cases do not compete |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
654 } | 578 } |
655 | 579 |
656 static bool | 580 static bool |
657 selparkcommit(G *gp, void *signal) | 581 selparkcommit(G *gp, void *signal) |
658 { | 582 { |
659 USED(gp); | 583 USED(gp); |
660 return runtime·casp(signal, PREWAIT, WAIT); | 584 return runtime·casp(signal, PREWAIT, WAIT); |
661 } | 585 } |
662 | 586 |
663 func block() { | 587 func block() { |
664 » runtime·park(nil, nil, "select (no cases)");» // forever | 588 » runtime·park(nil, nil, runtime·gostringnocopy((byte*)"select (no cases)"
));» // forever |
665 } | 589 } |
666 | 590 |
667 static void* selectgo(Select**); | 591 static void* selectgo(Select**); |
668 | 592 |
669 // selectgo(sel *byte); | 593 // selectgo(sel *byte); |
670 // | 594 // |
671 // overwrites return pc on stack to signal which case of the select | 595 // overwrites return pc on stack to signal which case of the select |
672 // to run, so cannot appear at the top of a split stack. | 596 // to run, so cannot appear at the top of a split stack. |
673 #pragma textflag NOSPLIT | 597 #pragma textflag NOSPLIT |
674 func selectgo(sel *Select) { | 598 func selectgo(sel *Select) { |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
727 // the race will be resolved in pass 2. | 651 // the race will be resolved in pass 2. |
728 dfl = nil; | 652 dfl = nil; |
729 dequeuepos = 0; | 653 dequeuepos = 0; |
730 for(i=0; i<sel->ncase; i++) { | 654 for(i=0; i<sel->ncase; i++) { |
731 o = sel->pollorder[i]; | 655 o = sel->pollorder[i]; |
732 cas = &sel->scase[o]; | 656 cas = &sel->scase[o]; |
733 c = cas->chan; | 657 c = cas->chan; |
734 | 658 |
735 switch(cas->kind) { | 659 switch(cas->kind) { |
736 case CaseRecv: | 660 case CaseRecv: |
737 » » » // fast path: check ready predicate w/o lock | 661 » » » // Fast path: check ready predicate w/o lock. |
738 » » » if((c->dataqsiz > 0 && c->qcount > 0) || (c->dataqsiz ==
0 && c->sendq.first != nil) || c->closed) { | 662 » » » // Ordering of loads is important here. See the "Fast pa
th" comment in chanrecv. |
739 » » » » runtime·lock(c); | 663 » » » if((c->dataqsiz > 0 && (uintgo)runtime·atomicloadp((void
**)&c->qcount) > 0) || |
| 664 » » » » » (c->dataqsiz == 0 && c->sendq.first != n
il) || |
| 665 » » » » » runtime·atomicload(&c->closed) != 0) { |
| 666 » » » » runtime·lock(&c->lock); |
740 if(c->dataqsiz > 0 && c->qcount > 0) | 667 if(c->dataqsiz > 0 && c->qcount > 0) |
741 goto asyncrecv; | 668 goto asyncrecv; |
742 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq,
SYNC, &wake)) != nil) | 669 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq,
SYNC, &wake)) != nil) |
743 goto syncrecv; | 670 goto syncrecv; |
744 if(c->closed) { | 671 if(c->closed) { |
745 » » » » » runtime·unlock(c); | 672 » » » » » runtime·unlock(&c->lock); |
746 goto rclose; | 673 goto rclose; |
747 } | 674 } |
748 » » » » runtime·unlock(c); | 675 » » » » runtime·unlock(&c->lock); |
749 } | 676 } |
750 break; | 677 break; |
751 | 678 |
752 case CaseSend: | 679 case CaseSend: |
753 if(raceenabled) | 680 if(raceenabled) |
754 runtime·racereadpc(c, cas->pc, chansend); | 681 runtime·racereadpc(c, cas->pc, chansend); |
755 » » » if(c->closed || (c->dataqsiz > 0 && c->qcount < c->dataq
siz) || (c->dataqsiz == 0 && c->recvq.first != nil)) { | 682 » » » if(c->closed != 0 || |
756 » » » » runtime·lock(c); | 683 » » » » » (c->dataqsiz > 0 && c->qcount < c->dataq
siz) || |
| 684 » » » » » (c->dataqsiz == 0 && c->recvq.first != n
il)) { |
| 685 » » » » runtime·lock(&c->lock); |
757 if(c->closed) { | 686 if(c->closed) { |
758 » » » » » runtime·unlock(c); | 687 » » » » » runtime·unlock(&c->lock); |
759 goto sclose; | 688 goto sclose; |
760 } | 689 } |
761 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz) | 690 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz) |
762 goto asyncsend; | 691 goto asyncsend; |
763 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq,
SYNC, &wake)) != nil) | 692 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq,
SYNC, &wake)) != nil) |
764 goto syncsend; | 693 goto syncsend; |
765 » » » » runtime·unlock(c); | 694 » » » » runtime·unlock(&c->lock); |
766 } | 695 } |
767 break; | 696 break; |
768 | 697 |
769 case CaseDefault: | 698 case CaseDefault: |
770 dfl = cas; | 699 dfl = cas; |
771 break; | 700 break; |
772 } | 701 } |
773 } | 702 } |
774 | 703 |
775 if(dfl != nil) { | 704 if(dfl != nil) { |
(...skipping 10 matching lines...) Expand all Loading... |
786 goto ready; | 715 goto ready; |
787 o = sel->pollorder[dequeuepos]; | 716 o = sel->pollorder[dequeuepos]; |
788 cas = &sel->scase[o]; | 717 cas = &sel->scase[o]; |
789 c = cas->chan; | 718 c = cas->chan; |
790 sg = &cas->sg; | 719 sg = &cas->sg; |
791 sg->g = g; | 720 sg->g = g; |
792 sg->signal = &signal; | 721 sg->signal = &signal; |
793 | 722 |
794 switch(cas->kind) { | 723 switch(cas->kind) { |
795 case CaseRecv: | 724 case CaseRecv: |
796 » » » runtime·lock(c); | 725 » » » runtime·lock(&c->lock); |
797 if(c->closed || (c->dataqsiz > 0 && c->qcount > 0) || | 726 if(c->closed || (c->dataqsiz > 0 && c->qcount > 0) || |
798 (c->dataqsiz == 0 && c->sendq.first != nil && ca
ndequeue(&c->sendq))) { | 727 (c->dataqsiz == 0 && c->sendq.first != nil && ca
ndequeue(&c->sendq))) { |
799 // Before executing send/recv on this chan we mu
st disarm the select. | 728 // Before executing send/recv on this chan we mu
st disarm the select. |
800 // Otherwise it can happen so that we execute se
nd/recv on this chan, | 729 // Otherwise it can happen so that we execute se
nd/recv on this chan, |
801 // and another goroutine accomplishes send/recv
on a sync chan | 730 // and another goroutine accomplishes send/recv
on a sync chan |
802 // with this select as well. The result would be
completion of two | 731 // with this select as well. The result would be
completion of two |
803 // communications in the single select. | 732 // communications in the single select. |
804 if(!runtime·casp(&signal, nil, READY)) { | 733 if(!runtime·casp(&signal, nil, READY)) { |
805 // Already signaled on another chan. | 734 // Already signaled on another chan. |
806 » » » » » runtime·unlock(c); | 735 » » » » » runtime·unlock(&c->lock); |
807 goto ready; | 736 goto ready; |
808 } | 737 } |
809 if(c->dataqsiz > 0 && c->qcount > 0) | 738 if(c->dataqsiz > 0 && c->qcount > 0) |
810 goto asyncrecv; | 739 goto asyncrecv; |
811 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq,
SYNC, &wake)) != nil) | 740 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq,
SYNC, &wake)) != nil) |
812 goto syncrecv; | 741 goto syncrecv; |
813 if(c->closed) { | 742 if(c->closed) { |
814 » » » » » runtime·unlock(c); | 743 » » » » » runtime·unlock(&c->lock); |
815 goto rclose; | 744 goto rclose; |
816 } | 745 } |
817 // We can reach this point iff candequeue above
returned true, | 746 // We can reach this point iff candequeue above
returned true, |
818 // but meanwhile a select sudog in the wait queu
e has been signaled | 747 // but meanwhile a select sudog in the wait queu
e has been signaled |
819 // on a different chan. In such unlikely case, | 748 // on a different chan. In such unlikely case, |
820 // we just retry the whole algorithm. | 749 // we just retry the whole algorithm. |
821 » » » » runtime·unlock(c); | 750 » » » » runtime·unlock(&c->lock); |
822 goto ready; | 751 goto ready; |
823 } | 752 } |
824 | 753 |
825 // OK, it's not ready so enqueue. | 754 // OK, it's not ready so enqueue. |
826 enqueue(&c->recvq, &cas->sg); | 755 enqueue(&c->recvq, &cas->sg); |
827 » » » runtime·unlock(c); | 756 » » » runtime·unlock(&c->lock); |
828 break; | 757 break; |
829 | 758 |
830 case CaseSend: | 759 case CaseSend: |
831 » » » runtime·lock(c); | 760 » » » runtime·lock(&c->lock); |
832 if(c->closed || (c->dataqsiz > 0 && c->qcount < c->dataq
siz) || | 761 if(c->closed || (c->dataqsiz > 0 && c->qcount < c->dataq
siz) || |
833 (c->dataqsiz == 0 && c->recvq.first != nil && ca
ndequeue(&c->recvq))) { | 762 (c->dataqsiz == 0 && c->recvq.first != nil && ca
ndequeue(&c->recvq))) { |
834 if(!runtime·casp(&signal, nil, READY)) { | 763 if(!runtime·casp(&signal, nil, READY)) { |
835 » » » » » runtime·unlock(c); | 764 » » » » » runtime·unlock(&c->lock); |
836 goto ready; | 765 goto ready; |
837 } | 766 } |
838 if(c->closed) { | 767 if(c->closed) { |
839 » » » » » runtime·unlock(c); | 768 » » » » » runtime·unlock(&c->lock); |
840 goto sclose; | 769 goto sclose; |
841 } | 770 } |
842 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz) | 771 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz) |
843 goto asyncsend; | 772 goto asyncsend; |
844 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq,
SYNC, &wake)) != nil) | 773 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq,
SYNC, &wake)) != nil) |
845 goto syncsend; | 774 goto syncsend; |
846 » » » » runtime·unlock(c); | 775 » » » » runtime·unlock(&c->lock); |
847 goto ready; | 776 goto ready; |
848 } | 777 } |
849 enqueue(&c->sendq, &cas->sg); | 778 enqueue(&c->sendq, &cas->sg); |
850 » » » runtime·unlock(c); | 779 » » » runtime·unlock(&c->lock); |
851 break; | 780 break; |
852 } | 781 } |
853 } | 782 } |
854 | 783 |
855 if(signal == nil) | 784 if(signal == nil) |
856 » » runtime·park(selparkcommit, &signal, "select"); | 785 » » runtime·park(selparkcommit, &signal, runtime·gostringnocopy((byt
e*)"select")); |
857 | 786 |
858 ready: | 787 ready: |
859 // Pass 3 - dequeue from unsuccessful chans. | 788 // Pass 3 - dequeue from unsuccessful chans. |
860 if(signal == PREWAIT || signal == WAIT) | 789 if(signal == PREWAIT || signal == WAIT) |
861 runtime·throw("select: still waiting"); | 790 runtime·throw("select: still waiting"); |
862 dequeuesel(sel, dequeuepos); | 791 dequeuesel(sel, dequeuepos); |
863 while(signal == SYNC) { | 792 while(signal == SYNC) { |
864 if(runtime·gomaxprocs == 1) | 793 if(runtime·gomaxprocs == 1) |
865 runtime·throw("select: spinning with gomaxprocs=1"); | 794 runtime·throw("select: spinning with gomaxprocs=1"); |
866 runtime·osyield(); | 795 runtime·osyield(); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
907 runtime·raceacquire(chanbuf(c, c->recvx)); | 836 runtime·raceacquire(chanbuf(c, c->recvx)); |
908 runtime·racerelease(chanbuf(c, c->recvx)); | 837 runtime·racerelease(chanbuf(c, c->recvx)); |
909 } | 838 } |
910 if(cas->sg.elem != nil) | 839 if(cas->sg.elem != nil) |
911 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->
recvx)); | 840 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->
recvx)); |
912 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); | 841 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); |
913 if(++c->recvx == c->dataqsiz) | 842 if(++c->recvx == c->dataqsiz) |
914 c->recvx = 0; | 843 c->recvx = 0; |
915 c->qcount--; | 844 c->qcount--; |
916 sg = dequeue(&c->sendq, READY, &wake); | 845 sg = dequeue(&c->sendq, READY, &wake); |
917 » runtime·unlock(c); | 846 » runtime·unlock(&c->lock); |
918 if(wake) { | 847 if(wake) { |
919 if(sg->releasetime) | 848 if(sg->releasetime) |
920 sg->releasetime = runtime·cputicks(); | 849 sg->releasetime = runtime·cputicks(); |
921 runtime·ready(sg->g); | 850 runtime·ready(sg->g); |
922 } | 851 } |
923 if(cas->receivedp != nil) | 852 if(cas->receivedp != nil) |
924 *cas->receivedp = true; | 853 *cas->receivedp = true; |
925 goto retc; | 854 goto retc; |
926 | 855 |
927 asyncsend: | 856 asyncsend: |
928 // can send to buffer | 857 // can send to buffer |
929 // caller set c and cas; sg is not set | 858 // caller set c and cas; sg is not set |
930 // c is locked | 859 // c is locked |
931 if(raceenabled) { | 860 if(raceenabled) { |
932 runtime·raceacquire(chanbuf(c, c->sendx)); | 861 runtime·raceacquire(chanbuf(c, c->sendx)); |
933 runtime·racerelease(chanbuf(c, c->sendx)); | 862 runtime·racerelease(chanbuf(c, c->sendx)); |
934 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha
nsend); | 863 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha
nsend); |
935 } | 864 } |
936 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem); | 865 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem); |
937 if(++c->sendx == c->dataqsiz) | 866 if(++c->sendx == c->dataqsiz) |
938 c->sendx = 0; | 867 c->sendx = 0; |
939 c->qcount++; | 868 c->qcount++; |
940 sg = dequeue(&c->recvq, READY, &wake); | 869 sg = dequeue(&c->recvq, READY, &wake); |
941 » runtime·unlock(c); | 870 » runtime·unlock(&c->lock); |
942 if(wake) { | 871 if(wake) { |
943 if(sg->releasetime) | 872 if(sg->releasetime) |
944 sg->releasetime = runtime·cputicks(); | 873 sg->releasetime = runtime·cputicks(); |
945 runtime·ready(sg->g); | 874 runtime·ready(sg->g); |
946 } | 875 } |
947 goto retc; | 876 goto retc; |
948 | 877 |
949 syncrecv: | 878 syncrecv: |
950 // can receive from sleeping sender (sg) | 879 // can receive from sleeping sender (sg) |
951 // caller set c, cas, sg and wake | 880 // caller set c, cas, sg and wake |
952 // c is locked | 881 // c is locked |
953 if(raceenabled) { | 882 if(raceenabled) { |
954 if(cas->sg.elem != nil) | 883 if(cas->sg.elem != nil) |
955 runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas
->pc, chanrecv); | 884 runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas
->pc, chanrecv); |
956 racesync(c, sg); | 885 racesync(c, sg); |
957 } | 886 } |
958 g->m->locks++; | 887 g->m->locks++; |
959 » runtime·unlock(c); | 888 » runtime·unlock(&c->lock); |
960 if(debug) | 889 if(debug) |
961 runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); | 890 runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); |
962 if(cas->sg.elem != nil) | 891 if(cas->sg.elem != nil) |
963 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, sg->elem); | 892 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, sg->elem); |
964 if((signalp = sg->signal) != nil) { | 893 if((signalp = sg->signal) != nil) { |
965 sg->signal = nil; | 894 sg->signal = nil; |
966 runtime·atomicstorep(signalp, sg); | 895 runtime·atomicstorep(signalp, sg); |
967 } | 896 } |
968 if(wake) { | 897 if(wake) { |
969 if(sg->releasetime) | 898 if(sg->releasetime) |
(...skipping 19 matching lines...) Expand all Loading... |
989 | 918 |
990 syncsend: | 919 syncsend: |
991 // can send to sleeping receiver (sg) | 920 // can send to sleeping receiver (sg) |
992 // caller set c, cas, sg and wake | 921 // caller set c, cas, sg and wake |
993 // c is locked | 922 // c is locked |
994 if(raceenabled) { | 923 if(raceenabled) { |
995 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha
nsend); | 924 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha
nsend); |
996 racesync(c, sg); | 925 racesync(c, sg); |
997 } | 926 } |
998 g->m->locks++; | 927 g->m->locks++; |
999 » runtime·unlock(c); | 928 » runtime·unlock(&c->lock); |
1000 if(debug) | 929 if(debug) |
1001 runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); | 930 runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); |
1002 if(sg->elem != nil) | 931 if(sg->elem != nil) |
1003 c->elemtype->alg->copy(c->elemsize, sg->elem, cas->sg.elem); | 932 c->elemtype->alg->copy(c->elemsize, sg->elem, cas->sg.elem); |
1004 if((signalp = sg->signal) != nil) { | 933 if((signalp = sg->signal) != nil) { |
1005 sg->signal = nil; | 934 sg->signal = nil; |
1006 runtime·atomicstorep(signalp, sg); | 935 runtime·atomicstorep(signalp, sg); |
1007 } | 936 } |
1008 if(wake) { | 937 if(wake) { |
1009 if(sg->releasetime) | 938 if(sg->releasetime) |
(...skipping 10 matching lines...) Expand all Loading... |
1020 // don't need to update the boolean. | 949 // don't need to update the boolean. |
1021 pc = cas->pc; | 950 pc = cas->pc; |
1022 if(cas->so > 0) { | 951 if(cas->so > 0) { |
1023 as = (byte*)selp + cas->so; | 952 as = (byte*)selp + cas->so; |
1024 *as = true; | 953 *as = true; |
1025 } | 954 } |
1026 if(cas->sg.releasetime > 0) | 955 if(cas->sg.releasetime > 0) |
1027 runtime·blockevent(cas->sg.releasetime - t0, 2); | 956 runtime·blockevent(cas->sg.releasetime - t0, 2); |
1028 if(dequeuepos > 0) | 957 if(dequeuepos > 0) |
1029 dequeuesel(sel, dequeuepos); | 958 dequeuesel(sel, dequeuepos); |
1030 runtime·free(sel); | |
1031 return pc; | 959 return pc; |
1032 | 960 |
1033 sclose: | 961 sclose: |
1034 // send on closed channel | 962 // send on closed channel |
1035 // c is unlocked | 963 // c is unlocked |
1036 if(dequeuepos > 0) | 964 if(dequeuepos > 0) |
1037 dequeuesel(sel, dequeuepos); | 965 dequeuesel(sel, dequeuepos); |
1038 runtime·panicstring("send on closed channel"); | 966 runtime·panicstring("send on closed channel"); |
1039 return nil; // not reached | 967 return nil; // not reached |
1040 } | 968 } |
1041 | 969 |
1042 static void | 970 static void |
1043 dequeuesel(Select *sel, int32 pos) | 971 dequeuesel(Select *sel, int32 pos) |
1044 { | 972 { |
1045 int32 i, o; | 973 int32 i, o; |
1046 Scase *cas; | 974 Scase *cas; |
1047 Hchan *c; | 975 Hchan *c; |
1048 | 976 |
1049 for(i=0; i<pos; i++) { | 977 for(i=0; i<pos; i++) { |
1050 o = sel->pollorder[i]; | 978 o = sel->pollorder[i]; |
1051 cas = &sel->scase[o]; | 979 cas = &sel->scase[o]; |
1052 if(cas->sg.signal == nil) | 980 if(cas->sg.signal == nil) |
1053 continue; // sg is already dequeued by somebody else | 981 continue; // sg is already dequeued by somebody else |
1054 c = cas->chan; | 982 c = cas->chan; |
1055 » » runtime·lock(c); | 983 » » runtime·lock(&c->lock); |
1056 if(cas->kind == CaseSend) | 984 if(cas->kind == CaseSend) |
1057 dequeueg(&c->sendq); | 985 dequeueg(&c->sendq); |
1058 else | 986 else |
1059 dequeueg(&c->recvq); | 987 dequeueg(&c->recvq); |
1060 » » runtime·unlock(c); | 988 » » runtime·unlock(&c->lock); |
1061 } | 989 } |
1062 | 990 |
1063 } | 991 } |
1064 | 992 |
1065 // This struct must match ../reflect/value.go:/runtimeSelect. | 993 // This struct must match ../reflect/value.go:/runtimeSelect. |
1066 typedef struct runtimeSelect runtimeSelect; | 994 typedef struct runtimeSelect runtimeSelect; |
1067 struct runtimeSelect | 995 struct runtimeSelect |
1068 { | 996 { |
1069 uintptr dir; | 997 uintptr dir; |
1070 ChanType *typ; | 998 ChanType *typ; |
(...skipping 11 matching lines...) Expand all Loading... |
1082 func reflect·rselect(cases Slice) (chosen int, recvOK bool) { | 1010 func reflect·rselect(cases Slice) (chosen int, recvOK bool) { |
1083 int32 i; | 1011 int32 i; |
1084 Select *sel; | 1012 Select *sel; |
1085 runtimeSelect* rcase, *rc; | 1013 runtimeSelect* rcase, *rc; |
1086 | 1014 |
1087 chosen = -1; | 1015 chosen = -1; |
1088 recvOK = false; | 1016 recvOK = false; |
1089 | 1017 |
1090 rcase = (runtimeSelect*)cases.array; | 1018 rcase = (runtimeSelect*)cases.array; |
1091 | 1019 |
1092 » sel = newselect(cases.len); | 1020 » // FlagNoScan is safe here, because all objects are also referenced from
cases. |
| 1021 » sel = runtime·mallocgc(selectsize(cases.len), 0, FlagNoScan); |
| 1022 » runtime·newselect(sel, selectsize(cases.len), cases.len); |
1093 for(i=0; i<cases.len; i++) { | 1023 for(i=0; i<cases.len; i++) { |
1094 rc = &rcase[i]; | 1024 rc = &rcase[i]; |
1095 switch(rc->dir) { | 1025 switch(rc->dir) { |
1096 case SelectDefault: | 1026 case SelectDefault: |
1097 selectdefault(sel, (void*)i, 0); | 1027 selectdefault(sel, (void*)i, 0); |
1098 break; | 1028 break; |
1099 case SelectSend: | 1029 case SelectSend: |
1100 if(rc->ch == nil) | 1030 if(rc->ch == nil) |
1101 break; | 1031 break; |
1102 selectsend(sel, rc->ch, (void*)i, rc->val, 0); | 1032 selectsend(sel, rc->ch, (void*)i, rc->val, 0); |
(...skipping 23 matching lines...) Expand all Loading... |
1126 | 1056 |
1127 static void | 1057 static void |
1128 closechan(Hchan *c, void *pc) | 1058 closechan(Hchan *c, void *pc) |
1129 { | 1059 { |
1130 SudoG *sg; | 1060 SudoG *sg; |
1131 bool wake; | 1061 bool wake; |
1132 | 1062 |
1133 if(c == nil) | 1063 if(c == nil) |
1134 runtime·panicstring("close of nil channel"); | 1064 runtime·panicstring("close of nil channel"); |
1135 | 1065 |
1136 » runtime·lock(c); | 1066 » runtime·lock(&c->lock); |
1137 if(c->closed) { | 1067 if(c->closed) { |
1138 » » runtime·unlock(c); | 1068 » » runtime·unlock(&c->lock); |
1139 runtime·panicstring("close of closed channel"); | 1069 runtime·panicstring("close of closed channel"); |
1140 } | 1070 } |
1141 | 1071 |
1142 if(raceenabled) { | 1072 if(raceenabled) { |
1143 runtime·racewritepc(c, pc, runtime·closechan); | 1073 runtime·racewritepc(c, pc, runtime·closechan); |
1144 runtime·racerelease(c); | 1074 runtime·racerelease(c); |
1145 } | 1075 } |
1146 | 1076 |
1147 c->closed = true; | 1077 c->closed = true; |
1148 | 1078 |
(...skipping 14 matching lines...) Expand all Loading... |
1163 sg = dequeue(&c->sendq, CLOSED, &wake); | 1093 sg = dequeue(&c->sendq, CLOSED, &wake); |
1164 if(sg == nil) | 1094 if(sg == nil) |
1165 break; | 1095 break; |
1166 if(wake) { | 1096 if(wake) { |
1167 if(sg->releasetime) | 1097 if(sg->releasetime) |
1168 sg->releasetime = runtime·cputicks(); | 1098 sg->releasetime = runtime·cputicks(); |
1169 runtime·ready(sg->g); | 1099 runtime·ready(sg->g); |
1170 } | 1100 } |
1171 } | 1101 } |
1172 | 1102 |
1173 » runtime·unlock(c); | 1103 » runtime·unlock(&c->lock); |
1174 } | 1104 } |
1175 | 1105 |
1176 func reflect·chanlen(c *Hchan) (len int) { | 1106 func reflect·chanlen(c *Hchan) (len int) { |
1177 if(c == nil) | 1107 if(c == nil) |
1178 len = 0; | 1108 len = 0; |
1179 else | 1109 else |
1180 len = c->qcount; | 1110 len = c->qcount; |
1181 } | 1111 } |
1182 | 1112 |
1183 func reflect·chancap(c *Hchan) (cap int) { | 1113 func reflect·chancap(c *Hchan) (cap int) { |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1283 } | 1213 } |
1284 | 1214 |
1285 static void | 1215 static void |
1286 racesync(Hchan *c, SudoG *sg) | 1216 racesync(Hchan *c, SudoG *sg) |
1287 { | 1217 { |
1288 runtime·racerelease(chanbuf(c, 0)); | 1218 runtime·racerelease(chanbuf(c, 0)); |
1289 runtime·raceacquireg(sg->g, chanbuf(c, 0)); | 1219 runtime·raceacquireg(sg->g, chanbuf(c, 0)); |
1290 runtime·racereleaseg(sg->g, chanbuf(c, 0)); | 1220 runtime·racereleaseg(sg->g, chanbuf(c, 0)); |
1291 runtime·raceacquire(chanbuf(c, 0)); | 1221 runtime·raceacquire(chanbuf(c, 0)); |
1292 } | 1222 } |
LEFT | RIGHT |