Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(525)

Delta Between Two Patch Sets: src/pkg/net/fd.go

Issue 164057: code review 164057: 'close' race fixed/suggestions from rcs (Closed)
Left Patch Set: code review 164057: 'close' race fixed/suggestions from rcs Created 15 years, 4 months ago
Right Patch Set: code review 164057: 'close' race fixed/suggestions from rcs Created 15 years, 4 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « no previous file | src/pkg/net/sock.go » ('j') | src/pkg/net/tcpsock.go » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 // Copyright 2009 The Go Authors. All rights reserved. 1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 // TODO(rsc): All the prints in this file should go to standard error. 5 // TODO(rsc): All the prints in this file should go to standard error.
6 6
7 package net 7 package net
8 8
9 import ( 9 import (
10 "once"; 10 "once";
11 "os"; 11 "os";
12 "sync"; 12 "sync";
13 "syscall"; 13 "syscall";
14 ) 14 )
15
16 import "fmt" // XXX
17 15
18 // Network file descriptor. 16 // Network file descriptor.
19 type netFD struct { 17 type netFD struct {
20 // locking/lifetime of sysfd 18 // locking/lifetime of sysfd
21 sysmu sync.Mutex; 19 sysmu sync.Mutex;
22 sysref int; 20 sysref int;
23 closing bool; 21 closing bool;
24 22
25 // immutable until Close 23 // immutable until Close
26 sysfd int; 24 sysfd int;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 // If the operations were reversed, there would be a race: the poll 65 // If the operations were reversed, there would be a race: the poll
68 // server might wake up and look at the request channel, see that it 66 // server might wake up and look at the request channel, see that it
69 // was empty, and go back to sleep, all before the requester managed 67 // was empty, and go back to sleep, all before the requester managed
70 // to send the request. Because the send must complete before the wakeup, 68 // to send the request. Because the send must complete before the wakeup,
71 // the request channel must be buffered. A buffer of size 1 is sufficient 69 // the request channel must be buffered. A buffer of size 1 is sufficient
72 // for any request load. If many processes are trying to submit requests, 70 // for any request load. If many processes are trying to submit requests,
73 // one will succeed, the pollServer will read the request, and then the 71 // one will succeed, the pollServer will read the request, and then the
74 // channel will be empty for the next process's request. A larger buffer 72 // channel will be empty for the next process's request. A larger buffer
75 // might help batch requests. 73 // might help batch requests.
76 // 74 //
77 // In order to prevent race conditions, pollServer has an additional cc channel 75 // To avoid races in closing, all fd operations are locked and
78 // that receives fds to be closed. pollServer doesn't make the close system 76 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown
79 // call, it just sets fd.sysfile = nil and fd.fd = -1. Because of this, 77 // and sets a closing flag. Only when the last reference is removed
80 // pollServer is always in sync with the kernel's view of a given descriptor. 78 // will the fd be closed.
81 79
82 type pollServer struct { 80 type pollServer struct {
83 cr, cw chan *netFD; // buffered >= 1 81 cr, cw chan *netFD; // buffered >= 1
84 pr, pw *os.File; 82 pr, pw *os.File;
85 pending map[int]*netFD; 83 pending map[int]*netFD;
86 poll *pollster; // low-level OS hooks 84 poll *pollster; // low-level OS hooks
87 deadline int64; // next deadline (nsec since 1970) 85 deadline int64; // next deadline (nsec since 1970)
88 } 86 }
89 87
90 func newPollServer() (s *pollServer, err os.Error) { 88 func newPollServer() (s *pollServer, err os.Error) {
(...skipping 21 matching lines...) Expand all
112 if err = s.poll.AddFD(s.pr.Fd(), 'r', true); err != nil { 110 if err = s.poll.AddFD(s.pr.Fd(), 'r', true); err != nil {
113 s.poll.Close(); 111 s.poll.Close();
114 goto Error; 112 goto Error;
115 } 113 }
116 s.pending = make(map[int]*netFD); 114 s.pending = make(map[int]*netFD);
117 go s.Run(); 115 go s.Run();
118 return s, nil; 116 return s, nil;
119 } 117 }
120 118
121 func (s *pollServer) AddFD(fd *netFD, mode int) { 119 func (s *pollServer) AddFD(fd *netFD, mode int) {
122 // This check verifies that the underlying file descriptor hasn't been
123 // closed in the mean time. Any time a netFD is closed, the closing
124 // goroutine makes a round trip to the pollServer which sets file = nil
125 // and fd = -1. The goroutine then closes the actual file descriptor.
126 // Thus fd.fd mirrors the kernel's view of the file descriptor.
127
128 // TODO(rsc,agl): There is still a race in Read and Write,
129 // because they optimistically try to use the fd and don't
130 // call into the PollServer unless they get EAGAIN.
131 intfd := fd.sysfd; 120 intfd := fd.sysfd;
132 if intfd < 0 { 121 if intfd < 0 {
133 // fd closed underfoot 122 // fd closed underfoot
134 if mode == 'r' { 123 if mode == 'r' {
135 fd.cr <- fd 124 fd.cr <- fd
136 } else { 125 } else {
137 fd.cw <- fd 126 fd.cw <- fd
138 } 127 }
139 return; 128 return;
140 } 129 }
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
195 return nsec; 184 return nsec;
196 } 185 }
197 186
198 func (s *pollServer) CheckDeadlines() { 187 func (s *pollServer) CheckDeadlines() {
199 now := s.Now(); 188 now := s.Now();
200 // TODO(rsc): This will need to be handled more efficiently, 189 // TODO(rsc): This will need to be handled more efficiently,
201 // probably with a heap indexed by wakeup time. 190 // probably with a heap indexed by wakeup time.
202 191
203 var next_deadline int64; 192 var next_deadline int64;
204 for key, fd := range s.pending { 193 for key, fd := range s.pending {
205 fmt.Printf("XXX in poll server\n");
206 var t int64; 194 var t int64;
207 var mode int; 195 var mode int;
208 if key&1 == 0 { 196 if key&1 == 0 {
209 mode = 'r' 197 mode = 'r'
210 } else { 198 } else {
211 mode = 'w' 199 mode = 'w'
212 } 200 }
213 if mode == 'r' { 201 if mode == 'r' {
214 t = fd.rdeadline 202 t = fd.rdeadline
215 } else { 203 } else {
216 t = fd.wdeadline 204 t = fd.wdeadline
217 } 205 }
218 if t > 0 { 206 if t > 0 {
219 if t <= now { 207 if t <= now {
208 fd.incref();
rsc 2009/12/01 19:58:56 I don't think this needs incref/decref. If the fd
220 s.pending[key] = nil, false; 209 s.pending[key] = nil, false;
221 if mode == 'r' { 210 if mode == 'r' {
222 s.poll.DelFD(fd.sysfd, mode); 211 s.poll.DelFD(fd.sysfd, mode);
223 fd.rdeadline = -1; 212 fd.rdeadline = -1;
224 } else { 213 } else {
225 s.poll.DelFD(fd.sysfd, mode); 214 s.poll.DelFD(fd.sysfd, mode);
226 fd.wdeadline = -1; 215 fd.wdeadline = -1;
227 } 216 }
228 s.WakeFD(fd, mode); 217 s.WakeFD(fd, mode);
218 fd.decref();
229 } else if next_deadline == 0 || t < next_deadline { 219 } else if next_deadline == 0 || t < next_deadline {
230 next_deadline = t 220 next_deadline = t
231 } 221 }
232 } 222 }
233 } 223 }
234 s.deadline = next_deadline; 224 s.deadline = next_deadline;
235 } 225 }
236 226
237 func (s *pollServer) Run() { 227 func (s *pollServer) Run() {
238 var scratch [100]byte; 228 var scratch [100]byte;
(...skipping 14 matching lines...) Expand all
253 if fd < 0 { 243 if fd < 0 {
254 // Timeout happened. 244 // Timeout happened.
255 s.CheckDeadlines(); 245 s.CheckDeadlines();
256 continue; 246 continue;
257 } 247 }
258 if fd == s.pr.Fd() { 248 if fd == s.pr.Fd() {
259 // Drain our wakeup pipe. 249 // Drain our wakeup pipe.
260 for nn, _ := s.pr.Read(&scratch); nn > 0; { 250 for nn, _ := s.pr.Read(&scratch); nn > 0; {
261 nn, _ = s.pr.Read(&scratch) 251 nn, _ = s.pr.Read(&scratch)
262 } 252 }
263
264 // Read from channels 253 // Read from channels
265 for fd, ok := <-s.cr; ok; fd, ok = <-s.cr { 254 for fd, ok := <-s.cr; ok; fd, ok = <-s.cr {
266 s.AddFD(fd, 'r') 255 s.AddFD(fd, 'r')
267 } 256 }
268 for fd, ok := <-s.cw; ok; fd, ok = <-s.cw { 257 for fd, ok := <-s.cw; ok; fd, ok = <-s.cw {
269 s.AddFD(fd, 'w') 258 s.AddFD(fd, 'w')
270 } 259 }
271 } else { 260 } else {
272 netfd := s.LookupFD(fd, mode); 261 netfd := s.LookupFD(fd, mode);
262
273 if netfd == nil { 263 if netfd == nil {
274 print("pollServer: unexpected wakeup for fd=", n etfd, " mode=", string(mode), "\n"); 264 print("pollServer: unexpected wakeup for fd=", n etfd, " mode=", string(mode), "\n");
275 continue; 265 continue;
276 } 266 }
277 s.WakeFD(netfd, mode); 267 s.WakeFD(netfd, mode);
278 } 268 }
279 } 269 }
280 } 270 }
281 271
282 var wakeupbuf [1]byte 272 var wakeupbuf [1]byte
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
327 } 317 }
328 if raddr != nil { 318 if raddr != nil {
329 rs = raddr.String() 319 rs = raddr.String()
330 } 320 }
331 f.sysfile = os.NewFile(fd, net+":"+ls+"->"+rs); 321 f.sysfile = os.NewFile(fd, net+":"+ls+"->"+rs);
332 f.cr = make(chan *netFD, 1); 322 f.cr = make(chan *netFD, 1);
333 f.cw = make(chan *netFD, 1); 323 f.cw = make(chan *netFD, 1);
334 return f, nil; 324 return f, nil;
335 } 325 }
336 326
337 // locksysfd allows the caller to use sysfd. When finished, the 327 // Add a reference to this fd.
338 // caller must call unlocksysfd. 328 func (fd *netFD) incref() {
339 func (fd *netFD) locksysfd() {
340 fd.sysmu.Lock(); 329 fd.sysmu.Lock();
341 fd.sysref++; 330 fd.sysref++;
342 fd.sysmu.Unlock(); 331 fd.sysmu.Unlock();
343 } 332 }
344 333
345 // unlocksysfd releases sysfd. After calling this, the caller should 334 // Remove a reference to this FD and close if we've been asked to do so (and
346 // not refer to fd.sysfd again without calling locksysfd. 335 // there are no references left.
347 func (fd *netFD) unlocksysfd() { 336 func (fd *netFD) decref() {
348 fd.sysmu.Lock(); 337 fd.sysmu.Lock();
349 » if fd.sysref--; fd.sysref == 0 && fd.closing && fd.sysfd >= 0 { 338 » fd.sysref--;
350 339 » if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 {
351 // In case the user has set linger, switch to blocking mode so 340 // In case the user has set linger, switch to blocking mode so
352 // the close blocks. As long as this doesn't happen often, we 341 // the close blocks. As long as this doesn't happen often, we
353 // can handle the extra OS processes. Otherwise we'll need to 342 // can handle the extra OS processes. Otherwise we'll need to
354 // use the pollserver for Close too. Sigh. 343 // use the pollserver for Close too. Sigh.
355 syscall.SetNonblock(fd.sysfd, false); 344 syscall.SetNonblock(fd.sysfd, false);
356 fd.sysfile.Close(); 345 fd.sysfile.Close();
357 » » fd.sysfile = nil;» // XXX needed? 346 » » fd.sysfile = nil;
358 fd.sysfd = -1; 347 fd.sysfd = -1;
359 } 348 }
360 fd.sysmu.Unlock(); 349 fd.sysmu.Unlock();
361 } 350 }
362 351
363 func isEAGAIN(e os.Error) bool { 352 func isEAGAIN(e os.Error) bool {
364 if e1, ok := e.(*os.PathError); ok { 353 if e1, ok := e.(*os.PathError); ok {
365 return e1.Error == os.EAGAIN 354 return e1.Error == os.EAGAIN
366 } 355 }
367 return e == os.EAGAIN; 356 return e == os.EAGAIN;
368 } 357 }
369 358
370 func (fd *netFD) Close() os.Error { 359 func (fd *netFD) Close() os.Error {
371 » fd.locksysfd(); 360 » if fd == nil || fd.sysfile == nil {
361 » » return os.EINVAL
362 » }
363
364 » fd.incref();
365 » syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR);
372 fd.closing = true; 366 fd.closing = true;
373 » fd.unlocksysfd(); 367 » fd.decref();
374 return nil; 368 return nil;
375 } 369 }
376 370
377 func (fd *netFD) Read(p []byte) (n int, err os.Error) { 371 func (fd *netFD) Read(p []byte) (n int, err os.Error) {
378 if fd == nil || fd.sysfile == nil { 372 if fd == nil || fd.sysfile == nil {
379 return 0, os.EINVAL 373 return 0, os.EINVAL
380 } 374 }
381 fd.rio.Lock(); 375 fd.rio.Lock();
382 defer fd.rio.Unlock(); 376 defer fd.rio.Unlock();
383 » fd.locksysfd(); 377 » fd.incref();
384 » defer fd.unlocksysfd(); 378 » defer fd.decref();
385 if fd.rdeadline_delta > 0 { 379 if fd.rdeadline_delta > 0 {
386 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta 380 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
387 } else { 381 } else {
388 fd.rdeadline = 0 382 fd.rdeadline = 0
389 } 383 }
390 for { 384 for {
391 » » n, err = fd.sysfile.Read(p);» // fd.sysfile 385 » » n, err = fd.sysfile.Read(p);
392 if isEAGAIN(err) && fd.rdeadline >= 0 { 386 if isEAGAIN(err) && fd.rdeadline >= 0 {
393 pollserver.WaitRead(fd); 387 pollserver.WaitRead(fd);
394 continue; 388 continue;
395 } 389 }
396 break; 390 break;
397 } 391 }
398 return; 392 return;
399 } 393 }
400 394
401 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { 395 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) {
402 if fd == nil || fd.sysfile == nil { 396 if fd == nil || fd.sysfile == nil {
403 return 0, nil, os.EINVAL 397 return 0, nil, os.EINVAL
404 } 398 }
405 fd.rio.Lock(); 399 fd.rio.Lock();
406 defer fd.rio.Unlock(); 400 defer fd.rio.Unlock();
407 » fd.locksysfd(); 401 » fd.incref();
408 » defer fd.unlocksysfd(); 402 » defer fd.decref();
409 if fd.rdeadline_delta > 0 { 403 if fd.rdeadline_delta > 0 {
410 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta 404 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
411 } else { 405 } else {
412 fd.rdeadline = 0 406 fd.rdeadline = 0
413 } 407 }
414 for { 408 for {
415 var errno int; 409 var errno int;
416 n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0); 410 n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0);
417 if errno == syscall.EAGAIN && fd.rdeadline >= 0 { 411 if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
418 pollserver.WaitRead(fd); 412 pollserver.WaitRead(fd);
419 continue; 413 continue;
420 } 414 }
421 if errno != 0 { 415 if errno != 0 {
422 n = 0; 416 n = 0;
423 err = &os.PathError{"recvfrom", fd.sysfile.Name(), os.Er rno(errno)}; 417 err = &os.PathError{"recvfrom", fd.sysfile.Name(), os.Er rno(errno)};
424 } 418 }
425 break; 419 break;
426 } 420 }
427 return; 421 return;
428 } 422 }
429 423
430 func (fd *netFD) Write(p []byte) (n int, err os.Error) { 424 func (fd *netFD) Write(p []byte) (n int, err os.Error) {
431 if fd == nil || fd.sysfile == nil { 425 if fd == nil || fd.sysfile == nil {
432 return 0, os.EINVAL 426 return 0, os.EINVAL
433 } 427 }
434 fd.wio.Lock(); 428 fd.wio.Lock();
435 defer fd.wio.Unlock(); 429 defer fd.wio.Unlock();
436 » fd.locksysfd(); 430 » fd.incref();
437 » defer fd.unlocksysfd(); 431 » defer fd.decref();
438 if fd.wdeadline_delta > 0 { 432 if fd.wdeadline_delta > 0 {
439 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta 433 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
440 } else { 434 } else {
441 fd.wdeadline = 0 435 fd.wdeadline = 0
442 } 436 }
443 err = nil; 437 err = nil;
444 nn := 0; 438 nn := 0;
445 for nn < len(p) { 439 for nn < len(p) {
446 n, err = fd.sysfile.Write(p[nn:]); 440 n, err = fd.sysfile.Write(p[nn:]);
447 if n > 0 { 441 if n > 0 {
(...skipping 12 matching lines...) Expand all
460 } 454 }
461 return nn, err; 455 return nn, err;
462 } 456 }
463 457
464 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { 458 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) {
465 if fd == nil || fd.sysfile == nil { 459 if fd == nil || fd.sysfile == nil {
466 return 0, os.EINVAL 460 return 0, os.EINVAL
467 } 461 }
468 fd.wio.Lock(); 462 fd.wio.Lock();
469 defer fd.wio.Unlock(); 463 defer fd.wio.Unlock();
470 » fd.locksysfd(); 464 » fd.incref();
471 » defer fd.unlocksysfd(); 465 » defer fd.decref();
472 if fd.wdeadline_delta > 0 { 466 if fd.wdeadline_delta > 0 {
473 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta 467 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
474 } else { 468 } else {
475 fd.wdeadline = 0 469 fd.wdeadline = 0
476 } 470 }
477 err = nil; 471 err = nil;
478 for { 472 for {
479 errno := syscall.Sendto(fd.sysfd, p, 0, sa); 473 errno := syscall.Sendto(fd.sysfd, p, 0, sa);
480 if errno == syscall.EAGAIN && fd.wdeadline >= 0 { 474 if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
481 pollserver.WaitWrite(fd); 475 pollserver.WaitWrite(fd);
(...skipping 14 matching lines...) Expand all
496 if fd == nil || fd.sysfile == nil { 490 if fd == nil || fd.sysfile == nil {
497 return nil, os.EINVAL 491 return nil, os.EINVAL
498 } 492 }
499 493
500 // See ../syscall/exec.go for description of ForkLock. 494 // See ../syscall/exec.go for description of ForkLock.
501 // It is okay to hold the lock across syscall.Accept 495 // It is okay to hold the lock across syscall.Accept
502 // because we have put fd.sysfd into non-blocking mode. 496 // because we have put fd.sysfd into non-blocking mode.
503 syscall.ForkLock.RLock(); 497 syscall.ForkLock.RLock();
504 var s, e int; 498 var s, e int;
505 var sa syscall.Sockaddr; 499 var sa syscall.Sockaddr;
500 fd.incref();
rsc 2009/12/01 19:58:56 Please move these two lines up above the comment
501 defer fd.decref();
506 for { 502 for {
507 fd.locksysfd();
508 s, sa, e = syscall.Accept(fd.sysfd); 503 s, sa, e = syscall.Accept(fd.sysfd);
509 fd.unlocksysfd();
510 if e != syscall.EAGAIN { 504 if e != syscall.EAGAIN {
511 break 505 break
512 } 506 }
513 syscall.ForkLock.RUnlock(); 507 syscall.ForkLock.RUnlock();
514 pollserver.WaitRead(fd); 508 pollserver.WaitRead(fd);
515 syscall.ForkLock.RLock(); 509 syscall.ForkLock.RLock();
516 } 510 }
517 if e != 0 { 511 if e != 0 {
518 syscall.ForkLock.RUnlock(); 512 syscall.ForkLock.RUnlock();
519 return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)}; 513 return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)};
520 } 514 }
521 syscall.CloseOnExec(s); 515 syscall.CloseOnExec(s);
522 syscall.ForkLock.RUnlock(); 516 syscall.ForkLock.RUnlock();
523 517
524 if nfd, err = newFD(s, fd.family, fd.proto, fd.net, fd.laddr, toAddr(sa) ); err != nil { 518 if nfd, err = newFD(s, fd.family, fd.proto, fd.net, fd.laddr, toAddr(sa) ); err != nil {
525 syscall.Close(s); 519 syscall.Close(s);
526 return nil, err; 520 return nil, err;
527 } 521 }
528 return nfd, nil; 522 return nfd, nil;
529 } 523 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b