LEFT | RIGHT |
(no file at all) | |
1 // Copyright 2009 The Go Authors. All rights reserved. | 1 // Copyright 2009 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 /* | 5 /* |
6 Package rpc provides access to the exported methods of an object across
a | 6 Package rpc provides access to the exported methods of an object across
a |
7 network or other I/O connection. A server registers an object, making i
t visible | 7 network or other I/O connection. A server registers an object, making i
t visible |
8 as a service with the name of the type of the object. After registratio
n, exported | 8 as a service with the name of the type of the object. After registratio
n, exported |
9 methods of the object will be accessible remotely. A server may registe
r multiple | 9 methods of the object will be accessible remotely. A server may registe
r multiple |
10 objects (services) of different types but it is an error to register mul
tiple | 10 objects (services) of different types but it is an error to register mul
tiple |
(...skipping 376 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
387 buf := bufio.NewWriter(conn) | 387 buf := bufio.NewWriter(conn) |
388 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf),
buf} | 388 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf),
buf} |
389 server.ServeCodec(srv) | 389 server.ServeCodec(srv) |
390 } | 390 } |
391 | 391 |
392 // ServeCodec is like ServeConn but uses the specified codec to | 392 // ServeCodec is like ServeConn but uses the specified codec to |
393 // decode requests and encode responses. | 393 // decode requests and encode responses. |
394 func (server *Server) ServeCodec(codec ServerCodec) { | 394 func (server *Server) ServeCodec(codec ServerCodec) { |
395 sending := new(sync.Mutex) | 395 sending := new(sync.Mutex) |
396 for { | 396 for { |
397 » » service, mtype, req, argv, replyv, err := server.readRequest(cod
ec) | 397 » » service, mtype, req, argv, replyv, keepReading, err := server.re
adRequest(codec) |
398 if err != nil { | 398 if err != nil { |
399 if err != os.EOF { | 399 if err != os.EOF { |
400 log.Println("rpc:", err) | 400 log.Println("rpc:", err) |
401 } | 401 } |
402 » » » if err == os.EOF || err == io.ErrUnexpectedEOF { | 402 » » » if !keepReading { |
403 break | 403 break |
404 } | 404 } |
405 // send a response if we actually managed to read a head
er. | 405 // send a response if we actually managed to read a head
er. |
406 if req != nil { | 406 if req != nil { |
407 server.sendResponse(sending, req, invalidRequest
, codec, err.String()) | 407 server.sendResponse(sending, req, invalidRequest
, codec, err.String()) |
408 server.freeRequest(req) | 408 server.freeRequest(req) |
409 } | 409 } |
410 continue | 410 continue |
411 } | 411 } |
412 go service.call(server, sending, mtype, req, argv, replyv, codec
) | 412 go service.call(server, sending, mtype, req, argv, replyv, codec
) |
413 } | 413 } |
414 codec.Close() | 414 codec.Close() |
415 } | 415 } |
416 | 416 |
417 // ServeRequest is like ServeCodec but synchronously serves a single request. | 417 // ServeRequest is like ServeCodec but synchronously serves a single request. |
418 // It does not close the codec upon completion. | 418 // It does not close the codec upon completion. |
419 func (server *Server) ServeRequest(codec ServerCodec) os.Error { | 419 func (server *Server) ServeRequest(codec ServerCodec) os.Error { |
420 sending := new(sync.Mutex) | 420 sending := new(sync.Mutex) |
421 » service, mtype, req, argv, replyv, err := server.readRequest(codec) | 421 » service, mtype, req, argv, replyv, keepReading, err := server.readReques
t(codec) |
422 if err != nil { | 422 if err != nil { |
423 » » if err == os.EOF || err == io.ErrUnexpectedEOF { | 423 » » if !keepReading { |
424 return err | 424 return err |
425 } | 425 } |
426 // send a response if we actually managed to read a header. | 426 // send a response if we actually managed to read a header. |
427 if req != nil { | 427 if req != nil { |
428 server.sendResponse(sending, req, invalidRequest, codec,
err.String()) | 428 server.sendResponse(sending, req, invalidRequest, codec,
err.String()) |
429 server.freeRequest(req) | 429 server.freeRequest(req) |
430 } | 430 } |
431 return err | 431 return err |
432 } | 432 } |
433 service.call(server, sending, mtype, req, argv, replyv, codec) | 433 service.call(server, sending, mtype, req, argv, replyv, codec) |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
467 return resp | 467 return resp |
468 } | 468 } |
469 | 469 |
470 func (server *Server) freeResponse(resp *Response) { | 470 func (server *Server) freeResponse(resp *Response) { |
471 server.respLock.Lock() | 471 server.respLock.Lock() |
472 resp.next = server.freeResp | 472 resp.next = server.freeResp |
473 server.freeResp = resp | 473 server.freeResp = resp |
474 server.respLock.Unlock() | 474 server.respLock.Unlock() |
475 } | 475 } |
476 | 476 |
477 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m
ethodType, req *Request, argv, replyv reflect.Value, err os.Error) { | 477 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m
ethodType, req *Request, argv, replyv reflect.Value, keepReading bool, err os.Er
ror) { |
478 » service, mtype, req, err = server.readRequestHeader(codec) | 478 » service, mtype, req, keepReading, err = server.readRequestHeader(codec) |
479 if err != nil { | 479 if err != nil { |
480 » » if err == os.EOF || err == io.ErrUnexpectedEOF { | 480 » » if !keepReading { |
481 return | 481 return |
482 } | 482 } |
483 // discard body | 483 // discard body |
484 codec.ReadRequestBody(nil) | 484 codec.ReadRequestBody(nil) |
485 return | 485 return |
486 } | 486 } |
487 | 487 |
488 // Decode the argument value. | 488 // Decode the argument value. |
489 argIsValue := false // if true, need to indirect before calling. | 489 argIsValue := false // if true, need to indirect before calling. |
490 if mtype.ArgType.Kind() == reflect.Ptr { | 490 if mtype.ArgType.Kind() == reflect.Ptr { |
491 argv = reflect.New(mtype.ArgType.Elem()) | 491 argv = reflect.New(mtype.ArgType.Elem()) |
492 } else { | 492 } else { |
493 argv = reflect.New(mtype.ArgType) | 493 argv = reflect.New(mtype.ArgType) |
494 argIsValue = true | 494 argIsValue = true |
495 } | 495 } |
496 // argv guaranteed to be a pointer now. | 496 // argv guaranteed to be a pointer now. |
497 if err = codec.ReadRequestBody(argv.Interface()); err != nil { | 497 if err = codec.ReadRequestBody(argv.Interface()); err != nil { |
498 return | 498 return |
499 } | 499 } |
500 if argIsValue { | 500 if argIsValue { |
501 argv = argv.Elem() | 501 argv = argv.Elem() |
502 } | 502 } |
503 | 503 |
504 replyv = reflect.New(mtype.ReplyType.Elem()) | 504 replyv = reflect.New(mtype.ReplyType.Elem()) |
505 return | 505 return |
506 } | 506 } |
507 | 507 |
508 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mt
ype *methodType, req *Request, err os.Error) { | 508 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mt
ype *methodType, req *Request, keepReading bool, err os.Error) { |
509 // Grab the request header. | 509 // Grab the request header. |
510 req = server.getRequest() | 510 req = server.getRequest() |
511 err = codec.ReadRequestHeader(req) | 511 err = codec.ReadRequestHeader(req) |
512 if err != nil { | 512 if err != nil { |
513 req = nil | 513 req = nil |
514 if err == os.EOF || err == io.ErrUnexpectedEOF { | 514 if err == os.EOF || err == io.ErrUnexpectedEOF { |
515 return | 515 return |
516 } | 516 } |
517 err = os.NewError("rpc: server cannot decode request: " + err.St
ring()) | 517 err = os.NewError("rpc: server cannot decode request: " + err.St
ring()) |
518 return | 518 return |
519 } | 519 } |
| 520 |
| 521 // We read the header successfully. If we see an error now, |
| 522 // we can still recover and move on to the next request. |
| 523 keepReading = true |
520 | 524 |
521 serviceMethod := strings.Split(req.ServiceMethod, ".") | 525 serviceMethod := strings.Split(req.ServiceMethod, ".") |
522 if len(serviceMethod) != 2 { | 526 if len(serviceMethod) != 2 { |
523 err = os.NewError("rpc: service/method request ill-formed: " + r
eq.ServiceMethod) | 527 err = os.NewError("rpc: service/method request ill-formed: " + r
eq.ServiceMethod) |
524 return | 528 return |
525 } | 529 } |
526 // Look up the request. | 530 // Look up the request. |
527 server.mu.Lock() | 531 server.mu.Lock() |
528 service = server.serviceMap[serviceMethod[0]] | 532 service = server.serviceMap[serviceMethod[0]] |
529 server.mu.Unlock() | 533 server.mu.Unlock() |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
628 http.Handle(rpcPath, server) | 632 http.Handle(rpcPath, server) |
629 http.Handle(debugPath, debugHTTP{server}) | 633 http.Handle(debugPath, debugHTTP{server}) |
630 } | 634 } |
631 | 635 |
632 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer | 636 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer |
633 // on DefaultRPCPath and a debugging handler on DefaultDebugPath. | 637 // on DefaultRPCPath and a debugging handler on DefaultDebugPath. |
634 // It is still necessary to invoke http.Serve(), typically in a go statement. | 638 // It is still necessary to invoke http.Serve(), typically in a go statement. |
635 func HandleHTTP() { | 639 func HandleHTTP() { |
636 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) | 640 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) |
637 } | 641 } |
LEFT | RIGHT |