Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(161)

Delta Between Two Patch Sets: src/pkg/runtime/chan.goc

Issue 112990043: code review 112990043: runtime: fine-grained locking in select
Left Patch Set: diff -r 67f9ef140028 https://dvyukov%40google.com@code.google.com/p/go/ Created 9 years, 8 months ago
Right Patch Set: diff -r 03b003455359b09fff0f1662255dc5fe10b93290 https://dvyukov%40google.com@code.google.com/p/go/ Created 9 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « src/pkg/runtime/chan.go ('k') | src/pkg/runtime/runtime.h » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 package runtime 5 package runtime
6 #include "runtime.h" 6 #include "runtime.h"
7 #include "arch_GOARCH.h" 7 #include "arch_GOARCH.h"
8 #include "type.h" 8 #include "type.h"
9 #include "race.h" 9 #include "race.h"
10 #include "malloc.h" 10 #include "malloc.h"
11 #include "chan.h" 11 #include "chan.h"
12 #include "mgc0.h"
13 #include "typekind.h"
12 #include "../../cmd/ld/textflag.h" 14 #include "../../cmd/ld/textflag.h"
13
14 uint32 runtime·Hchansize = sizeof(Hchan);
15
16 // Outline of the synchronization protocols.
17 //
18 // Single channel sync send/recv first check the opposite wait queue (WaitQ).
19 // If the queue is not empty, it dequeues a waiter (SudoG), copies the data
20 // directly to/from the waiter buffer and wakes up the waiter.
21 // If the queue is empty, it queues itself into wait queue and parks. When it is
22 // woken up, the operation is already executed, so it just returns.
23 // SudoG.signal is set to nil before parking and is left intact during wake up.
24 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine acts accor dingly.
25 //
26 // Signle channel async send/recv first check whether the operation can be execu ted
27 // (buffer is not full/empty, respectively). If the operation can be executed,
28 // it copies data to/from the buffer, adjusts indices and wakes up an opposite
29 // waiter if there is any. If the operation cannot be executed at the moment,
30 // it queues itself into wait queue and parks. When it is woken up, it retries
31 // from the beginning.
32 // SudoG.signal is set to nil before parking and is left intact during wake up.
33 // Close sets SudoG.signal to CLOSED, in such case woken up goroutine
34 // accomplishes the operation w/o retry.
35 //
36 // Select operations proceed in 3 passes. In the pass 1 channels are checked one -by-one;
37 // if some channel is ready, the operation is executed (as outlined above for
38 // single channel operations) and select returns. In the pass 2 rediness of the
39 // channels is re-checked and select queues itself as waiter on all channels.
40 // If no channel turned out to be ready in pass 2, select blocks. In the pass 3
41 // (after wake up) select dequeues itself from unsuccessful channels. If it is
42 // woken up by an operation on a sync channel or close, select returns. Otherwis e
43 // it retries from the beginning.
44 // SudoG.signal for all channels is setup to point to a single signal variable,
45 // signal variable is set to PREWAIT before enqueue. The signal variable is oper ated
46 // with CAS and allows to resolve competition between concurrent wakers and the
47 // select goroutine. When SudoG is removed from a wait queue, SudoG.signal is se t to nil;
48 // this allows select to skip dequeueing it again. Below is the signal state mac hine:
49 //
50 // PREWAIT -> WAIT: select commits to park
51 //
52 // PREWAIT -> sg|CLOSED: sg chan is closed, select is not woken up
53 // PREWAIT -> READY: an async chan is ready or select discovers a ready chan dur ing pass 2, select is not woken up
54 // PREWAIT -> SYNC: a sync chan is ready but the operation is not yet completed, select is not woken up
55 //
56 // WAIT -> sg|CLOSED: the same as above, but select is woken up
57 // WAIT -> READY:
58 // WAIT -> SYNC:
59 //
60 // SYNC -> sg: operation on the sync chan is completed now
61 15
62 #define PREWAIT ((SudoG*)0) 16 #define PREWAIT ((SudoG*)0)
63 #define CLOSED ((SudoG*)1) 17 #define CLOSED ((SudoG*)1)
64 #define WAIT ((SudoG*)2) 18 #define WAIT ((SudoG*)2)
65 #define READY ((SudoG*)3) 19 #define READY ((SudoG*)3)
66 #define SYNC ((SudoG*)4) 20 #define SYNC ((SudoG*)4)
67 21
68 static void dequeueg(WaitQ*); 22 static void dequeueg(WaitQ*);
69 static SudoG* dequeue(WaitQ*, SudoG*, bool*); 23 static SudoG* dequeue(WaitQ*, SudoG*, bool*);
70 static bool candequeue(WaitQ*); 24 static bool candequeue(WaitQ*);
71 static void dequeuesel(Select*, int32); 25 static void dequeuesel(Select*, int32);
72 static void enqueue(WaitQ*, SudoG*); 26 static void enqueue(WaitQ*, SudoG*);
73 static void destroychan(Hchan*);
74 static void racesync(Hchan*, SudoG*); 27 static void racesync(Hchan*, SudoG*);
75
76 static Hchan*
77 makechan(ChanType *t, int64 hint)
78 {
79 Hchan *c;
80 Type *elem;
81
82 elem = t->elem;
83
84 // compiler checks this but be safe.
85 if(elem->size >= (1<<16))
86 runtime·throw("makechan: invalid channel element type");
87 if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN)
88 runtime·throw("makechan: bad alignment");
89
90 if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > (MaxMem - sizeof(*c)) / elem->size))
91 runtime·panicstring("makechan: size out of range");
92
93 // allocate memory in one call
94 c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0);
95 c->elemsize = elem->size;
96 c->elemtype = elem;
97 c->dataqsiz = hint;
98
99 if(debug)
100 runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; data qsiz=%D\n",
101 c, (int64)elem->size, elem->alg, (int64)c->dataqsiz);
102
103 return c;
104 }
105
106 func reflect·makechan(t *ChanType, size uint64) (c *Hchan) {
107 c = makechan(t, size);
108 }
109
110 func makechan(t *ChanType, size int64) (c *Hchan) {
111 c = makechan(t, size);
112 }
113 28
114 // generic single channel send 29 // generic single channel send
115 static bool 30 static bool
116 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc) 31 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
117 { 32 {
118 SudoG *sg, **signalp; 33 SudoG *sg, **signalp;
119 SudoG mysg; 34 SudoG mysg;
120 int64 t0; 35 int64 t0;
121 bool wake; 36 bool wake;
122 37
123 if(raceenabled) 38 if(raceenabled)
124 runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), c hansend); 39 runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), c hansend);
125 40
126 if(c == nil) { 41 if(c == nil) {
127 USED(t); 42 USED(t);
128 if(!block) 43 if(!block)
129 return false; 44 return false;
130 » » runtime·park(nil, nil, "chan send (nil chan)"); 45 » » runtime·park(nil, nil, runtime·gostringnocopy((byte*)"chan send (nil chan)"));
131 return false; // not reached 46 return false; // not reached
132 } 47 }
133 48
134 if(debug) { 49 if(debug) {
135 runtime·printf("chansend: chan=%p; elem=", c); 50 runtime·printf("chansend: chan=%p; elem=", c);
136 c->elemtype->alg->print(c->elemsize, ep); 51 c->elemtype->alg->print(c->elemsize, ep);
137 runtime·prints("\n"); 52 runtime·prints("\n");
138 } 53 }
139 54
140 if(raceenabled) 55 if(raceenabled)
141 runtime·racereadpc(c, pc, chansend); 56 runtime·racereadpc(c, pc, chansend);
142 57
143 » // Fast path: if it's a non-blocking operation that can't proceed, 58 » // Fast path: check for failed non-blocking operation without acquiring the lock.
144 » // then we can return w/o acquiring the lock. 59 » //
60 » // After observing that the channel is not closed, we observe that the c hannel is
61 » // not ready for sending. Each of these observations is a single word-si zed read
62 » // (first c.closed and second c.recvq.first or c.qcount depending on kin d of channel).
63 » // Because a closed channel cannot transition from 'ready for sending' t o
64 » // 'not ready for sending', even if the channel is closed between the tw o observations,
65 » // they imply a moment between the two when the channel was both not yet closed
66 » // and not ready for sending. We behave as if we observed the channel at that moment,
67 » // and report that the send cannot proceed.
68 » //
69 » // It is okay if the reads are reordered here: if we observe that the ch annel is not
70 » // ready for sending and then observe that it is not closed, that implie s that the
71 » // channel wasn't closed during the first observation.
145 if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil) || 72 if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil) ||
146 (c->dataqsiz > 0 && c->qcount == c->dataqsiz))) 73 (c->dataqsiz > 0 && c->qcount == c->dataqsiz)))
147 return false; 74 return false;
148 75
149 t0 = 0; 76 t0 = 0;
150 mysg.releasetime = 0; 77 mysg.releasetime = 0;
151 if(runtime·blockprofilerate > 0) { 78 if(runtime·blockprofilerate > 0) {
152 t0 = runtime·cputicks(); 79 t0 = runtime·cputicks();
153 mysg.releasetime = -1; 80 mysg.releasetime = -1;
154 } 81 }
155 82
156 » runtime·lock(c); 83 » runtime·lock(&c->lock);
157 if(c->closed) 84 if(c->closed)
158 goto closed; 85 goto closed;
159 86
160 if(c->dataqsiz > 0) 87 if(c->dataqsiz > 0)
161 goto asynch; 88 goto asynch;
162 89
163 sg = dequeue(&c->recvq, SYNC, &wake); 90 sg = dequeue(&c->recvq, SYNC, &wake);
164 if(sg != nil) { 91 if(sg != nil) {
165 if(raceenabled) 92 if(raceenabled)
166 racesync(c, sg); 93 racesync(c, sg);
167 // Disable preemption. If we communicate with a select, 94 // Disable preemption. If we communicate with a select,
168 // it can be still running. It such case completion of the opera tion 95 // it can be still running. It such case completion of the opera tion
169 // is the store to sg->signal below. We don't want this goroutin e 96 // is the store to sg->signal below. We don't want this goroutin e
170 // to be preempted until that store. Otherwise the select would 97 // to be preempted until that store. Otherwise the select would
171 // need to spin in while(signal == SYNC) using a mix of osyield 98 // need to spin in while(signal == SYNC) using a mix of osyield
172 // and gosched. 99 // and gosched.
173 g->m->locks++; 100 g->m->locks++;
174 » » runtime·unlock(c); 101 » » runtime·unlock(&c->lock);
175 102
176 if(sg->elem != nil) 103 if(sg->elem != nil)
177 c->elemtype->alg->copy(c->elemsize, sg->elem, ep); 104 c->elemtype->alg->copy(c->elemsize, sg->elem, ep);
178 if((signalp = sg->signal) != nil) { 105 if((signalp = sg->signal) != nil) {
179 sg->signal = nil; 106 sg->signal = nil;
180 runtime·atomicstorep(signalp, sg); 107 runtime·atomicstorep(signalp, sg);
181 } 108 }
182 if(wake) { 109 if(wake) {
183 if(sg->releasetime) 110 if(sg->releasetime)
184 sg->releasetime = runtime·cputicks(); 111 sg->releasetime = runtime·cputicks();
185 runtime·ready(sg->g); 112 runtime·ready(sg->g);
186 } 113 }
187 g->m->locks--; 114 g->m->locks--;
188 return true; 115 return true;
189 } 116 }
190 117
191 if(!block) { 118 if(!block) {
192 » » runtime·unlock(c); 119 » » runtime·unlock(&c->lock);
193 return false; 120 return false;
194 } 121 }
195 122
196 mysg.elem = ep; 123 mysg.elem = ep;
197 mysg.g = g; 124 mysg.g = g;
198 mysg.signal = nil; 125 mysg.signal = nil;
199 enqueue(&c->sendq, &mysg); 126 enqueue(&c->sendq, &mysg);
200 » runtime·parkunlock(c, "chan send"); 127 » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan send")) ;
201 128
202 if(mysg.signal != nil) { 129 if(mysg.signal != nil) {
203 » » runtime·lock(c); 130 » » runtime·lock(&c->lock);
204 if(!c->closed) 131 if(!c->closed)
205 runtime·throw("chansend: spurious wakeup"); 132 runtime·throw("chansend: spurious wakeup");
206 goto closed; 133 goto closed;
207 } 134 }
208 135
209 if(mysg.releasetime > 0) 136 if(mysg.releasetime > 0)
210 runtime·blockevent(mysg.releasetime - t0, 2); 137 runtime·blockevent(mysg.releasetime - t0, 2);
211 138
212 return true; 139 return true;
213 140
214 asynch: 141 asynch:
215 if(c->closed) 142 if(c->closed)
216 goto closed; 143 goto closed;
217 144
218 if(c->qcount >= c->dataqsiz) { 145 if(c->qcount >= c->dataqsiz) {
219 if(!block) { 146 if(!block) {
220 » » » runtime·unlock(c); 147 » » » runtime·unlock(&c->lock);
221 return false; 148 return false;
222 } 149 }
223 mysg.g = g; 150 mysg.g = g;
224 mysg.elem = nil; 151 mysg.elem = nil;
225 mysg.signal = nil; 152 mysg.signal = nil;
226 enqueue(&c->sendq, &mysg); 153 enqueue(&c->sendq, &mysg);
227 » » runtime·parkunlock(c, "chan send"); 154 » » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan send"));
228 155
229 » » runtime·lock(c); 156 » » runtime·lock(&c->lock);
230 goto asynch; 157 goto asynch;
231 } 158 }
232 159
233 if(raceenabled) { 160 if(raceenabled) {
234 runtime·raceacquire(chanbuf(c, c->sendx)); 161 runtime·raceacquire(chanbuf(c, c->sendx));
235 runtime·racerelease(chanbuf(c, c->sendx)); 162 runtime·racerelease(chanbuf(c, c->sendx));
236 } 163 }
237 164
238 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), ep); 165 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
239 if(++c->sendx == c->dataqsiz) 166 if(++c->sendx == c->dataqsiz)
240 c->sendx = 0; 167 c->sendx = 0;
241 c->qcount++; 168 c->qcount++;
242 169
243 sg = dequeue(&c->recvq, READY, &wake); 170 sg = dequeue(&c->recvq, READY, &wake);
244 » runtime·unlock(c); 171 » runtime·unlock(&c->lock);
245 if(wake) { 172 if(wake) {
246 if(sg->releasetime) 173 if(sg->releasetime)
247 sg->releasetime = runtime·cputicks(); 174 sg->releasetime = runtime·cputicks();
248 runtime·ready(sg->g); 175 runtime·ready(sg->g);
249 } 176 }
250 if(mysg.releasetime > 0) 177 if(mysg.releasetime > 0)
251 runtime·blockevent(mysg.releasetime - t0, 2); 178 runtime·blockevent(mysg.releasetime - t0, 2);
252 return true; 179 return true;
253 180
254 closed: 181 closed:
255 » runtime·unlock(c); 182 » runtime·unlock(&c->lock);
256 runtime·panicstring("send on closed channel"); 183 runtime·panicstring("send on closed channel");
257 return false; // not reached 184 return false; // not reached
258 } 185 }
259 186
260 // generic single channel recv 187 // generic single channel recv
261 static bool 188 static bool
262 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received) 189 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
263 { 190 {
264 SudoG *sg, **signalp; 191 SudoG *sg, **signalp;
265 SudoG mysg; 192 SudoG mysg;
266 int64 t0; 193 int64 t0;
267 bool wake; 194 bool wake;
268 195
269 // raceenabled: don't need to check ep, as it is always on the stack. 196 // raceenabled: don't need to check ep, as it is always on the stack.
270 197
271 if(debug) 198 if(debug)
272 runtime·printf("chanrecv: chan=%p\n", c); 199 runtime·printf("chanrecv: chan=%p\n", c);
273 200
274 if(c == nil) { 201 if(c == nil) {
275 USED(t); 202 USED(t);
276 if(!block) 203 if(!block)
277 return false; 204 return false;
278 » » runtime·park(nil, nil, "chan receive (nil chan)"); 205 » » runtime·park(nil, nil, runtime·gostringnocopy((byte*)"chan recei ve (nil chan)"));
279 return false; // not reached 206 return false; // not reached
280 } 207 }
281 208
282 » // Fast path: if it's a non-blocking operation that can't proceed, 209 » // Fast path: check for failed non-blocking operation without acquiring the lock.
283 » // then we can return w/o acquiring the lock. 210 » //
284 » if(!block && !c->closed && ((c->dataqsiz == 0 && c->sendq.first == nil) || 211 » // After observing that the channel is not ready for receiving, we obser ve that the
285 » » (c->dataqsiz > 0 && c->qcount == 0))) 212 » // channel is not closed. Each of these observations is a single word-si zed read
213 » // (first c.sendq.first or c.qcount, and second c.closed).
214 » // Because a channel cannot be reopened, the later observation of the ch annel
215 » // being not closed implies that it was also not closed at the moment of the
216 » // first observation. We behave as if we observed the channel at that mo ment
217 » // and report that the receive cannot proceed.
218 » //
219 » // The order of operations is important here: reversing the operations c an lead to
220 » // incorrect behavior when racing with a close.
221 » if(!block && ((c->dataqsiz == 0 && c->sendq.first == nil) ||
222 » » (c->dataqsiz > 0 && runtime·atomicloadp((void**)&c->qcount) == 0 )) &&
223 » » !runtime·atomicload(&c->closed))
286 return false; 224 return false;
287 225
288 t0 = 0; 226 t0 = 0;
289 mysg.releasetime = 0; 227 mysg.releasetime = 0;
290 if(runtime·blockprofilerate > 0) { 228 if(runtime·blockprofilerate > 0) {
291 t0 = runtime·cputicks(); 229 t0 = runtime·cputicks();
292 mysg.releasetime = -1; 230 mysg.releasetime = -1;
293 } 231 }
294 232
295 » runtime·lock(c); 233 » runtime·lock(&c->lock);
296 if(c->dataqsiz > 0) 234 if(c->dataqsiz > 0)
297 goto asynch; 235 goto asynch;
298 236
299 if(c->closed) { 237 if(c->closed) {
300 » » runtime·unlock(c); 238 » » runtime·unlock(&c->lock);
301 goto closed; 239 goto closed;
302 } 240 }
303 241
304 sg = dequeue(&c->sendq, SYNC, &wake); 242 sg = dequeue(&c->sendq, SYNC, &wake);
305 if(sg != nil) { 243 if(sg != nil) {
306 if(raceenabled) 244 if(raceenabled)
307 racesync(c, sg); 245 racesync(c, sg);
308 g->m->locks++; 246 g->m->locks++;
309 » » runtime·unlock(c); 247 » » runtime·unlock(&c->lock);
310 248
311 if(ep != nil) 249 if(ep != nil)
312 c->elemtype->alg->copy(c->elemsize, ep, sg->elem); 250 c->elemtype->alg->copy(c->elemsize, ep, sg->elem);
313 if((signalp = sg->signal) != nil) { 251 if((signalp = sg->signal) != nil) {
314 sg->signal = nil; 252 sg->signal = nil;
315 runtime·atomicstorep(signalp, sg); 253 runtime·atomicstorep(signalp, sg);
316 } 254 }
317 if(wake) { 255 if(wake) {
318 if(sg->releasetime) 256 if(sg->releasetime)
319 sg->releasetime = runtime·cputicks(); 257 sg->releasetime = runtime·cputicks();
320 runtime·ready(sg->g); 258 runtime·ready(sg->g);
321 } 259 }
322 260
323 if(received != nil) 261 if(received != nil)
324 *received = true; 262 *received = true;
325 g->m->locks--; 263 g->m->locks--;
326 return true; 264 return true;
327 } 265 }
328 266
329 if(!block) { 267 if(!block) {
330 » » runtime·unlock(c); 268 » » runtime·unlock(&c->lock);
331 return false; 269 return false;
332 } 270 }
333 271
334 mysg.elem = ep; 272 mysg.elem = ep;
335 mysg.g = g; 273 mysg.g = g;
336 mysg.signal = nil; 274 mysg.signal = nil;
337 enqueue(&c->recvq, &mysg); 275 enqueue(&c->recvq, &mysg);
338 » runtime·parkunlock(c, "chan receive"); 276 » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan receive "));
339 277
340 if(mysg.signal != nil) { 278 if(mysg.signal != nil) {
341 if(!c->closed) 279 if(!c->closed)
342 runtime·throw("chanrecv: spurious wakeup"); 280 runtime·throw("chanrecv: spurious wakeup");
343 goto closed; 281 goto closed;
344 } 282 }
345 283
346 if(received != nil) 284 if(received != nil)
347 *received = true; 285 *received = true;
348 if(mysg.releasetime > 0) 286 if(mysg.releasetime > 0)
349 runtime·blockevent(mysg.releasetime - t0, 2); 287 runtime·blockevent(mysg.releasetime - t0, 2);
350 return true; 288 return true;
351 289
352 asynch: 290 asynch:
353 if(c->qcount <= 0) { 291 if(c->qcount <= 0) {
354 if(c->closed) { 292 if(c->closed) {
355 » » » runtime·unlock(c); 293 » » » runtime·unlock(&c->lock);
356 goto closed; 294 goto closed;
357 } 295 }
358 296
359 if(!block) { 297 if(!block) {
360 » » » runtime·unlock(c); 298 » » » runtime·unlock(&c->lock);
361 if(received != nil) 299 if(received != nil)
362 *received = false; 300 *received = false;
363 return false; 301 return false;
364 } 302 }
365 mysg.g = g; 303 mysg.g = g;
366 mysg.elem = nil; 304 mysg.elem = nil;
367 mysg.signal = nil; 305 mysg.signal = nil;
368 enqueue(&c->recvq, &mysg); 306 enqueue(&c->recvq, &mysg);
369 » » runtime·parkunlock(c, "chan receive"); 307 » » runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan receive"));
370 308
371 if(mysg.signal != nil) { 309 if(mysg.signal != nil) {
372 if(!c->closed) 310 if(!c->closed)
373 runtime·throw("chanrecv: spurious wakeup"); 311 runtime·throw("chanrecv: spurious wakeup");
374 goto closed; 312 goto closed;
375 } 313 }
376 » » runtime·lock(c); 314 » » runtime·lock(&c->lock);
377 goto asynch; 315 goto asynch;
378 } 316 }
379 317
380 if(raceenabled) { 318 if(raceenabled) {
381 runtime·raceacquire(chanbuf(c, c->recvx)); 319 runtime·raceacquire(chanbuf(c, c->recvx));
382 runtime·racerelease(chanbuf(c, c->recvx)); 320 runtime·racerelease(chanbuf(c, c->recvx));
383 } 321 }
384 322
385 if(ep != nil) 323 if(ep != nil)
386 c->elemtype->alg->copy(c->elemsize, ep, chanbuf(c, c->recvx)); 324 c->elemtype->alg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
387 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); 325 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
388 if(++c->recvx == c->dataqsiz) 326 if(++c->recvx == c->dataqsiz)
389 c->recvx = 0; 327 c->recvx = 0;
390 c->qcount--; 328 c->qcount--;
391 329
392 sg = dequeue(&c->sendq, READY, &wake); 330 sg = dequeue(&c->sendq, READY, &wake);
393 » runtime·unlock(c); 331 » runtime·unlock(&c->lock);
394 if(wake) { 332 if(wake) {
395 if(sg->releasetime) 333 if(sg->releasetime)
396 sg->releasetime = runtime·cputicks(); 334 sg->releasetime = runtime·cputicks();
397 runtime·ready(sg->g); 335 runtime·ready(sg->g);
398 } 336 }
399 337
400 if(received != nil) 338 if(received != nil)
401 *received = true; 339 *received = true;
402 if(mysg.releasetime > 0) 340 if(mysg.releasetime > 0)
403 runtime·blockevent(mysg.releasetime - t0, 2); 341 runtime·blockevent(mysg.releasetime - t0, 2);
404 return true; 342 return true;
405 343
406 closed: 344 closed:
407 if(ep != nil) 345 if(ep != nil)
408 c->elemtype->alg->copy(c->elemsize, ep, nil); 346 c->elemtype->alg->copy(c->elemsize, ep, nil);
409 if(received != nil) 347 if(received != nil)
410 *received = false; 348 *received = false;
411 if(raceenabled) 349 if(raceenabled)
412 runtime·raceacquire(c); 350 runtime·raceacquire(c);
413 if(mysg.releasetime > 0) 351 if(mysg.releasetime > 0)
414 runtime·blockevent(mysg.releasetime - t0, 2); 352 runtime·blockevent(mysg.releasetime - t0, 2);
415 return true; 353 return true;
416 }
417
418 #pragma textflag NOSPLIT
419 func chansend1(t *ChanType, c *Hchan, elem *byte) {
420 chansend(t, c, elem, true, runtime·getcallerpc(&t));
421 } 354 }
422 355
423 #pragma textflag NOSPLIT 356 #pragma textflag NOSPLIT
424 func chanrecv1(t *ChanType, c *Hchan, elem *byte) { 357 func chanrecv1(t *ChanType, c *Hchan, elem *byte) {
425 chanrecv(t, c, elem, true, nil); 358 chanrecv(t, c, elem, true, nil);
426 } 359 }
427 360
428 // chanrecv2(hchan *chan any, elem *any) (received bool); 361 // chanrecv2(hchan *chan any, elem *any) (received bool);
429 #pragma textflag NOSPLIT 362 #pragma textflag NOSPLIT
430 func chanrecv2(t *ChanType, c *Hchan, elem *byte) (received bool) { 363 func chanrecv2(t *ChanType, c *Hchan, elem *byte) (received bool) {
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
500 #pragma textflag NOSPLIT 433 #pragma textflag NOSPLIT
501 func reflect·chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool ) { 434 func reflect·chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool ) {
502 selected = chansend(t, c, elem, !nb, runtime·getcallerpc(&t)); 435 selected = chansend(t, c, elem, !nb, runtime·getcallerpc(&t));
503 } 436 }
504 437
505 func reflect·chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool , received bool) { 438 func reflect·chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool , received bool) {
506 received = false; 439 received = false;
507 selected = chanrecv(t, c, elem, !nb, &received); 440 selected = chanrecv(t, c, elem, !nb, &received);
508 } 441 }
509 442
510 static Select* newselect(int32); 443 static int64
511 444 selectsize(int32 size)
512 #pragma textflag NOSPLIT 445 {
513 func newselect(size int32) (sel *byte) {
514 » sel = (byte*)newselect(size);
515 }
516
517 static Select*
518 newselect(int32 size)
519 {
520 » int32 n;
521 Select *sel; 446 Select *sel;
522 447 » int64 selsize;
523 » n = 0; 448
524 » if(size > 1) 449 » selsize = sizeof(*sel) +
525 » » n = size-1; 450 » » (size-1)*sizeof(sel->scase[0]) +
526 451 » » size*sizeof(sel->pollorder[0]);
527 » // allocate all the memory we need in a single allocation 452 » return ROUND(selsize, Int64Align);
528 » // start with Select with size cases 453 }
529 » // then lockorder with size entries 454
530 » // then pollorder with size entries 455 #pragma textflag NOSPLIT
531 » sel = runtime·mal(sizeof(*sel) + 456 func newselect(sel *Select, selsize int64, size int32) {
532 » » n*sizeof(sel->scase[0]) + 457 » if(selsize != selectsize(size)) {
533 » » size*sizeof(sel->lockorder[0]) + 458 » » runtime·printf("runtime: bad select size %D, want %D\n", selsize , selectsize(size));
534 » » size*sizeof(sel->pollorder[0])); 459 » » runtime·throw("bad select size");
535 460 » }
536 sel->tcase = size; 461 sel->tcase = size;
537 sel->ncase = 0; 462 sel->ncase = 0;
538 sel->pollorder = (void*)(sel->scase + size); 463 sel->pollorder = (void*)(sel->scase + size);
539 464
540 if(debug) 465 if(debug)
541 runtime·printf("newselect s=%p size=%d\n", sel, size); 466 runtime·printf("newselect s=%p size=%d\n", sel, size);
542 return sel;
543 } 467 }
544 468
545 // cut in half to give stack a chance to split 469 // cut in half to give stack a chance to split
546 static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so); 470 static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so);
547 471
548 #pragma textflag NOSPLIT 472 #pragma textflag NOSPLIT
549 func selectsend(sel *Select, c *Hchan, elem *byte) (selected bool) { 473 func selectsend(sel *Select, c *Hchan, elem *byte) (selected bool) {
550 selected = false; 474 selected = false;
551 475
552 // nil cases do not compete 476 // nil cases do not compete
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
654 } 578 }
655 579
656 static bool 580 static bool
657 selparkcommit(G *gp, void *signal) 581 selparkcommit(G *gp, void *signal)
658 { 582 {
659 USED(gp); 583 USED(gp);
660 return runtime·casp(signal, PREWAIT, WAIT); 584 return runtime·casp(signal, PREWAIT, WAIT);
661 } 585 }
662 586
663 func block() { 587 func block() {
664 » runtime·park(nil, nil, "select (no cases)");» // forever 588 » runtime·park(nil, nil, runtime·gostringnocopy((byte*)"select (no cases)" ));» // forever
665 } 589 }
666 590
667 static void* selectgo(Select**); 591 static void* selectgo(Select**);
668 592
669 // selectgo(sel *byte); 593 // selectgo(sel *byte);
670 // 594 //
671 // overwrites return pc on stack to signal which case of the select 595 // overwrites return pc on stack to signal which case of the select
672 // to run, so cannot appear at the top of a split stack. 596 // to run, so cannot appear at the top of a split stack.
673 #pragma textflag NOSPLIT 597 #pragma textflag NOSPLIT
674 func selectgo(sel *Select) { 598 func selectgo(sel *Select) {
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
727 // the race will be resolved in pass 2. 651 // the race will be resolved in pass 2.
728 dfl = nil; 652 dfl = nil;
729 dequeuepos = 0; 653 dequeuepos = 0;
730 for(i=0; i<sel->ncase; i++) { 654 for(i=0; i<sel->ncase; i++) {
731 o = sel->pollorder[i]; 655 o = sel->pollorder[i];
732 cas = &sel->scase[o]; 656 cas = &sel->scase[o];
733 c = cas->chan; 657 c = cas->chan;
734 658
735 switch(cas->kind) { 659 switch(cas->kind) {
736 case CaseRecv: 660 case CaseRecv:
737 » » » // fast path: check ready predicate w/o lock 661 » » » // Fast path: check ready predicate w/o lock.
738 » » » if((c->dataqsiz > 0 && c->qcount > 0) || (c->dataqsiz == 0 && c->sendq.first != nil) || c->closed) { 662 » » » // Ordering of loads is important here. See the "Fast pa th" comment in chanrecv.
739 » » » » runtime·lock(c); 663 » » » if((c->dataqsiz > 0 && (uintgo)runtime·atomicloadp((void **)&c->qcount) > 0) ||
664 » » » » » (c->dataqsiz == 0 && c->sendq.first != n il) ||
665 » » » » » runtime·atomicload(&c->closed) != 0) {
666 » » » » runtime·lock(&c->lock);
740 if(c->dataqsiz > 0 && c->qcount > 0) 667 if(c->dataqsiz > 0 && c->qcount > 0)
741 goto asyncrecv; 668 goto asyncrecv;
742 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq, SYNC, &wake)) != nil) 669 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq, SYNC, &wake)) != nil)
743 goto syncrecv; 670 goto syncrecv;
744 if(c->closed) { 671 if(c->closed) {
745 » » » » » runtime·unlock(c); 672 » » » » » runtime·unlock(&c->lock);
746 goto rclose; 673 goto rclose;
747 } 674 }
748 » » » » runtime·unlock(c); 675 » » » » runtime·unlock(&c->lock);
749 } 676 }
750 break; 677 break;
751 678
752 case CaseSend: 679 case CaseSend:
753 if(raceenabled) 680 if(raceenabled)
754 runtime·racereadpc(c, cas->pc, chansend); 681 runtime·racereadpc(c, cas->pc, chansend);
755 » » » if(c->closed || (c->dataqsiz > 0 && c->qcount < c->dataq siz) || (c->dataqsiz == 0 && c->recvq.first != nil)) { 682 » » » if(c->closed != 0 ||
756 » » » » runtime·lock(c); 683 » » » » » (c->dataqsiz > 0 && c->qcount < c->dataq siz) ||
684 » » » » » (c->dataqsiz == 0 && c->recvq.first != n il)) {
685 » » » » runtime·lock(&c->lock);
757 if(c->closed) { 686 if(c->closed) {
758 » » » » » runtime·unlock(c); 687 » » » » » runtime·unlock(&c->lock);
759 goto sclose; 688 goto sclose;
760 } 689 }
761 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz) 690 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz)
762 goto asyncsend; 691 goto asyncsend;
763 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq, SYNC, &wake)) != nil) 692 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq, SYNC, &wake)) != nil)
764 goto syncsend; 693 goto syncsend;
765 » » » » runtime·unlock(c); 694 » » » » runtime·unlock(&c->lock);
766 } 695 }
767 break; 696 break;
768 697
769 case CaseDefault: 698 case CaseDefault:
770 dfl = cas; 699 dfl = cas;
771 break; 700 break;
772 } 701 }
773 } 702 }
774 703
775 if(dfl != nil) { 704 if(dfl != nil) {
(...skipping 10 matching lines...) Expand all
786 goto ready; 715 goto ready;
787 o = sel->pollorder[dequeuepos]; 716 o = sel->pollorder[dequeuepos];
788 cas = &sel->scase[o]; 717 cas = &sel->scase[o];
789 c = cas->chan; 718 c = cas->chan;
790 sg = &cas->sg; 719 sg = &cas->sg;
791 sg->g = g; 720 sg->g = g;
792 sg->signal = &signal; 721 sg->signal = &signal;
793 722
794 switch(cas->kind) { 723 switch(cas->kind) {
795 case CaseRecv: 724 case CaseRecv:
796 » » » runtime·lock(c); 725 » » » runtime·lock(&c->lock);
797 if(c->closed || (c->dataqsiz > 0 && c->qcount > 0) || 726 if(c->closed || (c->dataqsiz > 0 && c->qcount > 0) ||
798 (c->dataqsiz == 0 && c->sendq.first != nil && ca ndequeue(&c->sendq))) { 727 (c->dataqsiz == 0 && c->sendq.first != nil && ca ndequeue(&c->sendq))) {
799 // Before executing send/recv on this chan we mu st disarm the select. 728 // Before executing send/recv on this chan we mu st disarm the select.
800 // Otherwise it can happen so that we execute se nd/recv on this chan, 729 // Otherwise it can happen so that we execute se nd/recv on this chan,
801 // and another goroutine accomplishes send/recv on a sync chan 730 // and another goroutine accomplishes send/recv on a sync chan
802 // with this select as well. The result would be completion of two 731 // with this select as well. The result would be completion of two
803 // communications in the single select. 732 // communications in the single select.
804 if(!runtime·casp(&signal, nil, READY)) { 733 if(!runtime·casp(&signal, nil, READY)) {
805 // Already signaled on another chan. 734 // Already signaled on another chan.
806 » » » » » runtime·unlock(c); 735 » » » » » runtime·unlock(&c->lock);
807 goto ready; 736 goto ready;
808 } 737 }
809 if(c->dataqsiz > 0 && c->qcount > 0) 738 if(c->dataqsiz > 0 && c->qcount > 0)
810 goto asyncrecv; 739 goto asyncrecv;
811 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq, SYNC, &wake)) != nil) 740 if(c->dataqsiz == 0 && (sg = dequeue(&c->sendq, SYNC, &wake)) != nil)
812 goto syncrecv; 741 goto syncrecv;
813 if(c->closed) { 742 if(c->closed) {
814 » » » » » runtime·unlock(c); 743 » » » » » runtime·unlock(&c->lock);
815 goto rclose; 744 goto rclose;
816 } 745 }
817 // We can reach this point iff candequeue above returned true, 746 // We can reach this point iff candequeue above returned true,
818 // but meanwhile a select sudog in the wait queu e has been signaled 747 // but meanwhile a select sudog in the wait queu e has been signaled
819 // on a different chan. In such unlikely case, 748 // on a different chan. In such unlikely case,
820 // we just retry the whole algorithm. 749 // we just retry the whole algorithm.
821 » » » » runtime·unlock(c); 750 » » » » runtime·unlock(&c->lock);
822 goto ready; 751 goto ready;
823 } 752 }
824 753
825 // OK, it's not ready so enqueue. 754 // OK, it's not ready so enqueue.
826 enqueue(&c->recvq, &cas->sg); 755 enqueue(&c->recvq, &cas->sg);
827 » » » runtime·unlock(c); 756 » » » runtime·unlock(&c->lock);
828 break; 757 break;
829 758
830 case CaseSend: 759 case CaseSend:
831 » » » runtime·lock(c); 760 » » » runtime·lock(&c->lock);
832 if(c->closed || (c->dataqsiz > 0 && c->qcount < c->dataq siz) || 761 if(c->closed || (c->dataqsiz > 0 && c->qcount < c->dataq siz) ||
833 (c->dataqsiz == 0 && c->recvq.first != nil && ca ndequeue(&c->recvq))) { 762 (c->dataqsiz == 0 && c->recvq.first != nil && ca ndequeue(&c->recvq))) {
834 if(!runtime·casp(&signal, nil, READY)) { 763 if(!runtime·casp(&signal, nil, READY)) {
835 » » » » » runtime·unlock(c); 764 » » » » » runtime·unlock(&c->lock);
836 goto ready; 765 goto ready;
837 } 766 }
838 if(c->closed) { 767 if(c->closed) {
839 » » » » » runtime·unlock(c); 768 » » » » » runtime·unlock(&c->lock);
840 goto sclose; 769 goto sclose;
841 } 770 }
842 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz) 771 if(c->dataqsiz > 0 && c->qcount < c->dataqsiz)
843 goto asyncsend; 772 goto asyncsend;
844 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq, SYNC, &wake)) != nil) 773 if(c->dataqsiz == 0 && (sg = dequeue(&c->recvq, SYNC, &wake)) != nil)
845 goto syncsend; 774 goto syncsend;
846 » » » » runtime·unlock(c); 775 » » » » runtime·unlock(&c->lock);
847 goto ready; 776 goto ready;
848 } 777 }
849 enqueue(&c->sendq, &cas->sg); 778 enqueue(&c->sendq, &cas->sg);
850 » » » runtime·unlock(c); 779 » » » runtime·unlock(&c->lock);
851 break; 780 break;
852 } 781 }
853 } 782 }
854 783
855 if(signal == nil) 784 if(signal == nil)
856 » » runtime·park(selparkcommit, &signal, "select"); 785 » » runtime·park(selparkcommit, &signal, runtime·gostringnocopy((byt e*)"select"));
857 786
858 ready: 787 ready:
859 // Pass 3 - dequeue from unsuccessful chans. 788 // Pass 3 - dequeue from unsuccessful chans.
860 if(signal == PREWAIT || signal == WAIT) 789 if(signal == PREWAIT || signal == WAIT)
861 runtime·throw("select: still waiting"); 790 runtime·throw("select: still waiting");
862 dequeuesel(sel, dequeuepos); 791 dequeuesel(sel, dequeuepos);
863 while(signal == SYNC) { 792 while(signal == SYNC) {
864 if(runtime·gomaxprocs == 1) 793 if(runtime·gomaxprocs == 1)
865 runtime·throw("select: spinning with gomaxprocs=1"); 794 runtime·throw("select: spinning with gomaxprocs=1");
866 runtime·osyield(); 795 runtime·osyield();
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
907 runtime·raceacquire(chanbuf(c, c->recvx)); 836 runtime·raceacquire(chanbuf(c, c->recvx));
908 runtime·racerelease(chanbuf(c, c->recvx)); 837 runtime·racerelease(chanbuf(c, c->recvx));
909 } 838 }
910 if(cas->sg.elem != nil) 839 if(cas->sg.elem != nil)
911 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c-> recvx)); 840 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c-> recvx));
912 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil); 841 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
913 if(++c->recvx == c->dataqsiz) 842 if(++c->recvx == c->dataqsiz)
914 c->recvx = 0; 843 c->recvx = 0;
915 c->qcount--; 844 c->qcount--;
916 sg = dequeue(&c->sendq, READY, &wake); 845 sg = dequeue(&c->sendq, READY, &wake);
917 » runtime·unlock(c); 846 » runtime·unlock(&c->lock);
918 if(wake) { 847 if(wake) {
919 if(sg->releasetime) 848 if(sg->releasetime)
920 sg->releasetime = runtime·cputicks(); 849 sg->releasetime = runtime·cputicks();
921 runtime·ready(sg->g); 850 runtime·ready(sg->g);
922 } 851 }
923 if(cas->receivedp != nil) 852 if(cas->receivedp != nil)
924 *cas->receivedp = true; 853 *cas->receivedp = true;
925 goto retc; 854 goto retc;
926 855
927 asyncsend: 856 asyncsend:
928 // can send to buffer 857 // can send to buffer
929 // caller set c and cas; sg is not set 858 // caller set c and cas; sg is not set
930 // c is locked 859 // c is locked
931 if(raceenabled) { 860 if(raceenabled) {
932 runtime·raceacquire(chanbuf(c, c->sendx)); 861 runtime·raceacquire(chanbuf(c, c->sendx));
933 runtime·racerelease(chanbuf(c, c->sendx)); 862 runtime·racerelease(chanbuf(c, c->sendx));
934 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha nsend); 863 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha nsend);
935 } 864 }
936 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem); 865 c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
937 if(++c->sendx == c->dataqsiz) 866 if(++c->sendx == c->dataqsiz)
938 c->sendx = 0; 867 c->sendx = 0;
939 c->qcount++; 868 c->qcount++;
940 sg = dequeue(&c->recvq, READY, &wake); 869 sg = dequeue(&c->recvq, READY, &wake);
941 » runtime·unlock(c); 870 » runtime·unlock(&c->lock);
942 if(wake) { 871 if(wake) {
943 if(sg->releasetime) 872 if(sg->releasetime)
944 sg->releasetime = runtime·cputicks(); 873 sg->releasetime = runtime·cputicks();
945 runtime·ready(sg->g); 874 runtime·ready(sg->g);
946 } 875 }
947 goto retc; 876 goto retc;
948 877
949 syncrecv: 878 syncrecv:
950 // can receive from sleeping sender (sg) 879 // can receive from sleeping sender (sg)
951 // caller set c, cas, sg and wake 880 // caller set c, cas, sg and wake
952 // c is locked 881 // c is locked
953 if(raceenabled) { 882 if(raceenabled) {
954 if(cas->sg.elem != nil) 883 if(cas->sg.elem != nil)
955 runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas ->pc, chanrecv); 884 runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas ->pc, chanrecv);
956 racesync(c, sg); 885 racesync(c, sg);
957 } 886 }
958 g->m->locks++; 887 g->m->locks++;
959 » runtime·unlock(c); 888 » runtime·unlock(&c->lock);
960 if(debug) 889 if(debug)
961 runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); 890 runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
962 if(cas->sg.elem != nil) 891 if(cas->sg.elem != nil)
963 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, sg->elem); 892 c->elemtype->alg->copy(c->elemsize, cas->sg.elem, sg->elem);
964 if((signalp = sg->signal) != nil) { 893 if((signalp = sg->signal) != nil) {
965 sg->signal = nil; 894 sg->signal = nil;
966 runtime·atomicstorep(signalp, sg); 895 runtime·atomicstorep(signalp, sg);
967 } 896 }
968 if(wake) { 897 if(wake) {
969 if(sg->releasetime) 898 if(sg->releasetime)
(...skipping 19 matching lines...) Expand all
989 918
990 syncsend: 919 syncsend:
991 // can send to sleeping receiver (sg) 920 // can send to sleeping receiver (sg)
992 // caller set c, cas, sg and wake 921 // caller set c, cas, sg and wake
993 // c is locked 922 // c is locked
994 if(raceenabled) { 923 if(raceenabled) {
995 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha nsend); 924 runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, cha nsend);
996 racesync(c, sg); 925 racesync(c, sg);
997 } 926 }
998 g->m->locks++; 927 g->m->locks++;
999 » runtime·unlock(c); 928 » runtime·unlock(&c->lock);
1000 if(debug) 929 if(debug)
1001 runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); 930 runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
1002 if(sg->elem != nil) 931 if(sg->elem != nil)
1003 c->elemtype->alg->copy(c->elemsize, sg->elem, cas->sg.elem); 932 c->elemtype->alg->copy(c->elemsize, sg->elem, cas->sg.elem);
1004 if((signalp = sg->signal) != nil) { 933 if((signalp = sg->signal) != nil) {
1005 sg->signal = nil; 934 sg->signal = nil;
1006 runtime·atomicstorep(signalp, sg); 935 runtime·atomicstorep(signalp, sg);
1007 } 936 }
1008 if(wake) { 937 if(wake) {
1009 if(sg->releasetime) 938 if(sg->releasetime)
(...skipping 10 matching lines...) Expand all
1020 // don't need to update the boolean. 949 // don't need to update the boolean.
1021 pc = cas->pc; 950 pc = cas->pc;
1022 if(cas->so > 0) { 951 if(cas->so > 0) {
1023 as = (byte*)selp + cas->so; 952 as = (byte*)selp + cas->so;
1024 *as = true; 953 *as = true;
1025 } 954 }
1026 if(cas->sg.releasetime > 0) 955 if(cas->sg.releasetime > 0)
1027 runtime·blockevent(cas->sg.releasetime - t0, 2); 956 runtime·blockevent(cas->sg.releasetime - t0, 2);
1028 if(dequeuepos > 0) 957 if(dequeuepos > 0)
1029 dequeuesel(sel, dequeuepos); 958 dequeuesel(sel, dequeuepos);
1030 runtime·free(sel);
1031 return pc; 959 return pc;
1032 960
1033 sclose: 961 sclose:
1034 // send on closed channel 962 // send on closed channel
1035 // c is unlocked 963 // c is unlocked
1036 if(dequeuepos > 0) 964 if(dequeuepos > 0)
1037 dequeuesel(sel, dequeuepos); 965 dequeuesel(sel, dequeuepos);
1038 runtime·panicstring("send on closed channel"); 966 runtime·panicstring("send on closed channel");
1039 return nil; // not reached 967 return nil; // not reached
1040 } 968 }
1041 969
1042 static void 970 static void
1043 dequeuesel(Select *sel, int32 pos) 971 dequeuesel(Select *sel, int32 pos)
1044 { 972 {
1045 int32 i, o; 973 int32 i, o;
1046 Scase *cas; 974 Scase *cas;
1047 Hchan *c; 975 Hchan *c;
1048 976
1049 for(i=0; i<pos; i++) { 977 for(i=0; i<pos; i++) {
1050 o = sel->pollorder[i]; 978 o = sel->pollorder[i];
1051 cas = &sel->scase[o]; 979 cas = &sel->scase[o];
1052 if(cas->sg.signal == nil) 980 if(cas->sg.signal == nil)
1053 continue; // sg is already dequeued by somebody else 981 continue; // sg is already dequeued by somebody else
1054 c = cas->chan; 982 c = cas->chan;
1055 » » runtime·lock(c); 983 » » runtime·lock(&c->lock);
1056 if(cas->kind == CaseSend) 984 if(cas->kind == CaseSend)
1057 dequeueg(&c->sendq); 985 dequeueg(&c->sendq);
1058 else 986 else
1059 dequeueg(&c->recvq); 987 dequeueg(&c->recvq);
1060 » » runtime·unlock(c); 988 » » runtime·unlock(&c->lock);
1061 } 989 }
1062 990
1063 } 991 }
1064 992
1065 // This struct must match ../reflect/value.go:/runtimeSelect. 993 // This struct must match ../reflect/value.go:/runtimeSelect.
1066 typedef struct runtimeSelect runtimeSelect; 994 typedef struct runtimeSelect runtimeSelect;
1067 struct runtimeSelect 995 struct runtimeSelect
1068 { 996 {
1069 uintptr dir; 997 uintptr dir;
1070 ChanType *typ; 998 ChanType *typ;
(...skipping 11 matching lines...) Expand all
1082 func reflect·rselect(cases Slice) (chosen int, recvOK bool) { 1010 func reflect·rselect(cases Slice) (chosen int, recvOK bool) {
1083 int32 i; 1011 int32 i;
1084 Select *sel; 1012 Select *sel;
1085 runtimeSelect* rcase, *rc; 1013 runtimeSelect* rcase, *rc;
1086 1014
1087 chosen = -1; 1015 chosen = -1;
1088 recvOK = false; 1016 recvOK = false;
1089 1017
1090 rcase = (runtimeSelect*)cases.array; 1018 rcase = (runtimeSelect*)cases.array;
1091 1019
1092 » sel = newselect(cases.len); 1020 » // FlagNoScan is safe here, because all objects are also referenced from cases.
1021 » sel = runtime·mallocgc(selectsize(cases.len), 0, FlagNoScan);
1022 » runtime·newselect(sel, selectsize(cases.len), cases.len);
1093 for(i=0; i<cases.len; i++) { 1023 for(i=0; i<cases.len; i++) {
1094 rc = &rcase[i]; 1024 rc = &rcase[i];
1095 switch(rc->dir) { 1025 switch(rc->dir) {
1096 case SelectDefault: 1026 case SelectDefault:
1097 selectdefault(sel, (void*)i, 0); 1027 selectdefault(sel, (void*)i, 0);
1098 break; 1028 break;
1099 case SelectSend: 1029 case SelectSend:
1100 if(rc->ch == nil) 1030 if(rc->ch == nil)
1101 break; 1031 break;
1102 selectsend(sel, rc->ch, (void*)i, rc->val, 0); 1032 selectsend(sel, rc->ch, (void*)i, rc->val, 0);
(...skipping 23 matching lines...) Expand all
1126 1056
1127 static void 1057 static void
1128 closechan(Hchan *c, void *pc) 1058 closechan(Hchan *c, void *pc)
1129 { 1059 {
1130 SudoG *sg; 1060 SudoG *sg;
1131 bool wake; 1061 bool wake;
1132 1062
1133 if(c == nil) 1063 if(c == nil)
1134 runtime·panicstring("close of nil channel"); 1064 runtime·panicstring("close of nil channel");
1135 1065
1136 » runtime·lock(c); 1066 » runtime·lock(&c->lock);
1137 if(c->closed) { 1067 if(c->closed) {
1138 » » runtime·unlock(c); 1068 » » runtime·unlock(&c->lock);
1139 runtime·panicstring("close of closed channel"); 1069 runtime·panicstring("close of closed channel");
1140 } 1070 }
1141 1071
1142 if(raceenabled) { 1072 if(raceenabled) {
1143 runtime·racewritepc(c, pc, runtime·closechan); 1073 runtime·racewritepc(c, pc, runtime·closechan);
1144 runtime·racerelease(c); 1074 runtime·racerelease(c);
1145 } 1075 }
1146 1076
1147 c->closed = true; 1077 c->closed = true;
1148 1078
(...skipping 14 matching lines...) Expand all
1163 sg = dequeue(&c->sendq, CLOSED, &wake); 1093 sg = dequeue(&c->sendq, CLOSED, &wake);
1164 if(sg == nil) 1094 if(sg == nil)
1165 break; 1095 break;
1166 if(wake) { 1096 if(wake) {
1167 if(sg->releasetime) 1097 if(sg->releasetime)
1168 sg->releasetime = runtime·cputicks(); 1098 sg->releasetime = runtime·cputicks();
1169 runtime·ready(sg->g); 1099 runtime·ready(sg->g);
1170 } 1100 }
1171 } 1101 }
1172 1102
1173 » runtime·unlock(c); 1103 » runtime·unlock(&c->lock);
1174 } 1104 }
1175 1105
1176 func reflect·chanlen(c *Hchan) (len int) { 1106 func reflect·chanlen(c *Hchan) (len int) {
1177 if(c == nil) 1107 if(c == nil)
1178 len = 0; 1108 len = 0;
1179 else 1109 else
1180 len = c->qcount; 1110 len = c->qcount;
1181 } 1111 }
1182 1112
1183 func reflect·chancap(c *Hchan) (cap int) { 1113 func reflect·chancap(c *Hchan) (cap int) {
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
1283 } 1213 }
1284 1214
1285 static void 1215 static void
1286 racesync(Hchan *c, SudoG *sg) 1216 racesync(Hchan *c, SudoG *sg)
1287 { 1217 {
1288 runtime·racerelease(chanbuf(c, 0)); 1218 runtime·racerelease(chanbuf(c, 0));
1289 runtime·raceacquireg(sg->g, chanbuf(c, 0)); 1219 runtime·raceacquireg(sg->g, chanbuf(c, 0));
1290 runtime·racereleaseg(sg->g, chanbuf(c, 0)); 1220 runtime·racereleaseg(sg->g, chanbuf(c, 0));
1291 runtime·raceacquire(chanbuf(c, 0)); 1221 runtime·raceacquire(chanbuf(c, 0));
1292 } 1222 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b