Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2194)

Side by Side Diff: state/apiserver/root.go

Issue 24040044: apiserver: analyzing ping timeouts
Patch Set: apiserver: analyzing ping timeouts Created 10 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « state/apiserver/export_test.go ('k') | state/apiserver/root_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 Canonical Ltd. 1 // Copyright 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package apiserver 4 package apiserver
5 5
6 import ( 6 import (
7 "errors"
8 "time"
9
10 "launchpad.net/tomb"
11
12 "launchpad.net/juju-core/rpc"
7 "launchpad.net/juju-core/state" 13 "launchpad.net/juju-core/state"
8 "launchpad.net/juju-core/state/apiserver/agent" 14 "launchpad.net/juju-core/state/apiserver/agent"
9 "launchpad.net/juju-core/state/apiserver/client" 15 "launchpad.net/juju-core/state/apiserver/client"
10 "launchpad.net/juju-core/state/apiserver/common" 16 "launchpad.net/juju-core/state/apiserver/common"
11 "launchpad.net/juju-core/state/apiserver/deployer" 17 "launchpad.net/juju-core/state/apiserver/deployer"
12 loggerapi "launchpad.net/juju-core/state/apiserver/logger" 18 loggerapi "launchpad.net/juju-core/state/apiserver/logger"
13 "launchpad.net/juju-core/state/apiserver/machine" 19 "launchpad.net/juju-core/state/apiserver/machine"
14 "launchpad.net/juju-core/state/apiserver/provisioner" 20 "launchpad.net/juju-core/state/apiserver/provisioner"
15 "launchpad.net/juju-core/state/apiserver/uniter" 21 "launchpad.net/juju-core/state/apiserver/uniter"
16 "launchpad.net/juju-core/state/apiserver/upgrader" 22 "launchpad.net/juju-core/state/apiserver/upgrader"
17 "launchpad.net/juju-core/state/multiwatcher" 23 "launchpad.net/juju-core/state/multiwatcher"
18 ) 24 )
19 25
20 type clientAPI struct{ *client.API } 26 type clientAPI struct{ *client.API }
21 27
22 type taggedAuthenticator interface { 28 type taggedAuthenticator interface {
23 state.Entity 29 state.Entity
24 state.Authenticator 30 state.Authenticator
25 } 31 }
26 32
33 // maxPingInterval defines the timeframe until the ping
34 // timeout closes the monitored connection.
35 // TODO(mue): Idea by Roger: Move to API (e.g. params) so
36 // that the pinging there may depend on the interval.
37 const maxPingInterval = 3 * time.Minute
38
27 // srvRoot represents a single client's connection to the state 39 // srvRoot represents a single client's connection to the state
28 // after it has logged in. 40 // after it has logged in.
29 type srvRoot struct { 41 type srvRoot struct {
30 clientAPI 42 clientAPI
31 » srv *Server 43 » srv *Server
32 » resources *common.Resources 44 » rpcConn *rpc.Conn
45 » resources *common.Resources
46 » pingTimeout *pingTimeout
33 47
34 entity taggedAuthenticator 48 entity taggedAuthenticator
35 } 49 }
36 50
37 func newSrvRoot(srv *Server, entity taggedAuthenticator) *srvRoot { 51 // newSrvRoot creates the client's connection representation
52 // and starts a ping timeout for the monitoring of this
53 // connection.
54 func newSrvRoot(root *initialRoot, entity taggedAuthenticator) *srvRoot {
38 r := &srvRoot{ 55 r := &srvRoot{
39 » » srv: srv, 56 » » srv: root.srv,
57 » » rpcConn: root.rpcConn,
40 resources: common.NewResources(), 58 resources: common.NewResources(),
41 entity: entity, 59 entity: entity,
42 } 60 }
43 » r.clientAPI.API = client.NewAPI(srv.state, r.resources, r) 61 » action := func() {
62 » » err := r.rpcConn.Close()
63 » » if err != nil {
64 » » » logger.Errorf("error closing the RPC connection: %v", er r)
65 » » }
66 » }
67 » r.clientAPI.API = client.NewAPI(r.srv.state, r.resources, r)
68 » r.pingTimeout = newPingTimeout(action, maxPingInterval)
44 return r 69 return r
45 } 70 }
46 71
47 // Kill implements rpc.Killer. It cleans up any resources that need 72 // Kill implements rpc.Killer. It cleans up any resources that need
48 // cleaning up to ensure that all outstanding requests return. 73 // cleaning up to ensure that all outstanding requests return.
49 func (r *srvRoot) Kill() { 74 func (r *srvRoot) Kill() {
50 r.resources.StopAll() 75 r.resources.StopAll()
76 r.pingTimeout.stop()
51 } 77 }
52 78
53 // requireAgent checks whether the current client is an agent and hence 79 // requireAgent checks whether the current client is an agent and hence
54 // may access the agent APIs. We filter out non-agents when calling one 80 // may access the agent APIs. We filter out non-agents when calling one
55 // of the accessor functions (Machine, Unit, etc) which avoids us making 81 // of the accessor functions (Machine, Unit, etc) which avoids us making
56 // the check in every single request method. 82 // the check in every single request method.
57 func (r *srvRoot) requireAgent() error { 83 func (r *srvRoot) requireAgent() error {
58 if !isAgent(r.entity) { 84 if !isAgent(r.entity) {
59 return common.ErrPerm 85 return common.ErrPerm
60 } 86 }
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 if !ok { 247 if !ok {
222 return nil, common.ErrUnknownWatcher 248 return nil, common.ErrUnknownWatcher
223 } 249 }
224 return &srvClientAllWatcher{ 250 return &srvClientAllWatcher{
225 watcher: watcher, 251 watcher: watcher,
226 id: id, 252 id: id,
227 resources: r.resources, 253 resources: r.resources,
228 }, nil 254 }, nil
229 } 255 }
230 256
231 // Pinger returns object with a single "Ping" method that does nothing. 257 // Pinger returns an object that can be pinged
232 func (r *srvRoot) Pinger(id string) (srvPinger, error) { 258 // by calling its Ping method. If this method
233 » return srvPinger{}, nil 259 // is not called frequently enough, the connection
260 // will be dropped.
261 func (r *srvRoot) Pinger(id string) (pinger, error) {
262 » return r.pingTimeout, nil
234 } 263 }
235 264
236 type srvPinger struct{}
237
238 // Ping is a no-op used by client heartbeat monitor.
239 func (r srvPinger) Ping() {}
240
241 // AuthMachineAgent returns whether the current client is a machine agent. 265 // AuthMachineAgent returns whether the current client is a machine agent.
242 func (r *srvRoot) AuthMachineAgent() bool { 266 func (r *srvRoot) AuthMachineAgent() bool {
243 _, ok := r.entity.(*state.Machine) 267 _, ok := r.entity.(*state.Machine)
244 return ok 268 return ok
245 } 269 }
246 270
247 // AuthUnitAgent returns whether the current client is a unit agent. 271 // AuthUnitAgent returns whether the current client is a unit agent.
248 func (r *srvRoot) AuthUnitAgent() bool { 272 func (r *srvRoot) AuthUnitAgent() bool {
249 _, ok := r.entity.(*state.Unit) 273 _, ok := r.entity.(*state.Unit)
250 return ok 274 return ok
(...skipping 19 matching lines...) Expand all
270 294
271 // GetAuthTag returns the tag of the authenticated entity. 295 // GetAuthTag returns the tag of the authenticated entity.
272 func (r *srvRoot) GetAuthTag() string { 296 func (r *srvRoot) GetAuthTag() string {
273 return r.entity.Tag() 297 return r.entity.Tag()
274 } 298 }
275 299
276 // GetAuthEntity returns the authenticated entity. 300 // GetAuthEntity returns the authenticated entity.
277 func (r *srvRoot) GetAuthEntity() state.Entity { 301 func (r *srvRoot) GetAuthEntity() state.Entity {
278 return r.entity 302 return r.entity
279 } 303 }
304
305 // pinger describes a type that can be pinged.
306 type pinger interface {
307 Ping()
308 }
309
310 // pingTimeout listens for pings and will call the
311 // passed action in case of a timeout. This way broken
312 // or inactive connections can be closed.
313 type pingTimeout struct {
314 tomb tomb.Tomb
315 action func()
316 timeout time.Duration
317 reset chan struct{}
318 }
319
320 // newPingTimeout returns a new pingTimeout instance
321 // that invokes the given action asynchronously if there
322 // is more than the given timeout interval between calls
323 // to its Ping method.
324 func newPingTimeout(action func(), timeout time.Duration) *pingTimeout {
325 pt := &pingTimeout{
326 action: action,
327 timeout: timeout,
328 reset: make(chan struct{}),
329 }
330 go func() {
331 defer pt.tomb.Done()
332 pt.tomb.Kill(pt.loop())
333 }()
334 return pt
335 }
336
337 // Ping is used by the client heartbeat monitor and resets
338 // the killer.
339 func (pt *pingTimeout) Ping() {
340 select {
341 case <-pt.tomb.Dying():
342 case pt.reset <- struct{}{}:
343 }
344 }
345
346 // stop terminates the ping timeout.
347 func (pt *pingTimeout) stop() error {
348 pt.tomb.Kill(nil)
349 return pt.tomb.Wait()
350 }
351
352 // loop waits for a reset signal, otherwise it performs
353 // the initially passed action.
354 func (pt *pingTimeout) loop() error {
355 timer := time.NewTimer(pt.timeout)
356 defer timer.Stop()
357 for {
358 select {
359 case <-pt.tomb.Dying():
360 return nil
361 case <-timer.C:
362 go pt.action()
363 return errors.New("ping timeout")
364 case <-pt.reset:
365 timer.Reset(pt.timeout)
366 }
367 }
368 }
OLDNEW
« no previous file with comments | « state/apiserver/export_test.go ('k') | state/apiserver/root_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b