OLD | NEW |
(Empty) | |
| 1 // Copyright 2010 The Go Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style |
| 3 // license that can be found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "container/heap" |
| 9 "fmt" |
| 10 "rand" |
| 11 "time" |
| 12 ) |
| 13 |
| 14 const nRequester = 100 |
| 15 const nWorker = 10 |
| 16 |
| 17 // Simulation of some work: just sleep for a while and report how long. |
| 18 func op() int { |
| 19 n := rand.Int63n(1e9) |
| 20 time.Sleep(nWorker * n) |
| 21 return int(n) |
| 22 } |
| 23 |
| 24 type Request struct { |
| 25 fn func() int |
| 26 c chan int |
| 27 } |
| 28 |
| 29 func requester(work chan Request) { |
| 30 c := make(chan int) |
| 31 for { |
| 32 time.Sleep(rand.Int63n(nWorker * 2e9)) |
| 33 work <- Request{op, c} |
| 34 <-c |
| 35 } |
| 36 } |
| 37 |
| 38 type Worker struct { |
| 39 i int |
| 40 requests chan Request |
| 41 pending int |
| 42 } |
| 43 |
| 44 func (w *Worker) work(done chan *Worker) { |
| 45 for { |
| 46 req := <-w.requests |
| 47 req.c <- req.fn() |
| 48 done <- w |
| 49 } |
| 50 } |
| 51 |
| 52 type Pool []*Worker |
| 53 |
| 54 func (p Pool) Len() int { return len(p) } |
| 55 |
| 56 func (p Pool) Less(i, j int) bool { |
| 57 return p[i].pending < p[j].pending |
| 58 } |
| 59 |
| 60 func (p *Pool) Swap(i, j int) { |
| 61 a := *p |
| 62 a[i], a[j] = a[j], a[i] |
| 63 a[i].i = i |
| 64 a[j].i = j |
| 65 } |
| 66 |
| 67 func (p *Pool) Push(x interface{}) { |
| 68 a := *p |
| 69 n := len(a) |
| 70 a = a[0 : n+1] |
| 71 w := x.(*Worker) |
| 72 a[n] = w |
| 73 w.i = n |
| 74 *p = a |
| 75 } |
| 76 |
| 77 func (p *Pool) Pop() interface{} { |
| 78 a := *p |
| 79 *p = a[0 : len(a)-1] |
| 80 w := a[len(a)-1] |
| 81 w.i = -1 // for safety |
| 82 return w |
| 83 } |
| 84 |
| 85 type Balancer struct { |
| 86 pool Pool |
| 87 done chan *Worker |
| 88 i int |
| 89 } |
| 90 |
| 91 func NewBalancer() *Balancer { |
| 92 done := make(chan *Worker, nWorker) |
| 93 b := &Balancer{make(Pool, 0, nWorker), done, 0} |
| 94 for i := 0; i < nWorker; i++ { |
| 95 w := &Worker{requests: make(chan Request, nRequester)} |
| 96 heap.Push(&b.pool, w) |
| 97 go w.work(b.done) |
| 98 } |
| 99 return b |
| 100 } |
| 101 |
| 102 func (b *Balancer) balance(work chan Request) { |
| 103 for { |
| 104 select { |
| 105 case req := <-work: |
| 106 b.dispatch(req) |
| 107 case w := <-b.done: |
| 108 b.completed(w) |
| 109 } |
| 110 b.print() |
| 111 } |
| 112 } |
| 113 |
| 114 func (b *Balancer) print() { |
| 115 sum := 0 |
| 116 sumsq := 0 |
| 117 for _, w := range b.pool { |
| 118 fmt.Printf("%d ", w.pending) |
| 119 sum += w.pending |
| 120 sumsq += w.pending * w.pending |
| 121 } |
| 122 avg := float64(sum) / float64(len(b.pool)) |
| 123 variance := float64(sumsq)/float64(len(b.pool)) - avg*avg |
| 124 fmt.Printf(" %.2f %.2f\n", avg, variance) |
| 125 } |
| 126 |
| 127 func (b *Balancer) dispatch(req Request) { |
| 128 if false { |
| 129 w := b.pool[b.i] |
| 130 w.requests <- req |
| 131 w.pending++ |
| 132 b.i++ |
| 133 if b.i >= len(b.pool) { |
| 134 b.i = 0 |
| 135 } |
| 136 return |
| 137 } |
| 138 |
| 139 w := heap.Pop(&b.pool).(*Worker) |
| 140 w.requests <- req |
| 141 w.pending++ |
| 142 // fmt.Printf("started %p; now %d\n", w, w.pending) |
| 143 heap.Push(&b.pool, w) |
| 144 } |
| 145 |
| 146 func (b *Balancer) completed(w *Worker) { |
| 147 if false { |
| 148 w.pending-- |
| 149 return |
| 150 } |
| 151 |
| 152 w.pending-- |
| 153 // fmt.Printf("finished %p; now %d\n", w, w.pending) |
| 154 heap.Remove(&b.pool, w.i) |
| 155 heap.Push(&b.pool, w) |
| 156 } |
| 157 |
| 158 func main() { |
| 159 work := make(chan Request) |
| 160 for i := 0; i < nRequester; i++ { |
| 161 go requester(work) |
| 162 } |
| 163 NewBalancer().balance(work) |
| 164 } |
OLD | NEW |