Index: src/pkg/time/sleep.go |
=================================================================== |
--- a/src/pkg/time/sleep.go |
+++ b/src/pkg/time/sleep.go |
@@ -11,20 +11,40 @@ |
"container/heap" |
) |
-// The event type represents a single After or AfterFunc event. |
-type event struct { |
- t int64 // The absolute time that the event should fire. |
- f func(int64) // The function to call when the event fires. |
- sleeping bool // A sleeper is sleeping for this event. |
+// The Timer type represents a single event. |
+// When the Timer expires, the current time will be sent on C |
+// unless the Timer represents an AfterFunc event. |
+type Timer struct { |
+ C <-chan int64 |
+ t int64 // The absolute time that the event should fire. |
+ f func(int64) // The function to call when the event fires. |
+ i int // The event's index inside eventHeap. |
} |
-type eventHeap []*event |
+type timerHeap []*Timer |
-var events eventHeap |
-var eventMutex sync.Mutex |
+// forever is the absolute time (in ns) of an event that is forever away. |
+const forever = 1 << 62 |
+ |
+// maxSleepTime is the maximum length of time that a sleeper |
+// sleeps for before checking if it is defunct. |
+const maxSleepTime = 1e9 |
+ |
+var ( |
+ // timerMutex guards the variables inside this var group. |
+ timerMutex sync.Mutex |
+ |
+ // timers holds a binary heap of pending events, terminated with a sentinel. |
+ timers timerHeap |
+ |
+ // currentSleeper is an ever-incrementing counter which represents |
+ // the current sleeper. It allows older sleepers to detect that they are |
+ // defunct and exit. |
+ currentSleeper int64 |
+) |
func init() { |
- events.Push(&event{1 << 62, nil, true}) // sentinel |
+ timers.Push(&Timer{t: forever}) // sentinel |
} |
// Sleep pauses the current goroutine for at least ns nanoseconds. |
@@ -51,101 +71,133 @@ |
return t, nil |
} |
+// NewTimer creates a new Timer that will send |
+// the current time on its channel after at least ns nanoseconds. |
+func NewTimer(ns int64) *Timer { |
+ c := make(chan int64, 1) |
+ e := after(ns, func(t int64) { c <- t }) |
+ e.C = c |
+ return e |
+} |
+ |
// After waits at least ns nanoseconds before sending the current time |
// on the returned channel. |
+// It is equivalent to NewTimer(ns).C. |
func After(ns int64) <-chan int64 { |
- c := make(chan int64, 1) |
- after(ns, func(t int64) { c <- t }) |
- return c |
+ return NewTimer(ns).C |
} |
// AfterFunc waits at least ns nanoseconds before calling f |
-// in its own goroutine. |
-func AfterFunc(ns int64, f func()) { |
- after(ns, func(_ int64) { |
+// in its own goroutine. It returns a Timer that can |
+// be used to cancel the call using its Stop method. |
+func AfterFunc(ns int64, f func()) *Timer { |
+ return after(ns, func(_ int64) { |
go f() |
}) |
} |
+// Stop prevents the Timer from firing. |
+// It returns true if the call stops the timer, false if the timer has already |
+// expired or stopped. |
+func (e *Timer) Stop() (ok bool) { |
+ timerMutex.Lock() |
+ // Avoid removing the first event in the queue so that |
+ // we don't start a new sleeper unnecessarily. |
+ if e.i > 0 { |
+ heap.Remove(timers, e.i) |
+ } |
+ ok = e.f != nil |
+ e.f = nil |
+ timerMutex.Unlock() |
+ return |
+} |
+ |
// after is the implementation of After and AfterFunc. |
// When the current time is after ns, it calls f with the current time. |
// It assumes that f will not block. |
-func after(ns int64, f func(int64)) { |
+func after(ns int64, f func(int64)) (e *Timer) { |
+ now := Nanoseconds() |
t := Nanoseconds() + ns |
- eventMutex.Lock() |
- t0 := events[0].t |
- heap.Push(events, &event{t, f, false}) |
- if t < t0 { |
- go sleeper() |
+ if ns > 0 && t < now { |
+ panic("time: time overflow") |
} |
- eventMutex.Unlock() |
+ timerMutex.Lock() |
+ t0 := timers[0].t |
+ e = &Timer{nil, t, f, -1} |
+ heap.Push(timers, e) |
+ // Start a new sleeper if the new event is before |
+ // the first event in the queue. If the length of time |
+ // until the new event is at least maxSleepTime, |
+ // then we're guaranteed that the sleeper will wake up |
+ // in time to service it, so no new sleeper is needed. |
+ if t0 > t && (t0 == forever || ns < maxSleepTime) { |
+ currentSleeper++ |
+ go sleeper(currentSleeper) |
+ } |
+ timerMutex.Unlock() |
+ return |
} |
-// sleeper continually looks at the earliest event in the queue, marks it |
-// as sleeping, waits until it happens, then removes any events |
-// in the queue that are due. It stops when it finds an event that is |
-// already marked as sleeping. When an event is inserted before the first item, |
-// a new sleeper is started. |
-// |
-// Scheduling vagaries mean that sleepers may not wake up in |
-// exactly the order of the events that they are waiting for, |
-// but this does not matter as long as there are at least as |
-// many sleepers as events marked sleeping (invariant). This ensures that |
-// there is always a sleeper to service the remaining events. |
-// |
-// A sleeper will remove at least the event it has been waiting for |
-// unless the event has already been removed by another sleeper. Both |
-// cases preserve the invariant described above. |
-func sleeper() { |
- eventMutex.Lock() |
- e := events[0] |
- for !e.sleeping { |
- t := Nanoseconds() |
+// sleeper continually looks at the earliest event in the queue, waits until it happens, |
+// then removes any events in the queue that are due. It stops when the queue |
+// is empty or when another sleeper has been started. |
+func sleeper(sleeperId int64) { |
+ timerMutex.Lock() |
+ e := timers[0] |
+ t := Nanoseconds() |
+ for e.t != forever { |
if dt := e.t - t; dt > 0 { |
- e.sleeping = true |
- eventMutex.Unlock() |
- if nt, err := sleep(t, dt); err != nil { |
- // If sleep has encountered an error, |
- // there's not much we can do. We pretend |
- // that time really has advanced by the required |
- // amount and lie to the rest of the system. |
- t = e.t |
- } else { |
- t = nt |
+ if dt > maxSleepTime { |
+ dt = maxSleepTime |
} |
- eventMutex.Lock() |
- e = events[0] |
+ timerMutex.Unlock() |
+ syscall.Sleep(dt) |
+ timerMutex.Lock() |
+ if currentSleeper != sleeperId { |
+ // Another sleeper has been started, making this one redundant. |
+ break |
+ } |
} |
+ e = timers[0] |
+ t = Nanoseconds() |
for t >= e.t { |
- e.f(t) |
- heap.Pop(events) |
- e = events[0] |
+ if e.f != nil { |
+ e.f(t) |
+ e.f = nil |
+ } |
+ heap.Pop(timers) |
+ e = timers[0] |
} |
} |
- eventMutex.Unlock() |
+ timerMutex.Unlock() |
} |
-func (eventHeap) Len() int { |
- return len(events) |
+func (timerHeap) Len() int { |
+ return len(timers) |
} |
-func (eventHeap) Less(i, j int) bool { |
- return events[i].t < events[j].t |
+func (timerHeap) Less(i, j int) bool { |
+ return timers[i].t < timers[j].t |
} |
-func (eventHeap) Swap(i, j int) { |
- events[i], events[j] = events[j], events[i] |
+func (timerHeap) Swap(i, j int) { |
+ timers[i], timers[j] = timers[j], timers[i] |
+ timers[i].i = i |
+ timers[j].i = j |
} |
-func (eventHeap) Push(x interface{}) { |
- events = append(events, x.(*event)) |
+func (timerHeap) Push(x interface{}) { |
+ e := x.(*Timer) |
+ e.i = len(timers) |
+ timers = append(timers, e) |
} |
-func (eventHeap) Pop() interface{} { |
+func (timerHeap) Pop() interface{} { |
// TODO: possibly shrink array. |
- n := len(events) - 1 |
- e := events[n] |
- events[n] = nil |
- events = events[0:n] |
+ n := len(timers) - 1 |
+ e := timers[n] |
+ timers[n] = nil |
+ timers = timers[0:n] |
+ e.i = -1 |
return e |
} |