Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(44)

Side by Side Diff: rpc/server.go

Issue 7398050: rpc: allow served object to know about shutdown.
Patch Set: rpc: allow served object to know about shutdown. Created 12 years, 1 month ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« rpc/rpc_test.go ('K') | « rpc/rpc_test.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package rpc 1 package rpc
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "io" 5 "io"
6 "launchpad.net/juju-core/log" 6 "launchpad.net/juju-core/log"
7 "reflect" 7 "reflect"
8 "sync" 8 "sync"
9 ) 9 )
10 10
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 sending sync.Mutex 62 sending sync.Mutex
63 } 63 }
64 64
65 // ErrorCoder represents an any error that has an associated 65 // ErrorCoder represents an any error that has an associated
66 // error code. An error code is a short string that describes the 66 // error code. An error code is a short string that describes the
67 // class of error. 67 // class of error.
68 type ErrorCoder interface { 68 type ErrorCoder interface {
69 ErrorCode() string 69 ErrorCode() string
70 } 70 }
71 71
72 // Killer represents a type that can be asked to
73 // abort any outstanding requests. The Kill
74 // method should return immediately.
75 type Killer interface {
TheMue 2013/02/26 12:01:56 Hehe, funny name.
dave_cheney.net 2013/02/27 05:24:14 Sometimes the language hand you these gems.
76 Kill()
77 }
78
72 // ServeCodec runs the server on a single connection. ServeCodec 79 // ServeCodec runs the server on a single connection. ServeCodec
73 // blocks, serving the connection until the client hangs up. The caller 80 // blocks, serving the connection until the client hangs up. The caller
74 // typically invokes ServeCodec in a go statement. The given 81 // typically invokes ServeCodec in a go statement. The given
75 // root value, which must be the same type as that passed to 82 // root value, which must be the same type as that passed to
76 // NewServer, is used to invoke the RPC requests. If rootValue 83 // NewServer, is used to invoke the RPC requests. If rootValue
77 // nil, the original root value passed to NewServer will 84 // nil, the original root value passed to NewServer will
78 // be used instead. 85 // be used instead.
79 // 86 //
80 // ServeCodec will only return when all its outstanding calls have 87 // ServeCodec stops serving requests when it receives an error
88 // reading a request. Before returning, if rootValue implements
89 // the Killer interface, its Kill method will be called.
90 // ServeCodec will then return only when all its outstanding calls have
81 // completed. 91 // completed.
82 func (srv *Server) ServeCodec(codec ServerCodec, rootValue interface{}) error { 92 func (srv *Server) ServeCodec(codec ServerCodec, root interface{}) error {
83 » return srv.serve(reflect.ValueOf(rootValue), codec)
84 }
85
86 func (srv *Server) serve(root reflect.Value, codec ServerCodec) error {
87 » // TODO(rog) allow concurrent requests.
88 » if root.Type() != srv.root.Type() {
89 » » panic(fmt.Errorf("rpc: unexpected type of root value; got %s, wa nt %s", root.Type(), srv.root.Type()))
90 » }
91 csrv := &codecServer{ 93 csrv := &codecServer{
92 Server: srv, 94 Server: srv,
93 codec: codec, 95 codec: codec,
94 » » root: root, 96 » » root: reflect.ValueOf(root),
97 » }
98 » if csrv.root.Type() != srv.root.Type() {
99 » » panic(fmt.Errorf("rpc: unexpected type of root value; got %s, wa nt %s", csrv.root.Type(), srv.root.Type()))
95 } 100 }
96 defer csrv.pending.Wait() 101 defer csrv.pending.Wait()
102 err := csrv.serve()
103 if killer, ok := root.(Killer); ok {
104 killer.Kill()
105 }
106 return err
107 }
108
109 func (csrv *codecServer) serve() error {
97 var req Request 110 var req Request
98 for { 111 for {
99 req = Request{} 112 req = Request{}
100 » » err := codec.ReadRequestHeader(&req) 113 » » err := csrv.codec.ReadRequestHeader(&req)
101 if err != nil { 114 if err != nil {
102 if err == io.EOF { 115 if err == io.EOF {
103 return nil 116 return nil
104 } 117 }
105 return err 118 return err
106 } 119 }
107 o, a, err := csrv.findRequest(&req) 120 o, a, err := csrv.findRequest(&req)
108 if err != nil { 121 if err != nil {
109 » » » _ = codec.ReadRequestBody(&struct{}{}) 122 » » » _ = csrv.codec.ReadRequestBody(&struct{}{})
110 resp := &Response{ 123 resp := &Response{
111 RequestId: req.RequestId, 124 RequestId: req.RequestId,
112 } 125 }
113 csrv.setError(resp, err) 126 csrv.setError(resp, err)
114 » » » if err := codec.WriteResponse(resp, struct{}{}); err != nil { 127 » » » if err := csrv.codec.WriteResponse(resp, struct{}{}); er r != nil {
115 return err 128 return err
116 } 129 }
117 continue 130 continue
118 } 131 }
119 var argp interface{} 132 var argp interface{}
120 var arg reflect.Value 133 var arg reflect.Value
121 if a.arg != nil { 134 if a.arg != nil {
122 v := reflect.New(a.arg) 135 v := reflect.New(a.arg)
123 arg = v.Elem() 136 arg = v.Elem()
124 argp = v.Interface() 137 argp = v.Interface()
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
179 } 192 }
180 } 193 }
181 194
182 func (csrv *codecServer) runRequest0(reqId uint64, objId string, o *obtainer, a *action, arg reflect.Value) (reflect.Value, error) { 195 func (csrv *codecServer) runRequest0(reqId uint64, objId string, o *obtainer, a *action, arg reflect.Value) (reflect.Value, error) {
183 obj, err := o.call(csrv.root, objId) 196 obj, err := o.call(csrv.root, objId)
184 if err != nil { 197 if err != nil {
185 return reflect.Value{}, err 198 return reflect.Value{}, err
186 } 199 }
187 return a.call(obj, arg) 200 return a.call(obj, arg)
188 } 201 }
OLDNEW
« rpc/rpc_test.go ('K') | « rpc/rpc_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b