Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(169)

Side by Side Diff: doc/talks/io2010/balance.go

Issue 1614041: code review 1614041: doc: add Google I/O talk and programs (Closed)
Patch Set: code review 1614041: doc: add Google I/O talk and programs Created 14 years, 9 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | doc/talks/io2010/decrypt.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package main
6
7 import (
8 "container/heap"
9 "fmt"
10 "rand"
11 "time"
12 )
13
14 const nRequester = 100
15 const nWorker = 10
16
17 // Simulation of some work: just sleep for a while and report how long.
18 func op() int {
19 n := rand.Int63n(1e9)
20 time.Sleep(nWorker * n)
21 return int(n)
22 }
23
24 type Request struct {
25 fn func() int
26 c chan int
27 }
28
29 func requester(work chan Request) {
30 c := make(chan int)
31 for {
32 time.Sleep(rand.Int63n(nWorker * 2e9))
33 work <- Request{op, c}
34 <-c
35 }
36 }
37
38 type Worker struct {
39 i int
40 requests chan Request
41 pending int
42 }
43
44 func (w *Worker) work(done chan *Worker) {
45 for {
46 req := <-w.requests
47 req.c <- req.fn()
48 done <- w
49 }
50 }
51
52 type Pool []*Worker
53
54 func (p Pool) Len() int { return len(p) }
55
56 func (p Pool) Less(i, j int) bool {
57 return p[i].pending < p[j].pending
58 }
59
60 func (p *Pool) Swap(i, j int) {
61 a := *p
62 a[i], a[j] = a[j], a[i]
63 a[i].i = i
64 a[j].i = j
65 }
66
67 func (p *Pool) Push(x interface{}) {
68 a := *p
69 n := len(a)
70 a = a[0 : n+1]
71 w := x.(*Worker)
72 a[n] = w
73 w.i = n
74 *p = a
75 }
76
77 func (p *Pool) Pop() interface{} {
78 a := *p
79 *p = a[0 : len(a)-1]
80 w := a[len(a)-1]
81 w.i = -1 // for safety
82 return w
83 }
84
85 type Balancer struct {
86 pool Pool
87 done chan *Worker
88 i int
89 }
90
91 func NewBalancer() *Balancer {
92 done := make(chan *Worker, nWorker)
93 b := &Balancer{make(Pool, 0, nWorker), done, 0}
94 for i := 0; i < nWorker; i++ {
95 w := &Worker{requests: make(chan Request, nRequester)}
96 heap.Push(&b.pool, w)
97 go w.work(b.done)
98 }
99 return b
100 }
101
102 func (b *Balancer) balance(work chan Request) {
103 for {
104 select {
105 case req := <-work:
106 b.dispatch(req)
107 case w := <-b.done:
108 b.completed(w)
109 }
110 b.print()
111 }
112 }
113
114 func (b *Balancer) print() {
115 sum := 0
116 sumsq := 0
117 for _, w := range b.pool {
118 fmt.Printf("%d ", w.pending)
119 sum += w.pending
120 sumsq += w.pending * w.pending
121 }
122 avg := float64(sum) / float64(len(b.pool))
123 variance := float64(sumsq)/float64(len(b.pool)) - avg*avg
124 fmt.Printf(" %.2f %.2f\n", avg, variance)
125 }
126
127 func (b *Balancer) dispatch(req Request) {
128 if false {
129 w := b.pool[b.i]
130 w.requests <- req
131 w.pending++
132 b.i++
133 if b.i >= len(b.pool) {
134 b.i = 0
135 }
136 return
137 }
138
139 w := heap.Pop(&b.pool).(*Worker)
140 w.requests <- req
141 w.pending++
142 // fmt.Printf("started %p; now %d\n", w, w.pending)
143 heap.Push(&b.pool, w)
144 }
145
146 func (b *Balancer) completed(w *Worker) {
147 if false {
148 w.pending--
149 return
150 }
151
152 w.pending--
153 // fmt.Printf("finished %p; now %d\n", w, w.pending)
154 heap.Remove(&b.pool, w.i)
155 heap.Push(&b.pool, w)
156 }
157
158 func main() {
159 work := make(chan Request)
160 for i := 0; i < nRequester; i++ {
161 go requester(work)
162 }
163 NewBalancer().balance(work)
164 }
OLDNEW
« no previous file with comments | « no previous file | doc/talks/io2010/decrypt.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b