Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1521)

Delta Between Two Patch Sets: src/pkg/net/fd_unix.go

Issue 6460108: net: epoll dispatcher 2
Left Patch Set: diff -r ac1b735e8753 https://go.googlecode.com/hg/ Created 11 years, 7 months ago
Right Patch Set: diff -r f2755950769b https://dvyukov%40google.com@code.google.com/p/go/ Created 11 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Right: Side by side diff | Download
« no previous file with change/comment | « src/pkg/net/fd_linux.go ('k') | src/pkg/net/fd_unix_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
(no file at all)
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 // +build darwin freebsd linux netbsd openbsd 5 // +build darwin freebsd linux netbsd openbsd
6 6
7 package net 7 package net
8 8
9 import ( 9 import (
10 "io" 10 "io"
11 "os" 11 "os"
12 "runtime"
13 "sync" 12 "sync"
14 "syscall" 13 "syscall"
15 "time"
16 ) 14 )
17 15
18 // Network file descriptor. 16 // Network file descriptor.
19 type netFD struct { 17 type netFD struct {
20 // locking/lifetime of sysfd 18 // locking/lifetime of sysfd
21 sysmu sync.Mutex 19 sysmu sync.Mutex
22 sysref int 20 sysref int
23 21
24 // must lock both sysmu and pollserver to write 22 // must lock both sysmu and pollserver to write
25 // can lock either to read 23 // can lock either to read
(...skipping 17 matching lines...) Expand all
43 wdeadline int64 41 wdeadline int64
44 wio sync.Mutex 42 wio sync.Mutex
45 43
46 // owned by fd wait server 44 // owned by fd wait server
47 ncr, ncw int 45 ncr, ncw int
48 46
49 // wait server 47 // wait server
50 pollServer *pollServer 48 pollServer *pollServer
51 } 49 }
52 50
53 // A pollServer helps FDs determine when to retry a non-blocking
54 // read or write after they get EAGAIN. When an FD needs to wait,
55 // call s.WaitRead() or s.WaitWrite() to pass the request to the poll server.
56 // When the pollServer finds that i/o on FD should be possible
57 // again, it will send on fd.cr/fd.cw to wake any waiting goroutines.
58 //
59 // To avoid races in closing, all fd operations are locked and
60 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown
61 // and sets a closing flag. Only when the last reference is removed
62 // will the fd be closed.
63
64 type pollServer struct {
65 pr, pw *os.File
66 poll *pollster // low-level OS hooks
67 sync.Mutex // controls pending and deadline
68 pending map[int]*netFD
69 deadline int64 // next deadline (nsec since 1970)
70 }
71
72 func (s *pollServer) AddFD(fd *netFD, mode int) error {
73 s.Lock()
74 intfd := fd.sysfd
75 if intfd < 0 || fd.closing {
76 // fd closed underfoot
77 s.Unlock()
78 return errClosing
79 }
80
81 var t int64
82 key := intfd << 1
83 if mode == 'r' {
84 fd.ncr++
85 t = fd.rdeadline
86 } else {
87 fd.ncw++
88 key++
89 t = fd.wdeadline
90 }
91 s.pending[key] = fd
92 doWakeup := false
93 if t > 0 && (s.deadline == 0 || t < s.deadline) {
94 s.deadline = t
95 doWakeup = true
96 }
97
98 wake, err := s.poll.AddFD(intfd, mode, false)
99 s.Unlock()
100 if err != nil {
101 return &OpError{"addfd", fd.net, fd.laddr, err}
102 }
103 if wake || doWakeup {
104 s.Wakeup()
105 }
106 return nil
107 }
108
109 // Evict evicts fd from the pending list, unblocking
110 // any I/O running on fd. The caller must have locked
111 // pollserver.
112 func (s *pollServer) Evict(fd *netFD) {
113 if s.pending[fd.sysfd<<1] == fd {
114 s.WakeFD(fd, 'r', errClosing)
115 s.poll.DelFD(fd.sysfd, 'r')
116 delete(s.pending, fd.sysfd<<1)
117 }
118 if s.pending[fd.sysfd<<1|1] == fd {
119 s.WakeFD(fd, 'w', errClosing)
120 s.poll.DelFD(fd.sysfd, 'w')
121 delete(s.pending, fd.sysfd<<1|1)
122 }
123 }
124
125 var wakeupbuf [1]byte
126
127 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
128
129 func (s *pollServer) LookupFD(fd int, mode int) *netFD {
130 key := fd << 1
131 if mode == 'w' {
132 key++
133 }
134 netfd, ok := s.pending[key]
135 if !ok {
136 return nil
137 }
138 delete(s.pending, key)
139 return netfd
140 }
141
142 func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
143 if mode == 'r' {
144 for fd.ncr > 0 {
145 fd.ncr--
146 fd.cr <- err
147 }
148 } else {
149 for fd.ncw > 0 {
150 fd.ncw--
151 fd.cw <- err
152 }
153 }
154 }
155
156 func (s *pollServer) Now() int64 {
157 return time.Now().UnixNano()
158 }
159
160 func (s *pollServer) CheckDeadlines() {
161 now := s.Now()
162 // TODO(rsc): This will need to be handled more efficiently,
163 // probably with a heap indexed by wakeup time.
164
165 var nextDeadline int64
166 for key, fd := range s.pending {
167 var t int64
168 var mode int
169 if key&1 == 0 {
170 mode = 'r'
171 } else {
172 mode = 'w'
173 }
174 if mode == 'r' {
175 t = fd.rdeadline
176 } else {
177 t = fd.wdeadline
178 }
179 if t > 0 {
180 if t <= now {
181 delete(s.pending, key)
182 if mode == 'r' {
183 s.poll.DelFD(fd.sysfd, mode)
184 fd.rdeadline = -1
185 } else {
186 s.poll.DelFD(fd.sysfd, mode)
187 fd.wdeadline = -1
188 }
189 s.WakeFD(fd, mode, nil)
190 } else if nextDeadline == 0 || t < nextDeadline {
191 nextDeadline = t
192 }
193 }
194 }
195 s.deadline = nextDeadline
196 }
197
198 func (s *pollServer) Run() {
199 var scratch [100]byte
200 s.Lock()
201 defer s.Unlock()
202 for {
203 var t = s.deadline
204 if t > 0 {
205 t = t - s.Now()
206 if t <= 0 {
207 s.CheckDeadlines()
208 continue
209 }
210 }
211 fd, mode, err := s.poll.WaitFD(s, t)
212 if err != nil {
213 print("pollServer WaitFD: ", err.Error(), "\n")
214 return
215 }
216 if fd < 0 {
217 // Timeout happened.
218 s.CheckDeadlines()
219 continue
220 }
221 if fd == int(s.pr.Fd()) {
222 // Drain our wakeup pipe (we could loop here,
223 // but it's unlikely that there are more than
224 // len(scratch) wakeup calls).
225 s.pr.Read(scratch[0:])
226 s.CheckDeadlines()
227 } else {
228 netfd := s.LookupFD(fd, mode)
229 if netfd == nil {
230 // This can happen because the WaitFD runs witho ut
231 // holding s's lock, so there might be a pending wakeup
232 // for an fd that has been evicted. No harm don e.
233 continue
234 }
235 s.WakeFD(netfd, mode, nil)
236 }
237 }
238 }
239
240 func (s *pollServer) WaitRead(fd *netFD) error {
241 err := s.AddFD(fd, 'r')
242 if err == nil {
243 err = <-fd.cr
244 }
245 return err
246 }
247
248 func (s *pollServer) WaitWrite(fd *netFD) error {
249 err := s.AddFD(fd, 'w')
250 if err == nil {
251 err = <-fd.cw
252 }
253 return err
254 }
255
256 // Network FD methods.
257 // Spread network FDs over several pollServers.
258
259 var pollMaxN int
260 var pollservers []*pollServer
261 var startServersOnce []func()
262
263 var canCancelIO = true // used for testing current package
264
265 func sysInit() {
266 pollMaxN = runtime.NumCPU()
267 if pollMaxN > 8 {
268 pollMaxN = 8 // No improvement then.
269 }
270 pollservers = make([]*pollServer, pollMaxN)
271 startServersOnce = make([]func(), pollMaxN)
272 for i := 0; i < pollMaxN; i++ {
273 k := i
274 once := new(sync.Once)
275 startServersOnce[i] = func() { once.Do(func() { startServer(k) } ) }
276 }
277 }
278
279 func startServer(k int) {
280 p, err := newPollServer()
281 if err != nil {
282 panic(err)
283 }
284 pollservers[k] = p
285 }
286
287 func server(fd int) *pollServer {
288 pollN := runtime.GOMAXPROCS(0)
289 if pollN > pollMaxN {
290 pollN = pollMaxN
291 }
292 k := fd % pollN
293 startServersOnce[k]()
294 return pollservers[k]
295 }
296
297 func newFD(fd, family, sotype int, net string) (*netFD, error) { 51 func newFD(fd, family, sotype int, net string) (*netFD, error) {
298 if err := syscall.SetNonblock(fd, true); err != nil { 52 if err := syscall.SetNonblock(fd, true); err != nil {
299 return nil, err 53 return nil, err
300 } 54 }
55 poller, err := server(fd)
56 if err != nil {
57 return nil, err
58 }
301 netfd := &netFD{ 59 netfd := &netFD{
302 » » sysfd: fd, 60 » » sysfd: fd,
303 » » family: family, 61 » » family: family,
304 » » sotype: sotype, 62 » » sotype: sotype,
305 » » net: net, 63 » » net: net,
306 » } 64 » » pollServer: poller,
307 » netfd.cr = make(chan error, 1) 65 » }
308 » netfd.cw = make(chan error, 1) 66 » if pollerUsesChans {
309 » netfd.pollServer = server(fd) 67 » » netfd.cr = make(chan error, 1)
68 » » netfd.cw = make(chan error, 1)
69 » }
310 return netfd, nil 70 return netfd, nil
311 } 71 }
312 72
313 func (fd *netFD) setAddr(laddr, raddr Addr) { 73 func (fd *netFD) setAddr(laddr, raddr Addr) {
314 fd.laddr = laddr 74 fd.laddr = laddr
315 fd.raddr = raddr 75 fd.raddr = raddr
316 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net) 76 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net)
317 } 77 }
318 78
319 func (fd *netFD) name() string { 79 func (fd *netFD) name() string {
320 var ls, rs string 80 var ls, rs string
321 if fd.laddr != nil { 81 if fd.laddr != nil {
322 ls = fd.laddr.String() 82 ls = fd.laddr.String()
323 } 83 }
324 if fd.raddr != nil { 84 if fd.raddr != nil {
325 rs = fd.raddr.String() 85 rs = fd.raddr.String()
326 } 86 }
327 return fd.net + ":" + ls + "->" + rs 87 return fd.net + ":" + ls + "->" + rs
328 } 88 }
329 89
90 /*
330 func (fd *netFD) connect(ra syscall.Sockaddr) error { 91 func (fd *netFD) connect(ra syscall.Sockaddr) error {
92 fd.wio.Lock()
93 defer fd.wio.Unlock()
94 runtime_resetFD(fd.pollServer, 'w')
331 err := syscall.Connect(fd.sysfd, ra) 95 err := syscall.Connect(fd.sysfd, ra)
332 hadTimeout := fd.wdeadline > 0 96 hadTimeout := fd.wdeadline > 0
333 if err == syscall.EINPROGRESS { 97 if err == syscall.EINPROGRESS {
334 if err = fd.pollServer.WaitWrite(fd); err != nil { 98 if err = fd.pollServer.WaitWrite(fd); err != nil {
335 return err 99 return err
336 } 100 }
337 if hadTimeout && fd.wdeadline < 0 { 101 if hadTimeout && fd.wdeadline < 0 {
338 return errTimeout 102 return errTimeout
339 } 103 }
340 var e int 104 var e int
341 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, sys call.SO_ERROR) 105 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, sys call.SO_ERROR)
342 if err != nil { 106 if err != nil {
343 return os.NewSyscallError("getsockopt", err) 107 return os.NewSyscallError("getsockopt", err)
344 } 108 }
345 if e != 0 { 109 if e != 0 {
346 err = syscall.Errno(e) 110 err = syscall.Errno(e)
347 } 111 }
348 } 112 }
349 return err 113 return err
114 }
115 */
116
117 func (fd *netFD) connect(ra syscall.Sockaddr) error {
118 for {
119 err := syscall.Connect(fd.sysfd, ra)
120 if err == nil {
121 break
122 } else if err == syscall.EINPROGRESS || err == syscall.EALREADY {
123 fd.wio.Lock()
124 err = fd.pollServer.WaitWrite(fd)
125 fd.wio.Unlock()
126 if err != nil {
127 return err
128 }
129 } else {
130 return err
131 }
132 }
133 return nil
350 } 134 }
351 135
352 // Add a reference to this fd. 136 // Add a reference to this fd.
353 // If closing==true, pollserver must be locked; mark the fd as closing. 137 // If closing==true, pollserver must be locked; mark the fd as closing.
354 // Returns an error if the fd cannot be used. 138 // Returns an error if the fd cannot be used.
355 func (fd *netFD) incref(closing bool) error { 139 func (fd *netFD) incref(closing bool) error {
356 fd.sysmu.Lock() 140 fd.sysmu.Lock()
357 if fd.closing { 141 if fd.closing {
358 fd.sysmu.Unlock() 142 fd.sysmu.Unlock()
359 return errClosing 143 return errClosing
360 } 144 }
361 fd.sysref++ 145 fd.sysref++
362 if closing { 146 if closing {
363 fd.closing = true 147 fd.closing = true
364 } 148 }
365 fd.sysmu.Unlock() 149 fd.sysmu.Unlock()
366 return nil 150 return nil
367 } 151 }
368 152
369 // Remove a reference to this FD and close if we've been asked to do so (and 153 // Remove a reference to this FD and close if we've been asked to do so (and
370 // there are no references left. 154 // there are no references left.
371 func (fd *netFD) decref() { 155 func (fd *netFD) decref() {
372 fd.sysmu.Lock() 156 fd.sysmu.Lock()
373 fd.sysref-- 157 fd.sysref--
374 if fd.closing && fd.sysref == 0 && fd.sysfile != nil { 158 if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
375 fd.sysfile.Close() 159 fd.sysfile.Close()
376 fd.sysfile = nil 160 fd.sysfile = nil
377 fd.sysfd = -1 161 fd.sysfd = -1
162 fd.pollServer.Close(fd)
163 fd.pollServer = nil
378 } 164 }
379 fd.sysmu.Unlock() 165 fd.sysmu.Unlock()
380 } 166 }
381 167
382 func (fd *netFD) Close() error { 168 func (fd *netFD) Close() error {
383 fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.E vict 169 fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.E vict
384 defer fd.pollServer.Unlock() 170 defer fd.pollServer.Unlock()
385 if err := fd.incref(true); err != nil { 171 if err := fd.incref(true); err != nil {
386 return err 172 return err
387 } 173 }
(...skipping 27 matching lines...) Expand all
415 return fd.shutdown(syscall.SHUT_WR) 201 return fd.shutdown(syscall.SHUT_WR)
416 } 202 }
417 203
418 func (fd *netFD) Read(p []byte) (n int, err error) { 204 func (fd *netFD) Read(p []byte) (n int, err error) {
419 fd.rio.Lock() 205 fd.rio.Lock()
420 defer fd.rio.Unlock() 206 defer fd.rio.Unlock()
421 if err := fd.incref(false); err != nil { 207 if err := fd.incref(false); err != nil {
422 return 0, err 208 return 0, err
423 } 209 }
424 defer fd.decref() 210 defer fd.decref()
211 runtime_resetFD(fd.pollServer, 'r')
425 for { 212 for {
426 n, err = syscall.Read(int(fd.sysfd), p) 213 n, err = syscall.Read(int(fd.sysfd), p)
427 if err == syscall.EAGAIN { 214 if err == syscall.EAGAIN {
428 err = errTimeout 215 err = errTimeout
429 if fd.rdeadline >= 0 { 216 if fd.rdeadline >= 0 {
430 if err = fd.pollServer.WaitRead(fd); err == nil { 217 if err = fd.pollServer.WaitRead(fd); err == nil {
431 continue 218 continue
432 } 219 }
433 } 220 }
434 } 221 }
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
504 291
505 func (fd *netFD) Write(p []byte) (int, error) { 292 func (fd *netFD) Write(p []byte) (int, error) {
506 fd.wio.Lock() 293 fd.wio.Lock()
507 defer fd.wio.Unlock() 294 defer fd.wio.Unlock()
508 if err := fd.incref(false); err != nil { 295 if err := fd.incref(false); err != nil {
509 return 0, err 296 return 0, err
510 } 297 }
511 defer fd.decref() 298 defer fd.decref()
512 var err error 299 var err error
513 nn := 0 300 nn := 0
301 runtime_resetFD(fd.pollServer, 'w')
514 for { 302 for {
515 var n int 303 var n int
516 n, err = syscall.Write(int(fd.sysfd), p[nn:]) 304 n, err = syscall.Write(int(fd.sysfd), p[nn:])
517 if n > 0 { 305 if n > 0 {
518 nn += n 306 nn += n
519 } 307 }
520 if nn == len(p) { 308 if nn == len(p) {
521 break 309 break
522 } 310 }
523 if err == syscall.EAGAIN { 311 if err == syscall.EAGAIN {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
592 if err == nil { 380 if err == nil {
593 n = len(p) 381 n = len(p)
594 oobn = len(oob) 382 oobn = len(oob)
595 } else { 383 } else {
596 err = &OpError{"write", fd.net, fd.raddr, err} 384 err = &OpError{"write", fd.net, fd.raddr, err}
597 } 385 }
598 return 386 return
599 } 387 }
600 388
601 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e rror) { 389 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e rror) {
390 fd.rio.Lock()
391 defer fd.rio.Unlock()
602 if err := fd.incref(false); err != nil { 392 if err := fd.incref(false); err != nil {
603 return nil, err 393 return nil, err
604 } 394 }
605 defer fd.decref() 395 defer fd.decref()
606 396
607 // See ../syscall/exec_unix.go for description of ForkLock. 397 // See ../syscall/exec_unix.go for description of ForkLock.
608 // It is okay to hold the lock across syscall.Accept 398 // It is okay to hold the lock across syscall.Accept
609 // because we have put fd.sysfd into non-blocking mode. 399 // because we have put fd.sysfd into non-blocking mode.
610 var s int 400 var s int
611 var rsa syscall.Sockaddr 401 var rsa syscall.Sockaddr
402 runtime_resetFD(fd.pollServer, 'r')
612 for { 403 for {
613 syscall.ForkLock.RLock() 404 syscall.ForkLock.RLock()
614 s, rsa, err = syscall.Accept(fd.sysfd) 405 s, rsa, err = syscall.Accept(fd.sysfd)
615 if err != nil { 406 if err != nil {
616 syscall.ForkLock.RUnlock() 407 syscall.ForkLock.RUnlock()
617 if err == syscall.EAGAIN { 408 if err == syscall.EAGAIN {
618 err = errTimeout 409 err = errTimeout
619 if fd.rdeadline >= 0 { 410 if fd.rdeadline >= 0 {
620 if err = fd.pollServer.WaitRead(fd); err == nil { 411 if err = fd.pollServer.WaitRead(fd); err == nil {
621 continue 412 continue
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
656 if err = syscall.SetNonblock(ns, false); err != nil { 447 if err = syscall.SetNonblock(ns, false); err != nil {
657 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err} 448 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err}
658 } 449 }
659 450
660 return os.NewFile(uintptr(ns), fd.name()), nil 451 return os.NewFile(uintptr(ns), fd.name()), nil
661 } 452 }
662 453
663 func closesocket(s int) error { 454 func closesocket(s int) error {
664 return syscall.Close(s) 455 return syscall.Close(s)
665 } 456 }
457
458 var canCancelIO = true // used for testing current package
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b