OLD | NEW |
1 // gozk - ZooKeeper support for the Go language | 1 // gozk - ZooKeeper support for the Go language |
2 // | 2 // |
3 // https://wiki.ubuntu.com/gozk | 3 // https://wiki.ubuntu.com/gozk |
4 // | 4 // |
5 // Copyright (c) 2010-2011 Canonical Ltd. | 5 // Copyright (c) 2010-2011 Canonical Ltd. |
6 // | 6 // |
7 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> | 7 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
8 // | 8 // |
9 package zookeeper | 9 package zookeeper |
10 | 10 |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
91 // Note that closed channels will deliver zeroed Event, which means | 91 // Note that closed channels will deliver zeroed Event, which means |
92 // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, | 92 // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, |
93 // to facilitate handling. | 93 // to facilitate handling. |
94 type Event struct { | 94 type Event struct { |
95 Type int // One of the EVENT_* constants. | 95 Type int // One of the EVENT_* constants. |
96 Path string // For non-session events, the path of the watched node. | 96 Path string // For non-session events, the path of the watched node. |
97 State int // One of the STATE_* constants. | 97 State int // One of the STATE_* constants. |
98 } | 98 } |
99 | 99 |
100 // Error represents a ZooKeeper error. | 100 // Error represents a ZooKeeper error. |
101 type Error int | 101 type Error struct { |
| 102 » Op string |
| 103 » Code ErrorCode |
| 104 » // SystemError holds an error if Code is ZSYSTEMERROR. |
| 105 » SystemError error |
| 106 » Path string |
| 107 } |
| 108 |
| 109 func (e *Error) Error() string { |
| 110 » s := e.Code.String() |
| 111 » if e.Code == ZSYSTEMERROR && e.SystemError != nil { |
| 112 » » s = e.SystemError.Error() |
| 113 » } |
| 114 » if e.Path == "" { |
| 115 » » return fmt.Sprintf("zookeeper: %s: %v", e.Op, s) |
| 116 » } |
| 117 » return fmt.Sprintf("zookeeper: %s %q: %v", e.Op, e.Path, s) |
| 118 } |
| 119 |
| 120 // IsError returns whether the error is a *Error |
| 121 // with the given error code. |
| 122 func IsError(err error, code ErrorCode) bool { |
| 123 » if err, _ := err.(*Error); err != nil { |
| 124 » » return err.Code == code |
| 125 » } |
| 126 » return false |
| 127 } |
| 128 |
| 129 // ErrorCode represents a kind of ZooKeeper error. |
| 130 type ErrorCode int |
102 | 131 |
103 const ( | 132 const ( |
104 » ZOK Error = C.ZOK | 133 » ZOK ErrorCode = C.ZOK |
105 » ZSYSTEMERROR Error = C.ZSYSTEMERROR | 134 » ZSYSTEMERROR ErrorCode = C.ZSYSTEMERROR |
106 » ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY | 135 » ZRUNTIMEINCONSISTENCY ErrorCode = C.ZRUNTIMEINCONSISTENCY |
107 » ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY | 136 » ZDATAINCONSISTENCY ErrorCode = C.ZDATAINCONSISTENCY |
108 » ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS | 137 » ZCONNECTIONLOSS ErrorCode = C.ZCONNECTIONLOSS |
109 » ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR | 138 » ZMARSHALLINGERROR ErrorCode = C.ZMARSHALLINGERROR |
110 » ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED | 139 » ZUNIMPLEMENTED ErrorCode = C.ZUNIMPLEMENTED |
111 » ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT | 140 » ZOPERATIONTIMEOUT ErrorCode = C.ZOPERATIONTIMEOUT |
112 » ZBADARGUMENTS Error = C.ZBADARGUMENTS | 141 » ZBADARGUMENTS ErrorCode = C.ZBADARGUMENTS |
113 » ZINVALIDSTATE Error = C.ZINVALIDSTATE | 142 » ZINVALIDSTATE ErrorCode = C.ZINVALIDSTATE |
114 » ZAPIERROR Error = C.ZAPIERROR | 143 » ZAPIERROR ErrorCode = C.ZAPIERROR |
115 » ZNONODE Error = C.ZNONODE | 144 » ZNONODE ErrorCode = C.ZNONODE |
116 » ZNOAUTH Error = C.ZNOAUTH | 145 » ZNOAUTH ErrorCode = C.ZNOAUTH |
117 » ZBADVERSION Error = C.ZBADVERSION | 146 » ZBADVERSION ErrorCode = C.ZBADVERSION |
118 » ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS | 147 » ZNOCHILDRENFOREPHEMERALS ErrorCode = C.ZNOCHILDRENFOREPHEMERALS |
119 » ZNODEEXISTS Error = C.ZNODEEXISTS | 148 » ZNODEEXISTS ErrorCode = C.ZNODEEXISTS |
120 » ZNOTEMPTY Error = C.ZNOTEMPTY | 149 » ZNOTEMPTY ErrorCode = C.ZNOTEMPTY |
121 » ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED | 150 » ZSESSIONEXPIRED ErrorCode = C.ZSESSIONEXPIRED |
122 » ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK | 151 » ZINVALIDCALLBACK ErrorCode = C.ZINVALIDCALLBACK |
123 » ZINVALIDACL Error = C.ZINVALIDACL | 152 » ZINVALIDACL ErrorCode = C.ZINVALIDACL |
124 » ZAUTHFAILED Error = C.ZAUTHFAILED | 153 » ZAUTHFAILED ErrorCode = C.ZAUTHFAILED |
125 » ZCLOSING Error = C.ZCLOSING | 154 » ZCLOSING ErrorCode = C.ZCLOSING |
126 » ZNOTHING Error = C.ZNOTHING | 155 » ZNOTHING ErrorCode = C.ZNOTHING |
127 » ZSESSIONMOVED Error = C.ZSESSIONMOVED | 156 » ZSESSIONMOVED ErrorCode = C.ZSESSIONMOVED |
128 ) | 157 ) |
129 | 158 |
130 func (error Error) Error() string { | 159 func (code ErrorCode) String() string { |
131 » return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. | 160 » return C.GoString(C.zerror(C.int(code))) // Static, no need to free it. |
132 } | 161 } |
133 | 162 |
134 // zkError creates an appropriate error return from | 163 // zkError creates an appropriate error return from |
135 // a ZooKeeper status and the errno return from a C API | 164 // a ZooKeeper status and the errno return from a C API |
136 // call. | 165 // call. |
137 func zkError(rc C.int, cerr error) error { | 166 func zkError(rc C.int, cerr error, op, path string) error { |
138 » code := Error(rc) | 167 » code := ErrorCode(rc) |
139 » switch code { | 168 » if code == ZOK { |
140 » case ZOK: | |
141 return nil | 169 return nil |
| 170 } |
| 171 err := &Error{ |
| 172 Op: op, |
| 173 Code: code, |
| 174 Path: path, |
| 175 } |
| 176 if code == ZSYSTEMERROR { |
| 177 err.SystemError = cerr |
| 178 } |
| 179 return err |
| 180 } |
142 | 181 |
143 » case ZSYSTEMERROR: | 182 func closingError(op, path string) error { |
144 » » // If a ZooKeeper call returns ZSYSTEMERROR, then | 183 » return zkError(C.int(ZCLOSING), nil, op, path) |
145 » » // errno becomes significant. If errno has not been | |
146 » » // set, then we will return ZSYSTEMERROR nonetheless. | |
147 » » if cerr != nil { | |
148 » » » return cerr | |
149 » » } | |
150 » } | |
151 » return code | |
152 } | 184 } |
153 | 185 |
154 // Constants for SetLogLevel. | 186 // Constants for SetLogLevel. |
155 const ( | 187 const ( |
156 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR | 188 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
157 LOG_WARN = C.ZOO_LOG_LEVEL_WARN | 189 LOG_WARN = C.ZOO_LOG_LEVEL_WARN |
158 LOG_INFO = C.ZOO_LOG_LEVEL_INFO | 190 LOG_INFO = C.ZOO_LOG_LEVEL_INFO |
159 LOG_DEBUG = C.ZOO_LOG_LEVEL_DEBUG | 191 LOG_DEBUG = C.ZOO_LOG_LEVEL_DEBUG |
160 ) | 192 ) |
161 | 193 |
(...skipping 250 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
412 } | 444 } |
413 | 445 |
414 watchId, watchChannel := conn.createWatch(true) | 446 watchId, watchChannel := conn.createWatch(true) |
415 conn.sessionWatchId = watchId | 447 conn.sessionWatchId = watchId |
416 | 448 |
417 cservers := C.CString(servers) | 449 cservers := C.CString(servers) |
418 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTi
meout/1e6), cId, unsafe.Pointer(watchId), 0) | 450 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTi
meout/1e6), cId, unsafe.Pointer(watchId), 0) |
419 C.free(unsafe.Pointer(cservers)) | 451 C.free(unsafe.Pointer(cservers)) |
420 if handle == nil { | 452 if handle == nil { |
421 conn.closeAllWatches() | 453 conn.closeAllWatches() |
422 » » return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) | 454 » » return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr, "dial", "") |
423 } | 455 } |
424 conn.handle = handle | 456 conn.handle = handle |
425 runWatchLoop() | 457 runWatchLoop() |
426 return conn, watchChannel, nil | 458 return conn, watchChannel, nil |
427 } | 459 } |
428 | 460 |
429 // ClientId returns the client ID for the existing session with ZooKeeper. | 461 // ClientId returns the client ID for the existing session with ZooKeeper. |
430 // This is useful to reestablish an existing session via ReInit. | 462 // This is useful to reestablish an existing session via ReInit. |
431 func (conn *Conn) ClientId() *ClientId { | 463 func (conn *Conn) ClientId() *ClientId { |
432 conn.mutex.RLock() | 464 conn.mutex.RLock() |
433 defer conn.mutex.RUnlock() | 465 defer conn.mutex.RUnlock() |
434 return &ClientId{*C.zoo_client_id(conn.handle)} | 466 return &ClientId{*C.zoo_client_id(conn.handle)} |
435 } | 467 } |
436 | 468 |
437 // Close terminates the ZooKeeper interaction. | 469 // Close terminates the ZooKeeper interaction. |
438 func (conn *Conn) Close() error { | 470 func (conn *Conn) Close() error { |
439 | 471 |
440 // Protect from concurrency around conn.handle change. | 472 // Protect from concurrency around conn.handle change. |
441 conn.mutex.Lock() | 473 conn.mutex.Lock() |
442 defer conn.mutex.Unlock() | 474 defer conn.mutex.Unlock() |
443 | 475 |
444 if conn.handle == nil { | 476 if conn.handle == nil { |
445 // ZooKeeper may hang indefinitely if a handler is closed twice, | 477 // ZooKeeper may hang indefinitely if a handler is closed twice, |
446 // so we get in the way and prevent it from happening. | 478 // so we get in the way and prevent it from happening. |
447 » » return ZCLOSING | 479 » » return closingError("close", "") |
448 } | 480 } |
449 rc, cerr := C.zookeeper_close(conn.handle) | 481 rc, cerr := C.zookeeper_close(conn.handle) |
450 | 482 |
451 conn.closeAllWatches() | 483 conn.closeAllWatches() |
452 stopWatchLoop() | 484 stopWatchLoop() |
453 | 485 |
454 // At this point, nothing else should need conn.handle. | 486 // At this point, nothing else should need conn.handle. |
455 conn.handle = nil | 487 conn.handle = nil |
456 | 488 |
457 » return zkError(rc, cerr) | 489 » return zkError(rc, cerr, "close", "") |
458 } | 490 } |
459 | 491 |
460 // Get returns the data and status from an existing node. err will be nil, | 492 // Get returns the data and status from an existing node. err will be nil, |
461 // unless an error is found. Attempting to retrieve data from a non-existing | 493 // unless an error is found. Attempting to retrieve data from a non-existing |
462 // node is an error. | 494 // node is an error. |
463 func (conn *Conn) Get(path string) (data string, stat *Stat, err error) { | 495 func (conn *Conn) Get(path string) (data string, stat *Stat, err error) { |
464 conn.mutex.RLock() | 496 conn.mutex.RLock() |
465 defer conn.mutex.RUnlock() | 497 defer conn.mutex.RUnlock() |
466 if conn.handle == nil { | 498 if conn.handle == nil { |
467 » » return "", nil, ZCLOSING | 499 » » return "", nil, closingError("get", path) |
468 } | 500 } |
469 | 501 |
470 cpath := C.CString(path) | 502 cpath := C.CString(path) |
471 cbuffer := (*C.char)(C.malloc(bufferSize)) | 503 cbuffer := (*C.char)(C.malloc(bufferSize)) |
472 cbufferLen := C.int(bufferSize) | 504 cbufferLen := C.int(bufferSize) |
473 defer C.free(unsafe.Pointer(cpath)) | 505 defer C.free(unsafe.Pointer(cpath)) |
474 defer C.free(unsafe.Pointer(cbuffer)) | 506 defer C.free(unsafe.Pointer(cbuffer)) |
475 | 507 |
476 var cstat Stat | 508 var cstat Stat |
477 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLe
n, &cstat.c) | 509 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLe
n, &cstat.c) |
478 if rc != C.ZOK { | 510 if rc != C.ZOK { |
479 » » return "", nil, zkError(rc, cerr) | 511 » » return "", nil, zkError(rc, cerr, "get", path) |
480 } | 512 } |
481 | 513 |
482 result := C.GoStringN(cbuffer, cbufferLen) | 514 result := C.GoStringN(cbuffer, cbufferLen) |
483 return result, &cstat, nil | 515 return result, &cstat, nil |
484 } | 516 } |
485 | 517 |
486 // GetW works like Get but also returns a channel that will receive | 518 // GetW works like Get but also returns a channel that will receive |
487 // a single Event value when the data or existence of the given ZooKeeper | 519 // a single Event value when the data or existence of the given ZooKeeper |
488 // node changes or when critical session events happen. See the | 520 // node changes or when critical session events happen. See the |
489 // documentation of the Event type for more details. | 521 // documentation of the Event type for more details. |
490 func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event
, err error) { | 522 func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event
, err error) { |
491 conn.mutex.RLock() | 523 conn.mutex.RLock() |
492 defer conn.mutex.RUnlock() | 524 defer conn.mutex.RUnlock() |
493 if conn.handle == nil { | 525 if conn.handle == nil { |
494 » » return "", nil, nil, ZCLOSING | 526 » » return "", nil, nil, closingError("getw", path) |
495 } | 527 } |
496 | 528 |
497 cpath := C.CString(path) | 529 cpath := C.CString(path) |
498 cbuffer := (*C.char)(C.malloc(bufferSize)) | 530 cbuffer := (*C.char)(C.malloc(bufferSize)) |
499 cbufferLen := C.int(bufferSize) | 531 cbufferLen := C.int(bufferSize) |
500 defer C.free(unsafe.Pointer(cpath)) | 532 defer C.free(unsafe.Pointer(cpath)) |
501 defer C.free(unsafe.Pointer(cbuffer)) | 533 defer C.free(unsafe.Pointer(cbuffer)) |
502 | 534 |
503 watchId, watchChannel := conn.createWatch(true) | 535 watchId, watchChannel := conn.createWatch(true) |
504 | 536 |
505 var cstat Stat | 537 var cstat Stat |
506 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Point
er(watchId), cbuffer, &cbufferLen, &cstat.c) | 538 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Point
er(watchId), cbuffer, &cbufferLen, &cstat.c) |
507 if rc != C.ZOK { | 539 if rc != C.ZOK { |
508 conn.forgetWatch(watchId) | 540 conn.forgetWatch(watchId) |
509 » » return "", nil, nil, zkError(rc, cerr) | 541 » » return "", nil, nil, zkError(rc, cerr, "getw", path) |
510 } | 542 } |
511 | 543 |
512 result := C.GoStringN(cbuffer, cbufferLen) | 544 result := C.GoStringN(cbuffer, cbufferLen) |
513 return result, &cstat, watchChannel, nil | 545 return result, &cstat, watchChannel, nil |
514 } | 546 } |
515 | 547 |
516 // Children returns the children list and status from an existing node. | 548 // Children returns the children list and status from an existing node. |
517 // Attempting to retrieve the children list from a non-existent node is an error
. | 549 // Attempting to retrieve the children list from a non-existent node is an error
. |
518 func (conn *Conn) Children(path string) (children []string, stat *Stat, err erro
r) { | 550 func (conn *Conn) Children(path string) (children []string, stat *Stat, err erro
r) { |
519 conn.mutex.RLock() | 551 conn.mutex.RLock() |
520 defer conn.mutex.RUnlock() | 552 defer conn.mutex.RUnlock() |
521 if conn.handle == nil { | 553 if conn.handle == nil { |
522 » » return nil, nil, ZCLOSING | 554 » » return nil, nil, closingError("children", path) |
523 } | 555 } |
524 | 556 |
525 cpath := C.CString(path) | 557 cpath := C.CString(path) |
526 defer C.free(unsafe.Pointer(cpath)) | 558 defer C.free(unsafe.Pointer(cpath)) |
527 | 559 |
528 cvector := C.struct_String_vector{} | 560 cvector := C.struct_String_vector{} |
529 var cstat Stat | 561 var cstat Stat |
530 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector,
&cstat.c) | 562 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector,
&cstat.c) |
531 | 563 |
532 // Can't happen if rc != 0, but avoid potential memory leaks in the futu
re. | 564 // Can't happen if rc != 0, but avoid potential memory leaks in the futu
re. |
533 if cvector.count != 0 { | 565 if cvector.count != 0 { |
534 children = parseStringVector(&cvector) | 566 children = parseStringVector(&cvector) |
535 } | 567 } |
536 if rc == C.ZOK { | 568 if rc == C.ZOK { |
537 stat = &cstat | 569 stat = &cstat |
538 } else { | 570 } else { |
539 » » err = zkError(rc, cerr) | 571 » » err = zkError(rc, cerr, "children", path) |
540 } | 572 } |
541 return | 573 return |
542 } | 574 } |
543 | 575 |
544 // ChildrenW works like Children but also returns a channel that will | 576 // ChildrenW works like Children but also returns a channel that will |
545 // receive a single Event value when a node is added or removed under the | 577 // receive a single Event value when a node is added or removed under the |
546 // provided path or when critical session events happen. See the documentation | 578 // provided path or when critical session events happen. See the documentation |
547 // of the Event type for more details. | 579 // of the Event type for more details. |
548 func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <
-chan Event, err error) { | 580 func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <
-chan Event, err error) { |
549 conn.mutex.RLock() | 581 conn.mutex.RLock() |
550 defer conn.mutex.RUnlock() | 582 defer conn.mutex.RUnlock() |
551 if conn.handle == nil { | 583 if conn.handle == nil { |
552 » » return nil, nil, nil, ZCLOSING | 584 » » return nil, nil, nil, closingError("childrenw", path) |
553 } | 585 } |
554 | 586 |
555 cpath := C.CString(path) | 587 cpath := C.CString(path) |
556 defer C.free(unsafe.Pointer(cpath)) | 588 defer C.free(unsafe.Pointer(cpath)) |
557 | 589 |
558 watchId, watchChannel := conn.createWatch(true) | 590 watchId, watchChannel := conn.createWatch(true) |
559 | 591 |
560 cvector := C.struct_String_vector{} | 592 cvector := C.struct_String_vector{} |
561 var cstat Stat | 593 var cstat Stat |
562 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, un
safe.Pointer(watchId), &cvector, &cstat.c) | 594 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, un
safe.Pointer(watchId), &cvector, &cstat.c) |
563 | 595 |
564 // Can't happen if rc != 0, but avoid potential memory leaks in the futu
re. | 596 // Can't happen if rc != 0, but avoid potential memory leaks in the futu
re. |
565 if cvector.count != 0 { | 597 if cvector.count != 0 { |
566 children = parseStringVector(&cvector) | 598 children = parseStringVector(&cvector) |
567 } | 599 } |
568 if rc == C.ZOK { | 600 if rc == C.ZOK { |
569 stat = &cstat | 601 stat = &cstat |
570 watch = watchChannel | 602 watch = watchChannel |
571 } else { | 603 } else { |
572 conn.forgetWatch(watchId) | 604 conn.forgetWatch(watchId) |
573 » » err = zkError(rc, cerr) | 605 » » err = zkError(rc, cerr, "childrenw", path) |
574 } | 606 } |
575 return | 607 return |
576 } | 608 } |
577 | 609 |
578 func parseStringVector(cvector *C.struct_String_vector) []string { | 610 func parseStringVector(cvector *C.struct_String_vector) []string { |
579 vector := make([]string, cvector.count) | 611 vector := make([]string, cvector.count) |
580 dataStart := uintptr(unsafe.Pointer(cvector.data)) | 612 dataStart := uintptr(unsafe.Pointer(cvector.data)) |
581 uintptrSize := unsafe.Sizeof(dataStart) | 613 uintptrSize := unsafe.Sizeof(dataStart) |
582 for i := 0; i != len(vector); i++ { | 614 for i := 0; i != len(vector); i++ { |
583 cpathPos := dataStart + uintptr(i)*uintptrSize | 615 cpathPos := dataStart + uintptr(i)*uintptrSize |
584 cpath := *(**C.char)(unsafe.Pointer(cpathPos)) | 616 cpath := *(**C.char)(unsafe.Pointer(cpathPos)) |
585 vector[i] = C.GoString(cpath) | 617 vector[i] = C.GoString(cpath) |
586 } | 618 } |
587 C.deallocate_String_vector(cvector) | 619 C.deallocate_String_vector(cvector) |
588 return vector | 620 return vector |
589 } | 621 } |
590 | 622 |
591 // Exists checks if a node exists at the given path. If it does, | 623 // Exists checks if a node exists at the given path. If it does, |
592 // stat will contain meta information on the existing node, otherwise | 624 // stat will contain meta information on the existing node, otherwise |
593 // it will be nil. | 625 // it will be nil. |
594 func (conn *Conn) Exists(path string) (stat *Stat, err error) { | 626 func (conn *Conn) Exists(path string) (stat *Stat, err error) { |
595 conn.mutex.RLock() | 627 conn.mutex.RLock() |
596 defer conn.mutex.RUnlock() | 628 defer conn.mutex.RUnlock() |
597 if conn.handle == nil { | 629 if conn.handle == nil { |
598 » » return nil, ZCLOSING | 630 » » return nil, closingError("exists", path) |
599 } | 631 } |
600 | 632 |
601 cpath := C.CString(path) | 633 cpath := C.CString(path) |
602 defer C.free(unsafe.Pointer(cpath)) | 634 defer C.free(unsafe.Pointer(cpath)) |
603 | 635 |
604 var cstat Stat | 636 var cstat Stat |
605 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &cstat.c) | 637 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &cstat.c) |
606 | 638 |
607 // We diverge a bit from the usual here: a ZNONODE is not an error | 639 // We diverge a bit from the usual here: a ZNONODE is not an error |
608 // for an exists call, otherwise every Exists call would have to check | 640 // for an exists call, otherwise every Exists call would have to check |
609 // for err != nil and err.Code() != ZNONODE. | 641 // for err != nil and err.Code() != ZNONODE. |
610 if rc == C.ZOK { | 642 if rc == C.ZOK { |
611 stat = &cstat | 643 stat = &cstat |
612 } else if rc != C.ZNONODE { | 644 } else if rc != C.ZNONODE { |
613 » » err = zkError(rc, cerr) | 645 » » err = zkError(rc, cerr, "exists", path) |
614 } | 646 } |
615 return | 647 return |
616 } | 648 } |
617 | 649 |
618 // ExistsW works like Exists but also returns a channel that will | 650 // ExistsW works like Exists but also returns a channel that will |
619 // receive an Event value when a node is created in case the returned | 651 // receive an Event value when a node is created in case the returned |
620 // stat is nil and the node didn't exist, or when the existing node | 652 // stat is nil and the node didn't exist, or when the existing node |
621 // is removed. It will also receive critical session events. See the | 653 // is removed. It will also receive critical session events. See the |
622 // documentation of the Event type for more details. | 654 // documentation of the Event type for more details. |
623 func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err erro
r) { | 655 func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err erro
r) { |
624 conn.mutex.RLock() | 656 conn.mutex.RLock() |
625 defer conn.mutex.RUnlock() | 657 defer conn.mutex.RUnlock() |
626 if conn.handle == nil { | 658 if conn.handle == nil { |
627 » » return nil, nil, ZCLOSING | 659 » » return nil, nil, closingError("existsw", path) |
628 } | 660 } |
629 | 661 |
630 cpath := C.CString(path) | 662 cpath := C.CString(path) |
631 defer C.free(unsafe.Pointer(cpath)) | 663 defer C.free(unsafe.Pointer(cpath)) |
632 | 664 |
633 watchId, watchChannel := conn.createWatch(true) | 665 watchId, watchChannel := conn.createWatch(true) |
634 | 666 |
635 var cstat Stat | 667 var cstat Stat |
636 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Po
inter(watchId), &cstat.c) | 668 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Po
inter(watchId), &cstat.c) |
637 | 669 |
638 // We diverge a bit from the usual here: a ZNONODE is not an error | 670 // We diverge a bit from the usual here: a ZNONODE is not an error |
639 // for an exists call, otherwise every Exists call would have to check | 671 // for an exists call, otherwise every Exists call would have to check |
640 // for err != nil and err.Code() != ZNONODE. | 672 // for err != nil and err.Code() != ZNONODE. |
641 » switch Error(rc) { | 673 » switch ErrorCode(rc) { |
642 case ZOK: | 674 case ZOK: |
643 stat = &cstat | 675 stat = &cstat |
644 watch = watchChannel | 676 watch = watchChannel |
645 case ZNONODE: | 677 case ZNONODE: |
646 watch = watchChannel | 678 watch = watchChannel |
647 default: | 679 default: |
648 conn.forgetWatch(watchId) | 680 conn.forgetWatch(watchId) |
649 » » err = zkError(rc, cerr) | 681 » » err = zkError(rc, cerr, "existsw", path) |
650 } | 682 } |
651 return | 683 return |
652 } | 684 } |
653 | 685 |
654 // Create creates a node at the given path with the given data. The | 686 // Create creates a node at the given path with the given data. The |
655 // provided flags may determine features such as whether the node is | 687 // provided flags may determine features such as whether the node is |
656 // ephemeral or not, or whether it should have a sequence number | 688 // ephemeral or not, or whether it should have a sequence number |
657 // attached to it, and the provided ACLs will determine who can access | 689 // attached to it, and the provided ACLs will determine who can access |
658 // the node and under which circumstances. | 690 // the node and under which circumstances. |
659 // | 691 // |
660 // The returned path is useful in cases where the created path may differ | 692 // The returned path is useful in cases where the created path may differ |
661 // from the requested one, such as when a sequence number is appended | 693 // from the requested one, such as when a sequence number is appended |
662 // to it due to the use of the gozk.SEQUENCE flag. | 694 // to it due to the use of the gozk.SEQUENCE flag. |
663 func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated
string, err error) { | 695 func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated
string, err error) { |
664 conn.mutex.RLock() | 696 conn.mutex.RLock() |
665 defer conn.mutex.RUnlock() | 697 defer conn.mutex.RUnlock() |
666 if conn.handle == nil { | 698 if conn.handle == nil { |
667 » » return "", ZCLOSING | 699 » » return "", closingError("close", path) |
668 } | 700 } |
669 | 701 |
670 cpath := C.CString(path) | 702 cpath := C.CString(path) |
671 cvalue := C.CString(value) | 703 cvalue := C.CString(value) |
672 defer C.free(unsafe.Pointer(cpath)) | 704 defer C.free(unsafe.Pointer(cpath)) |
673 defer C.free(unsafe.Pointer(cvalue)) | 705 defer C.free(unsafe.Pointer(cvalue)) |
674 | 706 |
675 caclv := buildACLVector(aclv) | 707 caclv := buildACLVector(aclv) |
676 defer C.deallocate_ACL_vector(caclv) | 708 defer C.deallocate_ACL_vector(caclv) |
677 | 709 |
678 // Allocate additional space for the sequence (10 bytes should be enough
). | 710 // Allocate additional space for the sequence (10 bytes should be enough
). |
679 cpathLen := C.size_t(len(path) + 32) | 711 cpathLen := C.size_t(len(path) + 32) |
680 cpathCreated := (*C.char)(C.malloc(cpathLen)) | 712 cpathCreated := (*C.char)(C.malloc(cpathLen)) |
681 defer C.free(unsafe.Pointer(cpathCreated)) | 713 defer C.free(unsafe.Pointer(cpathCreated)) |
682 | 714 |
683 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)),
caclv, C.int(flags), cpathCreated, C.int(cpathLen)) | 715 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)),
caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
684 if rc == C.ZOK { | 716 if rc == C.ZOK { |
685 pathCreated = C.GoString(cpathCreated) | 717 pathCreated = C.GoString(cpathCreated) |
686 } else { | 718 } else { |
687 » » err = zkError(rc, cerr) | 719 » » err = zkError(rc, cerr, "create", path) |
688 } | 720 } |
689 return | 721 return |
690 } | 722 } |
691 | 723 |
692 // Set modifies the data for the existing node at the given path, replacing it | 724 // Set modifies the data for the existing node at the given path, replacing it |
693 // by the provided value. If version is not -1, the operation will only | 725 // by the provided value. If version is not -1, the operation will only |
694 // succeed if the node is still at the given version when the replacement | 726 // succeed if the node is still at the given version when the replacement |
695 // happens as an atomic operation. The returned Stat value will contain | 727 // happens as an atomic operation. The returned Stat value will contain |
696 // data for the resulting node, after the operation is performed. | 728 // data for the resulting node, after the operation is performed. |
697 // | 729 // |
698 // It is an error to attempt to set the data of a non-existing node with | 730 // It is an error to attempt to set the data of a non-existing node with |
699 // this function. In these cases, use Create instead. | 731 // this function. In these cases, use Create instead. |
700 func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) { | 732 func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) { |
701 conn.mutex.RLock() | 733 conn.mutex.RLock() |
702 defer conn.mutex.RUnlock() | 734 defer conn.mutex.RUnlock() |
703 if conn.handle == nil { | 735 if conn.handle == nil { |
704 » » return nil, ZCLOSING | 736 » » return nil, closingError("set", path) |
705 } | 737 } |
706 | 738 |
707 cpath := C.CString(path) | 739 cpath := C.CString(path) |
708 cvalue := C.CString(value) | 740 cvalue := C.CString(value) |
709 defer C.free(unsafe.Pointer(cpath)) | 741 defer C.free(unsafe.Pointer(cpath)) |
710 defer C.free(unsafe.Pointer(cvalue)) | 742 defer C.free(unsafe.Pointer(cvalue)) |
711 | 743 |
712 var cstat Stat | 744 var cstat Stat |
713 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.
int(version), &cstat.c) | 745 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.
int(version), &cstat.c) |
714 if rc == C.ZOK { | 746 if rc == C.ZOK { |
715 stat = &cstat | 747 stat = &cstat |
716 } else { | 748 } else { |
717 » » err = zkError(rc, cerr) | 749 » » err = zkError(rc, cerr, "set", path) |
718 } | 750 } |
719 return | 751 return |
720 } | 752 } |
721 | 753 |
722 // Delete removes the node at path. If version is not -1, the operation | 754 // Delete removes the node at path. If version is not -1, the operation |
723 // will only succeed if the node is still at this version when the | 755 // will only succeed if the node is still at this version when the |
724 // node is deleted as an atomic operation. | 756 // node is deleted as an atomic operation. |
725 func (conn *Conn) Delete(path string, version int) (err error) { | 757 func (conn *Conn) Delete(path string, version int) (err error) { |
726 conn.mutex.RLock() | 758 conn.mutex.RLock() |
727 defer conn.mutex.RUnlock() | 759 defer conn.mutex.RUnlock() |
728 if conn.handle == nil { | 760 if conn.handle == nil { |
729 » » return ZCLOSING | 761 » » return closingError("delete", path) |
730 } | 762 } |
731 | 763 |
732 cpath := C.CString(path) | 764 cpath := C.CString(path) |
733 defer C.free(unsafe.Pointer(cpath)) | 765 defer C.free(unsafe.Pointer(cpath)) |
734 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) | 766 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
735 » return zkError(rc, cerr) | 767 » return zkError(rc, cerr, "delete", path) |
736 } | 768 } |
737 | 769 |
738 // AddAuth adds a new authentication certificate to the ZooKeeper | 770 // AddAuth adds a new authentication certificate to the ZooKeeper |
739 // interaction. The scheme parameter will specify how to handle the | 771 // interaction. The scheme parameter will specify how to handle the |
740 // authentication information, while the cert parameter provides the | 772 // authentication information, while the cert parameter provides the |
741 // identity data itself. For instance, the "digest" scheme requires | 773 // identity data itself. For instance, the "digest" scheme requires |
742 // a pair like "username:password" to be provided as the certificate. | 774 // a pair like "username:password" to be provided as the certificate. |
743 func (conn *Conn) AddAuth(scheme, cert string) error { | 775 func (conn *Conn) AddAuth(scheme, cert string) error { |
744 conn.mutex.RLock() | 776 conn.mutex.RLock() |
745 defer conn.mutex.RUnlock() | 777 defer conn.mutex.RUnlock() |
746 if conn.handle == nil { | 778 if conn.handle == nil { |
747 » » return ZCLOSING | 779 » » return closingError("addauth", "") |
748 } | 780 } |
749 | 781 |
750 cscheme := C.CString(scheme) | 782 cscheme := C.CString(scheme) |
751 ccert := C.CString(cert) | 783 ccert := C.CString(cert) |
752 defer C.free(unsafe.Pointer(cscheme)) | 784 defer C.free(unsafe.Pointer(cscheme)) |
753 defer C.free(unsafe.Pointer(ccert)) | 785 defer C.free(unsafe.Pointer(ccert)) |
754 | 786 |
755 data := C.create_completion_data() | 787 data := C.create_completion_data() |
756 if data == nil { | 788 if data == nil { |
757 panic("Failed to create completion data") | 789 panic("Failed to create completion data") |
758 } | 790 } |
759 defer C.destroy_completion_data(data) | 791 defer C.destroy_completion_data(data) |
760 | 792 |
761 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert))
, C.handle_void_completion, unsafe.Pointer(data)) | 793 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert))
, C.handle_void_completion, unsafe.Pointer(data)) |
762 if rc != C.ZOK { | 794 if rc != C.ZOK { |
763 » » return zkError(rc, cerr) | 795 » » return zkError(rc, cerr, "addauth", "") |
764 } | 796 } |
765 | 797 |
766 C.wait_for_completion(data) | 798 C.wait_for_completion(data) |
767 | 799 |
768 rc = C.int(uintptr(data.data)) | 800 rc = C.int(uintptr(data.data)) |
769 » return zkError(rc, nil) | 801 » return zkError(rc, nil, "addauth", "") |
770 } | 802 } |
771 | 803 |
772 // ACL returns the access control list for path. | 804 // ACL returns the access control list for path. |
773 func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) { | 805 func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) { |
774 conn.mutex.RLock() | 806 conn.mutex.RLock() |
775 defer conn.mutex.RUnlock() | 807 defer conn.mutex.RUnlock() |
776 if conn.handle == nil { | 808 if conn.handle == nil { |
777 » » return nil, nil, ZCLOSING | 809 » » return nil, nil, closingError("acl", path) |
778 } | 810 } |
779 | 811 |
780 cpath := C.CString(path) | 812 cpath := C.CString(path) |
781 defer C.free(unsafe.Pointer(cpath)) | 813 defer C.free(unsafe.Pointer(cpath)) |
782 | 814 |
783 caclv := C.struct_ACL_vector{} | 815 caclv := C.struct_ACL_vector{} |
784 | 816 |
785 var cstat Stat | 817 var cstat Stat |
786 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) | 818 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) |
787 if rc != C.ZOK { | 819 if rc != C.ZOK { |
788 » » return nil, nil, zkError(rc, cerr) | 820 » » return nil, nil, zkError(rc, cerr, "acl", path) |
789 } | 821 } |
790 | 822 |
791 aclv := parseACLVector(&caclv) | 823 aclv := parseACLVector(&caclv) |
792 | 824 |
793 return aclv, &cstat, nil | 825 return aclv, &cstat, nil |
794 } | 826 } |
795 | 827 |
796 // SetACL changes the access control list for path. | 828 // SetACL changes the access control list for path. |
797 func (conn *Conn) SetACL(path string, aclv []ACL, version int) error { | 829 func (conn *Conn) SetACL(path string, aclv []ACL, version int) error { |
798 conn.mutex.RLock() | 830 conn.mutex.RLock() |
799 defer conn.mutex.RUnlock() | 831 defer conn.mutex.RUnlock() |
800 if conn.handle == nil { | 832 if conn.handle == nil { |
801 » » return ZCLOSING | 833 » » return closingError("setacl", path) |
802 } | 834 } |
803 | 835 |
804 cpath := C.CString(path) | 836 cpath := C.CString(path) |
805 defer C.free(unsafe.Pointer(cpath)) | 837 defer C.free(unsafe.Pointer(cpath)) |
806 | 838 |
807 caclv := buildACLVector(aclv) | 839 caclv := buildACLVector(aclv) |
808 defer C.deallocate_ACL_vector(caclv) | 840 defer C.deallocate_ACL_vector(caclv) |
809 | 841 |
810 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) | 842 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) |
811 » return zkError(rc, cerr) | 843 » return zkError(rc, cerr, "setacl", path) |
812 } | 844 } |
813 | 845 |
814 func parseACLVector(caclv *C.struct_ACL_vector) []ACL { | 846 func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
815 structACLSize := unsafe.Sizeof(C.struct_ACL{}) | 847 structACLSize := unsafe.Sizeof(C.struct_ACL{}) |
816 aclv := make([]ACL, caclv.count) | 848 aclv := make([]ACL, caclv.count) |
817 dataStart := uintptr(unsafe.Pointer(caclv.data)) | 849 dataStart := uintptr(unsafe.Pointer(caclv.data)) |
818 for i := 0; i != int(caclv.count); i++ { | 850 for i := 0; i != int(caclv.count); i++ { |
819 caclPos := dataStart + uintptr(i)*structACLSize | 851 caclPos := dataStart + uintptr(i)*structACLSize |
820 cacl := (*C.struct_ACL)(unsafe.Pointer(caclPos)) | 852 cacl := (*C.struct_ACL)(unsafe.Pointer(caclPos)) |
821 | 853 |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
883 // 3. If the changeFunc returns no errors, use the string returned as | 915 // 3. If the changeFunc returns no errors, use the string returned as |
884 // the new candidate value for the node, and attempt to either create | 916 // the new candidate value for the node, and attempt to either create |
885 // the node, if it didn't exist, or to change its contents at the specified | 917 // the node, if it didn't exist, or to change its contents at the specified |
886 // version. If this procedure fails due to conflicts (concurrent changes | 918 // version. If this procedure fails due to conflicts (concurrent changes |
887 // in the same node), repeat from step 1. If this procedure fails with any | 919 // in the same node), repeat from step 1. If this procedure fails with any |
888 // other error, stop and return the error found. | 920 // other error, stop and return the error found. |
889 // | 921 // |
890 func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc Chan
geFunc) error { | 922 func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc Chan
geFunc) error { |
891 for { | 923 for { |
892 oldValue, oldStat, err := conn.Get(path) | 924 oldValue, oldStat, err := conn.Get(path) |
893 » » if err != nil && err != ZNONODE { | 925 » » if err != nil && !IsError(err, ZNONODE) { |
894 return err | 926 return err |
895 } | 927 } |
896 newValue, err := changeFunc(oldValue, oldStat) | 928 newValue, err := changeFunc(oldValue, oldStat) |
897 if err != nil { | 929 if err != nil { |
898 return err | 930 return err |
899 } | 931 } |
900 if oldStat == nil { | 932 if oldStat == nil { |
901 _, err := conn.Create(path, newValue, flags, acl) | 933 _, err := conn.Create(path, newValue, flags, acl) |
902 » » » if err == nil || err != ZNODEEXISTS { | 934 » » » if err == nil || !IsError(err, ZNODEEXISTS) { |
903 return err | 935 return err |
904 } | 936 } |
905 continue | 937 continue |
906 } | 938 } |
907 if newValue == oldValue { | 939 if newValue == oldValue { |
908 return nil // Nothing to do. | 940 return nil // Nothing to do. |
909 } | 941 } |
910 _, err = conn.Set(path, newValue, oldStat.Version()) | 942 _, err = conn.Set(path, newValue, oldStat.Version()) |
911 » » if err == nil || (err != ZBADVERSION && err != ZNONODE) { | 943 » » if err == nil || !IsError(err, ZBADVERSION) && !IsError(err, ZNO
NODE) { |
912 return err | 944 return err |
913 } | 945 } |
914 } | 946 } |
915 panic("not reached") | 947 panic("not reached") |
916 } | 948 } |
917 | 949 |
918 // ----------------------------------------------------------------------- | 950 // ----------------------------------------------------------------------- |
919 // Watching mechanism. | 951 // Watching mechanism. |
920 | 952 |
921 // The bridging of watches into Go is slightly tricky because Cgo doesn't | 953 // The bridging of watches into Go is slightly tricky because Cgo doesn't |
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1080 event := Event{ | 1112 event := Event{ |
1081 Type: int(data.event_type), | 1113 Type: int(data.event_type), |
1082 Path: C.GoString(data.event_path), | 1114 Path: C.GoString(data.event_path), |
1083 State: int(data.connection_state), | 1115 State: int(data.connection_state), |
1084 } | 1116 } |
1085 watchId := uintptr(data.watch_context) | 1117 watchId := uintptr(data.watch_context) |
1086 C.destroy_watch_data(data) | 1118 C.destroy_watch_data(data) |
1087 sendEvent(watchId, event) | 1119 sendEvent(watchId, event) |
1088 } | 1120 } |
1089 } | 1121 } |
OLD | NEW |