Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(325)

Delta Between Two Patch Sets: close_test.go

Issue 5699093: zookeeper: make Conn.Close safe to call concurrently with other operations.
Left Patch Set: zookeeper: make Conn.Close safe to call concurrently with other operations. Created 12 years, 1 month ago
Right Patch Set: zookeeper: make Conn.Close safe to call concurrently with other operations. Created 12 years ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « no previous file | retry_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 package zookeeper_test 1 package zookeeper_test
2 2
3 import ( 3 import (
4 "io" 4 "io"
5 . "launchpad.net/gocheck" 5 . "launchpad.net/gocheck"
6 zk "launchpad.net/gozk/zookeeper" 6 zk "launchpad.net/gozk/zookeeper"
7 "log" 7 "log"
8 "net" 8 "net"
9 "time" 9 "time"
10 ) 10 )
11 11
12 // requestFuncs holds all the requests that take a read lock 12 // requestFuncs holds all the requests that take a read lock
13 // on the zk connection except those that don't actually 13 // on the zk connection except those that don't actually
14 // make a round trip to the server. 14 // make a round trip to the server.
15 var requestFuncs = []func(conn *zk.Conn, path string) error { 15 var requestFuncs = []func(conn *zk.Conn, path string) error{
16 » func(conn *zk.Conn, path string) error { 16 » func(conn *zk.Conn, path string) error {
17 _, err := conn.Create(path, "", 0, nil) 17 _, err := conn.Create(path, "", 0, nil)
18 return err 18 return err
19 }, 19 },
20 func(conn *zk.Conn, path string) error { 20 func(conn *zk.Conn, path string) error {
21 _, err := conn.Exists(path) 21 _, err := conn.Exists(path)
22 return err 22 return err
23 }, 23 },
24 func(conn *zk.Conn, path string) error { 24 func(conn *zk.Conn, path string) error {
25 _, _, err := conn.ExistsW(path) 25 _, _, err := conn.ExistsW(path)
26 return err 26 return err
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
59 func(conn *zk.Conn, path string) error { 59 func(conn *zk.Conn, path string) error {
60 return conn.Delete(path, 0) 60 return conn.Delete(path, 0)
61 }, 61 },
62 } 62 }
63 63
64 func (s *S) TestConcurrentClose(c *C) { 64 func (s *S) TestConcurrentClose(c *C) {
65 // make sure the server is ready to receive connections. 65 // make sure the server is ready to receive connections.
66 s.init(c) 66 s.init(c)
67 67
68 // Close should wait until all outstanding requests have 68 // Close should wait until all outstanding requests have
69 » // completed before returning. We check that by interposing a 69 » // completed before returning. The idea of this test is that
70 » // proxy between the client and the server, so that we can delay 70 » // any request that requests or changes a zookeeper node must
71 » // requests arbitrarily. We try each kind of request in turn with 71 » // make at least one round trip to the server, so we interpose a
72 » // a new connection, ignoring error returns, because we don't care 72 » // proxy between the client and the server which can stop all
73 » // what the operation does, just whether actually makes some kind 73 » // incoming traffic on demand, thus blocking the request until
74 » // of transaction. 74 » // we want it to unblock.
75 » //
76 » // We assume that all requests take less than 0.1s to complete,
77 » // thus when we wait below, neither of the above goroutines
78 » // should complete within the allotted time (the request because
79 » // it's waiting for a reply from the server and the close
80 » // because it's waiting for the request to complete). If the
81 » // locking doesn't work, the Close will return early. If the
82 » // proxy blocking doesn't work, the request will return early.
83 » //
84 » // When we reenable incoming messages from the server, both
85 » // goroutines should complete. We can't tell which completes
86 » // first, but the fact that the close blocked is sufficient to
87 » // tell that the locking is working correctly.
75 for i, f := range requestFuncs { 88 for i, f := range requestFuncs {
76 c.Logf("iter %d", i) 89 c.Logf("iter %d", i)
77 p := newProxy(c, s.zkAddr) 90 p := newProxy(c, s.zkAddr)
78 conn, watch, err := zk.Dial(p.addr(), 5e9) 91 conn, watch, err := zk.Dial(p.addr(), 5e9)
79 c.Assert(err, IsNil) 92 c.Assert(err, IsNil)
80 c.Assert((<-watch).Ok(), Equals, true) 93 c.Assert((<-watch).Ok(), Equals, true)
81 94
82 // sanity check that the connection is actually 95 // sanity check that the connection is actually
83 // up and running. 96 // up and running.
84 _, err = conn.Exists("/nothing") 97 _, err = conn.Exists("/nothing")
85 c.Assert(err, IsNil) 98 c.Assert(err, IsNil)
86 99
87 p.stopIncoming() 100 p.stopIncoming()
88 reqDone := make(chan bool) 101 reqDone := make(chan bool)
89 closeDone := make(chan bool) 102 closeDone := make(chan bool)
90 go func() { 103 go func() {
91 f(conn, "/closetest") 104 f(conn, "/closetest")
92 reqDone <- true 105 reqDone <- true
93 }() 106 }()
94 go func() { 107 go func() {
95 // sleep for long enough for the request to be initiated and the read lock taken. 108 // sleep for long enough for the request to be initiated and the read lock taken.
96 time.Sleep(0.05e9) 109 time.Sleep(0.05e9)
niemeyer 2012/02/27 18:03:17 How can you tell this is really unblocking at the
rog 2012/03/08 17:26:38 the idea is that any request that requests or chan
97 conn.Close() 110 conn.Close()
98 closeDone <- true 111 closeDone <- true
99 }() 112 }()
100 select { 113 select {
101 case <-reqDone: 114 case <-reqDone:
102 c.Fatalf("request %d finished early", i) 115 c.Fatalf("request %d finished early", i)
103 case <-closeDone: 116 case <-closeDone:
104 c.Fatalf("request %d close finished early", i) 117 c.Fatalf("request %d close finished early", i)
105 case <-time.After(0.1e9): 118 case <-time.After(0.1e9):
106 } 119 }
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 return 158 return
146 } 159 }
147 go func() { 160 go func() {
148 defer client.Close() 161 defer client.Close()
149 server, err := net.Dial("tcp", dstAddr) 162 server, err := net.Dial("tcp", dstAddr)
150 if err != nil { 163 if err != nil {
151 log.Printf("cannot dial %q: %v", dstAddr , err) 164 log.Printf("cannot dial %q: %v", dstAddr , err)
152 return 165 return
153 } 166 }
154 defer server.Close() 167 defer server.Close()
155 » » » » done := make(chan bool) 168 » » » » go io.Copy(&haltableWriter{
156 » » » » go func() { 169 » » » » » w: client,
157 » » » » » io.Copy(&haltableWriter{ 170 » » » » » stop: p.stop,
158 » » » » » » w: client, 171 » » » » » start: p.start},
159 » » » » » » stop: p.stop, 172 » » » » » server)
160 » » » » » » start: p.start}, 173 » » » » // When the client is closed, the deferred close s will
161 » » » » » » server) 174 » » » » // take down the other io.Copy too.
162 » » » » » done <- true 175 » » » » io.Copy(server, client)
163 » » » » }()
164 » » » » go func() {
165 » » » » » io.Copy(server, client)
166 » » » » » done <- true
167 » » » » }()
168 » » » » <-done
169 » » » » <-done
170 }() 176 }()
171 } 177 }
172 }() 178 }()
173 return p 179 return p
174 } 180 }
175 181
176 func (p *proxy) close() error { 182 func (p *proxy) close() error {
177 return p.listener.Close() 183 return p.listener.Close()
178 } 184 }
179 185
(...skipping 25 matching lines...) Expand all
205 func (w *haltableWriter) Write(buf []byte) (int, error) { 211 func (w *haltableWriter) Write(buf []byte) (int, error) {
206 select { 212 select {
207 case <-w.stop: 213 case <-w.stop:
208 w.stop <- true 214 w.stop <- true
209 <-w.start 215 <-w.start
210 w.start <- true 216 w.start <- true
211 default: 217 default:
212 } 218 }
213 return w.w.Write(buf) 219 return w.w.Write(buf)
214 } 220 }
LEFTRIGHT
« no previous file | retry_test.go » ('j') | Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Toggle Comments ('s')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b