Left: | ||
Right: |
OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2012, 2013 Canonical Ltd. | |
2 // Licensed under the AGPLv3, see LICENCE file for details. | |
3 | |
4 // Package voyeur implements a concurrency-safe value that can be watched for | |
5 // changes. | |
6 package voyeur | |
7 | |
8 import ( | |
9 "sync" | |
10 ) | |
11 | |
12 // Value represents a shared value that can be watched for changes. Methods on | |
13 // a Value may be called concurrently. | |
14 type Value struct { | |
15 val interface{} | |
16 version int | |
17 mu sync.RWMutex | |
18 wait sync.Cond | |
19 closed bool | |
20 } | |
21 | |
22 // NewValue creates a new Value holding the given initial value. If initial is | |
23 // nil, any watchers will wait until a value is set. | |
24 func NewValue(initial interface{}) *Value { | |
25 v := new(Value) | |
26 v.wait.L = v.mu.RLocker() | |
27 if initial != nil { | |
28 v.val = initial | |
29 v.version++ | |
30 } | |
31 return v | |
32 } | |
33 | |
34 // Set sets the shared value to val. | |
35 func (v *Value) Set(val interface{}) { | |
36 v.mu.Lock() | |
37 v.val = val | |
38 v.version++ | |
39 v.wait.Broadcast() | |
40 v.mu.Unlock() | |
41 } | |
42 | |
43 // Close closes the Value, unblocking any outstanding watchers. Close always | |
44 // returns nil. | |
45 func (v *Value) Close() error { | |
46 v.mu.Lock() | |
47 v.closed = true | |
48 v.mu.Unlock() | |
49 v.wait.Broadcast() | |
50 return nil | |
51 } | |
52 | |
53 // Get returns the current value. If the Value has been closed, ok will be | |
54 // false. | |
55 func (v *Value) Get() (val interface{}, ok bool) { | |
56 v.mu.RLock() | |
57 defer v.mu.RUnlock() | |
58 if v.closed { | |
59 return nil, false | |
rog
2014/02/07 15:15:17
i wonder if this should return v.val anyway.
i ca
natefinch
2014/02/07 20:05:20
Probably no big deal to return val regardless, fix
| |
60 } | |
61 return v.val, true | |
62 } | |
63 | |
64 // Watch returns a Watcher that can be used to watch for changes to the value. | |
65 func (v *Value) Watch() *Watcher { | |
66 return &Watcher{value: v} | |
67 } | |
68 | |
69 // Watcher represents a single watcher of a shared value. | |
70 type Watcher struct { | |
71 value *Value | |
72 version int | |
73 current interface{} | |
74 closed bool | |
75 } | |
76 | |
77 // Next blocks until there is a new value to be retrieved from the value that is | |
78 // being watched. It also unblocks when the value or the Watcher itself is close d. | |
79 // Next reports whether the value or the Watcher itself has been closed. | |
rog
2014/02/07 15:15:17
The final sentence isn't quite accurate (it would
natefinch
2014/02/07 20:05:20
Good point, you're right about it implying it retu
| |
80 func (w *Watcher) Next() bool { | |
81 w.value.mu.RLock() | |
82 defer w.value.mu.RUnlock() | |
83 | |
84 // We should never go around this loop more than twice. | |
85 for { | |
86 if w.version != w.value.version { | |
87 w.version = w.value.version | |
88 w.current = w.value.val | |
89 return true | |
90 } | |
91 if w.value.closed || w.closed { | |
92 return false | |
93 } | |
94 | |
95 // wait is magic sauce that releases the lock until triggered | |
96 // and then reacquires the lock, thus avoiding a deadlock. | |
rog
2014/02/07 15:15:17
// Note that Wait releases the lock, waits for a
/
natefinch
2014/02/07 20:05:20
Well, yes, perhaps magic sauce is not a good comme
| |
97 w.value.wait.Wait() | |
98 } | |
99 } | |
100 | |
101 // Close closes the Watcher without closing the underlying | |
102 // value. It may be called concurrently with Next. | |
103 func (w *Watcher) Close() { | |
104 w.value.mu.Lock() | |
105 w.closed = true | |
106 w.value.mu.Unlock() | |
107 w.value.wait.Broadcast() | |
108 } | |
109 | |
110 // Value returns the last value that was retrieved from the watched Value. | |
rog
2014/02/07 15:15:17
s/./by Next./
?
natefinch
2014/02/07 20:05:20
Done.
| |
111 func (w *Watcher) Value() interface{} { | |
112 return w.current | |
113 } | |
OLD | NEW |