Left: | ||
Right: |
LEFT | RIGHT |
---|---|
1 // Copyright 2009 The Go Authors. All rights reserved. | 1 // Copyright 2009 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 // TODO(rsc): All the prints in this file should go to standard error. | 5 // TODO(rsc): All the prints in this file should go to standard error. |
6 | 6 |
7 package net | 7 package net |
8 | 8 |
9 import ( | 9 import ( |
10 "once"; | 10 "once"; |
11 "os"; | 11 "os"; |
12 "sync"; | 12 "sync"; |
13 "syscall"; | 13 "syscall"; |
14 ) | 14 ) |
15 | |
16 import "fmt" // XXX | |
17 | 15 |
18 // Network file descriptor. | 16 // Network file descriptor. |
19 type netFD struct { | 17 type netFD struct { |
20 // locking/lifetime of sysfd | 18 // locking/lifetime of sysfd |
21 sysmu sync.Mutex; | 19 sysmu sync.Mutex; |
22 sysref int; | 20 sysref int; |
23 closing bool; | 21 closing bool; |
24 | 22 |
25 // immutable until Close | 23 // immutable until Close |
26 sysfd int; | 24 sysfd int; |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
67 // If the operations were reversed, there would be a race: the poll | 65 // If the operations were reversed, there would be a race: the poll |
68 // server might wake up and look at the request channel, see that it | 66 // server might wake up and look at the request channel, see that it |
69 // was empty, and go back to sleep, all before the requester managed | 67 // was empty, and go back to sleep, all before the requester managed |
70 // to send the request. Because the send must complete before the wakeup, | 68 // to send the request. Because the send must complete before the wakeup, |
71 // the request channel must be buffered. A buffer of size 1 is sufficient | 69 // the request channel must be buffered. A buffer of size 1 is sufficient |
72 // for any request load. If many processes are trying to submit requests, | 70 // for any request load. If many processes are trying to submit requests, |
73 // one will succeed, the pollServer will read the request, and then the | 71 // one will succeed, the pollServer will read the request, and then the |
74 // channel will be empty for the next process's request. A larger buffer | 72 // channel will be empty for the next process's request. A larger buffer |
75 // might help batch requests. | 73 // might help batch requests. |
76 // | 74 // |
77 // In order to prevent race conditions, pollServer has an additional cc channel | 75 // To avoid races in closing, all fd operations are locked and |
78 // that receives fds to be closed. pollServer doesn't make the close system | 76 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown |
79 // call, it just sets fd.sysfile = nil and fd.fd = -1. Because of this, | 77 // and sets a closing flag. Only when the last reference is removed |
80 // pollServer is always in sync with the kernel's view of a given descriptor. | 78 // will the fd be closed. |
81 | 79 |
82 type pollServer struct { | 80 type pollServer struct { |
83 cr, cw chan *netFD; // buffered >= 1 | 81 cr, cw chan *netFD; // buffered >= 1 |
84 pr, pw *os.File; | 82 pr, pw *os.File; |
85 pending map[int]*netFD; | 83 pending map[int]*netFD; |
86 poll *pollster; // low-level OS hooks | 84 poll *pollster; // low-level OS hooks |
87 deadline int64; // next deadline (nsec since 1970) | 85 deadline int64; // next deadline (nsec since 1970) |
88 } | 86 } |
89 | 87 |
90 func newPollServer() (s *pollServer, err os.Error) { | 88 func newPollServer() (s *pollServer, err os.Error) { |
(...skipping 21 matching lines...) Expand all Loading... | |
112 if err = s.poll.AddFD(s.pr.Fd(), 'r', true); err != nil { | 110 if err = s.poll.AddFD(s.pr.Fd(), 'r', true); err != nil { |
113 s.poll.Close(); | 111 s.poll.Close(); |
114 goto Error; | 112 goto Error; |
115 } | 113 } |
116 s.pending = make(map[int]*netFD); | 114 s.pending = make(map[int]*netFD); |
117 go s.Run(); | 115 go s.Run(); |
118 return s, nil; | 116 return s, nil; |
119 } | 117 } |
120 | 118 |
121 func (s *pollServer) AddFD(fd *netFD, mode int) { | 119 func (s *pollServer) AddFD(fd *netFD, mode int) { |
122 // This check verifies that the underlying file descriptor hasn't been | |
123 // closed in the mean time. Any time a netFD is closed, the closing | |
124 // goroutine makes a round trip to the pollServer which sets file = nil | |
125 // and fd = -1. The goroutine then closes the actual file descriptor. | |
126 // Thus fd.fd mirrors the kernel's view of the file descriptor. | |
127 | |
128 // TODO(rsc,agl): There is still a race in Read and Write, | |
129 // because they optimistically try to use the fd and don't | |
130 // call into the PollServer unless they get EAGAIN. | |
131 intfd := fd.sysfd; | 120 intfd := fd.sysfd; |
132 if intfd < 0 { | 121 if intfd < 0 { |
133 // fd closed underfoot | 122 // fd closed underfoot |
134 if mode == 'r' { | 123 if mode == 'r' { |
135 fd.cr <- fd | 124 fd.cr <- fd |
136 } else { | 125 } else { |
137 fd.cw <- fd | 126 fd.cw <- fd |
138 } | 127 } |
139 return; | 128 return; |
140 } | 129 } |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
195 return nsec; | 184 return nsec; |
196 } | 185 } |
197 | 186 |
198 func (s *pollServer) CheckDeadlines() { | 187 func (s *pollServer) CheckDeadlines() { |
199 now := s.Now(); | 188 now := s.Now(); |
200 // TODO(rsc): This will need to be handled more efficiently, | 189 // TODO(rsc): This will need to be handled more efficiently, |
201 // probably with a heap indexed by wakeup time. | 190 // probably with a heap indexed by wakeup time. |
202 | 191 |
203 var next_deadline int64; | 192 var next_deadline int64; |
204 for key, fd := range s.pending { | 193 for key, fd := range s.pending { |
205 fmt.Printf("XXX in poll server\n"); | |
206 var t int64; | 194 var t int64; |
207 var mode int; | 195 var mode int; |
208 if key&1 == 0 { | 196 if key&1 == 0 { |
209 mode = 'r' | 197 mode = 'r' |
210 } else { | 198 } else { |
211 mode = 'w' | 199 mode = 'w' |
212 } | 200 } |
213 if mode == 'r' { | 201 if mode == 'r' { |
214 t = fd.rdeadline | 202 t = fd.rdeadline |
215 } else { | 203 } else { |
216 t = fd.wdeadline | 204 t = fd.wdeadline |
217 } | 205 } |
218 if t > 0 { | 206 if t > 0 { |
219 if t <= now { | 207 if t <= now { |
208 fd.incref(); | |
rsc
2009/12/01 19:58:56
I don't think this needs incref/decref.
If the fd
| |
220 s.pending[key] = nil, false; | 209 s.pending[key] = nil, false; |
221 if mode == 'r' { | 210 if mode == 'r' { |
222 s.poll.DelFD(fd.sysfd, mode); | 211 s.poll.DelFD(fd.sysfd, mode); |
223 fd.rdeadline = -1; | 212 fd.rdeadline = -1; |
224 } else { | 213 } else { |
225 s.poll.DelFD(fd.sysfd, mode); | 214 s.poll.DelFD(fd.sysfd, mode); |
226 fd.wdeadline = -1; | 215 fd.wdeadline = -1; |
227 } | 216 } |
228 s.WakeFD(fd, mode); | 217 s.WakeFD(fd, mode); |
218 fd.decref(); | |
229 } else if next_deadline == 0 || t < next_deadline { | 219 } else if next_deadline == 0 || t < next_deadline { |
230 next_deadline = t | 220 next_deadline = t |
231 } | 221 } |
232 } | 222 } |
233 } | 223 } |
234 s.deadline = next_deadline; | 224 s.deadline = next_deadline; |
235 } | 225 } |
236 | 226 |
237 func (s *pollServer) Run() { | 227 func (s *pollServer) Run() { |
238 var scratch [100]byte; | 228 var scratch [100]byte; |
(...skipping 14 matching lines...) Expand all Loading... | |
253 if fd < 0 { | 243 if fd < 0 { |
254 // Timeout happened. | 244 // Timeout happened. |
255 s.CheckDeadlines(); | 245 s.CheckDeadlines(); |
256 continue; | 246 continue; |
257 } | 247 } |
258 if fd == s.pr.Fd() { | 248 if fd == s.pr.Fd() { |
259 // Drain our wakeup pipe. | 249 // Drain our wakeup pipe. |
260 for nn, _ := s.pr.Read(&scratch); nn > 0; { | 250 for nn, _ := s.pr.Read(&scratch); nn > 0; { |
261 nn, _ = s.pr.Read(&scratch) | 251 nn, _ = s.pr.Read(&scratch) |
262 } | 252 } |
263 | |
264 // Read from channels | 253 // Read from channels |
265 for fd, ok := <-s.cr; ok; fd, ok = <-s.cr { | 254 for fd, ok := <-s.cr; ok; fd, ok = <-s.cr { |
266 s.AddFD(fd, 'r') | 255 s.AddFD(fd, 'r') |
267 } | 256 } |
268 for fd, ok := <-s.cw; ok; fd, ok = <-s.cw { | 257 for fd, ok := <-s.cw; ok; fd, ok = <-s.cw { |
269 s.AddFD(fd, 'w') | 258 s.AddFD(fd, 'w') |
270 } | 259 } |
271 } else { | 260 } else { |
272 netfd := s.LookupFD(fd, mode); | 261 netfd := s.LookupFD(fd, mode); |
262 | |
273 if netfd == nil { | 263 if netfd == nil { |
274 print("pollServer: unexpected wakeup for fd=", n etfd, " mode=", string(mode), "\n"); | 264 print("pollServer: unexpected wakeup for fd=", n etfd, " mode=", string(mode), "\n"); |
275 continue; | 265 continue; |
276 } | 266 } |
277 s.WakeFD(netfd, mode); | 267 s.WakeFD(netfd, mode); |
278 } | 268 } |
279 } | 269 } |
280 } | 270 } |
281 | 271 |
282 var wakeupbuf [1]byte | 272 var wakeupbuf [1]byte |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
327 } | 317 } |
328 if raddr != nil { | 318 if raddr != nil { |
329 rs = raddr.String() | 319 rs = raddr.String() |
330 } | 320 } |
331 f.sysfile = os.NewFile(fd, net+":"+ls+"->"+rs); | 321 f.sysfile = os.NewFile(fd, net+":"+ls+"->"+rs); |
332 f.cr = make(chan *netFD, 1); | 322 f.cr = make(chan *netFD, 1); |
333 f.cw = make(chan *netFD, 1); | 323 f.cw = make(chan *netFD, 1); |
334 return f, nil; | 324 return f, nil; |
335 } | 325 } |
336 | 326 |
337 // locksysfd allows the caller to use sysfd. When finished, the | 327 // Add a reference to this fd. |
338 // caller must call unlocksysfd. | 328 func (fd *netFD) incref() { |
339 func (fd *netFD) locksysfd() { | |
340 fd.sysmu.Lock(); | 329 fd.sysmu.Lock(); |
341 fd.sysref++; | 330 fd.sysref++; |
342 fd.sysmu.Unlock(); | 331 fd.sysmu.Unlock(); |
343 } | 332 } |
344 | 333 |
345 // unlocksysfd releases sysfd. After calling this, the caller should | 334 // Remove a reference to this FD and close if we've been asked to do so (and |
346 // not refer to fd.sysfd again without calling locksysfd. | 335 // there are no references left. |
347 func (fd *netFD) unlocksysfd() { | 336 func (fd *netFD) decref() { |
348 fd.sysmu.Lock(); | 337 fd.sysmu.Lock(); |
349 » if fd.sysref--; fd.sysref == 0 && fd.closing && fd.sysfd >= 0 { | 338 » fd.sysref--; |
350 | 339 » if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 { |
351 // In case the user has set linger, switch to blocking mode so | 340 // In case the user has set linger, switch to blocking mode so |
352 // the close blocks. As long as this doesn't happen often, we | 341 // the close blocks. As long as this doesn't happen often, we |
353 // can handle the extra OS processes. Otherwise we'll need to | 342 // can handle the extra OS processes. Otherwise we'll need to |
354 // use the pollserver for Close too. Sigh. | 343 // use the pollserver for Close too. Sigh. |
355 syscall.SetNonblock(fd.sysfd, false); | 344 syscall.SetNonblock(fd.sysfd, false); |
356 fd.sysfile.Close(); | 345 fd.sysfile.Close(); |
357 » » fd.sysfile = nil;» // XXX needed? | 346 » » fd.sysfile = nil; |
358 fd.sysfd = -1; | 347 fd.sysfd = -1; |
359 } | 348 } |
360 fd.sysmu.Unlock(); | 349 fd.sysmu.Unlock(); |
361 } | 350 } |
362 | 351 |
363 func isEAGAIN(e os.Error) bool { | 352 func isEAGAIN(e os.Error) bool { |
364 if e1, ok := e.(*os.PathError); ok { | 353 if e1, ok := e.(*os.PathError); ok { |
365 return e1.Error == os.EAGAIN | 354 return e1.Error == os.EAGAIN |
366 } | 355 } |
367 return e == os.EAGAIN; | 356 return e == os.EAGAIN; |
368 } | 357 } |
369 | 358 |
370 func (fd *netFD) Close() os.Error { | 359 func (fd *netFD) Close() os.Error { |
371 » fd.locksysfd(); | 360 » if fd == nil || fd.sysfile == nil { |
361 » » return os.EINVAL | |
362 » } | |
363 | |
364 » fd.incref(); | |
365 » syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR); | |
372 fd.closing = true; | 366 fd.closing = true; |
373 » fd.unlocksysfd(); | 367 » fd.decref(); |
374 return nil; | 368 return nil; |
375 } | 369 } |
376 | 370 |
377 func (fd *netFD) Read(p []byte) (n int, err os.Error) { | 371 func (fd *netFD) Read(p []byte) (n int, err os.Error) { |
378 if fd == nil || fd.sysfile == nil { | 372 if fd == nil || fd.sysfile == nil { |
379 return 0, os.EINVAL | 373 return 0, os.EINVAL |
380 } | 374 } |
381 fd.rio.Lock(); | 375 fd.rio.Lock(); |
382 defer fd.rio.Unlock(); | 376 defer fd.rio.Unlock(); |
383 » fd.locksysfd(); | 377 » fd.incref(); |
384 » defer fd.unlocksysfd(); | 378 » defer fd.decref(); |
385 if fd.rdeadline_delta > 0 { | 379 if fd.rdeadline_delta > 0 { |
386 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta | 380 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta |
387 } else { | 381 } else { |
388 fd.rdeadline = 0 | 382 fd.rdeadline = 0 |
389 } | 383 } |
390 for { | 384 for { |
391 » » n, err = fd.sysfile.Read(p);» // fd.sysfile | 385 » » n, err = fd.sysfile.Read(p); |
392 if isEAGAIN(err) && fd.rdeadline >= 0 { | 386 if isEAGAIN(err) && fd.rdeadline >= 0 { |
393 pollserver.WaitRead(fd); | 387 pollserver.WaitRead(fd); |
394 continue; | 388 continue; |
395 } | 389 } |
396 break; | 390 break; |
397 } | 391 } |
398 return; | 392 return; |
399 } | 393 } |
400 | 394 |
401 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { | 395 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { |
402 if fd == nil || fd.sysfile == nil { | 396 if fd == nil || fd.sysfile == nil { |
403 return 0, nil, os.EINVAL | 397 return 0, nil, os.EINVAL |
404 } | 398 } |
405 fd.rio.Lock(); | 399 fd.rio.Lock(); |
406 defer fd.rio.Unlock(); | 400 defer fd.rio.Unlock(); |
407 » fd.locksysfd(); | 401 » fd.incref(); |
408 » defer fd.unlocksysfd(); | 402 » defer fd.decref(); |
409 if fd.rdeadline_delta > 0 { | 403 if fd.rdeadline_delta > 0 { |
410 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta | 404 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta |
411 } else { | 405 } else { |
412 fd.rdeadline = 0 | 406 fd.rdeadline = 0 |
413 } | 407 } |
414 for { | 408 for { |
415 var errno int; | 409 var errno int; |
416 n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0); | 410 n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0); |
417 if errno == syscall.EAGAIN && fd.rdeadline >= 0 { | 411 if errno == syscall.EAGAIN && fd.rdeadline >= 0 { |
418 pollserver.WaitRead(fd); | 412 pollserver.WaitRead(fd); |
419 continue; | 413 continue; |
420 } | 414 } |
421 if errno != 0 { | 415 if errno != 0 { |
422 n = 0; | 416 n = 0; |
423 err = &os.PathError{"recvfrom", fd.sysfile.Name(), os.Er rno(errno)}; | 417 err = &os.PathError{"recvfrom", fd.sysfile.Name(), os.Er rno(errno)}; |
424 } | 418 } |
425 break; | 419 break; |
426 } | 420 } |
427 return; | 421 return; |
428 } | 422 } |
429 | 423 |
430 func (fd *netFD) Write(p []byte) (n int, err os.Error) { | 424 func (fd *netFD) Write(p []byte) (n int, err os.Error) { |
431 if fd == nil || fd.sysfile == nil { | 425 if fd == nil || fd.sysfile == nil { |
432 return 0, os.EINVAL | 426 return 0, os.EINVAL |
433 } | 427 } |
434 fd.wio.Lock(); | 428 fd.wio.Lock(); |
435 defer fd.wio.Unlock(); | 429 defer fd.wio.Unlock(); |
436 » fd.locksysfd(); | 430 » fd.incref(); |
437 » defer fd.unlocksysfd(); | 431 » defer fd.decref(); |
438 if fd.wdeadline_delta > 0 { | 432 if fd.wdeadline_delta > 0 { |
439 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta | 433 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta |
440 } else { | 434 } else { |
441 fd.wdeadline = 0 | 435 fd.wdeadline = 0 |
442 } | 436 } |
443 err = nil; | 437 err = nil; |
444 nn := 0; | 438 nn := 0; |
445 for nn < len(p) { | 439 for nn < len(p) { |
446 n, err = fd.sysfile.Write(p[nn:]); | 440 n, err = fd.sysfile.Write(p[nn:]); |
447 if n > 0 { | 441 if n > 0 { |
(...skipping 12 matching lines...) Expand all Loading... | |
460 } | 454 } |
461 return nn, err; | 455 return nn, err; |
462 } | 456 } |
463 | 457 |
464 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { | 458 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { |
465 if fd == nil || fd.sysfile == nil { | 459 if fd == nil || fd.sysfile == nil { |
466 return 0, os.EINVAL | 460 return 0, os.EINVAL |
467 } | 461 } |
468 fd.wio.Lock(); | 462 fd.wio.Lock(); |
469 defer fd.wio.Unlock(); | 463 defer fd.wio.Unlock(); |
470 » fd.locksysfd(); | 464 » fd.incref(); |
471 » defer fd.unlocksysfd(); | 465 » defer fd.decref(); |
472 if fd.wdeadline_delta > 0 { | 466 if fd.wdeadline_delta > 0 { |
473 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta | 467 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta |
474 } else { | 468 } else { |
475 fd.wdeadline = 0 | 469 fd.wdeadline = 0 |
476 } | 470 } |
477 err = nil; | 471 err = nil; |
478 for { | 472 for { |
479 errno := syscall.Sendto(fd.sysfd, p, 0, sa); | 473 errno := syscall.Sendto(fd.sysfd, p, 0, sa); |
480 if errno == syscall.EAGAIN && fd.wdeadline >= 0 { | 474 if errno == syscall.EAGAIN && fd.wdeadline >= 0 { |
481 pollserver.WaitWrite(fd); | 475 pollserver.WaitWrite(fd); |
(...skipping 14 matching lines...) Expand all Loading... | |
496 if fd == nil || fd.sysfile == nil { | 490 if fd == nil || fd.sysfile == nil { |
497 return nil, os.EINVAL | 491 return nil, os.EINVAL |
498 } | 492 } |
499 | 493 |
500 // See ../syscall/exec.go for description of ForkLock. | 494 // See ../syscall/exec.go for description of ForkLock. |
501 // It is okay to hold the lock across syscall.Accept | 495 // It is okay to hold the lock across syscall.Accept |
502 // because we have put fd.sysfd into non-blocking mode. | 496 // because we have put fd.sysfd into non-blocking mode. |
503 syscall.ForkLock.RLock(); | 497 syscall.ForkLock.RLock(); |
504 var s, e int; | 498 var s, e int; |
505 var sa syscall.Sockaddr; | 499 var sa syscall.Sockaddr; |
500 fd.incref(); | |
rsc
2009/12/01 19:58:56
Please move these two lines up above the comment
| |
501 defer fd.decref(); | |
506 for { | 502 for { |
507 fd.locksysfd(); | |
508 s, sa, e = syscall.Accept(fd.sysfd); | 503 s, sa, e = syscall.Accept(fd.sysfd); |
509 fd.unlocksysfd(); | |
510 if e != syscall.EAGAIN { | 504 if e != syscall.EAGAIN { |
511 break | 505 break |
512 } | 506 } |
513 syscall.ForkLock.RUnlock(); | 507 syscall.ForkLock.RUnlock(); |
514 pollserver.WaitRead(fd); | 508 pollserver.WaitRead(fd); |
515 syscall.ForkLock.RLock(); | 509 syscall.ForkLock.RLock(); |
516 } | 510 } |
517 if e != 0 { | 511 if e != 0 { |
518 syscall.ForkLock.RUnlock(); | 512 syscall.ForkLock.RUnlock(); |
519 return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)}; | 513 return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)}; |
520 } | 514 } |
521 syscall.CloseOnExec(s); | 515 syscall.CloseOnExec(s); |
522 syscall.ForkLock.RUnlock(); | 516 syscall.ForkLock.RUnlock(); |
523 | 517 |
524 if nfd, err = newFD(s, fd.family, fd.proto, fd.net, fd.laddr, toAddr(sa) ); err != nil { | 518 if nfd, err = newFD(s, fd.family, fd.proto, fd.net, fd.laddr, toAddr(sa) ); err != nil { |
525 syscall.Close(s); | 519 syscall.Close(s); |
526 return nil, err; | 520 return nil, err; |
527 } | 521 } |
528 return nfd, nil; | 522 return nfd, nil; |
529 } | 523 } |
LEFT | RIGHT |