OLD | NEW |
1 // Copyright 2013 Canonical Ltd. | 1 // Copyright 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package apiserver | 4 package apiserver |
5 | 5 |
6 import ( | 6 import ( |
| 7 "errors" |
| 8 "time" |
| 9 |
| 10 "launchpad.net/tomb" |
| 11 |
| 12 "launchpad.net/juju-core/rpc" |
7 "launchpad.net/juju-core/state" | 13 "launchpad.net/juju-core/state" |
8 "launchpad.net/juju-core/state/apiserver/agent" | 14 "launchpad.net/juju-core/state/apiserver/agent" |
9 "launchpad.net/juju-core/state/apiserver/client" | 15 "launchpad.net/juju-core/state/apiserver/client" |
10 "launchpad.net/juju-core/state/apiserver/common" | 16 "launchpad.net/juju-core/state/apiserver/common" |
11 "launchpad.net/juju-core/state/apiserver/deployer" | 17 "launchpad.net/juju-core/state/apiserver/deployer" |
12 loggerapi "launchpad.net/juju-core/state/apiserver/logger" | 18 loggerapi "launchpad.net/juju-core/state/apiserver/logger" |
13 "launchpad.net/juju-core/state/apiserver/machine" | 19 "launchpad.net/juju-core/state/apiserver/machine" |
14 "launchpad.net/juju-core/state/apiserver/provisioner" | 20 "launchpad.net/juju-core/state/apiserver/provisioner" |
15 "launchpad.net/juju-core/state/apiserver/uniter" | 21 "launchpad.net/juju-core/state/apiserver/uniter" |
16 "launchpad.net/juju-core/state/apiserver/upgrader" | 22 "launchpad.net/juju-core/state/apiserver/upgrader" |
17 "launchpad.net/juju-core/state/multiwatcher" | 23 "launchpad.net/juju-core/state/multiwatcher" |
18 ) | 24 ) |
19 | 25 |
20 type clientAPI struct{ *client.API } | 26 type clientAPI struct{ *client.API } |
21 | 27 |
22 type taggedAuthenticator interface { | 28 type taggedAuthenticator interface { |
23 state.Entity | 29 state.Entity |
24 state.Authenticator | 30 state.Authenticator |
25 } | 31 } |
26 | 32 |
| 33 // maxPingInterval defines the timeframe until the ping |
| 34 // timeout closes the monitored connection. |
| 35 // TODO(mue): Idea by Roger: Move to API (e.g. params) so |
| 36 // that the pinging there may depend on the interval. |
| 37 const maxPingInterval = 3 * time.Minute |
| 38 |
27 // srvRoot represents a single client's connection to the state | 39 // srvRoot represents a single client's connection to the state |
28 // after it has logged in. | 40 // after it has logged in. |
29 type srvRoot struct { | 41 type srvRoot struct { |
30 clientAPI | 42 clientAPI |
31 » srv *Server | 43 » srv *Server |
32 » resources *common.Resources | 44 » rpcConn *rpc.Conn |
| 45 » resources *common.Resources |
| 46 » pingTimeout *pingTimeout |
33 | 47 |
34 entity taggedAuthenticator | 48 entity taggedAuthenticator |
35 } | 49 } |
36 | 50 |
37 func newSrvRoot(srv *Server, entity taggedAuthenticator) *srvRoot { | 51 // newSrvRoot creates the client's connection representation |
| 52 // and starts a ping timeout for the monitoring of this |
| 53 // connection. |
| 54 func newSrvRoot(root *initialRoot, entity taggedAuthenticator) *srvRoot { |
38 r := &srvRoot{ | 55 r := &srvRoot{ |
39 » » srv: srv, | 56 » » srv: root.srv, |
| 57 » » rpcConn: root.rpcConn, |
40 resources: common.NewResources(), | 58 resources: common.NewResources(), |
41 entity: entity, | 59 entity: entity, |
42 } | 60 } |
43 » r.clientAPI.API = client.NewAPI(srv.state, r.resources, r) | 61 » action := func() { |
| 62 » » err := r.rpcConn.Close() |
| 63 » » if err != nil { |
| 64 » » » logger.Errorf("error closing the RPC connection: %v", er
r) |
| 65 » » } |
| 66 » } |
| 67 » r.clientAPI.API = client.NewAPI(r.srv.state, r.resources, r) |
| 68 » r.pingTimeout = newPingTimeout(action, maxPingInterval) |
44 return r | 69 return r |
45 } | 70 } |
46 | 71 |
47 // Kill implements rpc.Killer. It cleans up any resources that need | 72 // Kill implements rpc.Killer. It cleans up any resources that need |
48 // cleaning up to ensure that all outstanding requests return. | 73 // cleaning up to ensure that all outstanding requests return. |
49 func (r *srvRoot) Kill() { | 74 func (r *srvRoot) Kill() { |
50 r.resources.StopAll() | 75 r.resources.StopAll() |
| 76 r.pingTimeout.stop() |
51 } | 77 } |
52 | 78 |
53 // requireAgent checks whether the current client is an agent and hence | 79 // requireAgent checks whether the current client is an agent and hence |
54 // may access the agent APIs. We filter out non-agents when calling one | 80 // may access the agent APIs. We filter out non-agents when calling one |
55 // of the accessor functions (Machine, Unit, etc) which avoids us making | 81 // of the accessor functions (Machine, Unit, etc) which avoids us making |
56 // the check in every single request method. | 82 // the check in every single request method. |
57 func (r *srvRoot) requireAgent() error { | 83 func (r *srvRoot) requireAgent() error { |
58 if !isAgent(r.entity) { | 84 if !isAgent(r.entity) { |
59 return common.ErrPerm | 85 return common.ErrPerm |
60 } | 86 } |
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
221 if !ok { | 247 if !ok { |
222 return nil, common.ErrUnknownWatcher | 248 return nil, common.ErrUnknownWatcher |
223 } | 249 } |
224 return &srvClientAllWatcher{ | 250 return &srvClientAllWatcher{ |
225 watcher: watcher, | 251 watcher: watcher, |
226 id: id, | 252 id: id, |
227 resources: r.resources, | 253 resources: r.resources, |
228 }, nil | 254 }, nil |
229 } | 255 } |
230 | 256 |
231 // Pinger returns object with a single "Ping" method that does nothing. | 257 // Pinger returns an object that can be pinged |
232 func (r *srvRoot) Pinger(id string) (srvPinger, error) { | 258 // by calling its Ping method. If this method |
233 » return srvPinger{}, nil | 259 // is not called frequently enough, the connection |
| 260 // will be dropped. |
| 261 func (r *srvRoot) Pinger(id string) (pinger, error) { |
| 262 » return r.pingTimeout, nil |
234 } | 263 } |
235 | 264 |
236 type srvPinger struct{} | |
237 | |
238 // Ping is a no-op used by client heartbeat monitor. | |
239 func (r srvPinger) Ping() {} | |
240 | |
241 // AuthMachineAgent returns whether the current client is a machine agent. | 265 // AuthMachineAgent returns whether the current client is a machine agent. |
242 func (r *srvRoot) AuthMachineAgent() bool { | 266 func (r *srvRoot) AuthMachineAgent() bool { |
243 _, ok := r.entity.(*state.Machine) | 267 _, ok := r.entity.(*state.Machine) |
244 return ok | 268 return ok |
245 } | 269 } |
246 | 270 |
247 // AuthUnitAgent returns whether the current client is a unit agent. | 271 // AuthUnitAgent returns whether the current client is a unit agent. |
248 func (r *srvRoot) AuthUnitAgent() bool { | 272 func (r *srvRoot) AuthUnitAgent() bool { |
249 _, ok := r.entity.(*state.Unit) | 273 _, ok := r.entity.(*state.Unit) |
250 return ok | 274 return ok |
(...skipping 19 matching lines...) Expand all Loading... |
270 | 294 |
271 // GetAuthTag returns the tag of the authenticated entity. | 295 // GetAuthTag returns the tag of the authenticated entity. |
272 func (r *srvRoot) GetAuthTag() string { | 296 func (r *srvRoot) GetAuthTag() string { |
273 return r.entity.Tag() | 297 return r.entity.Tag() |
274 } | 298 } |
275 | 299 |
276 // GetAuthEntity returns the authenticated entity. | 300 // GetAuthEntity returns the authenticated entity. |
277 func (r *srvRoot) GetAuthEntity() state.Entity { | 301 func (r *srvRoot) GetAuthEntity() state.Entity { |
278 return r.entity | 302 return r.entity |
279 } | 303 } |
| 304 |
| 305 // pinger describes a type that can be pinged. |
| 306 type pinger interface { |
| 307 Ping() |
| 308 } |
| 309 |
| 310 // pingTimeout listens for pings and will call the |
| 311 // passed action in case of a timeout. This way broken |
| 312 // or inactive connections can be closed. |
| 313 type pingTimeout struct { |
| 314 tomb tomb.Tomb |
| 315 action func() |
| 316 timeout time.Duration |
| 317 reset chan struct{} |
| 318 } |
| 319 |
| 320 // newPingTimeout returns a new pingTimeout instance |
| 321 // that invokes the given action asynchronously if there |
| 322 // is more than the given timeout interval between calls |
| 323 // to its Ping method. |
| 324 func newPingTimeout(action func(), timeout time.Duration) *pingTimeout { |
| 325 pt := &pingTimeout{ |
| 326 action: action, |
| 327 timeout: timeout, |
| 328 reset: make(chan struct{}), |
| 329 } |
| 330 go func() { |
| 331 defer pt.tomb.Done() |
| 332 pt.tomb.Kill(pt.loop()) |
| 333 }() |
| 334 return pt |
| 335 } |
| 336 |
| 337 // Ping is used by the client heartbeat monitor and resets |
| 338 // the killer. |
| 339 func (pt *pingTimeout) Ping() { |
| 340 select { |
| 341 case <-pt.tomb.Dying(): |
| 342 case pt.reset <- struct{}{}: |
| 343 } |
| 344 } |
| 345 |
| 346 // stop terminates the ping timeout. |
| 347 func (pt *pingTimeout) stop() error { |
| 348 pt.tomb.Kill(nil) |
| 349 return pt.tomb.Wait() |
| 350 } |
| 351 |
| 352 // loop waits for a reset signal, otherwise it performs |
| 353 // the initially passed action. |
| 354 func (pt *pingTimeout) loop() error { |
| 355 timer := time.NewTimer(pt.timeout) |
| 356 defer timer.Stop() |
| 357 for { |
| 358 select { |
| 359 case <-pt.tomb.Dying(): |
| 360 return nil |
| 361 case <-timer.C: |
| 362 go pt.action() |
| 363 return errors.New("ping timeout") |
| 364 case <-pt.reset: |
| 365 timer.Reset(pt.timeout) |
| 366 } |
| 367 } |
| 368 } |
OLD | NEW |