|
|
Descriptionbufio: reuse Writer buffers after Flush
A bufio.Writer.Flush marks the usual end of a Writer's
life. Recycle its internal buffer on those explicit flushes,
but not on normal, as-needed internal flushes.
benchmark old ns/op new ns/op delta
BenchmarkWriterEmpty 1959 727 -62.89%
benchmark old allocs new allocs delta
BenchmarkWriterEmpty 2 1 -50.00%
benchmark old bytes new bytes delta
BenchmarkWriterEmpty 4215 83 -98.03%
Patch Set 1 #Patch Set 2 : diff -r 75602c1fb7cc https://go.googlecode.com/hg/ #Patch Set 3 : diff -r 75602c1fb7cc https://go.googlecode.com/hg/ #Patch Set 4 : diff -r 78e5ea623996 https://go.googlecode.com/hg/ #
MessagesTotal messages: 6
Hello golang-dev@googlegroups.com, I'd like you to review this change to https://go.googlecode.com/hg/
Sign in to reply to this message.
"A bufio.Writer.Flush marks the usual end of a Writer's life. " This is also often not the case. I have some programs that need to keep transaction logs. These programs have *bufio.Writers with large buffers, 64, 128 MB. To maintain consistency these programs will often flush their *bufio.Writer and sync the underlying file at set intervals, 5 seconds, 30 seconds, etc. I worry about this proposed change causing increased GC activity in these kind of programs (assuming this large buffers are not added to the buffer pool). I also have network communications that are buffered that often use Flush, only to immediately start writing data again. Would it be possible to note, with a bool field or something, if the Writer had to reallocate a buffer after its last Flush(), and if so disable the new behaviour? This would allow the new behaviour without adding knobs or forcing people to write their own buffers. Another idea would be to not recycle buffers that are larger than a given size, say 1 MB, perhaps this is already the case. On Monday, May 20, 2013 7:01:19 PM UTC-5, Brad Fitzpatrick wrote: > > Reviewers: golang-dev1, > > Message: > Hello golan...@googlegroups.com <javascript:>, > > I'd like you to review this change to > https://go.googlecode.com/hg/ > > > Description: > bufio: reuse Writer buffers after Flush > > A bufio.Writer.Flush marks the usual end of a Writer's > life. Recycle its internal buffer on those explicit flushes, > but not on normal, as-needed internal flushes. > > benchmark old ns/op new ns/op delta > BenchmarkWriterEmpty 1959 727 -62.89% > > benchmark old allocs new allocs delta > BenchmarkWriterEmpty 2 1 -50.00% > > benchmark old bytes new bytes delta > BenchmarkWriterEmpty 4215 83 -98.03% > > Please review this at https://codereview.appspot.com/9459044/ > > Affected files: > M src/pkg/bufio/bufio.go > M src/pkg/bufio/bufio_test.go > > > Index: src/pkg/bufio/bufio.go > =================================================================== > --- a/src/pkg/bufio/bufio.go > +++ b/src/pkg/bufio/bufio.go > @@ -29,7 +29,7 @@ > > // Reader implements buffering for an io.Reader object. > type Reader struct { > - buf []byte // either nil or []byte of size bufSize > + buf []byte // either nil or []byte of length bufSize > bufSize int > rd io.Reader > r, w int > @@ -314,7 +314,7 @@ > } > > // Buffer is full? > - if b.Buffered() >= len(b.buf) { > + if b.Buffered() >= b.bufSize { > b.r = b.w > return b.buf, ErrBufferFull > } > @@ -473,10 +473,11 @@ > // If an error occurs writing to a Writer, no more data will be > // accepted and all subsequent writes will return the error. > type Writer struct { > - err error > - buf []byte > - n int > - wr io.Writer > + err error > + buf []byte // either nil or []byte of length bufSize > + bufSize int > + n int > + wr io.Writer > } > > // NewWriterSize returns a new Writer whose buffer has at least the > specified > @@ -485,16 +486,20 @@ > func NewWriterSize(wr io.Writer, size int) *Writer { > // Is it already a Writer? > b, ok := wr.(*Writer) > - if ok && len(b.buf) >= size { > + if ok && b.bufSize >= size { > return b > } > if size <= 0 { > size = defaultBufSize > } > - b = new(Writer) > - // TODO(bradfitz): make Writer buffers lazy too, like Reader's > - b.buf = make([]byte, size) > - b.wr = wr > + b = &Writer{ > + wr: wr, > + bufSize: size, > + } > + if size > defaultBufSize { > + // TODO(bradfitz): make all buffer sizes recycle > + b.buf = make([]byte, b.bufSize) > + } > return b > } > > @@ -503,8 +508,38 @@ > return NewWriterSize(wr, defaultBufSize) > } > > +// allocBuf makes b.buf non-nil. > +func (b *Writer) allocBuf() { > + if b.buf != nil { > + return > + } > + select { > + case b.buf = <-bufCache: > + b.buf = b.buf[:b.bufSize] > + default: > + b.buf = make([]byte, b.bufSize, defaultBufSize) > + } > +} > + > +// putBuf returns b.buf if it's unused. > +func (b *Writer) putBuf() { > + if b.n == 0 && cap(b.buf) == defaultBufSize { > + select { > + case bufCache <- b.buf: > + b.buf = nil > + default: > + } > + } > +} > + > // Flush writes any buffered data to the underlying io.Writer. > func (b *Writer) Flush() error { > + err := b.flush() > + b.putBuf() > + return err > +} > + > +func (b *Writer) flush() error { > if b.err != nil { > return b.err > } > @@ -528,7 +563,7 @@ > } > > // Available returns how many bytes are unused in the buffer. > -func (b *Writer) Available() int { return len(b.buf) - b.n } > +func (b *Writer) Available() int { return b.bufSize - b.n } > > // Buffered returns the number of bytes that have been written into the > > current buffer. > func (b *Writer) Buffered() int { return b.n } > @@ -538,6 +573,7 @@ > // If nn < len(p), it also returns an error explaining > // why the write is short. > func (b *Writer) Write(p []byte) (nn int, err error) { > + b.allocBuf() > for len(p) > b.Available() && b.err == nil { > var n int > if b.Buffered() == 0 { > @@ -547,7 +583,7 @@ > } else { > n = copy(b.buf[b.n:], p) > b.n += n > - b.Flush() > + b.flush() > } > nn += n > p = p[n:] > @@ -566,9 +602,12 @@ > if b.err != nil { > return b.err > } > - if b.Available() <= 0 && b.Flush() != nil { > + if b.Available() <= 0 && b.flush() != nil { > return b.err > } > + if b.buf == nil { > + b.allocBuf() > + } > b.buf[b.n] = c > b.n++ > return nil > @@ -577,6 +616,9 @@ > // WriteRune writes a single Unicode code point, returning > // the number of bytes written and any error. > func (b *Writer) WriteRune(r rune) (size int, err error) { > + if b.buf == nil { > + b.allocBuf() > + } > if r < utf8.RuneSelf { > err = b.WriteByte(byte(r)) > if err != nil { > @@ -589,7 +631,7 @@ > } > n := b.Available() > if n < utf8.UTFMax { > - if b.Flush(); b.err != nil { > + if b.flush(); b.err != nil { > return 0, b.err > } > n = b.Available() > @@ -608,13 +650,14 @@ > // If the count is less than len(s), it also returns an error explaining > // why the write is short. > func (b *Writer) WriteString(s string) (int, error) { > + b.allocBuf() > nn := 0 > for len(s) > b.Available() && b.err == nil { > n := copy(b.buf[b.n:], s) > b.n += n > nn += n > s = s[n:] > - b.Flush() > + b.flush() > } > if b.err != nil { > return nn, b.err > @@ -627,6 +670,7 @@ > > // ReadFrom implements io.ReaderFrom. > func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) { > + b.allocBuf() > if b.Buffered() == 0 { > if w, ok := b.wr.(io.ReaderFrom); ok { > return w.ReadFrom(r) > @@ -641,7 +685,7 @@ > b.n += m > n += int64(m) > if b.Available() == 0 { > - if err1 := b.Flush(); err1 != nil { > + if err1 := b.flush(); err1 != nil { > return n, err1 > } > } > Index: src/pkg/bufio/bufio_test.go > =================================================================== > --- a/src/pkg/bufio/bufio_test.go > +++ b/src/pkg/bufio/bufio_test.go > @@ -1098,3 +1098,21 @@ > } > } > } > + > +func BenchmarkWriterEmpty(b *testing.B) { > + b.ReportAllocs() > + str := strings.Repeat("x", 1<<10) > + bs := []byte(str) > + for i := 0; i < b.N; i++ { > + bw := NewWriter(ioutil.Discard) > + bw.Flush() > + bw.WriteByte('a') > + bw.Flush() > + bw.WriteRune('B') > + bw.Flush() > + bw.Write(bs) > + bw.Flush() > + bw.WriteString(str) > + bw.Flush() > + } > +} > > >
Sign in to reply to this message.
Tylor, That's true, but the additional cost (flushing to the fastest possible writer: ioutil.Discard) is 70ns. I'm not concerned. Anything you'd actually flush regularly to (e.g. the network), as opposed to a flush-at-end-only, should dwarf that. And that 70ns will drop once runtime.xchg is inlined by cmd/6c for the channel operations in the runtime and/or we have a sync.Cache type. So I think this CL is fine as-is. On Tue, May 21, 2013 at 8:22 AM, <voidlogic7@gmail.com> wrote: > "A bufio.Writer.Flush marks the usual end of a Writer's life. " > > This is also often not the case. I have some programs that need to keep > transaction logs. These programs have *bufio.Writers with large buffers, > 64, 128 MB. To maintain consistency these programs will often flush their > *bufio.Writer and sync the underlying file at set intervals, 5 seconds, 30 > seconds, etc. I worry about this proposed change causing increased GC > activity in these kind of programs (assuming this large buffers are not > added to the buffer pool). I also have network communications that are > buffered that often use Flush, only to immediately start writing data again. > > Would it be possible to note, with a bool field or something, if the > Writer had to reallocate a buffer after its last Flush(), and if so disable > the new behaviour? This would allow the new behaviour without adding knobs > or forcing people to write their own buffers. Another idea would be to not > recycle buffers that are larger than a given size, say 1 MB, perhaps this > is already the case. > > On Monday, May 20, 2013 7:01:19 PM UTC-5, Brad Fitzpatrick wrote: >> >> Reviewers: golang-dev1, >> >> Message: >> Hello golan...@googlegroups.com, >> >> I'd like you to review this change to >> https://go.googlecode.com/hg/ >> >> >> Description: >> bufio: reuse Writer buffers after Flush >> >> A bufio.Writer.Flush marks the usual end of a Writer's >> life. Recycle its internal buffer on those explicit flushes, >> but not on normal, as-needed internal flushes. >> >> benchmark old ns/op new ns/op delta >> BenchmarkWriterEmpty 1959 727 -62.89% >> >> benchmark old allocs new allocs delta >> BenchmarkWriterEmpty 2 1 -50.00% >> >> benchmark old bytes new bytes delta >> BenchmarkWriterEmpty 4215 83 -98.03% >> >> Please review this at https://codereview.appspot.**com/9459044/<https://codereview.appspot.com/9459... >> >> Affected files: >> M src/pkg/bufio/bufio.go >> M src/pkg/bufio/bufio_test.go >> >> >> Index: src/pkg/bufio/bufio.go >> ==============================**==============================**======= >> --- a/src/pkg/bufio/bufio.go >> +++ b/src/pkg/bufio/bufio.go >> @@ -29,7 +29,7 @@ >> >> // Reader implements buffering for an io.Reader object. >> type Reader struct { >> - buf []byte // either nil or []byte of size bufSize >> + buf []byte // either nil or []byte of length bufSize >> bufSize int >> rd io.Reader >> r, w int >> @@ -314,7 +314,7 @@ >> } >> >> // Buffer is full? >> - if b.Buffered() >= len(b.buf) { >> + if b.Buffered() >= b.bufSize { >> b.r = b.w >> return b.buf, ErrBufferFull >> } >> @@ -473,10 +473,11 @@ >> // If an error occurs writing to a Writer, no more data will be >> // accepted and all subsequent writes will return the error. >> type Writer struct { >> - err error >> - buf []byte >> - n int >> - wr io.Writer >> + err error >> + buf []byte // either nil or []byte of length bufSize >> + bufSize int >> + n int >> + wr io.Writer >> } >> >> // NewWriterSize returns a new Writer whose buffer has at least the >> specified >> @@ -485,16 +486,20 @@ >> func NewWriterSize(wr io.Writer, size int) *Writer { >> // Is it already a Writer? >> b, ok := wr.(*Writer) >> - if ok && len(b.buf) >= size { >> + if ok && b.bufSize >= size { >> return b >> } >> if size <= 0 { >> size = defaultBufSize >> } >> - b = new(Writer) >> - // TODO(bradfitz): make Writer buffers lazy too, like Reader's >> - b.buf = make([]byte, size) >> - b.wr = wr >> + b = &Writer{ >> + wr: wr, >> + bufSize: size, >> + } >> + if size > defaultBufSize { >> + // TODO(bradfitz): make all buffer sizes recycle >> + b.buf = make([]byte, b.bufSize) >> + } >> return b >> } >> >> @@ -503,8 +508,38 @@ >> return NewWriterSize(wr, defaultBufSize) >> } >> >> +// allocBuf makes b.buf non-nil. >> +func (b *Writer) allocBuf() { >> + if b.buf != nil { >> + return >> + } >> + select { >> + case b.buf = <-bufCache: >> + b.buf = b.buf[:b.bufSize] >> + default: >> + b.buf = make([]byte, b.bufSize, defaultBufSize) >> + } >> +} >> + >> +// putBuf returns b.buf if it's unused. >> +func (b *Writer) putBuf() { >> + if b.n == 0 && cap(b.buf) == defaultBufSize { >> + select { >> + case bufCache <- b.buf: >> + b.buf = nil >> + default: >> + } >> + } >> +} >> + >> // Flush writes any buffered data to the underlying io.Writer. >> func (b *Writer) Flush() error { >> + err := b.flush() >> + b.putBuf() >> + return err >> +} >> + >> +func (b *Writer) flush() error { >> if b.err != nil { >> return b.err >> } >> @@ -528,7 +563,7 @@ >> } >> >> // Available returns how many bytes are unused in the buffer. >> -func (b *Writer) Available() int { return len(b.buf) - b.n } >> +func (b *Writer) Available() int { return b.bufSize - b.n } >> >> // Buffered returns the number of bytes that have been written into the >> >> current buffer. >> func (b *Writer) Buffered() int { return b.n } >> @@ -538,6 +573,7 @@ >> // If nn < len(p), it also returns an error explaining >> // why the write is short. >> func (b *Writer) Write(p []byte) (nn int, err error) { >> + b.allocBuf() >> for len(p) > b.Available() && b.err == nil { >> var n int >> if b.Buffered() == 0 { >> @@ -547,7 +583,7 @@ >> } else { >> n = copy(b.buf[b.n:], p) >> b.n += n >> - b.**Flush() >> + b.**flush() >> } >> nn += n >> p = p[n:] >> @@ -566,9 +602,12 @@ >> if b.err != nil { >> return b.err >> } >> - if b.Available() <= 0 && b.Flush() != nil { >> + if b.Available() <= 0 && b.flush() != nil { >> return b.err >> } >> + if b.buf == nil { >> + b.allocBuf() >> + } >> b.buf[b.n] = c >> b.n++ >> return nil >> @@ -577,6 +616,9 @@ >> // WriteRune writes a single Unicode code point, returning >> // the number of bytes written and any error. >> func (b *Writer) WriteRune(r rune) (size int, err error) { >> + if b.buf == nil { >> + b.allocBuf() >> + } >> if r < utf8.RuneSelf { >> err = b.WriteByte(byte(r)) >> if err != nil { >> @@ -589,7 +631,7 @@ >> } >> n := b.Available() >> if n < utf8.UTFMax { >> - if b.Flush(); b.err != nil { >> + if b.flush(); b.err != nil { >> return 0, b.err >> } >> n = b.Available() >> @@ -608,13 +650,14 @@ >> // If the count is less than len(s), it also returns an error >> explaining >> // why the write is short. >> func (b *Writer) WriteString(s string) (int, error) { >> + b.allocBuf() >> nn := 0 >> for len(s) > b.Available() && b.err == nil { >> n := copy(b.buf[b.n:], s) >> b.n += n >> nn += n >> s = s[n:] >> - b.Flush() >> + b.flush() >> } >> if b.err != nil { >> return nn, b.err >> @@ -627,6 +670,7 @@ >> >> // ReadFrom implements io.ReaderFrom. >> func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) { >> + b.allocBuf() >> if b.Buffered() == 0 { >> if w, ok := b.wr.(io.ReaderFrom); ok { >> return w.ReadFrom(r) >> @@ -641,7 +685,7 @@ >> b.n += m >> n += int64(m) >> if b.Available() == 0 { >> - if err1 := b.Flush(); err1 != nil { >> + if err1 := b.flush(); err1 != nil { >> ** return n, err1 >> } >> } >> Index: src/pkg/bufio/bufio_test.go >> ==============================**==============================**======= >> --- a/src/pkg/bufio/bufio_test.go >> +++ b/src/pkg/bufio/bufio_test.go >> @@ -1098,3 +1098,21 @@ >> } >> } >> } >> + >> +func BenchmarkWriterEmpty(b *testing.B) { >> + b.ReportAllocs() >> + str := strings.Repeat("x", 1<<10) >> + bs := []byte(str) >> + for i := 0; i < b.N; i++ { >> + bw := NewWriter(ioutil.Discard) >> + bw.Flush() >> + bw.WriteByte(**'a') >> + bw.Flush() >> + bw.WriteRune(**'B') >> + bw.Flush() >> + bw.Write(bs) >> + bw.Flush() >> + bw.**WriteString(str) >> + bw.Flush() >> + } >> +} >> >> >>
Sign in to reply to this message.
R=gri (assigned by bradfitz)
Sign in to reply to this message.
LGTM
Sign in to reply to this message.
*** Submitted as https://code.google.com/p/go/source/detail?r=8e41e8e4a2ab *** bufio: reuse Writer buffers after Flush A bufio.Writer.Flush marks the usual end of a Writer's life. Recycle its internal buffer on those explicit flushes, but not on normal, as-needed internal flushes. benchmark old ns/op new ns/op delta BenchmarkWriterEmpty 1959 727 -62.89% benchmark old allocs new allocs delta BenchmarkWriterEmpty 2 1 -50.00% benchmark old bytes new bytes delta BenchmarkWriterEmpty 4215 83 -98.03% R=gri, iant CC=gobot, golang-dev, voidlogic7 https://codereview.appspot.com/9459044
Sign in to reply to this message.
|