Commits

Anonymous committed 209c0d6

runtime: use lock-free ring for work queues
Use lock-free fixed-size ring for work queues
instead of an unbounded mutex-protected array.
The ring has single producer and multiple consumers.
If the ring overflows, work is put onto global queue.

benchmark old ns/op new ns/op delta
BenchmarkMatmult 7 5 -18.12%
BenchmarkMatmult-4 2 2 -18.98%
BenchmarkMatmult-16 1 0 -12.84%

BenchmarkCreateGoroutines 105 88 -16.10%
BenchmarkCreateGoroutines-4 376 219 -41.76%
BenchmarkCreateGoroutines-16 241 174 -27.80%
BenchmarkCreateGoroutinesParallel 103 87 -14.66%
BenchmarkCreateGoroutinesParallel-4 169 143 -15.38%
BenchmarkCreateGoroutinesParallel-16 158 151 -4.43%

R=golang-codereviews, rsc
CC=ddetlefs, devon.odell, golang-codereviews
https://codereview.appspot.com/46170044

Comments (0)

Files changed (2)

src/pkg/runtime/proc.c

 void runtime·mstart(void);
 static void runqput(P*, G*);
 static G* runqget(P*);
-static void runqgrow(P*);
+static bool runqputslow(P*, G*, uint32, uint32);
 static G* runqsteal(P*, P*);
 static void mput(M*);
 static M* mget(void);
 static G* gfget(P*);
 static void gfpurge(P*);
 static void globrunqput(G*);
+static void globrunqputbatch(G*, G*, int32);
 static G* globrunqget(P*, int32);
 static P* pidleget(void);
 static void pidleput(P*);
 			else
 				p->mcache = runtime·allocmcache();
 		}
-		if(p->runq == nil) {
-			p->runqsize = 128;
-			p->runq = (G**)runtime·mallocgc(p->runqsize*sizeof(G*), 0, FlagNoInvokeGC);
-		}
 	}
 
 	// redistribute runnable G's evenly
+	// collect all runnable goroutines in global queue
 	for(i = 0; i < old; i++) {
 		p = runtime·allp[i];
 		while(gp = runqget(p))
 			globrunqput(gp);
 	}
+	// fill local queues with at most nelem(p->runq)/2 goroutines
 	// start at 1 because current M already executes some G and will acquire allp[0] below,
 	// so if we have a spare G we want to put it into allp[1].
-	for(i = 1; runtime·sched.runqhead; i++) {
+	for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) {
 		gp = runtime·sched.runqhead;
 		runtime·sched.runqhead = gp->schedlink;
+		if(runtime·sched.runqhead == nil)
+			runtime·sched.runqtail = nil;
+		runtime·sched.runqsize--;
 		runqput(runtime·allp[i%new], gp);
 	}
-	runtime·sched.runqtail = nil;
-	runtime·sched.runqsize = 0;
 
 	// free unused P's
 	for(i = new; i < old; i++) {
 	static int64 starttime;
 	int64 now;
 	int64 id1, id2, id3;
-	int32 i, q, t, h, s;
+	int32 i, t, h;
 	int8 *fmt;
 	M *mp, *lockedm;
 	G *gp, *lockedg;
 		if(p == nil)
 			continue;
 		mp = p->m;
-		t = p->runqtail;
-		h = p->runqhead;
-		s = p->runqsize;
-		q = t - h;
-		if(q < 0)
-			q += s;
+		h = runtime·atomicload(&p->runqhead);
+		t = runtime·atomicload(&p->runqtail);
 		if(detailed)
-			runtime·printf("  P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d/%d gfreecnt=%d\n",
-				i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, q, s, p->gfreecnt);
+			runtime·printf("  P%d: status=%d schedtick=%d syscalltick=%d m=%d runqsize=%d gfreecnt=%d\n",
+				i, p->status, p->schedtick, p->syscalltick, mp ? mp->id : -1, t-h, p->gfreecnt);
 		else {
 			// In non-detailed mode format lengths of per-P run queues as:
 			// [len1 len2 len3 len4]
 				fmt = " [%d";
 			else if(i == runtime·gomaxprocs-1)
 				fmt = " %d]\n";
-			runtime·printf(fmt, q);
+			runtime·printf(fmt, t-h);
 		}
 	}
 	if(!detailed) {
 	runtime·sched.runqsize++;
 }
 
