Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 package zookeeper_test | 1 package zookeeper_test |
2 | 2 |
3 import ( | 3 import ( |
4 "io" | 4 "io" |
5 . "launchpad.net/gocheck" | 5 . "launchpad.net/gocheck" |
6 zk "launchpad.net/gozk/zookeeper" | 6 zk "launchpad.net/gozk/zookeeper" |
7 "log" | 7 "log" |
8 "net" | 8 "net" |
9 "time" | 9 "time" |
10 ) | 10 ) |
11 | 11 |
12 // requestFuncs holds all the requests that take a read lock | 12 // requestFuncs holds all the requests that take a read lock |
13 // on the zk connection except those that don't actually | 13 // on the zk connection except those that don't actually |
14 // make a round trip to the server. | 14 // make a round trip to the server. |
15 var requestFuncs = []func(conn *zk.Conn, path string) error { | 15 var requestFuncs = []func(conn *zk.Conn, path string) error{ |
16 » func(conn *zk.Conn, path string) error { | 16 » func(conn *zk.Conn, path string) error { |
17 _, err := conn.Create(path, "", 0, nil) | 17 _, err := conn.Create(path, "", 0, nil) |
18 return err | 18 return err |
19 }, | 19 }, |
20 func(conn *zk.Conn, path string) error { | 20 func(conn *zk.Conn, path string) error { |
21 _, err := conn.Exists(path) | 21 _, err := conn.Exists(path) |
22 return err | 22 return err |
23 }, | 23 }, |
24 func(conn *zk.Conn, path string) error { | 24 func(conn *zk.Conn, path string) error { |
25 _, _, err := conn.ExistsW(path) | 25 _, _, err := conn.ExistsW(path) |
26 return err | 26 return err |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
59 func(conn *zk.Conn, path string) error { | 59 func(conn *zk.Conn, path string) error { |
60 return conn.Delete(path, 0) | 60 return conn.Delete(path, 0) |
61 }, | 61 }, |
62 } | 62 } |
63 | 63 |
64 func (s *S) TestConcurrentClose(c *C) { | 64 func (s *S) TestConcurrentClose(c *C) { |
65 // make sure the server is ready to receive connections. | 65 // make sure the server is ready to receive connections. |
66 s.init(c) | 66 s.init(c) |
67 | 67 |
68 // Close should wait until all outstanding requests have | 68 // Close should wait until all outstanding requests have |
69 » // completed before returning. We check that by interposing a | 69 » // completed before returning. The idea of this test is that |
70 » // proxy between the client and the server, so that we can delay | 70 » // any request that requests or changes a zookeeper node must |
71 » // requests arbitrarily. We try each kind of request in turn with | 71 » // make at least one round trip to the server, so we interpose a |
72 » // a new connection, ignoring error returns, because we don't care | 72 » // proxy between the client and the server which can stop all |
73 » // what the operation does, just whether actually makes some kind | 73 » // incoming traffic on demand, thus blocking the request until |
74 » // of transaction. | 74 » // we want it to unblock. |
75 » // | |
76 » // We assume that all requests take less than 0.1s to complete, | |
77 » // thus when we wait below, neither of the above goroutines | |
78 » // should complete within the allotted time (the request because | |
79 » // it's waiting for a reply from the server and the close | |
80 » // because it's waiting for the request to complete). If the | |
81 » // locking doesn't work, the Close will return early. If the | |
82 » // proxy blocking doesn't work, the request will return early. | |
83 » // | |
84 » // When we reenable incoming messages from the server, both | |
85 » // goroutines should complete. We can't tell which completes | |
86 » // first, but the fact that the close blocked is sufficient to | |
87 » // tell that the locking is working correctly. | |
75 for i, f := range requestFuncs { | 88 for i, f := range requestFuncs { |
76 c.Logf("iter %d", i) | 89 c.Logf("iter %d", i) |
77 p := newProxy(c, s.zkAddr) | 90 p := newProxy(c, s.zkAddr) |
78 conn, watch, err := zk.Dial(p.addr(), 5e9) | 91 conn, watch, err := zk.Dial(p.addr(), 5e9) |
79 c.Assert(err, IsNil) | 92 c.Assert(err, IsNil) |
80 c.Assert((<-watch).Ok(), Equals, true) | 93 c.Assert((<-watch).Ok(), Equals, true) |
81 | 94 |
82 // sanity check that the connection is actually | 95 // sanity check that the connection is actually |
83 // up and running. | 96 // up and running. |
84 _, err = conn.Exists("/nothing") | 97 _, err = conn.Exists("/nothing") |
85 c.Assert(err, IsNil) | 98 c.Assert(err, IsNil) |
86 | 99 |
87 p.stopIncoming() | 100 p.stopIncoming() |
88 reqDone := make(chan bool) | 101 reqDone := make(chan bool) |
89 closeDone := make(chan bool) | 102 closeDone := make(chan bool) |
90 go func() { | 103 go func() { |
91 f(conn, "/closetest") | 104 f(conn, "/closetest") |
92 reqDone <- true | 105 reqDone <- true |
93 }() | 106 }() |
94 go func() { | 107 go func() { |
95 // sleep for long enough for the request to be initiated and the read lock taken. | 108 // sleep for long enough for the request to be initiated and the read lock taken. |
96 time.Sleep(0.05e9) | 109 time.Sleep(0.05e9) |
niemeyer
2012/02/27 18:03:17
How can you tell this is really unblocking at the
rog
2012/03/08 17:26:38
the idea is that any request that requests or chan
| |
97 conn.Close() | 110 conn.Close() |
98 closeDone <- true | 111 closeDone <- true |
99 }() | 112 }() |
100 select { | 113 select { |
101 case <-reqDone: | 114 case <-reqDone: |
102 c.Fatalf("request %d finished early", i) | 115 c.Fatalf("request %d finished early", i) |
103 case <-closeDone: | 116 case <-closeDone: |
104 c.Fatalf("request %d close finished early", i) | 117 c.Fatalf("request %d close finished early", i) |
105 case <-time.After(0.1e9): | 118 case <-time.After(0.1e9): |
106 } | 119 } |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
145 return | 158 return |
146 } | 159 } |
147 go func() { | 160 go func() { |
148 defer client.Close() | 161 defer client.Close() |
149 server, err := net.Dial("tcp", dstAddr) | 162 server, err := net.Dial("tcp", dstAddr) |
150 if err != nil { | 163 if err != nil { |
151 log.Printf("cannot dial %q: %v", dstAddr , err) | 164 log.Printf("cannot dial %q: %v", dstAddr , err) |
152 return | 165 return |
153 } | 166 } |
154 defer server.Close() | 167 defer server.Close() |
155 » » » » done := make(chan bool) | 168 » » » » go io.Copy(&haltableWriter{ |
156 » » » » go func() { | 169 » » » » » w: client, |
157 » » » » » io.Copy(&haltableWriter{ | 170 » » » » » stop: p.stop, |
158 » » » » » » w: client, | 171 » » » » » start: p.start}, |
159 » » » » » » stop: p.stop, | 172 » » » » » server) |
160 » » » » » » start: p.start}, | 173 » » » » // When the client is closed, the deferred close s will |
161 » » » » » » server) | 174 » » » » // take down the other io.Copy too. |
162 » » » » » done <- true | 175 » » » » io.Copy(server, client) |
163 » » » » }() | |
164 » » » » go func() { | |
165 » » » » » io.Copy(server, client) | |
166 » » » » » done <- true | |
167 » » » » }() | |
168 » » » » <-done | |
169 » » » » <-done | |
170 }() | 176 }() |
171 } | 177 } |
172 }() | 178 }() |
173 return p | 179 return p |
174 } | 180 } |
175 | 181 |
176 func (p *proxy) close() error { | 182 func (p *proxy) close() error { |
177 return p.listener.Close() | 183 return p.listener.Close() |
178 } | 184 } |
179 | 185 |
(...skipping 25 matching lines...) Expand all Loading... | |
205 func (w *haltableWriter) Write(buf []byte) (int, error) { | 211 func (w *haltableWriter) Write(buf []byte) (int, error) { |
206 select { | 212 select { |
207 case <-w.stop: | 213 case <-w.stop: |
208 w.stop <- true | 214 w.stop <- true |
209 <-w.start | 215 <-w.start |
210 w.start <- true | 216 w.start <- true |
211 default: | 217 default: |
212 } | 218 } |
213 return w.w.Write(buf) | 219 return w.w.Write(buf) |
214 } | 220 } |
LEFT | RIGHT |