Index: src/pkg/runtime/proc.c |
=================================================================== |
--- a/src/pkg/runtime/proc.c |
+++ b/src/pkg/runtime/proc.c |
@@ -79,7 +79,7 @@ |
void runtime·mstart(void); |
static void runqput(P*, G*); |
static G* runqget(P*); |
-static void runqgrow(P*); |
+static bool runqputslow(P*, G*, uint32, uint32); |
static G* runqsteal(P*, P*); |
static void mput(M*); |
static M* mget(void); |
@@ -106,6 +106,7 @@ |
static G* gfget(P*); |
static void gfpurge(P*); |
static void globrunqput(G*); |
+static void globrunqputbatch(G*, G*, int32); |
static G* globrunqget(P*, int32); |
static P* pidleget(void); |
static void pidleput(P*); |
@@ -2215,27 +2216,26 @@ |
else |
p->mcache = runtime·allocmcache(); |
} |
- if(p->runq == nil) { |
- p->runqsize = 128; |
- p->runq = (G**)runtime·mallocgc(p->runqsize*sizeof(G*), 0, FlagNoInvokeGC); |
- } |
} |
// redistribute runnable G's evenly |
+ // collect all runnable goroutines in global queue |
for(i = 0; i < old; i++) { |
p = runtime·allp[i]; |
while(gp = runqget(p)) |
globrunqput(gp); |
} |
+ // fill local queues with at most nelem(p->runq)/2 goroutines |
// start at 1 because current M already executes some G and will acquire allp[0] below, |
// so if we have a spare G we want to put it into allp[1]. |
- for(i = 1; runtime·sched.runqhead; i++) { |
+ for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) { |
gp = runtime·sched.runqhead; |
runtime·sched.runqhead = gp->schedlink; |
+ if(runtime·sched.runqhead == nil) |
+ runtime·sched.runqtail = nil; |
+ runtime·sched.runqsize--; |
runqput(runtime·allp[i%new], gp); |
} |
- runtime·sched.runqtail = nil; |
- runtime·sched.runqsize = 0; |
// free unused P's |
for(i = new; i < old; i++) { |
@@ -2524,7 +2524,7 @@ |
static int64 starttime; |
int64 now; |
int64 id1, id2, id3; |
- int32 i, q, t, h, s; |
+ int32 i, t, h; |
int8 *fmt; |
M *mp, *lockedm; |
G *gp, *lockedg; |
@@ -2551,15 +2551,11 @@ |
if(p == nil) |
continue; |
mp = p->m; |
- t = p->runqtail; |
- h = p->runqhead; |
- s = p->runqsize; |
- q = t - h; |
- if(q < 0) |
- q += s; |
+ h = runtime·atomicload(&p->runqhead); |
+ t = runtime·atomicload(&p->runqtail); |
if(detailed) |
- runtime·printf(" P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d/%d gfreecnt=%d\n", |
- i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, q, s, p->gfreecnt); |
+ runtime·printf(" P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d gfreecnt=%d\n", |
+ i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, t-h, p->gfreecnt); |
else { |
// In non-detailed mode format lengths of per-P run queues as: |
// [len1 len2 len3 len4] |
@@ -2570,7 +2566,7 @@ |
fmt = " [%d"; |
else if(i == runtime·gomaxprocs-1) |
fmt = " %d]\n"; |
- runtime·printf(fmt, q); |
+ runtime·printf(fmt, t-h); |
} |
} |
if(!detailed) { |
@@ -2645,6 +2641,20 @@ |
runtime·sched.runqsize++; |
} |
+// Put a batch of runnable goroutines on the global runnable queue. |
+// Sched must be locked. |
+static void |
+globrunqputbatch(G *ghead, G *gtail, int32 n) |
+{ |
+ gtail->schedlink = nil; |
+ if(runtime·sched.runqtail) |
+ runtime·sched.runqtail->schedlink = ghead; |
+ else |
+ runtime·sched.runqhead = ghead; |
+ runtime·sched.runqtail = gtail; |
+ runtime·sched.runqsize += n; |
+} |
+ |
// Try get a batch of G's from the global runnable queue. |
// Sched must be locked. |
static G* |
@@ -2660,6 +2670,8 @@ |
n = runtime·sched.runqsize; |
if(max > 0 && n > max) |
n = max; |
+ if(n > nelem(p->runq)/2) |
+ n = nelem(p->runq)/2; |
runtime·sched.runqsize -= n; |
if(runtime·sched.runqsize == 0) |
runtime·sched.runqtail = nil; |
@@ -2699,78 +2711,98 @@ |
return p; |
} |
-// Put g on local runnable queue. |
-// TODO(dvyukov): consider using lock-free queue. |
+// Try to put g on local runnable queue. |
+// If it's full, put onto global queue. |
+// Executed only by the owner P. |
static void |
runqput(P *p, G *gp) |
{ |
- int32 h, t, s; |
+ uint32 h, t; |
- runtime·lock(p); |
retry: |
- h = p->runqhead; |
+ h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers |
t = p->runqtail; |
- s = p->runqsize; |
- if(t == h-1 || (h == 0 && t == s-1)) { |
- runqgrow(p); |
- goto retry; |
+ if(t - h < nelem(p->runq)) { |
+ p->runq[t%nelem(p->runq)] = gp; |
+ runtime·atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption |
+ return; |
} |
- p->runq[t++] = gp; |
- if(t == s) |
- t = 0; |
- p->runqtail = t; |
- runtime·unlock(p); |
+ if(runqputslow(p, gp, h, t)) |
+ return; |
+ // the queue is not full, now the put above must suceed |
+ goto retry; |
+} |
+ |
+// Put g and a batch of work from local runnable queue on global queue. |
+// Executed only by the owner P. |
+static bool |
+runqputslow(P *p, G *gp, uint32 h, uint32 t) |
+{ |
+ G *batch[nelem(p->runq)/2+1]; |
+ uint32 n, i; |
+ |
+ // First, grab a batch from local queue. |
+ n = t-h; |
+ n = n/2; |
+ if(n != nelem(p->runq)/2) |
+ runtime·throw("runqputslow: queue is not full"); |
+ for(i=0; i<n; i++) |
+ batch[i] = p->runq[(h+i)%nelem(p->runq)]; |
+ if(!runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume |
+ return false; |
+ batch[n] = gp; |
+ // Link the goroutines. |
+ for(i=0; i<n; i++) |
+ batch[i]->schedlink = batch[i+1]; |
+ // Now put the batch on global queue. |
+ runtime·lock(&runtime·sched); |
+ globrunqputbatch(batch[0], batch[n], n+1); |
+ runtime·unlock(&runtime·sched); |
+ return true; |
} |
// Get g from local runnable queue. |
+// Executed only by the owner P. |
static G* |
runqget(P *p) |
{ |
G *gp; |
- int32 t, h, s; |
+ uint32 t, h; |
- if(p->runqhead == p->runqtail) |
- return nil; |
- runtime·lock(p); |
- h = p->runqhead; |
- t = p->runqtail; |
- s = p->runqsize; |
- if(t == h) { |
- runtime·unlock(p); |
- return nil; |
+ for(;;) { |
+ h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers |
+ t = p->runqtail; |
+ if(t == h) |
+ return nil; |
+ gp = p->runq[h%nelem(p->runq)]; |
+ if(runtime·cas(&p->runqhead, h, h+1)) // cas-release, commits consume |
+ return gp; |
} |
- gp = p->runq[h++]; |
- if(h == s) |
- h = 0; |
- p->runqhead = h; |
- runtime·unlock(p); |
- return gp; |
} |
-// Grow local runnable queue. |
-// TODO(dvyukov): consider using fixed-size array |
-// and transfer excess to the global list (local queue can grow way too big). |
-static void |
-runqgrow(P *p) |
+// Grabs a batch of goroutines from local runnable queue. |
+// batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines. |
+// Can be executed by any P. |
+static uint32 |
+runqgrab(P *p, G **batch) |
{ |
- G **q; |
- int32 s, t, h, t2; |
+ uint32 t, h, n, i; |
- h = p->runqhead; |
- t = p->runqtail; |
- s = p->runqsize; |
- t2 = 0; |
- q = runtime·malloc(2*s*sizeof(*q)); |
- while(t != h) { |
- q[t2++] = p->runq[h++]; |
- if(h == s) |
- h = 0; |
+ for(;;) { |
+ h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers |
+ t = runtime·atomicload(&p->runqtail); // load-acquire, synchronize with the producer |
+ n = t-h; |
+ n = n - n/2; |
+ if(n == 0) |
+ break; |
+ if(n > nelem(p->runq)/2) // read inconsistent h and t |
+ continue; |
+ for(i=0; i<n; i++) |
+ batch[i] = p->runq[(h+i)%nelem(p->runq)]; |
+ if(runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume |
+ break; |
} |
- runtime·free(p->runq); |
- p->runq = q; |
- p->runqhead = 0; |
- p->runqtail = t2; |
- p->runqsize = 2*s; |
+ return n; |
} |
// Steal half of elements from local runnable queue of p2 |
@@ -2779,57 +2811,24 @@ |
static G* |
runqsteal(P *p, P *p2) |
{ |
- G *gp, *gp1; |
- int32 t, h, s, t2, h2, s2, c, i; |
+ G *gp; |
+ G *batch[nelem(p->runq)/2]; |
+ uint32 t, h, n, i; |
- if(p2->runqhead == p2->runqtail) |
+ n = runqgrab(p2, batch); |
+ if(n == 0) |
return nil; |
- // sort locks to prevent deadlocks |
- if(p < p2) |
- runtime·lock(p); |
- runtime·lock(p2); |
- if(p2->runqhead == p2->runqtail) { |
- runtime·unlock(p2); |
- if(p < p2) |
- runtime·unlock(p); |
- return nil; |
- } |
- if(p >= p2) |
- runtime·lock(p); |
- // now we've locked both queues and know the victim is not empty |
- h = p->runqhead; |
+ n--; |
+ gp = batch[n]; |
+ if(n == 0) |
+ return gp; |
+ h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers |
t = p->runqtail; |
- s = p->runqsize; |
- h2 = p2->runqhead; |
- t2 = p2->runqtail; |
- s2 = p2->runqsize; |
- gp = p2->runq[h2++]; // return value |
- if(h2 == s2) |
- h2 = 0; |
- // steal roughly half |
- if(t2 > h2) |
- c = (t2 - h2) / 2; |
- else |
- c = (s2 - h2 + t2) / 2; |
- // copy |
- for(i = 0; i != c; i++) { |
- // the target queue is full? |
- if(t == h-1 || (h == 0 && t == s-1)) |
- break; |
- // the victim queue is empty? |
- if(t2 == h2) |
- break; |
- gp1 = p2->runq[h2++]; |
- if(h2 == s2) |
- h2 = 0; |
- p->runq[t++] = gp1; |
- if(t == s) |
- t = 0; |
- } |
- p->runqtail = t; |
- p2->runqhead = h2; |
- runtime·unlock(p2); |
- runtime·unlock(p); |
+ if(t - h + n >= nelem(p->runq)) |
+ runtime·throw("runqsteal: runq overflow"); |
+ for(i=0; i<n; i++, t++) |
+ p->runq[t%nelem(p->runq)] = batch[i]; |
+ runtime·atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption |
return gp; |
} |
@@ -2837,14 +2836,10 @@ |
runtime·testSchedLocalQueue(void) |
{ |
P p; |
- G gs[1000]; |
+ G gs[nelem(p.runq)]; |
int32 i, j; |
runtime·memclr((byte*)&p, sizeof(p)); |
- p.runqsize = 1; |
- p.runqhead = 0; |
- p.runqtail = 0; |
- p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq)); |
for(i = 0; i < nelem(gs); i++) { |
if(runqget(&p) != nil) |
@@ -2866,20 +2861,11 @@ |
runtime·testSchedLocalQueueSteal(void) |
{ |
P p1, p2; |
- G gs[1000], *gp; |
+ G gs[nelem(p1.runq)], *gp; |
int32 i, j, s; |
runtime·memclr((byte*)&p1, sizeof(p1)); |
- p1.runqsize = 1; |
- p1.runqhead = 0; |
- p1.runqtail = 0; |
- p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq)); |
- |
runtime·memclr((byte*)&p2, sizeof(p2)); |
- p2.runqsize = nelem(gs); |
- p2.runqhead = 0; |
- p2.runqtail = 0; |
- p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq)); |
for(i = 0; i < nelem(gs); i++) { |
for(j = 0; j < i; j++) { |