LEFT | RIGHT |
(no file at all) | |
1 // Copyright 2009 The Go Authors. All rights reserved. | 1 // Copyright 2009 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 // +build darwin freebsd linux netbsd openbsd | 5 // +build darwin freebsd linux netbsd openbsd |
6 | 6 |
7 package net | 7 package net |
8 | 8 |
9 import ( | 9 import ( |
10 "io" | 10 "io" |
11 "os" | 11 "os" |
12 "runtime" | |
13 "sync" | 12 "sync" |
14 "syscall" | 13 "syscall" |
15 "time" | |
16 ) | 14 ) |
17 | 15 |
18 // Network file descriptor. | 16 // Network file descriptor. |
19 type netFD struct { | 17 type netFD struct { |
20 // locking/lifetime of sysfd | 18 // locking/lifetime of sysfd |
21 sysmu sync.Mutex | 19 sysmu sync.Mutex |
22 sysref int | 20 sysref int |
23 | 21 |
24 // must lock both sysmu and pollserver to write | 22 // must lock both sysmu and pollserver to write |
25 // can lock either to read | 23 // can lock either to read |
(...skipping 17 matching lines...) Expand all Loading... |
43 wdeadline int64 | 41 wdeadline int64 |
44 wio sync.Mutex | 42 wio sync.Mutex |
45 | 43 |
46 // owned by fd wait server | 44 // owned by fd wait server |
47 ncr, ncw int | 45 ncr, ncw int |
48 | 46 |
49 // wait server | 47 // wait server |
50 pollServer *pollServer | 48 pollServer *pollServer |
51 } | 49 } |
52 | 50 |
53 // A pollServer helps FDs determine when to retry a non-blocking | |
54 // read or write after they get EAGAIN. When an FD needs to wait, | |
55 // call s.WaitRead() or s.WaitWrite() to pass the request to the poll server. | |
56 // When the pollServer finds that i/o on FD should be possible | |
57 // again, it will send on fd.cr/fd.cw to wake any waiting goroutines. | |
58 // | |
59 // To avoid races in closing, all fd operations are locked and | |
60 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown | |
61 // and sets a closing flag. Only when the last reference is removed | |
62 // will the fd be closed. | |
63 | |
64 type pollServer struct { | |
65 pr, pw *os.File | |
66 poll *pollster // low-level OS hooks | |
67 sync.Mutex // controls pending and deadline | |
68 pending map[int]*netFD | |
69 deadline int64 // next deadline (nsec since 1970) | |
70 } | |
71 | |
72 func (s *pollServer) AddFD(fd *netFD, mode int) error { | |
73 s.Lock() | |
74 intfd := fd.sysfd | |
75 if intfd < 0 || fd.closing { | |
76 // fd closed underfoot | |
77 s.Unlock() | |
78 return errClosing | |
79 } | |
80 | |
81 var t int64 | |
82 key := intfd << 1 | |
83 if mode == 'r' { | |
84 fd.ncr++ | |
85 t = fd.rdeadline | |
86 } else { | |
87 fd.ncw++ | |
88 key++ | |
89 t = fd.wdeadline | |
90 } | |
91 s.pending[key] = fd | |
92 doWakeup := false | |
93 if t > 0 && (s.deadline == 0 || t < s.deadline) { | |
94 s.deadline = t | |
95 doWakeup = true | |
96 } | |
97 | |
98 wake, err := s.poll.AddFD(intfd, mode, false) | |
99 s.Unlock() | |
100 if err != nil { | |
101 return &OpError{"addfd", fd.net, fd.laddr, err} | |
102 } | |
103 if wake || doWakeup { | |
104 s.Wakeup() | |
105 } | |
106 return nil | |
107 } | |
108 | |
109 // Evict evicts fd from the pending list, unblocking | |
110 // any I/O running on fd. The caller must have locked | |
111 // pollserver. | |
112 func (s *pollServer) Evict(fd *netFD) { | |
113 if s.pending[fd.sysfd<<1] == fd { | |
114 s.WakeFD(fd, 'r', errClosing) | |
115 s.poll.DelFD(fd.sysfd, 'r') | |
116 delete(s.pending, fd.sysfd<<1) | |
117 } | |
118 if s.pending[fd.sysfd<<1|1] == fd { | |
119 s.WakeFD(fd, 'w', errClosing) | |
120 s.poll.DelFD(fd.sysfd, 'w') | |
121 delete(s.pending, fd.sysfd<<1|1) | |
122 } | |
123 } | |
124 | |
125 var wakeupbuf [1]byte | |
126 | |
127 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) } | |
128 | |
129 func (s *pollServer) LookupFD(fd int, mode int) *netFD { | |
130 key := fd << 1 | |
131 if mode == 'w' { | |
132 key++ | |
133 } | |
134 netfd, ok := s.pending[key] | |
135 if !ok { | |
136 return nil | |
137 } | |
138 delete(s.pending, key) | |
139 return netfd | |
140 } | |
141 | |
142 func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { | |
143 if mode == 'r' { | |
144 for fd.ncr > 0 { | |
145 fd.ncr-- | |
146 fd.cr <- err | |
147 } | |
148 } else { | |
149 for fd.ncw > 0 { | |
150 fd.ncw-- | |
151 fd.cw <- err | |
152 } | |
153 } | |
154 } | |
155 | |
156 func (s *pollServer) Now() int64 { | |
157 return time.Now().UnixNano() | |
158 } | |
159 | |
160 func (s *pollServer) CheckDeadlines() { | |
161 now := s.Now() | |
162 // TODO(rsc): This will need to be handled more efficiently, | |
163 // probably with a heap indexed by wakeup time. | |
164 | |
165 var nextDeadline int64 | |
166 for key, fd := range s.pending { | |
167 var t int64 | |
168 var mode int | |
169 if key&1 == 0 { | |
170 mode = 'r' | |
171 } else { | |
172 mode = 'w' | |
173 } | |
174 if mode == 'r' { | |
175 t = fd.rdeadline | |
176 } else { | |
177 t = fd.wdeadline | |
178 } | |
179 if t > 0 { | |
180 if t <= now { | |
181 delete(s.pending, key) | |
182 if mode == 'r' { | |
183 s.poll.DelFD(fd.sysfd, mode) | |
184 fd.rdeadline = -1 | |
185 } else { | |
186 s.poll.DelFD(fd.sysfd, mode) | |
187 fd.wdeadline = -1 | |
188 } | |
189 s.WakeFD(fd, mode, nil) | |
190 } else if nextDeadline == 0 || t < nextDeadline { | |
191 nextDeadline = t | |
192 } | |
193 } | |
194 } | |
195 s.deadline = nextDeadline | |
196 } | |
197 | |
198 func (s *pollServer) Run() { | |
199 var scratch [100]byte | |
200 s.Lock() | |
201 defer s.Unlock() | |
202 for { | |
203 var t = s.deadline | |
204 if t > 0 { | |
205 t = t - s.Now() | |
206 if t <= 0 { | |
207 s.CheckDeadlines() | |
208 continue | |
209 } | |
210 } | |
211 fd, mode, err := s.poll.WaitFD(s, t) | |
212 if err != nil { | |
213 print("pollServer WaitFD: ", err.Error(), "\n") | |
214 return | |
215 } | |
216 if fd < 0 { | |
217 // Timeout happened. | |
218 s.CheckDeadlines() | |
219 continue | |
220 } | |
221 if fd == int(s.pr.Fd()) { | |
222 // Drain our wakeup pipe (we could loop here, | |
223 // but it's unlikely that there are more than | |
224 // len(scratch) wakeup calls). | |
225 s.pr.Read(scratch[0:]) | |
226 s.CheckDeadlines() | |
227 } else { | |
228 netfd := s.LookupFD(fd, mode) | |
229 if netfd == nil { | |
230 // This can happen because the WaitFD runs witho
ut | |
231 // holding s's lock, so there might be a pending
wakeup | |
232 // for an fd that has been evicted. No harm don
e. | |
233 continue | |
234 } | |
235 s.WakeFD(netfd, mode, nil) | |
236 } | |
237 } | |
238 } | |
239 | |
240 func (s *pollServer) WaitRead(fd *netFD) error { | |
241 err := s.AddFD(fd, 'r') | |
242 if err == nil { | |
243 err = <-fd.cr | |
244 } | |
245 return err | |
246 } | |
247 | |
248 func (s *pollServer) WaitWrite(fd *netFD) error { | |
249 err := s.AddFD(fd, 'w') | |
250 if err == nil { | |
251 err = <-fd.cw | |
252 } | |
253 return err | |
254 } | |
255 | |
256 // Network FD methods. | |
257 // Spread network FDs over several pollServers. | |
258 | |
259 var pollMaxN int | |
260 var pollservers []*pollServer | |
261 var startServersOnce []func() | |
262 | |
263 var canCancelIO = true // used for testing current package | |
264 | |
265 func sysInit() { | |
266 pollMaxN = runtime.NumCPU() | |
267 if pollMaxN > 8 { | |
268 pollMaxN = 8 // No improvement then. | |
269 } | |
270 pollservers = make([]*pollServer, pollMaxN) | |
271 startServersOnce = make([]func(), pollMaxN) | |
272 for i := 0; i < pollMaxN; i++ { | |
273 k := i | |
274 once := new(sync.Once) | |
275 startServersOnce[i] = func() { once.Do(func() { startServer(k) }
) } | |
276 } | |
277 } | |
278 | |
279 func startServer(k int) { | |
280 p, err := newPollServer() | |
281 if err != nil { | |
282 panic(err) | |
283 } | |
284 pollservers[k] = p | |
285 } | |
286 | |
287 func server(fd int) *pollServer { | |
288 pollN := runtime.GOMAXPROCS(0) | |
289 if pollN > pollMaxN { | |
290 pollN = pollMaxN | |
291 } | |
292 k := fd % pollN | |
293 startServersOnce[k]() | |
294 return pollservers[k] | |
295 } | |
296 | |
297 func newFD(fd, family, sotype int, net string) (*netFD, error) { | 51 func newFD(fd, family, sotype int, net string) (*netFD, error) { |
298 if err := syscall.SetNonblock(fd, true); err != nil { | 52 if err := syscall.SetNonblock(fd, true); err != nil { |
299 return nil, err | 53 return nil, err |
300 } | 54 } |
| 55 poller, err := server(fd) |
| 56 if err != nil { |
| 57 return nil, err |
| 58 } |
301 netfd := &netFD{ | 59 netfd := &netFD{ |
302 » » sysfd: fd, | 60 » » sysfd: fd, |
303 » » family: family, | 61 » » family: family, |
304 » » sotype: sotype, | 62 » » sotype: sotype, |
305 » » net: net, | 63 » » net: net, |
306 » } | 64 » » pollServer: poller, |
307 » netfd.cr = make(chan error, 1) | 65 » } |
308 » netfd.cw = make(chan error, 1) | 66 » if pollerUsesChans { |
309 » netfd.pollServer = server(fd) | 67 » » netfd.cr = make(chan error, 1) |
| 68 » » netfd.cw = make(chan error, 1) |
| 69 » } |
310 return netfd, nil | 70 return netfd, nil |
311 } | 71 } |
312 | 72 |
313 func (fd *netFD) setAddr(laddr, raddr Addr) { | 73 func (fd *netFD) setAddr(laddr, raddr Addr) { |
314 fd.laddr = laddr | 74 fd.laddr = laddr |
315 fd.raddr = raddr | 75 fd.raddr = raddr |
316 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net) | 76 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net) |
317 } | 77 } |
318 | 78 |
319 func (fd *netFD) name() string { | 79 func (fd *netFD) name() string { |
320 var ls, rs string | 80 var ls, rs string |
321 if fd.laddr != nil { | 81 if fd.laddr != nil { |
322 ls = fd.laddr.String() | 82 ls = fd.laddr.String() |
323 } | 83 } |
324 if fd.raddr != nil { | 84 if fd.raddr != nil { |
325 rs = fd.raddr.String() | 85 rs = fd.raddr.String() |
326 } | 86 } |
327 return fd.net + ":" + ls + "->" + rs | 87 return fd.net + ":" + ls + "->" + rs |
328 } | 88 } |
329 | 89 |
| 90 /* |
330 func (fd *netFD) connect(ra syscall.Sockaddr) error { | 91 func (fd *netFD) connect(ra syscall.Sockaddr) error { |
| 92 fd.wio.Lock() |
| 93 defer fd.wio.Unlock() |
| 94 runtime_resetFD(fd.pollServer, 'w') |
331 err := syscall.Connect(fd.sysfd, ra) | 95 err := syscall.Connect(fd.sysfd, ra) |
332 hadTimeout := fd.wdeadline > 0 | 96 hadTimeout := fd.wdeadline > 0 |
333 if err == syscall.EINPROGRESS { | 97 if err == syscall.EINPROGRESS { |
334 if err = fd.pollServer.WaitWrite(fd); err != nil { | 98 if err = fd.pollServer.WaitWrite(fd); err != nil { |
335 return err | 99 return err |
336 } | 100 } |
337 if hadTimeout && fd.wdeadline < 0 { | 101 if hadTimeout && fd.wdeadline < 0 { |
338 return errTimeout | 102 return errTimeout |
339 } | 103 } |
340 var e int | 104 var e int |
341 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, sys
call.SO_ERROR) | 105 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, sys
call.SO_ERROR) |
342 if err != nil { | 106 if err != nil { |
343 return os.NewSyscallError("getsockopt", err) | 107 return os.NewSyscallError("getsockopt", err) |
344 } | 108 } |
345 if e != 0 { | 109 if e != 0 { |
346 err = syscall.Errno(e) | 110 err = syscall.Errno(e) |
347 } | 111 } |
348 } | 112 } |
349 return err | 113 return err |
| 114 } |
| 115 */ |
| 116 |
| 117 func (fd *netFD) connect(ra syscall.Sockaddr) error { |
| 118 for { |
| 119 err := syscall.Connect(fd.sysfd, ra) |
| 120 if err == nil { |
| 121 break |
| 122 } else if err == syscall.EINPROGRESS || err == syscall.EALREADY
{ |
| 123 fd.wio.Lock() |
| 124 err = fd.pollServer.WaitWrite(fd) |
| 125 fd.wio.Unlock() |
| 126 if err != nil { |
| 127 return err |
| 128 } |
| 129 } else { |
| 130 return err |
| 131 } |
| 132 } |
| 133 return nil |
350 } | 134 } |
351 | 135 |
352 // Add a reference to this fd. | 136 // Add a reference to this fd. |
353 // If closing==true, pollserver must be locked; mark the fd as closing. | 137 // If closing==true, pollserver must be locked; mark the fd as closing. |
354 // Returns an error if the fd cannot be used. | 138 // Returns an error if the fd cannot be used. |
355 func (fd *netFD) incref(closing bool) error { | 139 func (fd *netFD) incref(closing bool) error { |
356 fd.sysmu.Lock() | 140 fd.sysmu.Lock() |
357 if fd.closing { | 141 if fd.closing { |
358 fd.sysmu.Unlock() | 142 fd.sysmu.Unlock() |
359 return errClosing | 143 return errClosing |
360 } | 144 } |
361 fd.sysref++ | 145 fd.sysref++ |
362 if closing { | 146 if closing { |
363 fd.closing = true | 147 fd.closing = true |
364 } | 148 } |
365 fd.sysmu.Unlock() | 149 fd.sysmu.Unlock() |
366 return nil | 150 return nil |
367 } | 151 } |
368 | 152 |
369 // Remove a reference to this FD and close if we've been asked to do so (and | 153 // Remove a reference to this FD and close if we've been asked to do so (and |
370 // there are no references left. | 154 // there are no references left. |
371 func (fd *netFD) decref() { | 155 func (fd *netFD) decref() { |
372 fd.sysmu.Lock() | 156 fd.sysmu.Lock() |
373 fd.sysref-- | 157 fd.sysref-- |
374 if fd.closing && fd.sysref == 0 && fd.sysfile != nil { | 158 if fd.closing && fd.sysref == 0 && fd.sysfile != nil { |
375 fd.sysfile.Close() | 159 fd.sysfile.Close() |
376 fd.sysfile = nil | 160 fd.sysfile = nil |
377 fd.sysfd = -1 | 161 fd.sysfd = -1 |
| 162 fd.pollServer.Close(fd) |
| 163 fd.pollServer = nil |
378 } | 164 } |
379 fd.sysmu.Unlock() | 165 fd.sysmu.Unlock() |
380 } | 166 } |
381 | 167 |
382 func (fd *netFD) Close() error { | 168 func (fd *netFD) Close() error { |
383 fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.E
vict | 169 fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.E
vict |
384 defer fd.pollServer.Unlock() | 170 defer fd.pollServer.Unlock() |
385 if err := fd.incref(true); err != nil { | 171 if err := fd.incref(true); err != nil { |
386 return err | 172 return err |
387 } | 173 } |
(...skipping 27 matching lines...) Expand all Loading... |
415 return fd.shutdown(syscall.SHUT_WR) | 201 return fd.shutdown(syscall.SHUT_WR) |
416 } | 202 } |
417 | 203 |
418 func (fd *netFD) Read(p []byte) (n int, err error) { | 204 func (fd *netFD) Read(p []byte) (n int, err error) { |
419 fd.rio.Lock() | 205 fd.rio.Lock() |
420 defer fd.rio.Unlock() | 206 defer fd.rio.Unlock() |
421 if err := fd.incref(false); err != nil { | 207 if err := fd.incref(false); err != nil { |
422 return 0, err | 208 return 0, err |
423 } | 209 } |
424 defer fd.decref() | 210 defer fd.decref() |
| 211 runtime_resetFD(fd.pollServer, 'r') |
425 for { | 212 for { |
426 n, err = syscall.Read(int(fd.sysfd), p) | 213 n, err = syscall.Read(int(fd.sysfd), p) |
427 if err == syscall.EAGAIN { | 214 if err == syscall.EAGAIN { |
428 err = errTimeout | 215 err = errTimeout |
429 if fd.rdeadline >= 0 { | 216 if fd.rdeadline >= 0 { |
430 if err = fd.pollServer.WaitRead(fd); err == nil
{ | 217 if err = fd.pollServer.WaitRead(fd); err == nil
{ |
431 continue | 218 continue |
432 } | 219 } |
433 } | 220 } |
434 } | 221 } |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
504 | 291 |
505 func (fd *netFD) Write(p []byte) (int, error) { | 292 func (fd *netFD) Write(p []byte) (int, error) { |
506 fd.wio.Lock() | 293 fd.wio.Lock() |
507 defer fd.wio.Unlock() | 294 defer fd.wio.Unlock() |
508 if err := fd.incref(false); err != nil { | 295 if err := fd.incref(false); err != nil { |
509 return 0, err | 296 return 0, err |
510 } | 297 } |
511 defer fd.decref() | 298 defer fd.decref() |
512 var err error | 299 var err error |
513 nn := 0 | 300 nn := 0 |
| 301 runtime_resetFD(fd.pollServer, 'w') |
514 for { | 302 for { |
515 var n int | 303 var n int |
516 n, err = syscall.Write(int(fd.sysfd), p[nn:]) | 304 n, err = syscall.Write(int(fd.sysfd), p[nn:]) |
517 if n > 0 { | 305 if n > 0 { |
518 nn += n | 306 nn += n |
519 } | 307 } |
520 if nn == len(p) { | 308 if nn == len(p) { |
521 break | 309 break |
522 } | 310 } |
523 if err == syscall.EAGAIN { | 311 if err == syscall.EAGAIN { |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
592 if err == nil { | 380 if err == nil { |
593 n = len(p) | 381 n = len(p) |
594 oobn = len(oob) | 382 oobn = len(oob) |
595 } else { | 383 } else { |
596 err = &OpError{"write", fd.net, fd.raddr, err} | 384 err = &OpError{"write", fd.net, fd.raddr, err} |
597 } | 385 } |
598 return | 386 return |
599 } | 387 } |
600 | 388 |
601 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e
rror) { | 389 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e
rror) { |
| 390 fd.rio.Lock() |
| 391 defer fd.rio.Unlock() |
602 if err := fd.incref(false); err != nil { | 392 if err := fd.incref(false); err != nil { |
603 return nil, err | 393 return nil, err |
604 } | 394 } |
605 defer fd.decref() | 395 defer fd.decref() |
606 | 396 |
607 // See ../syscall/exec_unix.go for description of ForkLock. | 397 // See ../syscall/exec_unix.go for description of ForkLock. |
608 // It is okay to hold the lock across syscall.Accept | 398 // It is okay to hold the lock across syscall.Accept |
609 // because we have put fd.sysfd into non-blocking mode. | 399 // because we have put fd.sysfd into non-blocking mode. |
610 var s int | 400 var s int |
611 var rsa syscall.Sockaddr | 401 var rsa syscall.Sockaddr |
| 402 runtime_resetFD(fd.pollServer, 'r') |
612 for { | 403 for { |
613 syscall.ForkLock.RLock() | 404 syscall.ForkLock.RLock() |
614 s, rsa, err = syscall.Accept(fd.sysfd) | 405 s, rsa, err = syscall.Accept(fd.sysfd) |
615 if err != nil { | 406 if err != nil { |
616 syscall.ForkLock.RUnlock() | 407 syscall.ForkLock.RUnlock() |
617 if err == syscall.EAGAIN { | 408 if err == syscall.EAGAIN { |
618 err = errTimeout | 409 err = errTimeout |
619 if fd.rdeadline >= 0 { | 410 if fd.rdeadline >= 0 { |
620 if err = fd.pollServer.WaitRead(fd); err
== nil { | 411 if err = fd.pollServer.WaitRead(fd); err
== nil { |
621 continue | 412 continue |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
656 if err = syscall.SetNonblock(ns, false); err != nil { | 447 if err = syscall.SetNonblock(ns, false); err != nil { |
657 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err} | 448 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err} |
658 } | 449 } |
659 | 450 |
660 return os.NewFile(uintptr(ns), fd.name()), nil | 451 return os.NewFile(uintptr(ns), fd.name()), nil |
661 } | 452 } |
662 | 453 |
663 func closesocket(s int) error { | 454 func closesocket(s int) error { |
664 return syscall.Close(s) | 455 return syscall.Close(s) |
665 } | 456 } |
| 457 |
| 458 var canCancelIO = true // used for testing current package |
LEFT | RIGHT |