LEFT | RIGHT |
1 // Copyright 2013 Canonical Ltd. | 1 // Copyright 2013 Canonical Ltd. |
2 // Licensed under the AGPLv3, see LICENCE file for details. | 2 // Licensed under the AGPLv3, see LICENCE file for details. |
3 | 3 |
4 package apiserver | 4 package apiserver |
5 | 5 |
6 import ( | 6 import ( |
7 "crypto/tls" | 7 "crypto/tls" |
8 "net" | 8 "net" |
9 "net/http" | 9 "net/http" |
10 "sync" | 10 "sync" |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 srv.tomb.Kill(nil) | 76 srv.tomb.Kill(nil) |
77 } | 77 } |
78 | 78 |
79 // Wait implements worker.Worker.Wait. | 79 // Wait implements worker.Worker.Wait. |
80 func (srv *Server) Wait() error { | 80 func (srv *Server) Wait() error { |
81 return srv.tomb.Wait() | 81 return srv.tomb.Wait() |
82 } | 82 } |
83 | 83 |
84 type requestNotifier struct { | 84 type requestNotifier struct { |
85 id int64 | 85 id int64 |
86 mu sync.Mutex | |
87 tag_ string | |
88 start time.Time | 86 start time.Time |
| 87 |
| 88 mu sync.Mutex |
| 89 tag_ string |
89 } | 90 } |
90 | 91 |
91 var globalCounter int64 | 92 var globalCounter int64 |
92 | 93 |
93 func newRequestNotifier() *requestNotifier { | 94 func newRequestNotifier() *requestNotifier { |
94 return &requestNotifier{ | 95 return &requestNotifier{ |
95 id: atomic.AddInt64(&globalCounter, 1), | 96 id: atomic.AddInt64(&globalCounter, 1), |
96 tag_: "<unknown>", | 97 tag_: "<unknown>", |
97 start: time.Now(), | 98 start: time.Now(), |
98 } | 99 } |
(...skipping 21 matching lines...) Expand all Loading... |
120 } | 121 } |
121 | 122 |
122 func (n *requestNotifier) ServerReply(req rpc.Request, hdr *rpc.Header, body int
erface{}, timeSpent time.Duration) { | 123 func (n *requestNotifier) ServerReply(req rpc.Request, hdr *rpc.Header, body int
erface{}, timeSpent time.Duration) { |
123 if req.Type == "Pinger" && req.Action == "Ping" { | 124 if req.Type == "Pinger" && req.Action == "Ping" { |
124 return | 125 return |
125 } | 126 } |
126 logger.Debugf("-> [%X] %s %s %s %s[%q].%s", n.id, n.tag(), timeSpent, js
oncodec.DumpRequest(hdr, body), req.Type, req.Id, req.Action) | 127 logger.Debugf("-> [%X] %s %s %s %s[%q].%s", n.id, n.tag(), timeSpent, js
oncodec.DumpRequest(hdr, body), req.Type, req.Id, req.Action) |
127 } | 128 } |
128 | 129 |
129 func (n *requestNotifier) join(req *http.Request) { | 130 func (n *requestNotifier) join(req *http.Request) { |
130 » logger.Debugf("[%X] API connection from %s", n.id, req.RemoteAddr) | 131 » logger.Infof("[%X] API connection from %s", n.id, req.RemoteAddr) |
131 } | 132 } |
132 | 133 |
133 func (n *requestNotifier) leave() { | 134 func (n *requestNotifier) leave() { |
134 » logger.Debugf("[%X] API connection terminated after %v", n.id, time.Sinc
e(n.start)) | 135 » logger.Infof("[%X] API connection terminated after %v", n.id, time.Since
(n.start)) |
135 } | 136 } |
136 | 137 |
137 func (n requestNotifier) ClientRequest(hdr *rpc.Header, body interface{}) { | 138 func (n requestNotifier) ClientRequest(hdr *rpc.Header, body interface{}) { |
138 } | 139 } |
139 | 140 |
140 func (n requestNotifier) ClientReply(req rpc.Request, hdr *rpc.Header, body inte
rface{}) { | 141 func (n requestNotifier) ClientReply(req rpc.Request, hdr *rpc.Header, body inte
rface{}) { |
141 } | 142 } |
142 | 143 |
143 func (srv *Server) run(lis net.Listener) { | 144 func (srv *Server) run(lis net.Listener) { |
144 defer srv.tomb.Done() | 145 defer srv.tomb.Done() |
145 defer srv.wg.Wait() // wait for any outstanding requests to complete. | 146 defer srv.wg.Wait() // wait for any outstanding requests to complete. |
146 srv.wg.Add(1) | 147 srv.wg.Add(1) |
147 go func() { | 148 go func() { |
148 <-srv.tomb.Dying() | 149 <-srv.tomb.Dying() |
149 lis.Close() | 150 lis.Close() |
150 srv.wg.Done() | 151 srv.wg.Done() |
151 }() | 152 }() |
152 mux := http.NewServeMux() | 153 mux := http.NewServeMux() |
153 mux.HandleFunc("/", srv.apiHandler) | 154 mux.HandleFunc("/", srv.apiHandler) |
154 mux.Handle("/charms", &charmsHandler{state: srv.state}) | 155 mux.Handle("/charms", &charmsHandler{state: srv.state}) |
155 // The error from http.Serve is not interesting. | 156 // The error from http.Serve is not interesting. |
156 http.Serve(lis, mux) | 157 http.Serve(lis, mux) |
157 } | 158 } |
158 | 159 |
159 func (srv *Server) apiHandler(w http.ResponseWriter, req *http.Request) { | 160 func (srv *Server) apiHandler(w http.ResponseWriter, req *http.Request) { |
160 » var reqNotifier *requestNotifier | 161 » reqNotifier := newRequestNotifier() |
161 » if logger.EffectiveLogLevel() <= loggo.DEBUG { | 162 » reqNotifier.join(req) |
162 » » reqNotifier = newRequestNotifier() | 163 » defer reqNotifier.leave() |
163 » » reqNotifier.join(req) | |
164 » » defer reqNotifier.leave() | |
165 » } | |
166 wsServer := websocket.Server{ | 164 wsServer := websocket.Server{ |
167 Handler: func(conn *websocket.Conn) { | 165 Handler: func(conn *websocket.Conn) { |
168 srv.wg.Add(1) | 166 srv.wg.Add(1) |
169 defer srv.wg.Done() | 167 defer srv.wg.Done() |
170 // If we've got to this stage and the tomb is still | 168 // If we've got to this stage and the tomb is still |
171 // alive, we know that any tomb.Kill must occur after we | 169 // alive, we know that any tomb.Kill must occur after we |
172 // have called wg.Add, so we avoid the possibility of a | 170 // have called wg.Add, so we avoid the possibility of a |
173 // handler goroutine running after Stop has returned. | 171 // handler goroutine running after Stop has returned. |
174 if srv.tomb.Err() != tomb.ErrStillAlive { | 172 if srv.tomb.Err() != tomb.ErrStillAlive { |
175 return | 173 return |
(...skipping 10 matching lines...) Expand all Loading... |
186 func (srv *Server) Addr() string { | 184 func (srv *Server) Addr() string { |
187 return srv.addr.String() | 185 return srv.addr.String() |
188 } | 186 } |
189 | 187 |
190 func (srv *Server) serveConn(wsConn *websocket.Conn, reqNotifier *requestNotifie
r) error { | 188 func (srv *Server) serveConn(wsConn *websocket.Conn, reqNotifier *requestNotifie
r) error { |
191 codec := jsoncodec.NewWebsocket(wsConn) | 189 codec := jsoncodec.NewWebsocket(wsConn) |
192 if loggo.GetLogger("juju.rpc.jsoncodec").EffectiveLogLevel() <= loggo.TR
ACE { | 190 if loggo.GetLogger("juju.rpc.jsoncodec").EffectiveLogLevel() <= loggo.TR
ACE { |
193 codec.SetLogging(true) | 191 codec.SetLogging(true) |
194 } | 192 } |
195 var notifier rpc.RequestNotifier | 193 var notifier rpc.RequestNotifier |
196 » if reqNotifier != nil { | 194 » if logger.EffectiveLogLevel() <= loggo.DEBUG { |
| 195 » » // Incur request monitoring overhead only if we |
| 196 » » // know we'll need it. |
197 notifier = reqNotifier | 197 notifier = reqNotifier |
198 } | 198 } |
199 conn := rpc.NewConn(codec, notifier) | 199 conn := rpc.NewConn(codec, notifier) |
200 conn.Serve(newStateServer(srv, conn, reqNotifier), serverError) | 200 conn.Serve(newStateServer(srv, conn, reqNotifier), serverError) |
201 conn.Start() | 201 conn.Start() |
202 select { | 202 select { |
203 case <-conn.Dead(): | 203 case <-conn.Dead(): |
204 case <-srv.tomb.Dying(): | 204 case <-srv.tomb.Dying(): |
205 } | 205 } |
206 return conn.Close() | 206 return conn.Close() |
207 } | 207 } |
208 | 208 |
209 func serverError(err error) error { | 209 func serverError(err error) error { |
210 if err := common.ServerError(err); err != nil { | 210 if err := common.ServerError(err); err != nil { |
211 return err | 211 return err |
212 } | 212 } |
213 return nil | 213 return nil |
214 } | 214 } |
215 | 215 |
216 var logRequests = true | 216 var logRequests = true |
LEFT | RIGHT |