Left: | ||
Right: |
OLD | NEW |
---|---|
1 // Copyright 2012, 2013 Canonical Ltd. | 1 // Copyright 2012, 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package api | 4 package api |
5 | 5 |
6 import ( | 6 import ( |
7 "crypto/tls" | 7 "crypto/tls" |
8 "crypto/x509" | 8 "crypto/x509" |
9 "fmt" | |
9 "io" | 10 "io" |
10 "time" | 11 "time" |
11 | 12 |
12 "code.google.com/p/go.net/websocket" | 13 "code.google.com/p/go.net/websocket" |
13 "github.com/juju/loggo" | 14 "github.com/juju/loggo" |
14 | 15 |
15 "launchpad.net/juju-core/cert" | 16 "launchpad.net/juju-core/cert" |
16 "launchpad.net/juju-core/instance" | 17 "launchpad.net/juju-core/instance" |
17 "launchpad.net/juju-core/rpc" | 18 "launchpad.net/juju-core/rpc" |
18 "launchpad.net/juju-core/rpc/jsoncodec" | 19 "launchpad.net/juju-core/rpc/jsoncodec" |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
77 Password string | 78 Password string |
78 | 79 |
79 // Nonce holds the nonce used when provisioning the machine. Used | 80 // Nonce holds the nonce used when provisioning the machine. Used |
80 // only by the machine agent. | 81 // only by the machine agent. |
81 Nonce string `yaml:",omitempty"` | 82 Nonce string `yaml:",omitempty"` |
82 } | 83 } |
83 | 84 |
84 // DialOpts holds configuration parameters that control the | 85 // DialOpts holds configuration parameters that control the |
85 // Dialing behavior when connecting to a state server. | 86 // Dialing behavior when connecting to a state server. |
86 type DialOpts struct { | 87 type DialOpts struct { |
88 // DialAddressInterval is the amount of time to wait | |
89 // before starting to dial another address. | |
90 DialAddressInterval time.Duration | |
91 | |
87 // Timeout is the amount of time to wait contacting | 92 // Timeout is the amount of time to wait contacting |
88 // a state server. | 93 // a state server. |
89 Timeout time.Duration | 94 Timeout time.Duration |
90 | 95 |
91 // RetryDelay is the amount of time to wait between | 96 // RetryDelay is the amount of time to wait between |
92 // unsucssful connection attempts. | 97 // unsucssful connection attempts. |
93 RetryDelay time.Duration | 98 RetryDelay time.Duration |
94 } | 99 } |
95 | 100 |
96 // DefaultDialOpts returns a DialOpts representing the default | 101 // DefaultDialOpts returns a DialOpts representing the default |
97 // parameters for contacting a state server. | 102 // parameters for contacting a state server. |
98 func DefaultDialOpts() DialOpts { | 103 func DefaultDialOpts() DialOpts { |
99 return DialOpts{ | 104 return DialOpts{ |
100 » » Timeout: 10 * time.Minute, | 105 » » DialAddressInterval: 50 * time.Millisecond, |
101 » » RetryDelay: 2 * time.Second, | 106 » » Timeout: 10 * time.Minute, |
107 » » RetryDelay: 2 * time.Second, | |
102 } | 108 } |
103 } | 109 } |
104 | 110 |
105 func Open(info *Info, opts DialOpts) (*State, error) { | 111 func Open(info *Info, opts DialOpts) (*State, error) { |
106 pool := x509.NewCertPool() | 112 pool := x509.NewCertPool() |
107 xcert, err := cert.ParseCert(info.CACert) | 113 xcert, err := cert.ParseCert(info.CACert) |
108 if err != nil { | 114 if err != nil { |
109 return nil, err | 115 return nil, err |
110 } | 116 } |
111 pool.AddCert(xcert) | 117 pool.AddCert(xcert) |
112 | 118 |
113 // Dial all addresses, with up to maxParallelDial in parallel. | 119 // Dial all addresses, with up to maxParallelDial in parallel. |
114 try := parallel.NewTry(maxParallelDial, nil) | 120 try := parallel.NewTry(maxParallelDial, nil) |
115 defer try.Kill() | 121 defer try.Kill() |
116 for _, addr := range info.Addrs { | 122 for _, addr := range info.Addrs { |
117 » » if err := dialWebsocket(addr, opts, pool, try); err != nil { | 123 » » err := dialWebsocket(addr, opts, pool, try) |
124 » » if err == parallel.ErrStopped { | |
125 » » » break | |
126 » » } else if err != nil { | |
rog
2014/04/01 07:42:07
s/else//
axw
2014/04/01 07:54:08
Done.
| |
118 return nil, err | 127 return nil, err |
119 } | 128 } |
129 select { | |
130 case <-time.After(opts.DialAddressInterval): | |
131 case <-try.Dead(): | |
132 } | |
120 } | 133 } |
121 try.Close() | 134 try.Close() |
122 result, err := try.Result() | 135 result, err := try.Result() |
123 if err != nil { | 136 if err != nil { |
124 return nil, err | 137 return nil, err |
125 } | 138 } |
126 conn := result.(*websocket.Conn) | 139 conn := result.(*websocket.Conn) |
127 logger.Infof("connection established to %q", conn.RemoteAddr()) | 140 logger.Infof("connection established to %q", conn.RemoteAddr()) |
128 | 141 |
129 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil) | 142 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil) |
(...skipping 23 matching lines...) Expand all Loading... | |
153 // inconsequential to us. | 166 // inconsequential to us. |
154 const origin = "http://localhost/" | 167 const origin = "http://localhost/" |
155 cfg, err := websocket.NewConfig("wss://"+addr+"/", origin) | 168 cfg, err := websocket.NewConfig("wss://"+addr+"/", origin) |
156 if err != nil { | 169 if err != nil { |
157 return err | 170 return err |
158 } | 171 } |
159 cfg.TlsConfig = &tls.Config{ | 172 cfg.TlsConfig = &tls.Config{ |
160 RootCAs: rootCAs, | 173 RootCAs: rootCAs, |
161 ServerName: "anything", | 174 ServerName: "anything", |
162 } | 175 } |
176 return try.Start(newWebsocketDialer(cfg, opts)) | |
177 } | |
178 | |
179 // new WebsocketDialler returns a function that | |
180 // can be passed to utils/parallel.Try.Start. | |
181 func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct {}) (io.Closer, error) { | |
163 openAttempt := utils.AttemptStrategy{ | 182 openAttempt := utils.AttemptStrategy{ |
164 Total: opts.Timeout, | 183 Total: opts.Timeout, |
165 Delay: opts.RetryDelay, | 184 Delay: opts.RetryDelay, |
166 } | 185 } |
167 » return try.Start(func(stop <-chan struct{}) (io.Closer, error) { | 186 » return func(stop <-chan struct{}) (io.Closer, error) { |
168 » » err := parallel.ErrStopped | |
169 for a := openAttempt.Start(); a.Next(); { | 187 for a := openAttempt.Start(); a.Next(); { |
170 select { | 188 select { |
171 case <-stop: | 189 case <-stop: |
172 » » » » break | 190 » » » » return nil, parallel.ErrStopped |
173 default: | 191 default: |
174 } | 192 } |
175 logger.Infof("dialing %q", cfg.Location) | 193 logger.Infof("dialing %q", cfg.Location) |
176 » » » var conn *websocket.Conn | 194 » » » conn, err := websocket.DialConfig(cfg) |
177 » » » conn, err = websocket.DialConfig(cfg) | |
178 if err == nil { | 195 if err == nil { |
179 return conn, nil | 196 return conn, nil |
180 } | 197 } |
181 » » » logger.Debugf("error dialing API server, will retry: %v" , err) | 198 » » » if a.HasNext() { |
199 » » » » logger.Debugf("error dialing API server, will re try: %v", err) | |
rog
2014/04/01 07:42:07
perhaps include the cfg.Location here, otherwise w
axw
2014/04/01 07:54:08
Done.
| |
200 » » » } else { | |
201 » » » » return nil, fmt.Errorf("timed out connecting to %q", cfg.Location) | |
202 » » » } | |
182 } | 203 } |
183 » » return nil, err | 204 » » panic("unreachable") |
184 » }) | 205 » } |
185 } | 206 } |
186 | 207 |
187 func (s *State) heartbeatMonitor() { | 208 func (s *State) heartbeatMonitor() { |
188 for { | 209 for { |
189 if err := s.Ping(); err != nil { | 210 if err := s.Ping(); err != nil { |
190 close(s.broken) | 211 close(s.broken) |
191 return | 212 return |
192 } | 213 } |
193 time.Sleep(PingPeriod) | 214 time.Sleep(PingPeriod) |
194 } | 215 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
242 // Juju CLI, all addresses must be attempted, as the CLI may | 263 // Juju CLI, all addresses must be attempted, as the CLI may |
243 // be invoked both within and outside the environment (think | 264 // be invoked both within and outside the environment (think |
244 // private clouds). | 265 // private clouds). |
245 func (s *State) APIHostPorts() [][]instance.HostPort { | 266 func (s *State) APIHostPorts() [][]instance.HostPort { |
246 hostPorts := make([][]instance.HostPort, len(s.hostPorts)) | 267 hostPorts := make([][]instance.HostPort, len(s.hostPorts)) |
247 for i, server := range s.hostPorts { | 268 for i, server := range s.hostPorts { |
248 hostPorts[i] = append([]instance.HostPort{}, server...) | 269 hostPorts[i] = append([]instance.HostPort{}, server...) |
249 } | 270 } |
250 return hostPorts | 271 return hostPorts |
251 } | 272 } |
OLD | NEW |