Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1268)

Side by Side Diff: src/pkg/rpc/server.go

Issue 4889043: code review 4889043: rpc: implement ServeRequest to synchronously serve a si... (Closed)
Patch Set: diff -r de51e6f7dd3e https://go.googlecode.com/hg/ Created 13 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | src/pkg/rpc/server_test.go » ('j') | src/pkg/rpc/server_test.go » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 /* 5 /*
6 Package rpc provides access to the exported methods of an object across a 6 Package rpc provides access to the exported methods of an object across a
7 network or other I/O connection. A server registers an object, making i t visible 7 network or other I/O connection. A server registers an object, making i t visible
8 as a service with the name of the type of the object. After registratio n, exported 8 as a service with the name of the type of the object. After registratio n, exported
9 methods of the object will be accessible remotely. A server may registe r multiple 9 methods of the object will be accessible remotely. A server may registe r multiple
10 objects (services) of different types but it is an error to register mul tiple 10 objects (services) of different types but it is an error to register mul tiple
(...skipping 376 matching lines...) Expand 10 before | Expand all | Expand 10 after
387 buf := bufio.NewWriter(conn) 387 buf := bufio.NewWriter(conn)
388 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf} 388 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf}
389 server.ServeCodec(srv) 389 server.ServeCodec(srv)
390 } 390 }
391 391
392 // ServeCodec is like ServeConn but uses the specified codec to 392 // ServeCodec is like ServeConn but uses the specified codec to
393 // decode requests and encode responses. 393 // decode requests and encode responses.
394 func (server *Server) ServeCodec(codec ServerCodec) { 394 func (server *Server) ServeCodec(codec ServerCodec) {
395 sending := new(sync.Mutex) 395 sending := new(sync.Mutex)
396 for { 396 for {
397 » » req, service, mtype, err := server.readRequest(codec) 397 » » service, mtype, req, argv, replyv, err := server.readRequest(cod ec)
398 if err != nil { 398 if err != nil {
399 if err != os.EOF { 399 if err != os.EOF {
400 log.Println("rpc:", err) 400 log.Println("rpc:", err)
401 } 401 }
402 if err == os.EOF || err == io.ErrUnexpectedEOF { 402 if err == os.EOF || err == io.ErrUnexpectedEOF {
403 break 403 break
404 } 404 }
405 // discard body
406 codec.ReadRequestBody(nil)
407
408 // send a response if we actually managed to read a head er. 405 // send a response if we actually managed to read a head er.
409 if req != nil { 406 if req != nil {
410 server.sendResponse(sending, req, invalidRequest , codec, err.String()) 407 server.sendResponse(sending, req, invalidRequest , codec, err.String())
411 server.freeRequest(req) 408 server.freeRequest(req)
412 } 409 }
413 continue 410 continue
414 } 411 }
415
416 // Decode the argument value.
417 var argv reflect.Value
418 argIsValue := false // if true, need to indirect before calling.
419 if mtype.ArgType.Kind() == reflect.Ptr {
420 argv = reflect.New(mtype.ArgType.Elem())
421 } else {
422 argv = reflect.New(mtype.ArgType)
423 argIsValue = true
424 }
425 // argv guaranteed to be a pointer now.
426 replyv := reflect.New(mtype.ReplyType.Elem())
427 err = codec.ReadRequestBody(argv.Interface())
428 if err != nil {
429 if err == os.EOF || err == io.ErrUnexpectedEOF {
430 if err == io.ErrUnexpectedEOF {
431 log.Println("rpc:", err)
432 }
433 break
434 }
435 server.sendResponse(sending, req, replyv.Interface(), co dec, err.String())
436 continue
437 }
438 if argIsValue {
439 argv = argv.Elem()
440 }
441 go service.call(server, sending, mtype, req, argv, replyv, codec ) 412 go service.call(server, sending, mtype, req, argv, replyv, codec )
442 } 413 }
443 codec.Close() 414 codec.Close()
444 } 415 }
445 416
417 // ServeSingle is like ServeCodec but synchronously serves a single request
rsc 2011/08/15 16:11:09 suggest ServeRequest instead of ServeSingle. // S
sougou 2011/08/15 20:17:18 Done.
418 // without launching a goroutine. It does not close the codec upon completion.
419 func (server *Server) ServeSingle(codec ServerCodec) os.Error {
420 sending := new(sync.Mutex)
421 service, mtype, req, argv, replyv, err := server.readRequest(codec)
422 if err != nil {
423 if err == os.EOF || err == io.ErrUnexpectedEOF {
424 return err
425 }
426 // send a response if we actually managed to read a header.
427 if req != nil {
428 server.sendResponse(sending, req, invalidRequest, codec, err.String())
429 server.freeRequest(req)
430 }
431 return err
432 }
433 service.call(server, sending, mtype, req, argv, replyv, codec)
434 return nil
435 }
436
446 func (server *Server) getRequest() *Request { 437 func (server *Server) getRequest() *Request {
447 server.reqLock.Lock() 438 server.reqLock.Lock()
448 req := server.freeReq 439 req := server.freeReq
449 if req == nil { 440 if req == nil {
450 req = new(Request) 441 req = new(Request)
451 } else { 442 } else {
452 server.freeReq = req.next 443 server.freeReq = req.next
453 *req = Request{} 444 *req = Request{}
454 } 445 }
455 server.reqLock.Unlock() 446 server.reqLock.Unlock()
(...skipping 20 matching lines...) Expand all
476 return resp 467 return resp
477 } 468 }
478 469
479 func (server *Server) freeResponse(resp *Response) { 470 func (server *Server) freeResponse(resp *Response) {
480 server.respLock.Lock() 471 server.respLock.Lock()
481 resp.next = server.freeResp 472 resp.next = server.freeResp
482 server.freeResp = resp 473 server.freeResp = resp
483 server.respLock.Unlock() 474 server.respLock.Unlock()
484 } 475 }
485 476
486 func (server *Server) readRequest(codec ServerCodec) (req *Request, service *ser vice, mtype *methodType, err os.Error) { 477 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m ethodType, req *Request, argv, replyv reflect.Value, err os.Error) {
478 » service, mtype, req, err = server.readRequestHeader(codec)
479 » if err != nil {
480 » » if err == os.EOF || err == io.ErrUnexpectedEOF {
481 » » » return
482 » » }
483 » » // discard body
484 » » codec.ReadRequestBody(nil)
485 » » return
486 » }
487
488 » // Decode the argument value.
489 » argIsValue := false // if true, need to indirect before calling.
490 » if mtype.ArgType.Kind() == reflect.Ptr {
491 » » argv = reflect.New(mtype.ArgType.Elem())
492 » } else {
493 » » argv = reflect.New(mtype.ArgType)
494 » » argIsValue = true
495 » }
496 » // argv guaranteed to be a pointer now.
497 » if err = codec.ReadRequestBody(argv.Interface()); err != nil {
498 » » return
499 » }
500 » if argIsValue {
501 » » argv = argv.Elem()
502 » }
503
504 » replyv = reflect.New(mtype.ReplyType.Elem())
505 » return
506 }
507
508 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mt ype *methodType, req *Request, err os.Error) {
487 // Grab the request header. 509 // Grab the request header.
488 req = server.getRequest() 510 req = server.getRequest()
489 err = codec.ReadRequestHeader(req) 511 err = codec.ReadRequestHeader(req)
490 if err != nil { 512 if err != nil {
491 req = nil 513 req = nil
492 if err == os.EOF || err == io.ErrUnexpectedEOF { 514 if err == os.EOF || err == io.ErrUnexpectedEOF {
493 return 515 return
494 } 516 }
495 err = os.NewError("rpc: server cannot decode request: " + err.St ring()) 517 err = os.NewError("rpc: server cannot decode request: " + err.St ring())
496 return 518 return
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
561 func ServeConn(conn io.ReadWriteCloser) { 583 func ServeConn(conn io.ReadWriteCloser) {
562 DefaultServer.ServeConn(conn) 584 DefaultServer.ServeConn(conn)
563 } 585 }
564 586
565 // ServeCodec is like ServeConn but uses the specified codec to 587 // ServeCodec is like ServeConn but uses the specified codec to
566 // decode requests and encode responses. 588 // decode requests and encode responses.
567 func ServeCodec(codec ServerCodec) { 589 func ServeCodec(codec ServerCodec) {
568 DefaultServer.ServeCodec(codec) 590 DefaultServer.ServeCodec(codec)
569 } 591 }
570 592
593 // ServeSingle is like ServeCodec but synchronously serves a single request
rsc 2011/08/15 16:11:09 same
sougou 2011/08/15 20:17:18 Done.
594 // without launching a goroutine. It does not close the codec upon completion.
595 func ServeSingle(codec ServerCodec) os.Error {
596 return DefaultServer.ServeSingle(codec)
597 }
598
571 // Accept accepts connections on the listener and serves requests 599 // Accept accepts connections on the listener and serves requests
572 // to DefaultServer for each incoming connection.·· 600 // to DefaultServer for each incoming connection.··
573 // Accept blocks; the caller typically invokes it in a go statement. 601 // Accept blocks; the caller typically invokes it in a go statement.
574 func Accept(lis net.Listener) { DefaultServer.Accept(lis) } 602 func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
575 603
576 // Can connect to RPC service using HTTP CONNECT to rpcPath. 604 // Can connect to RPC service using HTTP CONNECT to rpcPath.
577 var connected = "200 Connected to Go RPC" 605 var connected = "200 Connected to Go RPC"
578 606
579 // ServeHTTP implements an http.Handler that answers RPC requests. 607 // ServeHTTP implements an http.Handler that answers RPC requests.
580 func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { 608 func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
(...skipping 19 matching lines...) Expand all
600 http.Handle(rpcPath, server) 628 http.Handle(rpcPath, server)
601 http.Handle(debugPath, debugHTTP{server}) 629 http.Handle(debugPath, debugHTTP{server})
602 } 630 }
603 631
604 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer 632 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
605 // on DefaultRPCPath and a debugging handler on DefaultDebugPath. 633 // on DefaultRPCPath and a debugging handler on DefaultDebugPath.
606 // It is still necessary to invoke http.Serve(), typically in a go statement. 634 // It is still necessary to invoke http.Serve(), typically in a go statement.
607 func HandleHTTP() { 635 func HandleHTTP() {
608 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) 636 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
609 } 637 }
OLDNEW
« no previous file with comments | « no previous file | src/pkg/rpc/server_test.go » ('j') | src/pkg/rpc/server_test.go » ('J')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b