| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| 1 package api | 1 package api |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "code.google.com/p/go.net/websocket" | 4 "code.google.com/p/go.net/websocket" |
| 5 "crypto/tls" | 5 "crypto/tls" |
| 6 "launchpad.net/juju-core/log" | 6 "launchpad.net/juju-core/log" |
| 7 "launchpad.net/juju-core/state" | 7 "launchpad.net/juju-core/state" |
| 8 "launchpad.net/tomb" | |
| 8 "net" | 9 "net" |
| 9 "net/http" | 10 "net/http" |
| 11 "sync" | |
| 10 ) | 12 ) |
| 11 | 13 |
| 12 // srvState represents a single client's connection to the state. | 14 // srvState represents a single client's connection to the state. |
| 13 type srvState struct { | 15 type srvState struct { |
| 14 » state *state.State | 16 » srv *Server |
| 15 » conn *websocket.Conn | 17 » conn *websocket.Conn |
| 16 } | 18 } |
| 17 | 19 |
| 18 type Server struct { | 20 type Server struct { |
| 21 tomb tomb.Tomb | |
| 22 wg sync.WaitGroup | |
| 19 state *state.State | 23 state *state.State |
| 24 addr net.Addr | |
| 20 } | 25 } |
| 21 | 26 |
| 22 // Serve serves the given state by accepting requests | 27 // Serve serves the given state by accepting requests |
| 23 // on the given listener, using the given certificate | 28 // on the given listener, using the given certificate |
| 24 // and key (in PEM format) for authentication.· | 29 // and key (in PEM format) for authentication.· |
| 25 func Serve(s *state.State, lis net.Listener, cert, key []byte) error { | 30 func NewServer(s *state.State, addr string, cert, key []byte) (*Server, error) { |
| 31 » lis, err := net.Listen("tcp", addr) | |
| 32 » if err != nil { | |
| 33 » » return nil, err | |
| 34 » } | |
| 26 tlsCert, err := tls.X509KeyPair(cert, key) | 35 tlsCert, err := tls.X509KeyPair(cert, key) |
| 27 if err != nil { | 36 if err != nil { |
| 28 » » return err | 37 » » return nil, err |
| 38 » } | |
| 39 » srv := &Server{ | |
| 40 » » state: s, | |
| 41 » » addr: lis.Addr(), | |
| 29 } | 42 } |
| 30 lis = tls.NewListener(lis, &tls.Config{ | 43 lis = tls.NewListener(lis, &tls.Config{ |
| 31 Certificates: []tls.Certificate{tlsCert}, | 44 Certificates: []tls.Certificate{tlsCert}, |
| 32 }) | 45 }) |
| 33 » return http.Serve(lis, newHandler(s)) | 46 » go func() { |
| 47 » » defer srv.tomb.Done() | |
| 48 » » srv.tomb.Kill(srv.run(lis)) | |
|
dfc
2012/12/10 21:53:06
nice
| |
| 49 » }() | |
| 50 » return srv, nil | |
| 34 } | 51 } |
| 35 | 52 |
| 36 // newHandler returns an http handler that serves the API | 53 // Stop stops the server and returns when all requests that |
|
dfc
2012/12/10 21:53:06
s/when../once all outstanding requests have been c
fwereade
2012/12/11 09:16:33
+1
| |
| 37 // interface to the given state as a websocket. | 54 // it is running have completed. |
| 38 func newHandler(s *state.State) http.Handler { | 55 func (srv *Server) Stop() error { |
| 39 » return websocket.Handler(func(conn *websocket.Conn) { | 56 » srv.tomb.Kill(nil) |
| 40 » » srv := &srvState{ | 57 » return srv.tomb.Wait() |
| 41 » » » state: s, | 58 } |
| 42 » » » conn: conn, | 59 |
| 60 func (srv *Server) run(lis net.Listener) error { | |
| 61 » srv.wg.Add(1) | |
| 62 » go func() { | |
| 63 » » <-srv.tomb.Dying() | |
| 64 » » lis.Close() | |
| 65 » » srv.wg.Done() | |
|
dfc
2012/12/10 21:53:06
defer this line ? lis.Close() may panic, although
fwereade
2012/12/11 09:16:33
+1 also
rog
2012/12/13 17:16:22
lis.Close cannot panic (we've checked the listener
| |
| 66 » }() | |
| 67 » handler := websocket.Handler(func(conn *websocket.Conn) { | |
| 68 » » srv.wg.Add(1) | |
| 69 » » defer srv.wg.Done() | |
|
niemeyer
2012/12/13 16:05:55
Why do we have two Adds and two Dones for a single
| |
| 70 » » // If we've got to this stage and the tomb is still | |
| 71 » » // alive, we know that any tomb.Kill must occur after we | |
| 72 » » // have called wg.Add, so we avoid the possibility of a | |
| 73 » » // handler goroutine running after Stop has returned. | |
| 74 » » if srv.tomb.Err() != tomb.ErrStillAlive { | |
| 75 » » » return | |
| 43 } | 76 } |
| 44 » » srv.run() | 77 » » st := &srvState{ |
| 78 » » » srv: srv, | |
| 79 » » » conn: conn, | |
| 80 » » } | |
| 81 » » srv.wg.Add(1) | |
| 82 » » go func() { | |
| 83 » » » st.run() | |
| 84 » » » srv.wg.Done() | |
| 85 » » }() | |
| 86 » » <-srv.tomb.Dying() | |
|
niemeyer
2012/12/13 16:05:55
Won't this block every single goroutine per connec
| |
| 87 » » conn.Close() | |
| 45 }) | 88 }) |
| 89 // The error from http.Serve is not interesting. | |
| 90 http.Serve(lis, handler) | |
| 91 return nil | |
| 92 } | |
| 93 | |
| 94 // Addr returns the address that the server is listening on. | |
| 95 func (srv *Server) Addr() string { | |
| 96 return srv.addr.String() | |
| 46 } | 97 } |
| 47 | 98 |
| 48 type rpcRequest struct { | 99 type rpcRequest struct { |
| 49 Request string // placeholder only | 100 Request string // placeholder only |
| 50 } | 101 } |
| 51 | 102 |
| 52 type rpcResponse struct { | 103 type rpcResponse struct { |
| 53 Response string // placeholder only | 104 Response string // placeholder only |
| 54 Error string | 105 Error string |
| 55 } | 106 } |
| 56 | 107 |
| 57 func (srv *srvState) run() { | 108 func (st *srvState) run() { |
| 58 for { | 109 for { |
| 59 var req rpcRequest | 110 var req rpcRequest |
| 60 » » err := websocket.JSON.Receive(srv.conn, &req) | 111 » » err := websocket.JSON.Receive(st.conn, &req) |
| 61 if err != nil { | 112 if err != nil { |
| 62 log.Printf("api: error receiving request: %v", err) | 113 log.Printf("api: error receiving request: %v", err) |
| 63 return | 114 return |
| 64 } | 115 } |
| 65 var resp rpcResponse | 116 var resp rpcResponse |
| 66 // placeholder for executing some arbitrary operation | 117 // placeholder for executing some arbitrary operation |
| 67 // on state. | 118 // on state. |
| 68 » » m, err := srv.state.Machine(req.Request) | 119 » » m, err := st.srv.state.Machine(req.Request) |
| 69 if err != nil { | 120 if err != nil { |
| 70 resp.Error = err.Error() | 121 resp.Error = err.Error() |
| 71 } else { | 122 } else { |
| 72 instId, err := m.InstanceId() | 123 instId, err := m.InstanceId() |
| 73 if err != nil { | 124 if err != nil { |
| 74 resp.Error = err.Error() | 125 resp.Error = err.Error() |
| 75 } else { | 126 } else { |
| 76 resp.Response = string(instId) | 127 resp.Response = string(instId) |
| 77 } | 128 } |
| 78 } | 129 } |
| 79 » » err = websocket.JSON.Send(srv.conn, &resp) | 130 » » err = websocket.JSON.Send(st.conn, &resp) |
| 80 if err != nil { | 131 if err != nil { |
| 81 log.Printf("api: error sending reply: %v", err) | 132 log.Printf("api: error sending reply: %v", err) |
| 82 return | 133 return |
| 83 } | 134 } |
| 84 } | 135 } |
| 85 } | 136 } |
| OLD | NEW |