+// Put a batch of runnable goroutines on the global runnable queue.
+// Sched must be locked.
+static void
+globrunqputbatch(G *ghead, G *gtail, int32 n)
+{
+	gtail->schedlink = nil;
+	if(runtime·sched.runqtail)
+		runtime·sched.runqtail->schedlink = ghead;
+	else
+		runtime·sched.runqhead = ghead;
+	runtime·sched.runqtail = gtail;
+	runtime·sched.runqsize += n;
+}
+
 // Try get a batch of G's from the global runnable queue.
 // Sched must be locked.
 static G*
 		n = runtime·sched.runqsize;
 	if(max > 0 && n > max)
 		n = max;
+	if(n > nelem(p->runq)/2)
+		n = nelem(p->runq)/2;
 	runtime·sched.runqsize -= n;
 	if(runtime·sched.runqsize == 0)
 		runtime·sched.runqtail = nil;
 	return p;
 }
 
-// Put g on local runnable queue.
-// TODO(dvyukov): consider using lock-free queue.
+// Try to put g on local runnable queue.
+// If it's full, put onto global queue.
+// Executed only by the owner P.
 static void
 runqput(P *p, G *gp)
 {
-	int32 h, t, s;
+	uint32 h, t;
 
-	runtime·lock(p);
 retry:
-	h = p->runqhead;
+	h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with consumers
 	t = p->runqtail;
-	s = p->runqsize;
-	if(t == h-1 || (h == 0 && t == s-1)) {
-		runqgrow(p);
-		goto retry;
+	if(t - h < nelem(p->runq)) {
+		p->runq[t%nelem(p->runq)] = gp;
+		runtime·atomicstore(&p->runqtail, t+1);  // store-release, makes the item available for consumption
+		return;
 	}
-	p->runq[t++] = gp;
-	if(t == s)
-		t = 0;
-	p->runqtail = t;
-	runtime·unlock(p);
+	if(runqputslow(p, gp, h, t))
+		return;
+	// the queue is not full, now the put above must suceed
+	goto retry;
+}
+
+// Put g and a batch of work from local runnable queue on global queue.
+// Executed only by the owner P.
+static bool
+runqputslow(P *p, G *gp, uint32 h, uint32 t)
+{
+	G *batch[nelem(p->runq)/2+1];
+	uint32 n, i;
+
+	// First, grab a batch from local queue.
+	n = t-h;
+	n = n/2;
+	if(n != nelem(p->runq)/2)
+		runtime·throw("runqputslow: queue is not full");
+	for(i=0; i<n; i++)
+		batch[i] = p->runq[(h+i)%nelem(p->runq)];
+	if(!runtime·cas(&p->runqhead, h, h+n))  // cas-release, commits consume
+		return false;
+	batch[n] = gp;
+	// Link the goroutines.
+	for(i=0; i<n; i++)
+		batch[i]->schedlink = batch[i+1];
+	// Now put the batch on global queue.
+	runtime·lock(&runtime·sched);
+	globrunqputbatch(batch[0], batch[n], n+1);
+	runtime·unlock(&runtime·sched);
+	return true;
 }
 
 // Get g from local runnable queue.
+// Executed only by the owner P.
 static G*
 runqget(P *p)
 {
 	G *gp;
-	int32 t, h, s;
+	uint32 t, h;
 
-	if(p->runqhead == p->runqtail)
-		return nil;
-	runtime·lock(p);
-	h = p->runqhead;
-	t = p->runqtail;
-	s = p->runqsize;
-	if(t == h) {
-		runtime·unlock(p);
-		return nil;
+	for(;;) {
+		h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
+		t = p->runqtail;
+		if(t == h)
+			return nil;
+		gp = p->runq[h%nelem(p->runq)];
+		if(runtime·cas(&p->runqhead, h, h+1))  // cas-release, commits consume
+			return gp;
 	}
-	gp = p->runq[h++];
-	if(h == s)
-		h = 0;
-	p->runqhead = h;
-	runtime·unlock(p);
-	return gp;
 }
 
-// Grow local runnable queue.
-// TODO(dvyukov): consider using fixed-size array
-// and transfer excess to the global list (local queue can grow way too big).
-static void
-runqgrow(P *p)
+// Grabs a batch of goroutines from local runnable queue.
+// batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines.
+// Can be executed by any P.
+static uint32
+runqgrab(P *p, G **batch)
 {
-	G **q;
-	int32 s, t, h, t2;
+	uint32 t, h, n, i;
 
-	h = p->runqhead;
-	t = p->runqtail;
-	s = p->runqsize;
-	t2 = 0;
-	q = runtime·malloc(2*s*sizeof(*q));
-	while(t != h) {
-		q[t2++] = p->runq[h++];
-		if(h == s)
-			h = 0;
+	for(;;) {
+		h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
+		t = runtime·atomicload(&p->runqtail);  // load-acquire, synchronize with the producer
+		n = t-h;
+		n = n - n/2;
+		if(n == 0)
+			break;
+		if(n > nelem(p->runq)/2)  // read inconsistent h and t
+			continue;
+		for(i=0; i<n; i++)
+			batch[i] = p->runq[(h+i)%nelem(p->runq)];
+		if(runtime·cas(&p->runqhead, h, h+n))  // cas-release, commits consume
+			break;
 	}
-	runtime·free(p->runq);
-	p->runq = q;
-	p->runqhead = 0;
-	p->runqtail = t2;
-	p->runqsize = 2*s;
+	return n;
 }
 
 // Steal half of elements from local runnable queue of p2
 static G*
 runqsteal(P *p, P *p2)
 {
-	G *gp, *gp1;
-	int32 t, h, s, t2, h2, s2, c, i;
+	G *gp;
+	G *batch[nelem(p->runq)/2];
+	uint32 t, h, n, i;
 
-	if(p2->runqhead == p2->runqtail)
+	n = runqgrab(p2, batch);
+	if(n == 0)
 		return nil;
-	// sort locks to prevent deadlocks
-	if(p < p2)
-		runtime·lock(p);
-	runtime·lock(p2);
-	if(p2->runqhead == p2->runqtail) {
-		runtime·unlock(p2);
-		if(p < p2)
-			runtime·unlock(p);
-		return nil;
-	}
-	if(p >= p2)
-		runtime·lock(p);
-	// now we've locked both queues and know the victim is not empty
-	h = p->runqhead;
+	n--;
+	gp = batch[n];
+	if(n == 0)
+		return gp;
+	h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with consumers
 	t = p->runqtail;
-	s = p->runqsize;
-	h2 = p2->runqhead;
-	t2 = p2->runqtail;
-	s2 = p2->runqsize;
-	gp = p2->runq[h2++];  // return value
-	if(h2 == s2)
-		h2 = 0;
-	// steal roughly half
-	if(t2 > h2)
-		c = (t2 - h2) / 2;
-	else
-		c = (s2 - h2 + t2) / 2;
-	// copy
-	for(i = 0; i != c; i++) {
-		// the target queue is full?
-		if(t == h-1 || (h == 0 && t == s-1))
-			break;
-		// the victim queue is empty?
-		if(t2 == h2)
-			break;
-		gp1 = p2->runq[h2++];
-		if(h2 == s2)
-			h2 = 0;
-		p->runq[t++] = gp1;
-		if(t == s)
-			t = 0;
-	}
-	p->runqtail = t;
-	p2->runqhead = h2;
-	runtime·unlock(p2);
-	runtime·unlock(p);
+	if(t - h + n >= nelem(p->runq))
+		runtime·throw("runqsteal: runq overflow");
+	for(i=0; i<n; i++, t++)
+		p->runq[t%nelem(p->runq)] = batch[i];
+	runtime·atomicstore(&p->runqtail, t);  // store-release, makes the item available for consumption
 	return gp;
 }
 
 runtime·testSchedLocalQueue(void)
 {
 	P p;
-	G gs[1000];
+	G gs[nelem(p.runq)];
 	int32 i, j;
 
 	runtime·memclr((byte*)&p, sizeof(p));
-	p.runqsize = 1;
-	p.runqhead = 0;
-	p.runqtail = 0;
-	p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq));
 
 	for(i = 0; i < nelem(gs); i++) {
 		if(runqget(&p) != nil)
 runtime·testSchedLocalQueueSteal(void)
 {
 	P p1, p2;
-	G gs[1000], *gp;
+	G gs[nelem(p1.runq)], *gp;
 	int32 i, j, s;
 
 	runtime·memclr((byte*)&p1, sizeof(p1));
-	p1.runqsize = 1;
-	p1.runqhead = 0;
-	p1.runqtail = 0;
-	p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq));
-
 	runtime·memclr((byte*)&p2, sizeof(p2));
-	p2.runqsize = nelem(gs);
-	p2.runqhead = 0;
-	p2.runqtail = 0;
-	p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq));
 
 	for(i = 0; i < nelem(gs); i++) {
 		for(j = 0; j < i; j++) {

src/pkg/runtime/runtime.h

 	MCache*	mcache;
 
 	// Queue of runnable goroutines.
-	G**	runq;
-	int32	runqhead;
-	int32	runqtail;
-	int32	runqsize;
+	uint32	runqhead;
+	uint32	runqtail;
+	G*	runq[256];
 
 	// Available G's (status == Gdead)
 	G*	gfree;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.