Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(266)

Side by Side Diff: zk.go

Issue 5835045: zookeeper: make error messages more informative.
Patch Set: zookeeper: make error messages more informative. Created 12 years ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « retry_test.go ('k') | zk_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // gozk - ZooKeeper support for the Go language 1 // gozk - ZooKeeper support for the Go language
2 // 2 //
3 // https://wiki.ubuntu.com/gozk 3 // https://wiki.ubuntu.com/gozk
4 // 4 //
5 // Copyright (c) 2010-2011 Canonical Ltd. 5 // Copyright (c) 2010-2011 Canonical Ltd.
6 // 6 //
7 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> 7 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
8 // 8 //
9 package zookeeper 9 package zookeeper
10 10
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 // Note that closed channels will deliver zeroed Event, which means 91 // Note that closed channels will deliver zeroed Event, which means
92 // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, 92 // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED,
93 // to facilitate handling. 93 // to facilitate handling.
94 type Event struct { 94 type Event struct {
95 Type int // One of the EVENT_* constants. 95 Type int // One of the EVENT_* constants.
96 Path string // For non-session events, the path of the watched node. 96 Path string // For non-session events, the path of the watched node.
97 State int // One of the STATE_* constants. 97 State int // One of the STATE_* constants.
98 } 98 }
99 99
100 // Error represents a ZooKeeper error. 100 // Error represents a ZooKeeper error.
101 type Error int 101 type Error struct {
102 » Op string
103 » Code ErrorCode
104 » // SystemError holds an error if Code is ZSYSTEMERROR.
105 » SystemError error
106 » Path string
107 }
108
109 func (e *Error) Error() string {
110 » s := e.Code.String()
111 » if e.Code == ZSYSTEMERROR && e.SystemError != nil {
112 » » s = e.SystemError.Error()
113 » }
114 » if e.Path == "" {
115 » » return fmt.Sprintf("zookeeper: %s: %v", e.Op, s)
116 » }
117 » return fmt.Sprintf("zookeeper: %s %q: %v", e.Op, e.Path, s)
118 }
119
120 // IsError returns whether the error is a *Error
121 // with the given error code.
122 func IsError(err error, code ErrorCode) bool {
123 » if err, _ := err.(*Error); err != nil {
124 » » return err.Code == code
125 » }
126 » return false
127 }
128
129 // ErrorCode represents a kind of ZooKeeper error.
130 type ErrorCode int
102 131
103 const ( 132 const (
104 » ZOK Error = C.ZOK 133 » ZOK ErrorCode = C.ZOK
105 » ZSYSTEMERROR Error = C.ZSYSTEMERROR 134 » ZSYSTEMERROR ErrorCode = C.ZSYSTEMERROR
106 » ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY 135 » ZRUNTIMEINCONSISTENCY ErrorCode = C.ZRUNTIMEINCONSISTENCY
107 » ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY 136 » ZDATAINCONSISTENCY ErrorCode = C.ZDATAINCONSISTENCY
108 » ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS 137 » ZCONNECTIONLOSS ErrorCode = C.ZCONNECTIONLOSS
109 » ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR 138 » ZMARSHALLINGERROR ErrorCode = C.ZMARSHALLINGERROR
110 » ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED 139 » ZUNIMPLEMENTED ErrorCode = C.ZUNIMPLEMENTED
111 » ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT 140 » ZOPERATIONTIMEOUT ErrorCode = C.ZOPERATIONTIMEOUT
112 » ZBADARGUMENTS Error = C.ZBADARGUMENTS 141 » ZBADARGUMENTS ErrorCode = C.ZBADARGUMENTS
113 » ZINVALIDSTATE Error = C.ZINVALIDSTATE 142 » ZINVALIDSTATE ErrorCode = C.ZINVALIDSTATE
114 » ZAPIERROR Error = C.ZAPIERROR 143 » ZAPIERROR ErrorCode = C.ZAPIERROR
115 » ZNONODE Error = C.ZNONODE 144 » ZNONODE ErrorCode = C.ZNONODE
116 » ZNOAUTH Error = C.ZNOAUTH 145 » ZNOAUTH ErrorCode = C.ZNOAUTH
117 » ZBADVERSION Error = C.ZBADVERSION 146 » ZBADVERSION ErrorCode = C.ZBADVERSION
118 » ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS 147 » ZNOCHILDRENFOREPHEMERALS ErrorCode = C.ZNOCHILDRENFOREPHEMERALS
119 » ZNODEEXISTS Error = C.ZNODEEXISTS 148 » ZNODEEXISTS ErrorCode = C.ZNODEEXISTS
120 » ZNOTEMPTY Error = C.ZNOTEMPTY 149 » ZNOTEMPTY ErrorCode = C.ZNOTEMPTY
121 » ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED 150 » ZSESSIONEXPIRED ErrorCode = C.ZSESSIONEXPIRED
122 » ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK 151 » ZINVALIDCALLBACK ErrorCode = C.ZINVALIDCALLBACK
123 » ZINVALIDACL Error = C.ZINVALIDACL 152 » ZINVALIDACL ErrorCode = C.ZINVALIDACL
124 » ZAUTHFAILED Error = C.ZAUTHFAILED 153 » ZAUTHFAILED ErrorCode = C.ZAUTHFAILED
125 » ZCLOSING Error = C.ZCLOSING 154 » ZCLOSING ErrorCode = C.ZCLOSING
126 » ZNOTHING Error = C.ZNOTHING 155 » ZNOTHING ErrorCode = C.ZNOTHING
127 » ZSESSIONMOVED Error = C.ZSESSIONMOVED 156 » ZSESSIONMOVED ErrorCode = C.ZSESSIONMOVED
128 ) 157 )
129 158
130 func (error Error) Error() string { 159 func (code ErrorCode) String() string {
131 » return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. 160 » return C.GoString(C.zerror(C.int(code))) // Static, no need to free it.
132 } 161 }
133 162
134 // zkError creates an appropriate error return from 163 // zkError creates an appropriate error return from
135 // a ZooKeeper status and the errno return from a C API 164 // a ZooKeeper status and the errno return from a C API
136 // call. 165 // call.
137 func zkError(rc C.int, cerr error) error { 166 func zkError(rc C.int, cerr error, op, path string) error {
138 » code := Error(rc) 167 » code := ErrorCode(rc)
139 » switch code { 168 » if code == ZOK {
140 » case ZOK:
141 return nil 169 return nil
170 }
171 err := &Error{
172 Op: op,
173 Code: code,
174 Path: path,
175 }
176 if code == ZSYSTEMERROR {
177 err.SystemError = cerr
178 }
179 return err
180 }
142 181
143 » case ZSYSTEMERROR: 182 func closingError(op, path string) error {
144 » » // If a ZooKeeper call returns ZSYSTEMERROR, then 183 » return zkError(C.int(ZCLOSING), nil, op, path)
145 » » // errno becomes significant. If errno has not been
146 » » // set, then we will return ZSYSTEMERROR nonetheless.
147 » » if cerr != nil {
148 » » » return cerr
149 » » }
150 » }
151 » return code
152 } 184 }
153 185
154 // Constants for SetLogLevel. 186 // Constants for SetLogLevel.
155 const ( 187 const (
156 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR 188 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
157 LOG_WARN = C.ZOO_LOG_LEVEL_WARN 189 LOG_WARN = C.ZOO_LOG_LEVEL_WARN
158 LOG_INFO = C.ZOO_LOG_LEVEL_INFO 190 LOG_INFO = C.ZOO_LOG_LEVEL_INFO
159 LOG_DEBUG = C.ZOO_LOG_LEVEL_DEBUG 191 LOG_DEBUG = C.ZOO_LOG_LEVEL_DEBUG
160 ) 192 )
161 193
(...skipping 250 matching lines...) Expand 10 before | Expand all | Expand 10 after
412 } 444 }
413 445
414 watchId, watchChannel := conn.createWatch(true) 446 watchId, watchChannel := conn.createWatch(true)
415 conn.sessionWatchId = watchId 447 conn.sessionWatchId = watchId
416 448
417 cservers := C.CString(servers) 449 cservers := C.CString(servers)
418 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTi meout/1e6), cId, unsafe.Pointer(watchId), 0) 450 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTi meout/1e6), cId, unsafe.Pointer(watchId), 0)
419 C.free(unsafe.Pointer(cservers)) 451 C.free(unsafe.Pointer(cservers))
420 if handle == nil { 452 if handle == nil {
421 conn.closeAllWatches() 453 conn.closeAllWatches()
422 » » return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) 454 » » return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr, "dial", "")
423 } 455 }
424 conn.handle = handle 456 conn.handle = handle
425 runWatchLoop() 457 runWatchLoop()
426 return conn, watchChannel, nil 458 return conn, watchChannel, nil
427 } 459 }
428 460
429 // ClientId returns the client ID for the existing session with ZooKeeper. 461 // ClientId returns the client ID for the existing session with ZooKeeper.
430 // This is useful to reestablish an existing session via ReInit. 462 // This is useful to reestablish an existing session via ReInit.
431 func (conn *Conn) ClientId() *ClientId { 463 func (conn *Conn) ClientId() *ClientId {
432 conn.mutex.RLock() 464 conn.mutex.RLock()
433 defer conn.mutex.RUnlock() 465 defer conn.mutex.RUnlock()
434 return &ClientId{*C.zoo_client_id(conn.handle)} 466 return &ClientId{*C.zoo_client_id(conn.handle)}
435 } 467 }
436 468
437 // Close terminates the ZooKeeper interaction. 469 // Close terminates the ZooKeeper interaction.
438 func (conn *Conn) Close() error { 470 func (conn *Conn) Close() error {
439 471
440 // Protect from concurrency around conn.handle change. 472 // Protect from concurrency around conn.handle change.
441 conn.mutex.Lock() 473 conn.mutex.Lock()
442 defer conn.mutex.Unlock() 474 defer conn.mutex.Unlock()
443 475
444 if conn.handle == nil { 476 if conn.handle == nil {
445 // ZooKeeper may hang indefinitely if a handler is closed twice, 477 // ZooKeeper may hang indefinitely if a handler is closed twice,
446 // so we get in the way and prevent it from happening. 478 // so we get in the way and prevent it from happening.
447 » » return ZCLOSING 479 » » return closingError("close", "")
448 } 480 }
449 rc, cerr := C.zookeeper_close(conn.handle) 481 rc, cerr := C.zookeeper_close(conn.handle)
450 482
451 conn.closeAllWatches() 483 conn.closeAllWatches()
452 stopWatchLoop() 484 stopWatchLoop()
453 485
454 // At this point, nothing else should need conn.handle. 486 // At this point, nothing else should need conn.handle.
455 conn.handle = nil 487 conn.handle = nil
456 488
457 » return zkError(rc, cerr) 489 » return zkError(rc, cerr, "close", "")
458 } 490 }
459 491
460 // Get returns the data and status from an existing node. err will be nil, 492 // Get returns the data and status from an existing node. err will be nil,
461 // unless an error is found. Attempting to retrieve data from a non-existing 493 // unless an error is found. Attempting to retrieve data from a non-existing
462 // node is an error. 494 // node is an error.
463 func (conn *Conn) Get(path string) (data string, stat *Stat, err error) { 495 func (conn *Conn) Get(path string) (data string, stat *Stat, err error) {
464 conn.mutex.RLock() 496 conn.mutex.RLock()
465 defer conn.mutex.RUnlock() 497 defer conn.mutex.RUnlock()
466 if conn.handle == nil { 498 if conn.handle == nil {
467 » » return "", nil, ZCLOSING 499 » » return "", nil, closingError("get", path)
468 } 500 }
469 501
470 cpath := C.CString(path) 502 cpath := C.CString(path)
471 cbuffer := (*C.char)(C.malloc(bufferSize)) 503 cbuffer := (*C.char)(C.malloc(bufferSize))
472 cbufferLen := C.int(bufferSize) 504 cbufferLen := C.int(bufferSize)
473 defer C.free(unsafe.Pointer(cpath)) 505 defer C.free(unsafe.Pointer(cpath))
474 defer C.free(unsafe.Pointer(cbuffer)) 506 defer C.free(unsafe.Pointer(cbuffer))
475 507
476 var cstat Stat 508 var cstat Stat
477 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLe n, &cstat.c) 509 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLe n, &cstat.c)
478 if rc != C.ZOK { 510 if rc != C.ZOK {
479 » » return "", nil, zkError(rc, cerr) 511 » » return "", nil, zkError(rc, cerr, "get", path)
480 } 512 }
481 513
482 result := C.GoStringN(cbuffer, cbufferLen) 514 result := C.GoStringN(cbuffer, cbufferLen)
483 return result, &cstat, nil 515 return result, &cstat, nil
484 } 516 }
485 517
486 // GetW works like Get but also returns a channel that will receive 518 // GetW works like Get but also returns a channel that will receive
487 // a single Event value when the data or existence of the given ZooKeeper 519 // a single Event value when the data or existence of the given ZooKeeper
488 // node changes or when critical session events happen. See the 520 // node changes or when critical session events happen. See the
489 // documentation of the Event type for more details. 521 // documentation of the Event type for more details.
490 func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event , err error) { 522 func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event , err error) {
491 conn.mutex.RLock() 523 conn.mutex.RLock()
492 defer conn.mutex.RUnlock() 524 defer conn.mutex.RUnlock()
493 if conn.handle == nil { 525 if conn.handle == nil {
494 » » return "", nil, nil, ZCLOSING 526 » » return "", nil, nil, closingError("getw", path)
495 } 527 }
496 528
497 cpath := C.CString(path) 529 cpath := C.CString(path)
498 cbuffer := (*C.char)(C.malloc(bufferSize)) 530 cbuffer := (*C.char)(C.malloc(bufferSize))
499 cbufferLen := C.int(bufferSize) 531 cbufferLen := C.int(bufferSize)
500 defer C.free(unsafe.Pointer(cpath)) 532 defer C.free(unsafe.Pointer(cpath))
501 defer C.free(unsafe.Pointer(cbuffer)) 533 defer C.free(unsafe.Pointer(cbuffer))
502 534
503 watchId, watchChannel := conn.createWatch(true) 535 watchId, watchChannel := conn.createWatch(true)
504 536
505 var cstat Stat 537 var cstat Stat
506 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Point er(watchId), cbuffer, &cbufferLen, &cstat.c) 538 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Point er(watchId), cbuffer, &cbufferLen, &cstat.c)
507 if rc != C.ZOK { 539 if rc != C.ZOK {
508 conn.forgetWatch(watchId) 540 conn.forgetWatch(watchId)
509 » » return "", nil, nil, zkError(rc, cerr) 541 » » return "", nil, nil, zkError(rc, cerr, "getw", path)
510 } 542 }
511 543
512 result := C.GoStringN(cbuffer, cbufferLen) 544 result := C.GoStringN(cbuffer, cbufferLen)
513 return result, &cstat, watchChannel, nil 545 return result, &cstat, watchChannel, nil
514 } 546 }
515 547
516 // Children returns the children list and status from an existing node. 548 // Children returns the children list and status from an existing node.
517 // Attempting to retrieve the children list from a non-existent node is an error . 549 // Attempting to retrieve the children list from a non-existent node is an error .
518 func (conn *Conn) Children(path string) (children []string, stat *Stat, err erro r) { 550 func (conn *Conn) Children(path string) (children []string, stat *Stat, err erro r) {
519 conn.mutex.RLock() 551 conn.mutex.RLock()
520 defer conn.mutex.RUnlock() 552 defer conn.mutex.RUnlock()
521 if conn.handle == nil { 553 if conn.handle == nil {
522 » » return nil, nil, ZCLOSING 554 » » return nil, nil, closingError("children", path)
523 } 555 }
524 556
525 cpath := C.CString(path) 557 cpath := C.CString(path)
526 defer C.free(unsafe.Pointer(cpath)) 558 defer C.free(unsafe.Pointer(cpath))
527 559
528 cvector := C.struct_String_vector{} 560 cvector := C.struct_String_vector{}
529 var cstat Stat 561 var cstat Stat
530 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) 562 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
531 563
532 // Can't happen if rc != 0, but avoid potential memory leaks in the futu re. 564 // Can't happen if rc != 0, but avoid potential memory leaks in the futu re.
533 if cvector.count != 0 { 565 if cvector.count != 0 {
534 children = parseStringVector(&cvector) 566 children = parseStringVector(&cvector)
535 } 567 }
536 if rc == C.ZOK { 568 if rc == C.ZOK {
537 stat = &cstat 569 stat = &cstat
538 } else { 570 } else {
539 » » err = zkError(rc, cerr) 571 » » err = zkError(rc, cerr, "children", path)
540 } 572 }
541 return 573 return
542 } 574 }
543 575
544 // ChildrenW works like Children but also returns a channel that will 576 // ChildrenW works like Children but also returns a channel that will
545 // receive a single Event value when a node is added or removed under the 577 // receive a single Event value when a node is added or removed under the
546 // provided path or when critical session events happen. See the documentation 578 // provided path or when critical session events happen. See the documentation
547 // of the Event type for more details. 579 // of the Event type for more details.
548 func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch < -chan Event, err error) { 580 func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch < -chan Event, err error) {
549 conn.mutex.RLock() 581 conn.mutex.RLock()
550 defer conn.mutex.RUnlock() 582 defer conn.mutex.RUnlock()
551 if conn.handle == nil { 583 if conn.handle == nil {
552 » » return nil, nil, nil, ZCLOSING 584 » » return nil, nil, nil, closingError("childrenw", path)
553 } 585 }
554 586
555 cpath := C.CString(path) 587 cpath := C.CString(path)
556 defer C.free(unsafe.Pointer(cpath)) 588 defer C.free(unsafe.Pointer(cpath))
557 589
558 watchId, watchChannel := conn.createWatch(true) 590 watchId, watchChannel := conn.createWatch(true)
559 591
560 cvector := C.struct_String_vector{} 592 cvector := C.struct_String_vector{}
561 var cstat Stat 593 var cstat Stat
562 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, un safe.Pointer(watchId), &cvector, &cstat.c) 594 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, un safe.Pointer(watchId), &cvector, &cstat.c)
563 595
564 // Can't happen if rc != 0, but avoid potential memory leaks in the futu re. 596 // Can't happen if rc != 0, but avoid potential memory leaks in the futu re.
565 if cvector.count != 0 { 597 if cvector.count != 0 {
566 children = parseStringVector(&cvector) 598 children = parseStringVector(&cvector)
567 } 599 }
568 if rc == C.ZOK { 600 if rc == C.ZOK {
569 stat = &cstat 601 stat = &cstat
570 watch = watchChannel 602 watch = watchChannel
571 } else { 603 } else {
572 conn.forgetWatch(watchId) 604 conn.forgetWatch(watchId)
573 » » err = zkError(rc, cerr) 605 » » err = zkError(rc, cerr, "childrenw", path)
574 } 606 }
575 return 607 return
576 } 608 }
577 609
578 func parseStringVector(cvector *C.struct_String_vector) []string { 610 func parseStringVector(cvector *C.struct_String_vector) []string {
579 vector := make([]string, cvector.count) 611 vector := make([]string, cvector.count)
580 dataStart := uintptr(unsafe.Pointer(cvector.data)) 612 dataStart := uintptr(unsafe.Pointer(cvector.data))
581 uintptrSize := unsafe.Sizeof(dataStart) 613 uintptrSize := unsafe.Sizeof(dataStart)
582 for i := 0; i != len(vector); i++ { 614 for i := 0; i != len(vector); i++ {
583 cpathPos := dataStart + uintptr(i)*uintptrSize 615 cpathPos := dataStart + uintptr(i)*uintptrSize
584 cpath := *(**C.char)(unsafe.Pointer(cpathPos)) 616 cpath := *(**C.char)(unsafe.Pointer(cpathPos))
585 vector[i] = C.GoString(cpath) 617 vector[i] = C.GoString(cpath)
586 } 618 }
587 C.deallocate_String_vector(cvector) 619 C.deallocate_String_vector(cvector)
588 return vector 620 return vector
589 } 621 }
590 622
591 // Exists checks if a node exists at the given path. If it does, 623 // Exists checks if a node exists at the given path. If it does,
592 // stat will contain meta information on the existing node, otherwise 624 // stat will contain meta information on the existing node, otherwise
593 // it will be nil. 625 // it will be nil.
594 func (conn *Conn) Exists(path string) (stat *Stat, err error) { 626 func (conn *Conn) Exists(path string) (stat *Stat, err error) {
595 conn.mutex.RLock() 627 conn.mutex.RLock()
596 defer conn.mutex.RUnlock() 628 defer conn.mutex.RUnlock()
597 if conn.handle == nil { 629 if conn.handle == nil {
598 » » return nil, ZCLOSING 630 » » return nil, closingError("exists", path)
599 } 631 }
600 632
601 cpath := C.CString(path) 633 cpath := C.CString(path)
602 defer C.free(unsafe.Pointer(cpath)) 634 defer C.free(unsafe.Pointer(cpath))
603 635
604 var cstat Stat 636 var cstat Stat
605 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &cstat.c) 637 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &cstat.c)
606 638
607 // We diverge a bit from the usual here: a ZNONODE is not an error 639 // We diverge a bit from the usual here: a ZNONODE is not an error
608 // for an exists call, otherwise every Exists call would have to check 640 // for an exists call, otherwise every Exists call would have to check
609 // for err != nil and err.Code() != ZNONODE. 641 // for err != nil and err.Code() != ZNONODE.
610 if rc == C.ZOK { 642 if rc == C.ZOK {
611 stat = &cstat 643 stat = &cstat
612 } else if rc != C.ZNONODE { 644 } else if rc != C.ZNONODE {
613 » » err = zkError(rc, cerr) 645 » » err = zkError(rc, cerr, "exists", path)
614 } 646 }
615 return 647 return
616 } 648 }
617 649
618 // ExistsW works like Exists but also returns a channel that will 650 // ExistsW works like Exists but also returns a channel that will
619 // receive an Event value when a node is created in case the returned 651 // receive an Event value when a node is created in case the returned
620 // stat is nil and the node didn't exist, or when the existing node 652 // stat is nil and the node didn't exist, or when the existing node
621 // is removed. It will also receive critical session events. See the 653 // is removed. It will also receive critical session events. See the
622 // documentation of the Event type for more details. 654 // documentation of the Event type for more details.
623 func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err erro r) { 655 func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err erro r) {
624 conn.mutex.RLock() 656 conn.mutex.RLock()
625 defer conn.mutex.RUnlock() 657 defer conn.mutex.RUnlock()
626 if conn.handle == nil { 658 if conn.handle == nil {
627 » » return nil, nil, ZCLOSING 659 » » return nil, nil, closingError("existsw", path)
628 } 660 }
629 661
630 cpath := C.CString(path) 662 cpath := C.CString(path)
631 defer C.free(unsafe.Pointer(cpath)) 663 defer C.free(unsafe.Pointer(cpath))
632 664
633 watchId, watchChannel := conn.createWatch(true) 665 watchId, watchChannel := conn.createWatch(true)
634 666
635 var cstat Stat 667 var cstat Stat
636 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Po inter(watchId), &cstat.c) 668 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Po inter(watchId), &cstat.c)
637 669
638 // We diverge a bit from the usual here: a ZNONODE is not an error 670 // We diverge a bit from the usual here: a ZNONODE is not an error
639 // for an exists call, otherwise every Exists call would have to check 671 // for an exists call, otherwise every Exists call would have to check
640 // for err != nil and err.Code() != ZNONODE. 672 // for err != nil and err.Code() != ZNONODE.
641 » switch Error(rc) { 673 » switch ErrorCode(rc) {
642 case ZOK: 674 case ZOK:
643 stat = &cstat 675 stat = &cstat
644 watch = watchChannel 676 watch = watchChannel
645 case ZNONODE: 677 case ZNONODE:
646 watch = watchChannel 678 watch = watchChannel
647 default: 679 default:
648 conn.forgetWatch(watchId) 680 conn.forgetWatch(watchId)
649 » » err = zkError(rc, cerr) 681 » » err = zkError(rc, cerr, "existsw", path)
650 } 682 }
651 return 683 return
652 } 684 }
653 685
654 // Create creates a node at the given path with the given data. The 686 // Create creates a node at the given path with the given data. The
655 // provided flags may determine features such as whether the node is 687 // provided flags may determine features such as whether the node is
656 // ephemeral or not, or whether it should have a sequence number 688 // ephemeral or not, or whether it should have a sequence number
657 // attached to it, and the provided ACLs will determine who can access 689 // attached to it, and the provided ACLs will determine who can access
658 // the node and under which circumstances. 690 // the node and under which circumstances.
659 // 691 //
660 // The returned path is useful in cases where the created path may differ 692 // The returned path is useful in cases where the created path may differ
661 // from the requested one, such as when a sequence number is appended 693 // from the requested one, such as when a sequence number is appended
662 // to it due to the use of the gozk.SEQUENCE flag. 694 // to it due to the use of the gozk.SEQUENCE flag.
663 func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err error) { 695 func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err error) {
664 conn.mutex.RLock() 696 conn.mutex.RLock()
665 defer conn.mutex.RUnlock() 697 defer conn.mutex.RUnlock()
666 if conn.handle == nil { 698 if conn.handle == nil {
667 » » return "", ZCLOSING 699 » » return "", closingError("close", path)
668 } 700 }
669 701
670 cpath := C.CString(path) 702 cpath := C.CString(path)
671 cvalue := C.CString(value) 703 cvalue := C.CString(value)
672 defer C.free(unsafe.Pointer(cpath)) 704 defer C.free(unsafe.Pointer(cpath))
673 defer C.free(unsafe.Pointer(cvalue)) 705 defer C.free(unsafe.Pointer(cvalue))
674 706
675 caclv := buildACLVector(aclv) 707 caclv := buildACLVector(aclv)
676 defer C.deallocate_ACL_vector(caclv) 708 defer C.deallocate_ACL_vector(caclv)
677 709
678 // Allocate additional space for the sequence (10 bytes should be enough ). 710 // Allocate additional space for the sequence (10 bytes should be enough ).
679 cpathLen := C.size_t(len(path) + 32) 711 cpathLen := C.size_t(len(path) + 32)
680 cpathCreated := (*C.char)(C.malloc(cpathLen)) 712 cpathCreated := (*C.char)(C.malloc(cpathLen))
681 defer C.free(unsafe.Pointer(cpathCreated)) 713 defer C.free(unsafe.Pointer(cpathCreated))
682 714
683 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) 715 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
684 if rc == C.ZOK { 716 if rc == C.ZOK {
685 pathCreated = C.GoString(cpathCreated) 717 pathCreated = C.GoString(cpathCreated)
686 } else { 718 } else {
687 » » err = zkError(rc, cerr) 719 » » err = zkError(rc, cerr, "create", path)
688 } 720 }
689 return 721 return
690 } 722 }
691 723
692 // Set modifies the data for the existing node at the given path, replacing it 724 // Set modifies the data for the existing node at the given path, replacing it
693 // by the provided value. If version is not -1, the operation will only 725 // by the provided value. If version is not -1, the operation will only
694 // succeed if the node is still at the given version when the replacement 726 // succeed if the node is still at the given version when the replacement
695 // happens as an atomic operation. The returned Stat value will contain 727 // happens as an atomic operation. The returned Stat value will contain
696 // data for the resulting node, after the operation is performed. 728 // data for the resulting node, after the operation is performed.
697 // 729 //
698 // It is an error to attempt to set the data of a non-existing node with 730 // It is an error to attempt to set the data of a non-existing node with
699 // this function. In these cases, use Create instead. 731 // this function. In these cases, use Create instead.
700 func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) { 732 func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) {
701 conn.mutex.RLock() 733 conn.mutex.RLock()
702 defer conn.mutex.RUnlock() 734 defer conn.mutex.RUnlock()
703 if conn.handle == nil { 735 if conn.handle == nil {
704 » » return nil, ZCLOSING 736 » » return nil, closingError("set", path)
705 } 737 }
706 738
707 cpath := C.CString(path) 739 cpath := C.CString(path)
708 cvalue := C.CString(value) 740 cvalue := C.CString(value)
709 defer C.free(unsafe.Pointer(cpath)) 741 defer C.free(unsafe.Pointer(cpath))
710 defer C.free(unsafe.Pointer(cvalue)) 742 defer C.free(unsafe.Pointer(cvalue))
711 743
712 var cstat Stat 744 var cstat Stat
713 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C. int(version), &cstat.c) 745 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C. int(version), &cstat.c)
714 if rc == C.ZOK { 746 if rc == C.ZOK {
715 stat = &cstat 747 stat = &cstat
716 } else { 748 } else {
717 » » err = zkError(rc, cerr) 749 » » err = zkError(rc, cerr, "set", path)
718 } 750 }
719 return 751 return
720 } 752 }
721 753
722 // Delete removes the node at path. If version is not -1, the operation 754 // Delete removes the node at path. If version is not -1, the operation
723 // will only succeed if the node is still at this version when the 755 // will only succeed if the node is still at this version when the
724 // node is deleted as an atomic operation. 756 // node is deleted as an atomic operation.
725 func (conn *Conn) Delete(path string, version int) (err error) { 757 func (conn *Conn) Delete(path string, version int) (err error) {
726 conn.mutex.RLock() 758 conn.mutex.RLock()
727 defer conn.mutex.RUnlock() 759 defer conn.mutex.RUnlock()
728 if conn.handle == nil { 760 if conn.handle == nil {
729 » » return ZCLOSING 761 » » return closingError("delete", path)
730 } 762 }
731 763
732 cpath := C.CString(path) 764 cpath := C.CString(path)
733 defer C.free(unsafe.Pointer(cpath)) 765 defer C.free(unsafe.Pointer(cpath))
734 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) 766 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
735 » return zkError(rc, cerr) 767 » return zkError(rc, cerr, "delete", path)
736 } 768 }
737 769
738 // AddAuth adds a new authentication certificate to the ZooKeeper 770 // AddAuth adds a new authentication certificate to the ZooKeeper
739 // interaction. The scheme parameter will specify how to handle the 771 // interaction. The scheme parameter will specify how to handle the
740 // authentication information, while the cert parameter provides the 772 // authentication information, while the cert parameter provides the
741 // identity data itself. For instance, the "digest" scheme requires 773 // identity data itself. For instance, the "digest" scheme requires
742 // a pair like "username:password" to be provided as the certificate. 774 // a pair like "username:password" to be provided as the certificate.
743 func (conn *Conn) AddAuth(scheme, cert string) error { 775 func (conn *Conn) AddAuth(scheme, cert string) error {
744 conn.mutex.RLock() 776 conn.mutex.RLock()
745 defer conn.mutex.RUnlock() 777 defer conn.mutex.RUnlock()
746 if conn.handle == nil { 778 if conn.handle == nil {
747 » » return ZCLOSING 779 » » return closingError("addauth", "")
748 } 780 }
749 781
750 cscheme := C.CString(scheme) 782 cscheme := C.CString(scheme)
751 ccert := C.CString(cert) 783 ccert := C.CString(cert)
752 defer C.free(unsafe.Pointer(cscheme)) 784 defer C.free(unsafe.Pointer(cscheme))
753 defer C.free(unsafe.Pointer(ccert)) 785 defer C.free(unsafe.Pointer(ccert))
754 786
755 data := C.create_completion_data() 787 data := C.create_completion_data()
756 if data == nil { 788 if data == nil {
757 panic("Failed to create completion data") 789 panic("Failed to create completion data")
758 } 790 }
759 defer C.destroy_completion_data(data) 791 defer C.destroy_completion_data(data)
760 792
761 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)) , C.handle_void_completion, unsafe.Pointer(data)) 793 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)) , C.handle_void_completion, unsafe.Pointer(data))
762 if rc != C.ZOK { 794 if rc != C.ZOK {
763 » » return zkError(rc, cerr) 795 » » return zkError(rc, cerr, "addauth", "")
764 } 796 }
765 797
766 C.wait_for_completion(data) 798 C.wait_for_completion(data)
767 799
768 rc = C.int(uintptr(data.data)) 800 rc = C.int(uintptr(data.data))
769 » return zkError(rc, nil) 801 » return zkError(rc, nil, "addauth", "")
770 } 802 }
771 803
772 // ACL returns the access control list for path. 804 // ACL returns the access control list for path.
773 func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) { 805 func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) {
774 conn.mutex.RLock() 806 conn.mutex.RLock()
775 defer conn.mutex.RUnlock() 807 defer conn.mutex.RUnlock()
776 if conn.handle == nil { 808 if conn.handle == nil {
777 » » return nil, nil, ZCLOSING 809 » » return nil, nil, closingError("acl", path)
778 } 810 }
779 811
780 cpath := C.CString(path) 812 cpath := C.CString(path)
781 defer C.free(unsafe.Pointer(cpath)) 813 defer C.free(unsafe.Pointer(cpath))
782 814
783 caclv := C.struct_ACL_vector{} 815 caclv := C.struct_ACL_vector{}
784 816
785 var cstat Stat 817 var cstat Stat
786 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) 818 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
787 if rc != C.ZOK { 819 if rc != C.ZOK {
788 » » return nil, nil, zkError(rc, cerr) 820 » » return nil, nil, zkError(rc, cerr, "acl", path)
789 } 821 }
790 822
791 aclv := parseACLVector(&caclv) 823 aclv := parseACLVector(&caclv)
792 824
793 return aclv, &cstat, nil 825 return aclv, &cstat, nil
794 } 826 }
795 827
796 // SetACL changes the access control list for path. 828 // SetACL changes the access control list for path.
797 func (conn *Conn) SetACL(path string, aclv []ACL, version int) error { 829 func (conn *Conn) SetACL(path string, aclv []ACL, version int) error {
798 conn.mutex.RLock() 830 conn.mutex.RLock()
799 defer conn.mutex.RUnlock() 831 defer conn.mutex.RUnlock()
800 if conn.handle == nil { 832 if conn.handle == nil {
801 » » return ZCLOSING 833 » » return closingError("setacl", path)
802 } 834 }
803 835
804 cpath := C.CString(path) 836 cpath := C.CString(path)
805 defer C.free(unsafe.Pointer(cpath)) 837 defer C.free(unsafe.Pointer(cpath))
806 838
807 caclv := buildACLVector(aclv) 839 caclv := buildACLVector(aclv)
808 defer C.deallocate_ACL_vector(caclv) 840 defer C.deallocate_ACL_vector(caclv)
809 841
810 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) 842 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
811 » return zkError(rc, cerr) 843 » return zkError(rc, cerr, "setacl", path)
812 } 844 }
813 845
814 func parseACLVector(caclv *C.struct_ACL_vector) []ACL { 846 func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
815 structACLSize := unsafe.Sizeof(C.struct_ACL{}) 847 structACLSize := unsafe.Sizeof(C.struct_ACL{})
816 aclv := make([]ACL, caclv.count) 848 aclv := make([]ACL, caclv.count)
817 dataStart := uintptr(unsafe.Pointer(caclv.data)) 849 dataStart := uintptr(unsafe.Pointer(caclv.data))
818 for i := 0; i != int(caclv.count); i++ { 850 for i := 0; i != int(caclv.count); i++ {
819 caclPos := dataStart + uintptr(i)*structACLSize 851 caclPos := dataStart + uintptr(i)*structACLSize
820 cacl := (*C.struct_ACL)(unsafe.Pointer(caclPos)) 852 cacl := (*C.struct_ACL)(unsafe.Pointer(caclPos))
821 853
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
883 // 3. If the changeFunc returns no errors, use the string returned as 915 // 3. If the changeFunc returns no errors, use the string returned as
884 // the new candidate value for the node, and attempt to either create 916 // the new candidate value for the node, and attempt to either create
885 // the node, if it didn't exist, or to change its contents at the specified 917 // the node, if it didn't exist, or to change its contents at the specified
886 // version. If this procedure fails due to conflicts (concurrent changes 918 // version. If this procedure fails due to conflicts (concurrent changes
887 // in the same node), repeat from step 1. If this procedure fails with any 919 // in the same node), repeat from step 1. If this procedure fails with any
888 // other error, stop and return the error found. 920 // other error, stop and return the error found.
889 // 921 //
890 func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc Chan geFunc) error { 922 func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc Chan geFunc) error {
891 for { 923 for {
892 oldValue, oldStat, err := conn.Get(path) 924 oldValue, oldStat, err := conn.Get(path)
893 » » if err != nil && err != ZNONODE { 925 » » if err != nil && !IsError(err, ZNONODE) {
894 return err 926 return err
895 } 927 }
896 newValue, err := changeFunc(oldValue, oldStat) 928 newValue, err := changeFunc(oldValue, oldStat)
897 if err != nil { 929 if err != nil {
898 return err 930 return err
899 } 931 }
900 if oldStat == nil { 932 if oldStat == nil {
901 _, err := conn.Create(path, newValue, flags, acl) 933 _, err := conn.Create(path, newValue, flags, acl)
902 » » » if err == nil || err != ZNODEEXISTS { 934 » » » if err == nil || !IsError(err, ZNODEEXISTS) {
903 return err 935 return err
904 } 936 }
905 continue 937 continue
906 } 938 }
907 if newValue == oldValue { 939 if newValue == oldValue {
908 return nil // Nothing to do. 940 return nil // Nothing to do.
909 } 941 }
910 _, err = conn.Set(path, newValue, oldStat.Version()) 942 _, err = conn.Set(path, newValue, oldStat.Version())
911 » » if err == nil || (err != ZBADVERSION && err != ZNONODE) { 943 » » if err == nil || !IsError(err, ZBADVERSION) && !IsError(err, ZNO NODE) {
912 return err 944 return err
913 } 945 }
914 } 946 }
915 panic("not reached") 947 panic("not reached")
916 } 948 }
917 949
918 // ----------------------------------------------------------------------- 950 // -----------------------------------------------------------------------
919 // Watching mechanism. 951 // Watching mechanism.
920 952
921 // The bridging of watches into Go is slightly tricky because Cgo doesn't 953 // The bridging of watches into Go is slightly tricky because Cgo doesn't
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after
1080 event := Event{ 1112 event := Event{
1081 Type: int(data.event_type), 1113 Type: int(data.event_type),
1082 Path: C.GoString(data.event_path), 1114 Path: C.GoString(data.event_path),
1083 State: int(data.connection_state), 1115 State: int(data.connection_state),
1084 } 1116 }
1085 watchId := uintptr(data.watch_context) 1117 watchId := uintptr(data.watch_context)
1086 C.destroy_watch_data(data) 1118 C.destroy_watch_data(data)
1087 sendEvent(watchId, event) 1119 sendEvent(watchId, event)
1088 } 1120 }
1089 } 1121 }
OLDNEW
« no previous file with comments | « retry_test.go ('k') | zk_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b