Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(317)

Unified Diff: src/pkg/time/sleep.go

Issue 4063043: code review 4063043: time: allow cancelling of After events. (Closed)
Patch Set: code review 4063043: time: allow cancelling of After events. Created 13 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | src/pkg/time/sleep_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: src/pkg/time/sleep.go
===================================================================
--- a/src/pkg/time/sleep.go
+++ b/src/pkg/time/sleep.go
@@ -11,20 +11,40 @@
"container/heap"
)
-// The event type represents a single After or AfterFunc event.
-type event struct {
- t int64 // The absolute time that the event should fire.
- f func(int64) // The function to call when the event fires.
- sleeping bool // A sleeper is sleeping for this event.
+// The Timer type represents a single event.
+// When the Timer expires, the current time will be sent on C
+// unless the Timer represents an AfterFunc event.
+type Timer struct {
+ C <-chan int64
+ t int64 // The absolute time that the event should fire.
+ f func(int64) // The function to call when the event fires.
+ i int // The event's index inside eventHeap.
}
-type eventHeap []*event
+type timerHeap []*Timer
-var events eventHeap
-var eventMutex sync.Mutex
+// forever is the absolute time (in ns) of an event that is forever away.
+const forever = 1 << 62
+
+// maxSleepTime is the maximum length of time that a sleeper
+// sleeps for before checking if it is defunct.
+const maxSleepTime = 1e9
+
+var (
+ // timerMutex guards the variables inside this var group.
+ timerMutex sync.Mutex
+
+ // timers holds a binary heap of pending events, terminated with a sentinel.
+ timers timerHeap
+
+ // currentSleeper is an ever-incrementing counter which represents
+ // the current sleeper. It allows older sleepers to detect that they are
+ // defunct and exit.
+ currentSleeper int64
+)
func init() {
- events.Push(&event{1 << 62, nil, true}) // sentinel
+ timers.Push(&Timer{t: forever}) // sentinel
}
// Sleep pauses the current goroutine for at least ns nanoseconds.
@@ -51,101 +71,133 @@
return t, nil
}
+// NewTimer creates a new Timer that will send
+// the current time on its channel after at least ns nanoseconds.
+func NewTimer(ns int64) *Timer {
+ c := make(chan int64, 1)
+ e := after(ns, func(t int64) { c <- t })
+ e.C = c
+ return e
+}
+
// After waits at least ns nanoseconds before sending the current time
// on the returned channel.
+// It is equivalent to NewTimer(ns).C.
func After(ns int64) <-chan int64 {
- c := make(chan int64, 1)
- after(ns, func(t int64) { c <- t })
- return c
+ return NewTimer(ns).C
}
// AfterFunc waits at least ns nanoseconds before calling f
-// in its own goroutine.
-func AfterFunc(ns int64, f func()) {
- after(ns, func(_ int64) {
+// in its own goroutine. It returns a Timer that can
+// be used to cancel the call using its Stop method.
+func AfterFunc(ns int64, f func()) *Timer {
+ return after(ns, func(_ int64) {
go f()
})
}
+// Stop prevents the Timer from firing.
+// It returns true if the call stops the timer, false if the timer has already
+// expired or stopped.
+func (e *Timer) Stop() (ok bool) {
+ timerMutex.Lock()
+ // Avoid removing the first event in the queue so that
+ // we don't start a new sleeper unnecessarily.
+ if e.i > 0 {
+ heap.Remove(timers, e.i)
+ }
+ ok = e.f != nil
+ e.f = nil
+ timerMutex.Unlock()
+ return
+}
+
// after is the implementation of After and AfterFunc.
// When the current time is after ns, it calls f with the current time.
// It assumes that f will not block.
-func after(ns int64, f func(int64)) {
+func after(ns int64, f func(int64)) (e *Timer) {
+ now := Nanoseconds()
t := Nanoseconds() + ns
- eventMutex.Lock()
- t0 := events[0].t
- heap.Push(events, &event{t, f, false})
- if t < t0 {
- go sleeper()
+ if ns > 0 && t < now {
+ panic("time: time overflow")
}
- eventMutex.Unlock()
+ timerMutex.Lock()
+ t0 := timers[0].t
+ e = &Timer{nil, t, f, -1}
+ heap.Push(timers, e)
+ // Start a new sleeper if the new event is before
+ // the first event in the queue. If the length of time
+ // until the new event is at least maxSleepTime,
+ // then we're guaranteed that the sleeper will wake up
+ // in time to service it, so no new sleeper is needed.
+ if t0 > t && (t0 == forever || ns < maxSleepTime) {
+ currentSleeper++
+ go sleeper(currentSleeper)
+ }
+ timerMutex.Unlock()
+ return
}
-// sleeper continually looks at the earliest event in the queue, marks it
-// as sleeping, waits until it happens, then removes any events
-// in the queue that are due. It stops when it finds an event that is
-// already marked as sleeping. When an event is inserted before the first item,
-// a new sleeper is started.
-//
-// Scheduling vagaries mean that sleepers may not wake up in
-// exactly the order of the events that they are waiting for,
-// but this does not matter as long as there are at least as
-// many sleepers as events marked sleeping (invariant). This ensures that
-// there is always a sleeper to service the remaining events.
-//
-// A sleeper will remove at least the event it has been waiting for
-// unless the event has already been removed by another sleeper. Both
-// cases preserve the invariant described above.
-func sleeper() {
- eventMutex.Lock()
- e := events[0]
- for !e.sleeping {
- t := Nanoseconds()
+// sleeper continually looks at the earliest event in the queue, waits until it happens,
+// then removes any events in the queue that are due. It stops when the queue
+// is empty or when another sleeper has been started.
+func sleeper(sleeperId int64) {
+ timerMutex.Lock()
+ e := timers[0]
+ t := Nanoseconds()
+ for e.t != forever {
if dt := e.t - t; dt > 0 {
- e.sleeping = true
- eventMutex.Unlock()
- if nt, err := sleep(t, dt); err != nil {
- // If sleep has encountered an error,
- // there's not much we can do. We pretend
- // that time really has advanced by the required
- // amount and lie to the rest of the system.
- t = e.t
- } else {
- t = nt
+ if dt > maxSleepTime {
+ dt = maxSleepTime
}
- eventMutex.Lock()
- e = events[0]
+ timerMutex.Unlock()
+ syscall.Sleep(dt)
+ timerMutex.Lock()
+ if currentSleeper != sleeperId {
+ // Another sleeper has been started, making this one redundant.
+ break
+ }
}
+ e = timers[0]
+ t = Nanoseconds()
for t >= e.t {
- e.f(t)
- heap.Pop(events)
- e = events[0]
+ if e.f != nil {
+ e.f(t)
+ e.f = nil
+ }
+ heap.Pop(timers)
+ e = timers[0]
}
}
- eventMutex.Unlock()
+ timerMutex.Unlock()
}
-func (eventHeap) Len() int {
- return len(events)
+func (timerHeap) Len() int {
+ return len(timers)
}
-func (eventHeap) Less(i, j int) bool {
- return events[i].t < events[j].t
+func (timerHeap) Less(i, j int) bool {
+ return timers[i].t < timers[j].t
}
-func (eventHeap) Swap(i, j int) {
- events[i], events[j] = events[j], events[i]
+func (timerHeap) Swap(i, j int) {
+ timers[i], timers[j] = timers[j], timers[i]
+ timers[i].i = i
+ timers[j].i = j
}
-func (eventHeap) Push(x interface{}) {
- events = append(events, x.(*event))
+func (timerHeap) Push(x interface{}) {
+ e := x.(*Timer)
+ e.i = len(timers)
+ timers = append(timers, e)
}
-func (eventHeap) Pop() interface{} {
+func (timerHeap) Pop() interface{} {
// TODO: possibly shrink array.
- n := len(events) - 1
- e := events[n]
- events[n] = nil
- events = events[0:n]
+ n := len(timers) - 1
+ e := timers[n]
+ timers[n] = nil
+ timers = timers[0:n]
+ e.i = -1
return e
}
« no previous file with comments | « no previous file | src/pkg/time/sleep_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b