Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(76)

Delta Between Two Patch Sets: src/pkg/rpc/server.go

Issue 5305084: code review 5305084: rpc: avoid infinite loop on input error (Closed)
Left Patch Set: Created 13 years, 5 months ago
Right Patch Set: diff -r 39729c81e276 https://go.googlecode.com/hg/ Created 13 years, 5 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « src/pkg/rpc/jsonrpc/all_test.go ('k') | src/pkg/rpc/server_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 /* 5 /*
6 Package rpc provides access to the exported methods of an object across a 6 Package rpc provides access to the exported methods of an object across a
7 network or other I/O connection. A server registers an object, making i t visible 7 network or other I/O connection. A server registers an object, making i t visible
8 as a service with the name of the type of the object. After registratio n, exported 8 as a service with the name of the type of the object. After registratio n, exported
9 methods of the object will be accessible remotely. A server may registe r multiple 9 methods of the object will be accessible remotely. A server may registe r multiple
10 objects (services) of different types but it is an error to register mul tiple 10 objects (services) of different types but it is an error to register mul tiple
(...skipping 376 matching lines...) Expand 10 before | Expand all | Expand 10 after
387 buf := bufio.NewWriter(conn) 387 buf := bufio.NewWriter(conn)
388 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf} 388 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf}
389 server.ServeCodec(srv) 389 server.ServeCodec(srv)
390 } 390 }
391 391
392 // ServeCodec is like ServeConn but uses the specified codec to 392 // ServeCodec is like ServeConn but uses the specified codec to
393 // decode requests and encode responses. 393 // decode requests and encode responses.
394 func (server *Server) ServeCodec(codec ServerCodec) { 394 func (server *Server) ServeCodec(codec ServerCodec) {
395 sending := new(sync.Mutex) 395 sending := new(sync.Mutex)
396 for { 396 for {
397 » » service, mtype, req, argv, replyv, err := server.readRequest(cod ec) 397 » » service, mtype, req, argv, replyv, keepReading, err := server.re adRequest(codec)
398 if err != nil { 398 if err != nil {
399 if err != os.EOF { 399 if err != os.EOF {
400 log.Println("rpc:", err) 400 log.Println("rpc:", err)
401 } 401 }
402 » » » if err == os.EOF || err == io.ErrUnexpectedEOF { 402 » » » if !keepReading {
403 break 403 break
404 } 404 }
405 // send a response if we actually managed to read a head er. 405 // send a response if we actually managed to read a head er.
406 if req != nil { 406 if req != nil {
407 server.sendResponse(sending, req, invalidRequest , codec, err.String()) 407 server.sendResponse(sending, req, invalidRequest , codec, err.String())
408 server.freeRequest(req) 408 server.freeRequest(req)
409 } 409 }
410 continue 410 continue
411 } 411 }
412 go service.call(server, sending, mtype, req, argv, replyv, codec ) 412 go service.call(server, sending, mtype, req, argv, replyv, codec )
413 } 413 }
414 codec.Close() 414 codec.Close()
415 } 415 }
416 416
417 // ServeRequest is like ServeCodec but synchronously serves a single request. 417 // ServeRequest is like ServeCodec but synchronously serves a single request.
418 // It does not close the codec upon completion. 418 // It does not close the codec upon completion.
419 func (server *Server) ServeRequest(codec ServerCodec) os.Error { 419 func (server *Server) ServeRequest(codec ServerCodec) os.Error {
420 sending := new(sync.Mutex) 420 sending := new(sync.Mutex)
421 » service, mtype, req, argv, replyv, err := server.readRequest(codec) 421 » service, mtype, req, argv, replyv, keepReading, err := server.readReques t(codec)
rog 2011/11/02 21:28:31 does that set a record for most number of returned
422 if err != nil { 422 if err != nil {
423 » » if err == os.EOF || err == io.ErrUnexpectedEOF { 423 » » if !keepReading {
424 return err 424 return err
425 } 425 }
426 // send a response if we actually managed to read a header. 426 // send a response if we actually managed to read a header.
427 if req != nil { 427 if req != nil {
428 server.sendResponse(sending, req, invalidRequest, codec, err.String()) 428 server.sendResponse(sending, req, invalidRequest, codec, err.String())
429 server.freeRequest(req) 429 server.freeRequest(req)
430 } 430 }
431 return err 431 return err
432 } 432 }
433 service.call(server, sending, mtype, req, argv, replyv, codec) 433 service.call(server, sending, mtype, req, argv, replyv, codec)
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
467 return resp 467 return resp
468 } 468 }
469 469
470 func (server *Server) freeResponse(resp *Response) { 470 func (server *Server) freeResponse(resp *Response) {
471 server.respLock.Lock() 471 server.respLock.Lock()
472 resp.next = server.freeResp 472 resp.next = server.freeResp
473 server.freeResp = resp 473 server.freeResp = resp
474 server.respLock.Unlock() 474 server.respLock.Unlock()
475 } 475 }
476 476
477 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m ethodType, req *Request, argv, replyv reflect.Value, err os.Error) { 477 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m ethodType, req *Request, argv, replyv reflect.Value, keepReading bool, err os.Er ror) {
478 » service, mtype, req, err = server.readRequestHeader(codec) 478 » service, mtype, req, keepReading, err = server.readRequestHeader(codec)
479 if err != nil { 479 if err != nil {
480 » » if err == os.EOF || err == io.ErrUnexpectedEOF { 480 » » if !keepReading {
481 return 481 return
482 } 482 }
483 // discard body 483 // discard body
484 codec.ReadRequestBody(nil) 484 codec.ReadRequestBody(nil)
485 return 485 return
486 } 486 }
487 487
488 // Decode the argument value. 488 // Decode the argument value.
489 argIsValue := false // if true, need to indirect before calling. 489 argIsValue := false // if true, need to indirect before calling.
490 if mtype.ArgType.Kind() == reflect.Ptr { 490 if mtype.ArgType.Kind() == reflect.Ptr {
491 argv = reflect.New(mtype.ArgType.Elem()) 491 argv = reflect.New(mtype.ArgType.Elem())
492 } else { 492 } else {
493 argv = reflect.New(mtype.ArgType) 493 argv = reflect.New(mtype.ArgType)
494 argIsValue = true 494 argIsValue = true
495 } 495 }
496 // argv guaranteed to be a pointer now. 496 // argv guaranteed to be a pointer now.
497 if err = codec.ReadRequestBody(argv.Interface()); err != nil { 497 if err = codec.ReadRequestBody(argv.Interface()); err != nil {
498 return 498 return
499 } 499 }
500 if argIsValue { 500 if argIsValue {
501 argv = argv.Elem() 501 argv = argv.Elem()
502 } 502 }
503 503
504 replyv = reflect.New(mtype.ReplyType.Elem()) 504 replyv = reflect.New(mtype.ReplyType.Elem())
505 return 505 return
506 } 506 }
507 507
508 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mt ype *methodType, req *Request, err os.Error) { 508 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mt ype *methodType, req *Request, keepReading bool, err os.Error) {
509 // Grab the request header. 509 // Grab the request header.
510 req = server.getRequest() 510 req = server.getRequest()
511 err = codec.ReadRequestHeader(req) 511 err = codec.ReadRequestHeader(req)
512 if err != nil { 512 if err != nil {
513 req = nil 513 req = nil
514 if err == os.EOF || err == io.ErrUnexpectedEOF { 514 if err == os.EOF || err == io.ErrUnexpectedEOF {
515 return 515 return
516 } 516 }
517 err = os.NewError("rpc: server cannot decode request: " + err.St ring()) 517 err = os.NewError("rpc: server cannot decode request: " + err.St ring())
518 return 518 return
519 } 519 }
520
521 // We read the header successfully. If we see an error now,
522 // we can still recover and move on to the next request.
523 keepReading = true
520 524
521 serviceMethod := strings.Split(req.ServiceMethod, ".") 525 serviceMethod := strings.Split(req.ServiceMethod, ".")
522 if len(serviceMethod) != 2 { 526 if len(serviceMethod) != 2 {
523 err = os.NewError("rpc: service/method request ill-formed: " + r eq.ServiceMethod) 527 err = os.NewError("rpc: service/method request ill-formed: " + r eq.ServiceMethod)
524 return 528 return
525 } 529 }
526 // Look up the request. 530 // Look up the request.
527 server.mu.Lock() 531 server.mu.Lock()
528 service = server.serviceMap[serviceMethod[0]] 532 service = server.serviceMap[serviceMethod[0]]
529 server.mu.Unlock() 533 server.mu.Unlock()
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
628 http.Handle(rpcPath, server) 632 http.Handle(rpcPath, server)
629 http.Handle(debugPath, debugHTTP{server}) 633 http.Handle(debugPath, debugHTTP{server})
630 } 634 }
631 635
632 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer 636 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
633 // on DefaultRPCPath and a debugging handler on DefaultDebugPath. 637 // on DefaultRPCPath and a debugging handler on DefaultDebugPath.
634 // It is still necessary to invoke http.Serve(), typically in a go statement. 638 // It is still necessary to invoke http.Serve(), typically in a go statement.
635 func HandleHTTP() { 639 func HandleHTTP() {
636 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) 640 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
637 } 641 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b