OLD | NEW |
1 // The presence package is intended as a replacement for zookeeper ephemeral | 1 // The presence package is intended as a replacement for zookeeper ephemeral |
2 // nodes; the primary difference is that node timeout is unrelated to session | 2 // nodes; the primary difference is that node timeout is unrelated to session |
3 // timeout, and this allows us to restart a presence-enabled process "silently" | 3 // timeout, and this allows us to restart a presence-enabled process "silently" |
4 // (from the perspective of the rest of the system) without dealing with the | 4 // (from the perspective of the rest of the system) without dealing with the |
5 // complication of session re-establishment. | 5 // complication of session re-establishment. |
6 | 6 |
7 package presence | 7 package presence |
8 | 8 |
9 import ( | 9 import ( |
10 "fmt" | 10 "fmt" |
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
129 // ensure watchers detect the pinger's death as late as possible
. | 129 // ensure watchers detect the pinger's death as late as possible
. |
130 if p.tomb.Err() == nil { | 130 if p.tomb.Err() == nil { |
131 _, err := p.target.change() | 131 _, err := p.target.change() |
132 p.tomb.Kill(err) | 132 p.tomb.Kill(err) |
133 } | 133 } |
134 p.tomb.Done() | 134 p.tomb.Done() |
135 }() | 135 }() |
136 for { | 136 for { |
137 select { | 137 select { |
138 case <-p.tomb.Dying(): | 138 case <-p.tomb.Dying(): |
139 » » » log.Debugf("presence: %s pinger died after %s wait", p.t
arget.path, time.Now().Sub(p.lastPing)) | 139 » » » debugf("presence: %s pinger died after %s wait", p.targe
t.path, time.Now().Sub(p.lastPing)) |
140 return | 140 return |
141 case now := <-p.nextPing(): | 141 case now := <-p.nextPing(): |
142 » » » log.Debugf("presence: %s pinger awakened after %s wait",
p.target.path, time.Now().Sub(p.lastPing)) | 142 » » » debugf("presence: %s pinger awakened after %s wait", p.t
arget.path, time.Now().Sub(p.lastPing)) |
143 p.lastPing = now | 143 p.lastPing = now |
144 mtime, err := p.target.change() | 144 mtime, err := p.target.change() |
145 if err != nil { | 145 if err != nil { |
146 p.tomb.Kill(err) | 146 p.tomb.Kill(err) |
147 return | 147 return |
148 } | 148 } |
149 » » » log.Debugf("presence: wrote to %s at (zk) %s", p.target.
path, mtime) | 149 » » » debugf("presence: wrote to %s at (zk) %s", p.target.path
, mtime) |
150 } | 150 } |
151 } | 151 } |
152 } | 152 } |
153 | 153 |
154 // nextPing returns a channel that will receive an event when the pinger is next | 154 // nextPing returns a channel that will receive an event when the pinger is next |
155 // due to fire. | 155 // due to fire. |
156 func (p *Pinger) nextPing() <-chan time.Time { | 156 func (p *Pinger) nextPing() <-chan time.Time { |
157 next := p.lastPing.Add(p.period) | 157 next := p.lastPing.Add(p.period) |
158 wait := next.Sub(time.Now()) | 158 wait := next.Sub(time.Now()) |
159 if wait <= 0 { | 159 if wait <= 0 { |
160 wait = 0 | 160 wait = 0 |
161 } | 161 } |
162 » log.Debugf("presence: anticipating ping of %s in %s", p.target.path, wai
t) | 162 » debugf("presence: anticipating ping of %s in %s", p.target.path, wait) |
163 return time.After(wait) | 163 return time.After(wait) |
164 } | 164 } |
165 | 165 |
166 // node represents the state of a presence node from a watcher's perspective. | 166 // node represents the state of a presence node from a watcher's perspective. |
167 type node struct { | 167 type node struct { |
168 conn *zk.Conn | 168 conn *zk.Conn |
169 path string | 169 path string |
170 alive bool | 170 alive bool |
171 timeout time.Duration | 171 timeout time.Duration |
172 } | 172 } |
(...skipping 17 matching lines...) Expand all Loading... |
190 n.timeout = period * 3 | 190 n.timeout = period * 3 |
191 if firstTime { | 191 if firstTime { |
192 clock := changeNode{n.conn, "/clock", ""} | 192 clock := changeNode{n.conn, "/clock", ""} |
193 now, err := clock.change() | 193 now, err := clock.change() |
194 if err != nil { | 194 if err != nil { |
195 return err | 195 return err |
196 } | 196 } |
197 mtime := stat.MTime() | 197 mtime := stat.MTime() |
198 delay := now.Sub(mtime) | 198 delay := now.Sub(mtime) |
199 n.alive = delay < n.timeout | 199 n.alive = delay < n.timeout |
200 » » log.Debugf(` | 200 » » debugf(` |
201 presence: initial diagnosis of %s | 201 presence: initial diagnosis of %s |
202 now (zk) %s | 202 now (zk) %s |
203 last write (zk) %s | 203 last write (zk) %s |
204 apparent delay %s | 204 apparent delay %s |
205 timeout %s | 205 timeout %s |
206 alive %t | 206 alive %t |
207 `[1:], n.path, now, mtime, delay, n.timeout, n.alive) | 207 `[1:], n.path, now, mtime, delay, n.timeout, n.alive) |
208 } else { | 208 } else { |
209 // If this method is not being run for the first time, we know t
hat | 209 // If this method is not being run for the first time, we know t
hat |
210 // the node has just changed, so we know that it's alive and the
re's | 210 // the node has just changed, so we know that it's alive and the
re's |
(...skipping 323 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
534 continue | 534 continue |
535 } | 535 } |
536 select { | 536 select { |
537 case <-stop: | 537 case <-stop: |
538 return | 538 return |
539 case w.updates <- aliveChange{key, aliveNow}: | 539 case w.updates <- aliveChange{key, aliveNow}: |
540 } | 540 } |
541 } | 541 } |
542 } | 542 } |
543 } | 543 } |
| 544 |
| 545 // Debug, when true, causes detailed presence logs to be generated. |
| 546 var Debug = false |
| 547 |
| 548 // debugf passes its args on to log.Debugf if Debug is true. |
| 549 func debugf(format string, args ...interface{}) { |
| 550 if Debug { |
| 551 log.Debugf(format, args...) |
| 552 } |
| 553 } |
OLD | NEW |