LEFT | RIGHT |
1 // Copyright 2013 The Go Authors. All rights reserved. | 1 // Copyright 2013 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package net | 5 package net |
6 | 6 |
7 import "sync/atomic" | 7 import "sync/atomic" |
8 | 8 |
9 // fdMutex is a specialized synchronization primitive | 9 // fdMutex is a specialized synchronization primitive |
10 // that manages lifetime of fd and serializes access | 10 // that manages lifetime of an fd and serializes access |
11 // to Read and Write methods on netFD. | 11 // to Read and Write methods on netFD. |
12 type fdMutex struct { | 12 type fdMutex struct { |
13 state uint64 | 13 state uint64 |
14 rsema uint32 | 14 rsema uint32 |
15 wsema uint32 | 15 wsema uint32 |
16 } | 16 } |
17 | 17 |
18 // fdMutex.state is organized as follows: | 18 // fdMutex.state is organized as follows: |
19 // 1 bit - whether netFD is closed, if set all subsequent lock operations will f
ail. | 19 // 1 bit - whether netFD is closed, if set all subsequent lock operations will f
ail. |
20 // 1 bit - lock for read operations. | 20 // 1 bit - lock for read operations. |
21 // 1 bit - lock for write operations. | 21 // 1 bit - lock for write operations. |
22 // 20 bits - total number of references (read+write+misc). | 22 // 20 bits - total number of references (read+write+misc). |
23 // 20 bits - number of outstanding read waiters. | 23 // 20 bits - number of outstanding read waiters. |
24 // 20 bits - number of outstanding write waiters. | 24 // 20 bits - number of outstanding write waiters. |
25 const ( | 25 const ( |
26 mutexClosed = 1 << 0 | 26 mutexClosed = 1 << 0 |
27 mutexRLock = 1 << 1 | 27 mutexRLock = 1 << 1 |
28 mutexWLock = 1 << 2 | 28 mutexWLock = 1 << 2 |
29 mutexRef = 1 << 3 | 29 mutexRef = 1 << 3 |
30 mutexRefMask = (1<<20 - 1) << 3 | 30 mutexRefMask = (1<<20 - 1) << 3 |
31 mutexRWait = 1 << 23 | 31 mutexRWait = 1 << 23 |
32 mutexRMask = (1<<20 - 1) << 23 | 32 mutexRMask = (1<<20 - 1) << 23 |
33 mutexWWait = 1 << 43 | 33 mutexWWait = 1 << 43 |
34 mutexWMask = (1<<20 - 1) << 43 | 34 mutexWMask = (1<<20 - 1) << 43 |
35 ) | 35 ) |
36 | 36 |
37 // Read operations must do ReadLock/ReadUnlock. | 37 // Read operations must do RWLock(true)/RWUnlock(true). |
38 // Write operations must do WriteLock/WriteUnlock. | 38 // Write operations must do RWLock(false)/RWUnlock(false). |
39 // Misc operations (setsockopt) must do Lock/Unlock. | 39 // Misc operations must do Incref/Decref. Misc operations include functions like |
40 // Close operation must do LockAndClose/Unlock. | 40 // setsockopt and setDeadline. They need to use Incref/Decref to ensure that |
| 41 // they operate on the correct fd in presence of a concurrent Close call |
| 42 // (otherwise fd can be closed under their feet). |
| 43 // Close operation must do IncrefAndClose/Decref. |
41 | 44 |
42 // All lock operations return false if fd is closed. | 45 // RWLock/Incref return whether fd is open. |
43 // All unlock operations return true if fd is closed and it has dropped the last
reference. | 46 // RWUnlock/Decref return whether fd is closed and there are no remaining refere
nces. |
44 | 47 |
45 func (mu *fdMutex) Lock() bool { | 48 func (mu *fdMutex) Incref() bool { |
46 for { | 49 for { |
47 old := atomic.LoadUint64(&mu.state) | 50 old := atomic.LoadUint64(&mu.state) |
48 if old&mutexClosed != 0 { | 51 if old&mutexClosed != 0 { |
49 return false | 52 return false |
50 } | 53 } |
51 new := old + mutexRef | 54 new := old + mutexRef |
52 if new&mutexRefMask == 0 { | 55 if new&mutexRefMask == 0 { |
53 panic("net: inconsistent fdMutex") | 56 panic("net: inconsistent fdMutex") |
54 } | 57 } |
55 if atomic.CompareAndSwapUint64(&mu.state, old, new) { | 58 if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
56 return true | 59 return true |
57 } | 60 } |
58 } | 61 } |
59 } | 62 } |
60 | 63 |
61 func (mu *fdMutex) LockAndClose() bool { | 64 func (mu *fdMutex) IncrefAndClose() bool { |
62 for { | 65 for { |
63 old := atomic.LoadUint64(&mu.state) | 66 old := atomic.LoadUint64(&mu.state) |
64 if old&mutexClosed != 0 { | 67 if old&mutexClosed != 0 { |
65 return false | 68 return false |
66 } | 69 } |
67 // Mark as closed and acquire a reference. | 70 // Mark as closed and acquire a reference. |
68 » » new := old | mutexClosed + mutexRef | 71 » » new := (old | mutexClosed) + mutexRef |
69 if new&mutexRefMask == 0 { | 72 if new&mutexRefMask == 0 { |
70 panic("net: inconsistent fdMutex") | 73 panic("net: inconsistent fdMutex") |
71 } | 74 } |
72 // Remove all read and write waiters. | 75 // Remove all read and write waiters. |
73 » » new &= ^uint64(mutexRMask | mutexWMask) | 76 » » new &^= mutexRMask | mutexWMask |
74 if atomic.CompareAndSwapUint64(&mu.state, old, new) { | 77 if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
75 // Wake all read and write waiters, | 78 // Wake all read and write waiters, |
76 // they will observe closed flag after wakeup. | 79 // they will observe closed flag after wakeup. |
77 for old&mutexRMask != 0 { | 80 for old&mutexRMask != 0 { |
78 old -= mutexRWait | 81 old -= mutexRWait |
79 runtime_Semrelease(&mu.rsema) | 82 runtime_Semrelease(&mu.rsema) |
80 } | 83 } |
81 for old&mutexWMask != 0 { | 84 for old&mutexWMask != 0 { |
82 old -= mutexWWait | 85 old -= mutexWWait |
83 runtime_Semrelease(&mu.wsema) | 86 runtime_Semrelease(&mu.wsema) |
84 } | 87 } |
85 return true | 88 return true |
86 } | 89 } |
87 } | 90 } |
88 } | 91 } |
89 | 92 |
90 func (mu *fdMutex) Unlock() bool { | 93 func (mu *fdMutex) Decref() bool { |
91 for { | 94 for { |
92 old := atomic.LoadUint64(&mu.state) | 95 old := atomic.LoadUint64(&mu.state) |
93 if old&mutexRefMask == 0 { | 96 if old&mutexRefMask == 0 { |
94 panic("net: inconsistent fdMutex") | 97 panic("net: inconsistent fdMutex") |
95 } | 98 } |
96 new := old - mutexRef | 99 new := old - mutexRef |
97 if atomic.CompareAndSwapUint64(&mu.state, old, new) { | 100 if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
98 return new&(mutexClosed|mutexRef) == mutexClosed | 101 return new&(mutexClosed|mutexRef) == mutexClosed |
99 } | 102 } |
100 } | 103 } |
101 } | 104 } |
102 | 105 |
103 func (mu *fdMutex) ReadLock() bool { | 106 func (mu *fdMutex) RWLock(read bool) bool { |
| 107 » var mutexBit, mutexWait, mutexMask uint64 |
| 108 » var mutexSema *uint32 |
| 109 » if read { |
| 110 » » mutexBit = mutexRLock |
| 111 » » mutexWait = mutexRWait |
| 112 » » mutexMask = mutexRMask |
| 113 » » mutexSema = &mu.rsema |
| 114 » } else { |
| 115 » » mutexBit = mutexWLock |
| 116 » » mutexWait = mutexWWait |
| 117 » » mutexMask = mutexWMask |
| 118 » » mutexSema = &mu.wsema |
| 119 » } |
104 for { | 120 for { |
105 old := atomic.LoadUint64(&mu.state) | 121 old := atomic.LoadUint64(&mu.state) |
106 if old&mutexClosed != 0 { | 122 if old&mutexClosed != 0 { |
107 return false | 123 return false |
108 } | 124 } |
109 var new uint64 | 125 var new uint64 |
110 » » if old&mutexRLock == 0 { | 126 » » if old&mutexBit == 0 { |
111 » » » // Read lock is free, acquire it. | 127 » » » // Lock is free, acquire it. |
112 » » » new = old | mutexRLock + mutexRef | 128 » » » new = (old | mutexBit) + mutexRef |
113 if new&mutexRefMask == 0 { | 129 if new&mutexRefMask == 0 { |
114 panic("net: inconsistent fdMutex") | 130 panic("net: inconsistent fdMutex") |
115 } | 131 } |
116 } else { | 132 } else { |
117 » » » // Wait for read lock. | 133 » » » // Wait for lock. |
118 » » » new = old + mutexRWait | 134 » » » new = old + mutexWait |
119 » » » if new&mutexRMask == 0 { | 135 » » » if new&mutexMask == 0 { |
120 panic("net: inconsistent fdMutex") | 136 panic("net: inconsistent fdMutex") |
121 } | 137 } |
122 } | 138 } |
123 if atomic.CompareAndSwapUint64(&mu.state, old, new) { | 139 if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
124 » » » if old&mutexRLock == 0 { | 140 » » » if old&mutexBit == 0 { |
125 return true | 141 return true |
126 } | 142 } |
127 » » » runtime_Semacquire(&mu.rsema) | 143 » » » runtime_Semacquire(mutexSema) |
128 » » » // The signaller has substracted mutexRWait. | 144 » » » // The signaller has subtracted mutexWait. |
129 } | 145 } |
130 } | 146 } |
131 } | 147 } |
132 | 148 |
133 func (mu *fdMutex) ReadUnlock() bool { | 149 func (mu *fdMutex) RWUnlock(read bool) bool { |
| 150 » var mutexBit, mutexWait, mutexMask uint64 |
| 151 » var mutexSema *uint32 |
| 152 » if read { |
| 153 » » mutexBit = mutexRLock |
| 154 » » mutexWait = mutexRWait |
| 155 » » mutexMask = mutexRMask |
| 156 » » mutexSema = &mu.rsema |
| 157 » } else { |
| 158 » » mutexBit = mutexWLock |
| 159 » » mutexWait = mutexWWait |
| 160 » » mutexMask = mutexWMask |
| 161 » » mutexSema = &mu.wsema |
| 162 » } |
134 for { | 163 for { |
135 old := atomic.LoadUint64(&mu.state) | 164 old := atomic.LoadUint64(&mu.state) |
136 » » if old&mutexRLock == 0 || old&mutexRefMask == 0 { | 165 » » if old&mutexBit == 0 || old&mutexRefMask == 0 { |
137 panic("net: inconsistent fdMutex") | 166 panic("net: inconsistent fdMutex") |
138 } | 167 } |
139 » » // Drop read lock, drop reference and wake read waiter if presen
t. | 168 » » // Drop lock, drop reference and wake read waiter if present. |
140 » » new := old&^mutexRLock - mutexRef | 169 » » new := (old &^ mutexBit) - mutexRef |
141 » » if old&mutexRMask != 0 { | 170 » » if old&mutexMask != 0 { |
142 » » » new -= mutexRWait | 171 » » » new -= mutexWait |
143 } | 172 } |
144 if atomic.CompareAndSwapUint64(&mu.state, old, new) { | 173 if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
145 » » » if old&mutexRMask != 0 { | 174 » » » if old&mutexMask != 0 { |
146 » » » » runtime_Semrelease(&mu.rsema) | 175 » » » » runtime_Semrelease(mutexSema) |
147 » » » } | |
148 » » » return new&(mutexClosed|mutexRef) == mutexClosed | |
149 » » } | |
150 » } | |
151 } | |
152 | |
153 func (mu *fdMutex) WriteLock() bool { | |
154 » for { | |
155 » » old := atomic.LoadUint64(&mu.state) | |
156 » » if old&mutexClosed != 0 { | |
157 » » » return false | |
158 » » } | |
159 » » var new uint64 | |
160 » » if old&mutexWLock == 0 { | |
161 » » » // Write lock is free, acquire it. | |
162 » » » new = old | mutexWLock + mutexRef | |
163 » » » if new&mutexRefMask == 0 { | |
164 » » » » panic("net: inconsistent fdMutex") | |
165 » » » } | |
166 » » } else { | |
167 » » » // Wait for write lock. | |
168 » » » new = old + mutexWWait | |
169 » » » if new&mutexWMask == 0 { | |
170 » » » » panic("net: inconsistent fdMutex") | |
171 » » » } | |
172 » » } | |
173 » » if atomic.CompareAndSwapUint64(&mu.state, old, new) { | |
174 » » » if old&mutexWLock == 0 { | |
175 » » » » return true | |
176 » » » } | |
177 » » » runtime_Semacquire(&mu.wsema) | |
178 » » » // The signaller has substracted mutexWWait. | |
179 » » } | |
180 » } | |
181 } | |
182 | |
183 func (mu *fdMutex) WriteUnlock() bool { | |
184 » for { | |
185 » » old := atomic.LoadUint64(&mu.state) | |
186 » » if old&mutexWLock == 0 || old&mutexRefMask == 0 { | |
187 » » » panic("net: inconsistent fdMutex") | |
188 » » } | |
189 » » // Drop write lock, decrement ref and wake write waiter if prese
nt. | |
190 » » new := old&^mutexWLock - mutexRef | |
191 » » if old&mutexWMask != 0 { | |
192 » » » new -= mutexWWait | |
193 » » } | |
194 » » if atomic.CompareAndSwapUint64(&mu.state, old, new) { | |
195 » » » if old&mutexWMask != 0 { | |
196 » » » » runtime_Semrelease(&mu.wsema) | |
197 } | 176 } |
198 return new&(mutexClosed|mutexRef) == mutexClosed | 177 return new&(mutexClosed|mutexRef) == mutexClosed |
199 } | 178 } |
200 } | 179 } |
201 } | 180 } |
202 | 181 |
203 // Implemented in runtime package. | 182 // Implemented in runtime package. |
204 func runtime_Semacquire(sema *uint32) | 183 func runtime_Semacquire(sema *uint32) |
205 func runtime_Semrelease(sema *uint32) | 184 func runtime_Semrelease(sema *uint32) |
LEFT | RIGHT |