Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(370)

Side by Side Diff: src/pkg/net/fd_linux.go

Issue 6460108: net: epoll dispatcher 2
Patch Set: diff -r f2755950769b https://dvyukov%40google.com@code.google.com/p/go/ Created 11 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | src/pkg/net/fd_unix.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 // Waiting for FDs via epoll(7). 5 // +build none
6 6
7 package net 7 package net
8 8
9 import ( 9 import (
10 //"fmt"
11 "errors"
12 "io"
10 "os" 13 "os"
14 "sync"
11 "syscall" 15 "syscall"
12 ) 16 )
13 17
14 const ( 18 // Network file descriptor.
15 » readFlags = syscall.EPOLLIN | syscall.EPOLLRDHUP 19 type netFD struct {
16 » writeFlags = syscall.EPOLLOUT 20 » // locking/lifetime of sysfd
17 ) 21 » sysmu sync.Mutex
18 22 » sysref int
19 type pollster struct { 23 » closing bool
20 » epfd int 24 » pdesc uintptr
21 25
22 » // Events we're already waiting for 26 » // immutable until Close
23 » // Must hold pollServer lock 27 » sysfd int
24 » events map[int]uint32 28 » family int
25 29 » sotype int
26 » // An event buffer for EpollWait. 30 » isConnected bool
27 » // Used without a lock, may only be used by WaitFD. 31 » sysfile *os.File
28 » waitEventBuf [10]syscall.EpollEvent 32 » net string
29 » waitEvents []syscall.EpollEvent 33 » laddr Addr
30 34 » raddr Addr
31 » // An event buffer for EpollCtl, to avoid a malloc. 35
32 » // Must hold pollServer lock. 36 » rio sync.Mutex
33 » ctlEvent syscall.EpollEvent 37 » wio sync.Mutex
34 } 38 » rblock bool
35 39 » wblock bool
36 func newpollster() (p *pollster, err error) { 40 }
37 » p = new(pollster) 41
38 » if p.epfd, err = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC); err != nil { 42 func runtime_initPollServer()
39 » » if err != syscall.ENOSYS { 43 func runtime_initFD(fd int) (uintptr, error)
40 » » » return nil, os.NewSyscallError("epoll_create1", err) 44 func runtime_closeFD(desc uintptr)
41 » » } 45 func runtime_waitFD(desc uintptr, mode int) int32
42 » » // The arg to epoll_create is a hint to the kernel 46 func runtime_resetFD(desc uintptr, mode int)
43 » » // about the number of FDs we will care about. 47 func runtime_setDeadlineFD(desc uintptr, d int64, mode int)
44 » » // We don't know, and since 2.6.8 the kernel ignores it anyhow. 48 func runtime_unblockFD(desc uintptr)
45 » » if p.epfd, err = syscall.EpollCreate(16); err != nil { 49
46 » » » return nil, os.NewSyscallError("epoll_create", err) 50 func waitFD(fd *netFD, mode int) error {
47 » » } 51 » interr := runtime_waitFD(fd.pdesc, mode)
48 » » syscall.CloseOnExec(p.epfd) 52 » if interr == 0 {
49 » } 53 » » return nil
50 » p.events = make(map[int]uint32) 54 » }
51 » return p, nil 55 » if interr == 1 {
52 } 56 » » return errClosing
53 57 » }
54 func (p *pollster) AddFD(fd int, mode int, repeat bool) (bool, error) { 58 » if interr == 2 {
55 » // pollServer is locked. 59 » » return errTimeout
56 60 » }
57 » var already bool 61 » panic("net: unknown error code from waitFD()")
58 » p.ctlEvent.Fd = int32(fd) 62 }
59 » p.ctlEvent.Events, already = p.events[fd] 63
60 » if !repeat { 64 func waitRead(fd *netFD) error {
61 » » p.ctlEvent.Events |= syscall.EPOLLONESHOT 65 » return waitFD(fd, 'r')
62 » } 66 }
63 » if mode == 'r' { 67
64 » » p.ctlEvent.Events |= readFlags 68 func waitWrite(fd *netFD) error {
69 » return waitFD(fd, 'w')
70 }
71
72 var onceStartServer sync.Once
73
74 func newFD(fd, family, sotype int, net string) (*netFD, error) {
75 » onceStartServer.Do(runtime_initPollServer)
76 » if err := syscall.SetNonblock(fd, true); err != nil {
77 » » return nil, err
78 » }
79 » netfd := &netFD{
80 » » sysfd: fd,
81 » » family: family,
82 » » sotype: sotype,
83 » » net: net,
84 » }
85 » desc, err := runtime_initFD(fd)
86 » if err != nil {
87 » » return nil, err
88 » }
89 » netfd.pdesc = desc
90 » return netfd, nil
91 }
92
93 func (fd *netFD) setAddr(laddr, raddr Addr) {
94 » fd.laddr = laddr
95 » fd.raddr = raddr
96 » var ls, rs string
97 » if laddr != nil {
98 » » ls = laddr.String()
99 » }
100 » if raddr != nil {
101 » » rs = raddr.String()
102 » }
103 » fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net+":"+ls+"->"+rs)
104 }
105
106 func (fd *netFD) connect(ra syscall.Sockaddr) error {
107 » for {
108 » » err := syscall.Connect(fd.sysfd, ra)
109 » » if err == nil {
110 » » » break
111 » » } else if err == syscall.EINPROGRESS || err == syscall.EALREADY {
112 » » » fd.wio.Lock()
113 » » » err = waitWrite(fd)
114 » » » fd.wio.Unlock()
115 » » » if err != nil {
116 » » » » return err
117 » » » }
118 » » } else {
119 » » » return err
120 » » }
121 » }
122 » return nil
123 }
124
125 var errClosing = errors.New("use of closed network connection")
126
127 // Add a reference to this fd.
128 // If closing==true, pollserver must be locked; mark the fd as closing.
129 // Returns an error if the fd cannot be used.
130 func (fd *netFD) incref(closing bool) error {
131 » if fd == nil {
132 » » return errClosing
133 » }
134 » fd.sysmu.Lock()
135 » if fd.closing {
136 » » fd.sysmu.Unlock()
137 » » return errClosing
138 » }
139 » fd.sysref++
140 » if closing {
141 » » fd.closing = true
142 » }
143 » fd.sysmu.Unlock()
144 » return nil
145 }
146
147 // Remove a reference to this FD and close if we've been asked to do so (and
148 // there are no references left.
149 func (fd *netFD) decref() {
150 » if fd == nil {
151 » » return
152 » }
153 » fd.sysmu.Lock()
154 » fd.sysref--
155 » if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
156 » » fd.sysfile.Close()
157 » » fd.sysfile = nil
158 » » fd.sysfd = -1
159 » » runtime_closeFD(fd.pdesc)
160 » }
161 » fd.sysmu.Unlock()
162 }
163
164 func (fd *netFD) Close() error {
165 » if err := fd.incref(true); err != nil {
166 » » return err
167 » }
168 » // Unblock any I/O. Once it all unblocks and returns,
169 » // so that it cannot be referring to fd.sysfd anymore,
170 » // the final decref will close fd.sysfd. This should happen
171 » // fairly quickly, since all the I/O is non-blocking, and any
172 » // attempts to block in the pollserver will return errClosing.
173 » runtime_unblockFD(fd.pdesc)
174 » fd.decref()
175 » return nil
176 }
177
178 func (fd *netFD) shutdown(how int) error {
179 » if err := fd.incref(false); err != nil {
180 » » return err
181 » }
182 » defer fd.decref()
183 » err := syscall.Shutdown(fd.sysfd, how)
184 » if err != nil {
185 » » return &OpError{"shutdown", fd.net, fd.laddr, err}
186 » }
187 » return nil
188 }
189
190 func (fd *netFD) CloseRead() error {
191 » return fd.shutdown(syscall.SHUT_RD)
192 }
193
194 func (fd *netFD) CloseWrite() error {
195 » return fd.shutdown(syscall.SHUT_WR)
196 }
197
198 func (fd *netFD) Read(p []byte) (n int, err error) {
199 » fd.rio.Lock()
200 » //defer fd.rio.Unlock()
201 » if err := fd.incref(false); err != nil {
202 » » fd.rio.Unlock()
203 » » return 0, err //!!! does not wrap into OpError
204 » }
205 » //defer fd.decref()
206 » if fd.rblock {
207 » » //println(fd, "reset rblock and wait")
208 » » fd.rblock = false
209 » » err = waitRead(fd)
210 » » //println(fd, "ready")
211 » » if err != nil {
212 » » » fd.decref()
213 » » » fd.rio.Unlock()
214 » » » return 0, &OpError{"read", fd.net, fd.raddr, err}
215 » » }
65 } else { 216 } else {
66 » » p.ctlEvent.Events |= writeFlags 217 » » //println(fd, "reset fd")
67 » } 218 » » runtime_resetFD(fd.pdesc, 'r')
68 219 » }
69 » var op int 220 » for {
70 » if already { 221 » » n, err = syscall.Read(int(fd.sysfd), p)
71 » » op = syscall.EPOLL_CTL_MOD 222 » » /*
223 » » » if n > 0 && err == nil {
224 » » » fmt.Printf("READ %p %v:\n%v\n", fd, n, string(p[:n]))
225 » » » for i := 0; i < n; i++ {
226 » » » if i == 0 {
227 » » » fmt.Printf("\t\"")
228 » » » }
229 » » » c1 := p[i] >> 4
230 » » » r1 := c1 + '0'
231 » » » if c1 >=10 {
232 » » » r1 = c1 - 10 + 'A'
233 » » » }
234 » » » c2 := p[i] % 16
235 » » » r2 := c2 + '0'
236 » » » if c2 >=10 {
237 » » » r2 = c2 - 10 + 'A'
238 » » » }
239 » » » s := [2]byte{byte(r1), byte(r2)}
240 » » » fmt.Printf("\\x%v", string(s[:]))
241 » » » if (i+1) % 20 == 0 {
242 » » » fmt.Printf("\"\n\t\"")
243 » » » }
244 » » » }
245 » » » fmt.Printf("\n")
246 » » » }
247 » » */
248 » » if err == syscall.EAGAIN {
249 » » » //println(fd, "wait")
250 » » » if err = waitRead(fd); err == nil {
251 » » » » //println(fd, "ready")
252 » » » » continue
253 » » » }
254 » » }
255 » » if err != nil {
256 » » » n = 0
257 » » } else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRA M {
258 » » » err = io.EOF
259 » » }
260 » » if err == nil && fd.sotype == syscall.SOCK_STREAM && n > 0 && n < len(p) {
261 » » » //println(fd, "set rblock")
262 » » » //fd.rblock = true
263 » » }
264 » » break
265 » }
266 » if err != nil && err != io.EOF {
267 » » err = &OpError{"read", fd.net, fd.raddr, err}
268 » }
269 » //println(fd, "result", n, err)
270 » fd.decref()
271 » fd.rio.Unlock()
272 » return
273 }
274
275 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
276 » fd.rio.Lock()
277 » defer fd.rio.Unlock()
278 » if err := fd.incref(false); err != nil {
279 » » return 0, nil, err
280 » }
281 » defer fd.decref()
282 » for {
283 » » n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
284 » » if err == syscall.EAGAIN {
285 » » » if err = waitRead(fd); err == nil {
286 » » » » continue
287 » » » }
288 » » }
289 » » if err != nil {
290 » » » n = 0
291 » » }
292 » » break
293 » }
294 » if err != nil && err != io.EOF {
295 » » err = &OpError{"read", fd.net, fd.laddr, err}
296 » }
297 » return
298 }
299
300 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S ockaddr, err error) {
301 » fd.rio.Lock()
302 » defer fd.rio.Unlock()
303 » if err := fd.incref(false); err != nil {
304 » » return 0, 0, 0, nil, err
305 » }
306 » defer fd.decref()
307 » for {
308 » » n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
309 » » if err == syscall.EAGAIN {
310 » » » if err = waitRead(fd); err == nil {
311 » » » » continue
312 » » » }
313 » » }
314 » » if err == nil && n == 0 {
315 » » » err = io.EOF
316 » » }
317 » » break
318 » }
319 » if err != nil && err != io.EOF {
320 » » err = &OpError{"read", fd.net, fd.laddr, err}
321 » » return
322 » }
323 » return
324 }
325
326 func (fd *netFD) Write(p []byte) (int, error) {
327 » fd.wio.Lock()
328 » //defer fd.wio.Unlock()
329 » if err := fd.incref(false); err != nil {
330 » » fd.wio.Unlock()
331 » » return 0, err
332 » }
333 » //defer fd.decref()
334 » if fd.sysfile == nil {
335 » » fd.decref()
336 » » fd.wio.Unlock()
337 » » return 0, syscall.EINVAL
338 » }
339
340 » var err error
341 » nn := 0
342 » runtime_resetFD(fd.pdesc, 'w')
343 » for {
344 » » var n int
345 » » n, err = syscall.Write(int(fd.sysfd), p[nn:])
346 » » /*
347 » » » if n > 0 && err == nil {
348 » » » fmt.Printf("WRITE %p %v:\n%v\n", fd, n, string(p[nn:nn+n ]))
349 » » » for i := 0; i < n; i++ {
350 » » » if i == 0 {
351 » » » fmt.Printf("\t\"")
352 » » » }
353 » » » c1 := p[nn+i] >> 4
354 » » » r1 := c1 + '0'
355 » » » if c1 >=10 {
356 » » » r1 = c1 - 10 + 'A'
357 » » » }
358 » » » c2 := p[nn+i] % 16
359 » » » r2 := c2 + '0'
360 » » » if c2 >=10 {
361 » » » r2 = c2 - 10 + 'A'
362 » » » }
363 » » » s := [2]byte{byte(r1), byte(r2)}
364 » » » fmt.Printf("\\x%v", string(s[:]))
365 » » » if (i+1) % 20 == 0 {
366 » » » fmt.Printf("\"\n\t\"")
367 » » » }
368 » » » }
369 » » » fmt.Printf("\n")
370 » » » }
371 » » */
372 » » if n > 0 {
373 » » » nn += n
374 » » }
375 » » if nn == len(p) {
376 » » » break
377 » » }
378 » » if err == syscall.EAGAIN {
379 » » » if err = waitWrite(fd); err == nil {
380 » » » » continue
381 » » » }
382 » » }
383 » » if err != nil {
384 » » » n = 0
385 » » » break
386 » » }
387 » » if n == 0 {
388 » » » err = io.ErrUnexpectedEOF
389 » » » break
390 » » }
391 » }
392 » if err != nil {
393 » » err = &OpError{"write", fd.net, fd.raddr, err}
394 » }
395 » fd.decref()
396 » fd.wio.Unlock()
397 » return nn, err
398 }
399
400 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
401 » fd.wio.Lock()
402 » defer fd.wio.Unlock()
403 » if err := fd.incref(false); err != nil {
404 » » return 0, err
405 » }
406 » defer fd.decref()
407 » for {
408 » » err = syscall.Sendto(fd.sysfd, p, 0, sa)
409 » » if err == syscall.EAGAIN {
410 » » » if err = waitWrite(fd); err == nil {
411 » » » » continue
412 » » » }
413 » » }
414 » » break
415 » }
416 » if err == nil {
417 » » n = len(p)
72 } else { 418 } else {
73 » » op = syscall.EPOLL_CTL_ADD 419 » » err = &OpError{"write", fd.net, fd.raddr, err}
74 » } 420 » }
75 » if err := syscall.EpollCtl(p.epfd, op, fd, &p.ctlEvent); err != nil { 421 » return
76 » » return false, os.NewSyscallError("epoll_ctl", err) 422 }
77 » } 423
78 » p.events[fd] = p.ctlEvent.Events 424 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob n int, err error) {
79 » return false, nil 425 » fd.wio.Lock()
80 } 426 » defer fd.wio.Unlock()
81 427 » if err := fd.incref(false); err != nil {
82 func (p *pollster) StopWaiting(fd int, bits uint) { 428 » » return 0, 0, err
83 » // pollServer is locked. 429 » }
84 430 » defer fd.decref()
85 » events, already := p.events[fd] 431 » for {
86 » if !already { 432 » » err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
87 » » // The fd returned by the kernel may have been 433 » » if err == syscall.EAGAIN {
88 » » // cancelled already; return silently. 434 » » » if err = waitWrite(fd); err == nil {
89 » » return 435 » » » » continue
90 » } 436 » » » }
91 437 » » }
92 » // If syscall.EPOLLONESHOT is not set, the wait 438 » » break
93 » // is a repeating wait, so don't change it. 439 » }
94 » if events&syscall.EPOLLONESHOT == 0 { 440 » if err == nil {
95 » » return 441 » » n = len(p)
96 » } 442 » » oobn = len(oob)
97
98 » // Disable the given bits.
99 » // If we're still waiting for other events, modify the fd
100 » // event in the kernel. Otherwise, delete it.
101 » events &= ^uint32(bits)
102 » if int32(events)&^syscall.EPOLLONESHOT != 0 {
103 » » p.ctlEvent.Fd = int32(fd)
104 » » p.ctlEvent.Events = events
105 » » if err := syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_MOD, fd, &p .ctlEvent); err != nil {
106 » » » print("Epoll modify fd=", fd, ": ", err.Error(), "\n")
107 » » }
108 » » p.events[fd] = events
109 } else { 443 } else {
110 » » if err := syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_DEL, fd, ni l); err != nil { 444 » » err = &OpError{"write", fd.net, fd.raddr, err}
111 » » » print("Epoll delete fd=", fd, ": ", err.Error(), "\n") 445 » }
112 » » } 446 » return
113 » » delete(p.events, fd) 447 }
114 » } 448
115 } 449 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e rror) {
116 450 » if err := fd.incref(false); err != nil {
117 func (p *pollster) DelFD(fd int, mode int) { 451 » » return nil, err
118 » // pollServer is locked. 452 » }
119 453 » defer fd.decref()
120 » if mode == 'r' { 454
121 » » p.StopWaiting(fd, readFlags) 455 » // See ../syscall/exec.go for description of ForkLock.
122 » } else { 456 » // It is okay to hold the lock across syscall.Accept
123 » » p.StopWaiting(fd, writeFlags) 457 » // because we have put fd.sysfd into non-blocking mode.
124 » } 458 » var s int
125 459 » var rsa syscall.Sockaddr
126 » // Discard any queued up events. 460 » for {
127 » i := 0 461 » » syscall.ForkLock.RLock()
128 » for i < len(p.waitEvents) { 462 » » s, rsa, err = syscall.Accept(fd.sysfd)
129 » » if fd == int(p.waitEvents[i].Fd) {
130 » » » copy(p.waitEvents[i:], p.waitEvents[i+1:])
131 » » » p.waitEvents = p.waitEvents[:len(p.waitEvents)-1]
132 » » } else {
133 » » » i++
134 » » }
135 » }
136 }
137
138 func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err erro r) {
139 » for len(p.waitEvents) == 0 {
140 » » var msec int = -1
141 » » if nsec > 0 {
142 » » » msec = int((nsec + 1e6 - 1) / 1e6)
143 » » }
144
145 » » s.Unlock()
146 » » n, err := syscall.EpollWait(p.epfd, p.waitEventBuf[0:], msec)
147 » » s.Lock()
148
149 if err != nil { 463 if err != nil {
150 » » » if err == syscall.EAGAIN || err == syscall.EINTR { 464 » » » syscall.ForkLock.RUnlock()
151 » » » » continue 465 » » » if err == syscall.EAGAIN {
152 » » » } 466 » » » » fd.rio.Lock()
153 » » » return -1, 0, os.NewSyscallError("epoll_wait", err) 467 » » » » err = waitRead(fd)
154 » » } 468 » » » » fd.rio.Unlock()
155 » » if n == 0 { 469 » » » » if err == nil {
156 » » » return -1, 0, nil 470 » » » » » continue
157 » » } 471 » » » » }
158 » » p.waitEvents = p.waitEventBuf[0:n] 472 » » » } else if err == syscall.ECONNABORTED {
159 » } 473 » » » » // This means that a socket on the listen queue was closed
160 474 » » » » // before we Accept()ed it; it's a silly error, so try again.
161 » ev := &p.waitEvents[0] 475 » » » » continue
162 » p.waitEvents = p.waitEvents[1:] 476 » » » }
163 477 » » » return nil, &OpError{"accept", fd.net, fd.laddr, err}
164 » fd = int(ev.Fd) 478 » » }
165 479 » » break
166 » if ev.Events&writeFlags != 0 { 480 » }
167 » » p.StopWaiting(fd, writeFlags) 481 » syscall.CloseOnExec(s)
168 » » return fd, 'w', nil 482 » syscall.ForkLock.RUnlock()
169 » } 483
170 » if ev.Events&readFlags != 0 { 484 » if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
171 » » p.StopWaiting(fd, readFlags) 485 » » syscall.Close(s)
172 » » return fd, 'r', nil 486 » » return nil, err
173 » } 487 » }
174 488 » lsa, _ := syscall.Getsockname(netfd.sysfd)
175 » // Other events are error conditions - wake whoever is waiting. 489 » netfd.setAddr(toAddr(lsa), toAddr(rsa))
176 » events, _ := p.events[fd] 490 » return netfd, nil
177 » if events&writeFlags != 0 { 491 }
178 » » p.StopWaiting(fd, writeFlags) 492
179 » » return fd, 'w', nil 493 func (fd *netFD) dup() (f *os.File, err error) {
180 » } 494 » syscall.ForkLock.RLock()
181 » p.StopWaiting(fd, readFlags) 495 » ns, err := syscall.Dup(fd.sysfd)
182 » return fd, 'r', nil 496 » if err != nil {
183 } 497 » » syscall.ForkLock.RUnlock()
184 498 » » return nil, &OpError{"dup", fd.net, fd.laddr, err}
185 func (p *pollster) Close() error { 499 » }
186 » return os.NewSyscallError("close", syscall.Close(p.epfd)) 500 » syscall.CloseOnExec(ns)
187 } 501 » syscall.ForkLock.RUnlock()
502
503 » // We want blocking mode for the new fd, hence the double negative.
504 » if err = syscall.SetNonblock(ns, false); err != nil {
505 » » return nil, &OpError{"setnonblock", fd.net, fd.laddr, err}
506 » }
507
508 » return os.NewFile(uintptr(ns), fd.sysfile.Name()), nil
509 }
510
511 func closesocket(s int) error {
512 » return syscall.Close(s)
513 }
OLDNEW
« no previous file with comments | « no previous file | src/pkg/net/fd_unix.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b