Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(37)

Side by Side Diff: utils/tailer/tailer_test.go

Issue 36540043: utils: added Tailer for tailing of logs in API
Patch Set: utils: added Tailer for tailing of logs in API Created 11 years, 3 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « utils/tailer/tailer.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details.
3
4 package tailer_test
5
6 import (
7 "bufio"
8 "bytes"
9 "fmt"
10 "io"
11 "sync"
12 stdtesting "testing"
13 "time"
14
15 gc "launchpad.net/gocheck"
16
17 "launchpad.net/juju-core/testing"
18 "launchpad.net/juju-core/utils/tailer"
19 )
20
21 func Test(t *stdtesting.T) {
22 gc.TestingT(t)
23 }
24
25 type tailerSuite struct{}
26
27 var _ = gc.Suite(tailerSuite{})
28
29 var alphabetData = []string{
30 "alpha alpha\n",
31 "bravo bravo\n",
32 "charlie charlie\n",
33 "delta delta\n",
34 "echo echo\n",
35 "foxtrott foxtrott\n",
36 "golf golf\n",
37 "hotel hotel\n",
38 "india india\n",
39 "juliet juliet\n",
40 "kilo kilo\n",
41 "lima lima\n",
42 "mike mike\n",
43 "november november\n",
44 "oscar oscar\n",
45 "papa papa\n",
46 "quebec quebec\n",
47 "romeo romeo\n",
48 "sierra sierra\n",
49 "tango tango\n",
50 "uniform uniform\n",
51 "victor victor\n",
52 "whiskey whiskey\n",
53 "x-ray x-ray\n",
54 "yankee yankee\n",
55 "zulu zulu\n",
56 }
57
58 var tests = []struct {
59 description string
60 data []string
61 initialLinesWritten int
62 initialLinesRequested int
63 bufferSize int
64 filter tailer.TailerFilterFunc
65 injector func(*tailer.Tailer, *readSeeker) func([]string)
66 initialCollectedData []string
67 appendedCollectedData []string
68 err string
69 }{{
70 description: "lines are longer than buffer size",
71 data: []string{
72 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
73 "0123456789012345678901234567890123456789012345678901\n",
74 },
75 initialLinesWritten: 1,
76 initialLinesRequested: 1,
77 bufferSize: 5,
78 initialCollectedData: []string{
79 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
80 },
81 appendedCollectedData: []string{
82 "0123456789012345678901234567890123456789012345678901\n",
83 },
84 }, {
85 description: "lines are longer than buffer size, missing termination of last line",
86 data: []string{
87 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
88 "0123456789012345678901234567890123456789012345678901\n",
89 "the quick brown fox ",
90 },
91 initialLinesWritten: 1,
92 initialLinesRequested: 1,
93 bufferSize: 5,
94 initialCollectedData: []string{
95 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
96 },
97 appendedCollectedData: []string{
98 "0123456789012345678901234567890123456789012345678901\n",
99 },
100 }, {
101 description: "lines are longer than buffer size, last line is terminated later",
102 data: []string{
103 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
104 "0123456789012345678901234567890123456789012345678901\n",
105 "the quick brown fox ",
106 "jumps over the lazy dog\n",
107 },
108 initialLinesWritten: 1,
109 initialLinesRequested: 1,
110 bufferSize: 5,
111 initialCollectedData: []string{
112 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
113 },
114 appendedCollectedData: []string{
115 "0123456789012345678901234567890123456789012345678901\n",
116 "the quick brown fox jumps over the lazy dog\n",
117 },
118 }, {
119 description: "missing termination of last line",
120 data: []string{
121 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
122 "0123456789012345678901234567890123456789012345678901\n",
123 "the quick brown fox ",
124 },
125 initialLinesWritten: 1,
126 initialLinesRequested: 1,
127 initialCollectedData: []string{
128 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
129 },
130 appendedCollectedData: []string{
131 "0123456789012345678901234567890123456789012345678901\n",
132 },
133 }, {
134 description: "last line is terminated later",
135 data: []string{
136 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
137 "0123456789012345678901234567890123456789012345678901\n",
138 "the quick brown fox ",
139 "jumps over the lazy dog\n",
140 },
141 initialLinesWritten: 1,
142 initialLinesRequested: 1,
143 initialCollectedData: []string{
144 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
145 },
146 appendedCollectedData: []string{
147 "0123456789012345678901234567890123456789012345678901\n",
148 "the quick brown fox jumps over the lazy dog\n",
149 },
150 }, {
151 description: "more lines already written than initially reques ted",
152 data: alphabetData,
153 initialLinesWritten: 5,
154 initialLinesRequested: 3,
155 initialCollectedData: []string{
156 "charlie charlie\n",
157 "delta delta\n",
158 "echo echo\n",
159 },
160 appendedCollectedData: alphabetData[5:],
161 }, {
162 description: "less lines already written than initially reques ted",
163 data: alphabetData,
164 initialLinesWritten: 3,
165 initialLinesRequested: 5,
166 initialCollectedData: []string{
167 "alpha alpha\n",
168 "bravo bravo\n",
169 "charlie charlie\n",
170 },
171 appendedCollectedData: alphabetData[3:],
172 }, {
173 description: "lines are longer than buffer size, more lines al ready written than initially requested",
174 data: alphabetData,
175 initialLinesWritten: 5,
176 initialLinesRequested: 3,
177 bufferSize: 5,
178 initialCollectedData: []string{
179 "charlie charlie\n",
180 "delta delta\n",
181 "echo echo\n",
182 },
183 appendedCollectedData: alphabetData[5:],
184 }, {
185 description: "lines are longer than buffer size, less lines al ready written than initially requested",
186 data: alphabetData,
187 initialLinesWritten: 3,
188 initialLinesRequested: 5,
189 bufferSize: 5,
190 initialCollectedData: []string{
191 "alpha alpha\n",
192 "bravo bravo\n",
193 "charlie charlie\n",
194 },
195 appendedCollectedData: alphabetData[3:],
196 }, {
197 description: "filter lines which contain the char 'e'",
198 data: alphabetData,
199 initialLinesWritten: 10,
200 initialLinesRequested: 3,
201 filter: func(line []byte) bool {
202 return bytes.Contains(line, []byte{'e'})
203 },
204 initialCollectedData: []string{
205 "echo echo\n",
206 "hotel hotel\n",
207 "juliet juliet\n",
208 },
209 appendedCollectedData: []string{
210 "mike mike\n",
211 "november november\n",
212 "quebec quebec\n",
213 "romeo romeo\n",
214 "sierra sierra\n",
215 "whiskey whiskey\n",
216 "yankee yankee\n",
217 },
218 }, {
219 description: "stop tailing after 10 collected lines",
220 data: alphabetData,
221 initialLinesWritten: 5,
222 initialLinesRequested: 3,
223 injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) {
224 return func(lines []string) {
225 if len(lines) == 10 {
226 t.Stop()
227 }
228 }
229 },
230 initialCollectedData: []string{
231 "charlie charlie\n",
232 "delta delta\n",
233 "echo echo\n",
234 },
235 appendedCollectedData: alphabetData[5:],
236 }, {
237 description: "generate an error after 10 collected lines",
238 data: alphabetData,
239 initialLinesWritten: 5,
240 initialLinesRequested: 3,
241 injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) {
242 return func(lines []string) {
243 if len(lines) == 10 {
244 rs.setError(fmt.Errorf("ouch after 10 lines"))
245 }
246 }
247 },
248 initialCollectedData: []string{
249 "charlie charlie\n",
250 "delta delta\n",
251 "echo echo\n",
252 },
253 appendedCollectedData: alphabetData[5:],
254 err: "ouch after 10 lines",
255 }, {
256 description: "more lines already written than initially requested, some empty, unfiltered",
257 data: []string{
258 "one one\n",
259 "two two\n",
260 "\n",
261 "\n",
262 "three three\n",
263 "four four\n",
264 "\n",
265 "\n",
266 "five five\n",
267 "six six\n",
268 },
269 initialLinesWritten: 3,
270 initialLinesRequested: 2,
271 initialCollectedData: []string{
272 "two two\n",
273 "\n",
274 },
275 appendedCollectedData: []string{
276 "\n",
277 "three three\n",
278 "four four\n",
279 "\n",
280 "\n",
281 "five five\n",
282 "six six\n",
283 },
284 }, {
285 description: "more lines already written than initially requested, some empty, those filtered",
286 data: []string{
287 "one one\n",
288 "two two\n",
289 "\n",
290 "\n",
291 "three three\n",
292 "four four\n",
293 "\n",
294 "\n",
295 "five five\n",
296 "six six\n",
297 },
298 initialLinesWritten: 3,
299 initialLinesRequested: 2,
300 filter: func(line []byte) bool {
301 return len(bytes.TrimSpace(line)) > 0
302 },
303 initialCollectedData: []string{
304 "one one\n",
305 "two two\n",
306 },
307 appendedCollectedData: []string{
308 "three three\n",
309 "four four\n",
310 "five five\n",
311 "six six\n",
312 },
313 }}
314
315 func (tailerSuite) TestTailer(c *gc.C) {
316 for i, test := range tests {
317 c.Logf("Test #%d) %s", i, test.description)
318 bufferSize := test.bufferSize
319 if bufferSize == 0 {
320 // Default value.
321 bufferSize = 4096
322 }
323 reader, writer := io.Pipe()
324 sigc := make(chan struct{}, 1)
325 rs := startReadSeeker(c, test.data, test.initialLinesWritten, si gc)
326 tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequ ested, test.filter, bufferSize, 2*time.Millisecond)
327 linec := startReading(c, tailer, reader, writer)
328
329 // Collect initial data.
330 assertCollected(c, linec, test.initialCollectedData, nil)
331
332 sigc <- struct{}{}
333
334 // Collect remaining data, possibly with injection to stop
335 // earlier or generate an error.
336 var injection func([]string)
337 if test.injector != nil {
338 injection = test.injector(tailer, rs)
339 }
340
341 assertCollected(c, linec, test.appendedCollectedData, injection)
342
343 if test.err == "" {
344 c.Assert(tailer.Stop(), gc.IsNil)
345 } else {
346 c.Assert(tailer.Err(), gc.ErrorMatches, test.err)
347 }
348 }
349 }
350
351 // startReading starts a goroutine receiving the lines out of the reader
352 // in the background and passing them to a created string channel. This
353 // will used in the assertions.
354 func startReading(c *gc.C, tailer *tailer.Tailer, reader *io.PipeReader, writer *io.PipeWriter) chan string {
355 linec := make(chan string)
356 // Start goroutine for reading.
357 go func() {
358 defer close(linec)
359 reader := bufio.NewReader(reader)
360 for {
361 line, err := reader.ReadString('\n')
362 switch err {
363 case nil:
364 linec <- line
365 case io.EOF:
366 return
367 default:
368 c.Fail()
369 }
370 }
371 }()
372 // Close writer when tailer is stopped or has an error. Tailer using
373 // components can do it the same way.
374 go func() {
375 tailer.Wait()
376 writer.Close()
377 }()
378 return linec
379 }
380
381 // assertCollected reads lines from the string channel linec. It compares if
382 // those are the one passed with compare until a timeout. If the timeout is
383 // reached earlier than all lines are collected the assertion fails. The
384 // injection function allows to interrupt the processing with a function
385 // generating an error or a regular stopping during the tailing. In case the
386 // linec is closed due to stopping or an error only the values so far care
387 // compared. Checking the reason for termination is done in the test.
388 func assertCollected(c *gc.C, linec chan string, compare []string, injection fun c([]string)) {
389 timeout := time.After(testing.LongWait)
390 lines := []string{}
391 for {
392 select {
393 case line, ok := <-linec:
394 if ok {
395 lines = append(lines, line)
396 if injection != nil {
397 injection(lines)
398 }
399 if len(lines) == len(compare) {
400 // All data received.
401 c.Assert(lines, gc.DeepEquals, compare)
402 return
403 }
404 } else {
405 // linec closed after stopping or error.
406 c.Assert(lines, gc.DeepEquals, compare[:len(line s)])
407 return
408 }
409 case <-timeout:
410 if injection == nil {
411 c.Fatalf("timeout during tailer collection")
412 }
413 return
414 }
415 }
416 }
417
418 // startReadSeeker returns a ReadSeeker for the Tailer. It simulates
419 // reading and seeking inside a file and also simulating an error.
420 // The goroutine waits for a signal that it can start writing the
421 // appended lines.
422 func startReadSeeker(c *gc.C, data []string, initialLeg int, sigc chan struct{}) *readSeeker {
423 // Write initial lines into the buffer.
424 var rs readSeeker
425 var i int
426 for i = 0; i < initialLeg; i++ {
427 rs.write(data[i])
428 }
429
430 go func() {
431 <-sigc
432
433 for ; i < len(data); i++ {
434 time.Sleep(5 * time.Millisecond)
435 rs.write(data[i])
436 }
437 }()
438 return &rs
439 }
440
441 type readSeeker struct {
442 mux sync.Mutex
443 buffer []byte
444 pos int
445 err error
446 }
447
448 func (r *readSeeker) write(s string) {
449 r.mux.Lock()
450 defer r.mux.Unlock()
451 r.buffer = append(r.buffer, []byte(s)...)
452 }
453
454 func (r *readSeeker) setError(err error) {
455 r.mux.Lock()
456 defer r.mux.Unlock()
457 r.err = err
458 }
459
460 func (r *readSeeker) Read(p []byte) (n int, err error) {
461 r.mux.Lock()
462 defer r.mux.Unlock()
463 if r.err != nil {
464 return 0, r.err
465 }
466 if r.pos >= len(r.buffer) {
467 return 0, io.EOF
468 }
469 n = copy(p, r.buffer[r.pos:])
470 r.pos += n
471 return n, nil
472 }
473
474 func (r *readSeeker) Seek(offset int64, whence int) (ret int64, err error) {
475 r.mux.Lock()
476 defer r.mux.Unlock()
477 var newPos int64
478 switch whence {
479 case 0:
480 newPos = offset
481 case 1:
482 newPos = int64(r.pos) + offset
483 case 2:
484 newPos = int64(len(r.buffer)) + offset
485 default:
486 return 0, fmt.Errorf("invalid whence: %d", whence)
487 }
488 if newPos < 0 {
489 return 0, fmt.Errorf("negative position: %d", newPos)
490 }
491 if newPos >= 1<<31 {
492 return 0, fmt.Errorf("position out of range: %d", newPos)
493 }
494 r.pos = int(newPos)
495 return newPos, nil
496 }
OLDNEW
« no previous file with comments | « utils/tailer/tailer.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b