Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(659)

Side by Side Diff: state/api/apiclient.go

Issue 82900045: state/api: fixes to parallel api.Open
Patch Set: Created 11 years ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2012, 2013 Canonical Ltd. 1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details. 2 // Licensed under the AGPLv3, see LICENCE file for details.
3 3
4 package api 4 package api
5 5
6 import ( 6 import (
7 "crypto/tls" 7 "crypto/tls"
8 "crypto/x509" 8 "crypto/x509"
9 "fmt"
9 "io" 10 "io"
10 "time" 11 "time"
11 12
12 "code.google.com/p/go.net/websocket" 13 "code.google.com/p/go.net/websocket"
13 "github.com/juju/loggo" 14 "github.com/juju/loggo"
14 15
15 "launchpad.net/juju-core/cert" 16 "launchpad.net/juju-core/cert"
16 "launchpad.net/juju-core/instance" 17 "launchpad.net/juju-core/instance"
17 "launchpad.net/juju-core/rpc" 18 "launchpad.net/juju-core/rpc"
18 "launchpad.net/juju-core/rpc/jsoncodec" 19 "launchpad.net/juju-core/rpc/jsoncodec"
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 Password string 78 Password string
78 79
79 // Nonce holds the nonce used when provisioning the machine. Used 80 // Nonce holds the nonce used when provisioning the machine. Used
80 // only by the machine agent. 81 // only by the machine agent.
81 Nonce string `yaml:",omitempty"` 82 Nonce string `yaml:",omitempty"`
82 } 83 }
83 84
84 // DialOpts holds configuration parameters that control the 85 // DialOpts holds configuration parameters that control the
85 // Dialing behavior when connecting to a state server. 86 // Dialing behavior when connecting to a state server.
86 type DialOpts struct { 87 type DialOpts struct {
88 // DialAddressInterval is the amount of time to wait
89 // before starting to dial another address.
90 DialAddressInterval time.Duration
91
87 // Timeout is the amount of time to wait contacting 92 // Timeout is the amount of time to wait contacting
88 // a state server. 93 // a state server.
89 Timeout time.Duration 94 Timeout time.Duration
90 95
91 // RetryDelay is the amount of time to wait between 96 // RetryDelay is the amount of time to wait between
92 // unsucssful connection attempts. 97 // unsucssful connection attempts.
93 RetryDelay time.Duration 98 RetryDelay time.Duration
94 } 99 }
95 100
96 // DefaultDialOpts returns a DialOpts representing the default 101 // DefaultDialOpts returns a DialOpts representing the default
97 // parameters for contacting a state server. 102 // parameters for contacting a state server.
98 func DefaultDialOpts() DialOpts { 103 func DefaultDialOpts() DialOpts {
99 return DialOpts{ 104 return DialOpts{
100 » » Timeout: 10 * time.Minute, 105 » » DialAddressInterval: 50 * time.Millisecond,
101 » » RetryDelay: 2 * time.Second, 106 » » Timeout: 10 * time.Minute,
107 » » RetryDelay: 2 * time.Second,
102 } 108 }
103 } 109 }
104 110
105 func Open(info *Info, opts DialOpts) (*State, error) { 111 func Open(info *Info, opts DialOpts) (*State, error) {
106 pool := x509.NewCertPool() 112 pool := x509.NewCertPool()
107 xcert, err := cert.ParseCert(info.CACert) 113 xcert, err := cert.ParseCert(info.CACert)
108 if err != nil { 114 if err != nil {
109 return nil, err 115 return nil, err
110 } 116 }
111 pool.AddCert(xcert) 117 pool.AddCert(xcert)
112 118
113 // Dial all addresses, with up to maxParallelDial in parallel. 119 // Dial all addresses, with up to maxParallelDial in parallel.
114 try := parallel.NewTry(maxParallelDial, nil) 120 try := parallel.NewTry(maxParallelDial, nil)
115 defer try.Kill() 121 defer try.Kill()
116 for _, addr := range info.Addrs { 122 for _, addr := range info.Addrs {
117 » » if err := dialWebsocket(addr, opts, pool, try); err != nil { 123 » » err := dialWebsocket(addr, opts, pool, try)
124 » » if err == parallel.ErrStopped {
125 » » » break
126 » » } else if err != nil {
rog 2014/04/01 07:42:07 s/else//
axw 2014/04/01 07:54:08 Done.
118 return nil, err 127 return nil, err
119 } 128 }
129 select {
130 case <-time.After(opts.DialAddressInterval):
131 case <-try.Dead():
132 }
120 } 133 }
121 try.Close() 134 try.Close()
122 result, err := try.Result() 135 result, err := try.Result()
123 if err != nil { 136 if err != nil {
124 return nil, err 137 return nil, err
125 } 138 }
126 conn := result.(*websocket.Conn) 139 conn := result.(*websocket.Conn)
127 logger.Infof("connection established to %q", conn.RemoteAddr()) 140 logger.Infof("connection established to %q", conn.RemoteAddr())
128 141
129 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil) 142 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil)
(...skipping 23 matching lines...) Expand all
153 // inconsequential to us. 166 // inconsequential to us.
154 const origin = "http://localhost/" 167 const origin = "http://localhost/"
155 cfg, err := websocket.NewConfig("wss://"+addr+"/", origin) 168 cfg, err := websocket.NewConfig("wss://"+addr+"/", origin)
156 if err != nil { 169 if err != nil {
157 return err 170 return err
158 } 171 }
159 cfg.TlsConfig = &tls.Config{ 172 cfg.TlsConfig = &tls.Config{
160 RootCAs: rootCAs, 173 RootCAs: rootCAs,
161 ServerName: "anything", 174 ServerName: "anything",
162 } 175 }
176 return try.Start(newWebsocketDialer(cfg, opts))
177 }
178
179 // new WebsocketDialler returns a function that
180 // can be passed to utils/parallel.Try.Start.
181 func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct {}) (io.Closer, error) {
163 openAttempt := utils.AttemptStrategy{ 182 openAttempt := utils.AttemptStrategy{
164 Total: opts.Timeout, 183 Total: opts.Timeout,
165 Delay: opts.RetryDelay, 184 Delay: opts.RetryDelay,
166 } 185 }
167 » return try.Start(func(stop <-chan struct{}) (io.Closer, error) { 186 » return func(stop <-chan struct{}) (io.Closer, error) {
168 » » err := parallel.ErrStopped
169 for a := openAttempt.Start(); a.Next(); { 187 for a := openAttempt.Start(); a.Next(); {
170 select { 188 select {
171 case <-stop: 189 case <-stop:
172 » » » » break 190 » » » » return nil, parallel.ErrStopped
173 default: 191 default:
174 } 192 }
175 logger.Infof("dialing %q", cfg.Location) 193 logger.Infof("dialing %q", cfg.Location)
176 » » » var conn *websocket.Conn 194 » » » conn, err := websocket.DialConfig(cfg)
177 » » » conn, err = websocket.DialConfig(cfg)
178 if err == nil { 195 if err == nil {
179 return conn, nil 196 return conn, nil
180 } 197 }
181 » » » logger.Debugf("error dialing API server, will retry: %v" , err) 198 » » » if a.HasNext() {
199 » » » » logger.Debugf("error dialing API server, will re try: %v", err)
rog 2014/04/01 07:42:07 perhaps include the cfg.Location here, otherwise w
axw 2014/04/01 07:54:08 Done.
200 » » » } else {
201 » » » » return nil, fmt.Errorf("timed out connecting to %q", cfg.Location)
202 » » » }
182 } 203 }
183 » » return nil, err 204 » » panic("unreachable")
184 » }) 205 » }
185 } 206 }
186 207
187 func (s *State) heartbeatMonitor() { 208 func (s *State) heartbeatMonitor() {
188 for { 209 for {
189 if err := s.Ping(); err != nil { 210 if err := s.Ping(); err != nil {
190 close(s.broken) 211 close(s.broken)
191 return 212 return
192 } 213 }
193 time.Sleep(PingPeriod) 214 time.Sleep(PingPeriod)
194 } 215 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
242 // Juju CLI, all addresses must be attempted, as the CLI may 263 // Juju CLI, all addresses must be attempted, as the CLI may
243 // be invoked both within and outside the environment (think 264 // be invoked both within and outside the environment (think
244 // private clouds). 265 // private clouds).
245 func (s *State) APIHostPorts() [][]instance.HostPort { 266 func (s *State) APIHostPorts() [][]instance.HostPort {
246 hostPorts := make([][]instance.HostPort, len(s.hostPorts)) 267 hostPorts := make([][]instance.HostPort, len(s.hostPorts))
247 for i, server := range s.hostPorts { 268 for i, server := range s.hostPorts {
248 hostPorts[i] = append([]instance.HostPort{}, server...) 269 hostPorts[i] = append([]instance.HostPort{}, server...)
249 } 270 }
250 return hostPorts 271 return hostPorts
251 } 272 }
OLDNEW
« no previous file with comments | « [revision details] ('k') | state/api/apiclient_test.go » ('j') | state/api/apiclient_test.go » ('J')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b