OLD | NEW |
(Empty) | |
| 1 // Copyright 2013 Canonical Ltd. |
| 2 // Licensed under the AGPLv3, see LICENCE file for details. |
| 3 |
| 4 package tailer_test |
| 5 |
| 6 import ( |
| 7 "bufio" |
| 8 "bytes" |
| 9 "fmt" |
| 10 "io" |
| 11 "sync" |
| 12 stdtesting "testing" |
| 13 "time" |
| 14 |
| 15 gc "launchpad.net/gocheck" |
| 16 |
| 17 "launchpad.net/juju-core/testing" |
| 18 "launchpad.net/juju-core/utils/tailer" |
| 19 ) |
| 20 |
| 21 func Test(t *stdtesting.T) { |
| 22 gc.TestingT(t) |
| 23 } |
| 24 |
| 25 type tailerSuite struct{} |
| 26 |
| 27 var _ = gc.Suite(tailerSuite{}) |
| 28 |
| 29 var alphabetData = []string{ |
| 30 "alpha alpha\n", |
| 31 "bravo bravo\n", |
| 32 "charlie charlie\n", |
| 33 "delta delta\n", |
| 34 "echo echo\n", |
| 35 "foxtrott foxtrott\n", |
| 36 "golf golf\n", |
| 37 "hotel hotel\n", |
| 38 "india india\n", |
| 39 "juliet juliet\n", |
| 40 "kilo kilo\n", |
| 41 "lima lima\n", |
| 42 "mike mike\n", |
| 43 "november november\n", |
| 44 "oscar oscar\n", |
| 45 "papa papa\n", |
| 46 "quebec quebec\n", |
| 47 "romeo romeo\n", |
| 48 "sierra sierra\n", |
| 49 "tango tango\n", |
| 50 "uniform uniform\n", |
| 51 "victor victor\n", |
| 52 "whiskey whiskey\n", |
| 53 "x-ray x-ray\n", |
| 54 "yankee yankee\n", |
| 55 "zulu zulu\n", |
| 56 } |
| 57 |
| 58 var tests = []struct { |
| 59 description string |
| 60 data []string |
| 61 initialLinesWritten int |
| 62 initialLinesRequested int |
| 63 bufferSize int |
| 64 filter tailer.TailerFilterFunc |
| 65 injector func(*tailer.Tailer, *readSeeker) func([]string) |
| 66 initialCollectedData []string |
| 67 appendedCollectedData []string |
| 68 err string |
| 69 }{{ |
| 70 description: "lines are longer than buffer size", |
| 71 data: []string{ |
| 72 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 73 "0123456789012345678901234567890123456789012345678901\n", |
| 74 }, |
| 75 initialLinesWritten: 1, |
| 76 initialLinesRequested: 1, |
| 77 bufferSize: 5, |
| 78 initialCollectedData: []string{ |
| 79 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 80 }, |
| 81 appendedCollectedData: []string{ |
| 82 "0123456789012345678901234567890123456789012345678901\n", |
| 83 }, |
| 84 }, { |
| 85 description: "lines are longer than buffer size, missing termination of
last line", |
| 86 data: []string{ |
| 87 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 88 "0123456789012345678901234567890123456789012345678901\n", |
| 89 "the quick brown fox ", |
| 90 }, |
| 91 initialLinesWritten: 1, |
| 92 initialLinesRequested: 1, |
| 93 bufferSize: 5, |
| 94 initialCollectedData: []string{ |
| 95 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 96 }, |
| 97 appendedCollectedData: []string{ |
| 98 "0123456789012345678901234567890123456789012345678901\n", |
| 99 }, |
| 100 }, { |
| 101 description: "lines are longer than buffer size, last line is terminated
later", |
| 102 data: []string{ |
| 103 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 104 "0123456789012345678901234567890123456789012345678901\n", |
| 105 "the quick brown fox ", |
| 106 "jumps over the lazy dog\n", |
| 107 }, |
| 108 initialLinesWritten: 1, |
| 109 initialLinesRequested: 1, |
| 110 bufferSize: 5, |
| 111 initialCollectedData: []string{ |
| 112 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 113 }, |
| 114 appendedCollectedData: []string{ |
| 115 "0123456789012345678901234567890123456789012345678901\n", |
| 116 "the quick brown fox jumps over the lazy dog\n", |
| 117 }, |
| 118 }, { |
| 119 description: "missing termination of last line", |
| 120 data: []string{ |
| 121 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 122 "0123456789012345678901234567890123456789012345678901\n", |
| 123 "the quick brown fox ", |
| 124 }, |
| 125 initialLinesWritten: 1, |
| 126 initialLinesRequested: 1, |
| 127 initialCollectedData: []string{ |
| 128 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 129 }, |
| 130 appendedCollectedData: []string{ |
| 131 "0123456789012345678901234567890123456789012345678901\n", |
| 132 }, |
| 133 }, { |
| 134 description: "last line is terminated later", |
| 135 data: []string{ |
| 136 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 137 "0123456789012345678901234567890123456789012345678901\n", |
| 138 "the quick brown fox ", |
| 139 "jumps over the lazy dog\n", |
| 140 }, |
| 141 initialLinesWritten: 1, |
| 142 initialLinesRequested: 1, |
| 143 initialCollectedData: []string{ |
| 144 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
| 145 }, |
| 146 appendedCollectedData: []string{ |
| 147 "0123456789012345678901234567890123456789012345678901\n", |
| 148 "the quick brown fox jumps over the lazy dog\n", |
| 149 }, |
| 150 }, { |
| 151 description: "more lines already written than initially reques
ted", |
| 152 data: alphabetData, |
| 153 initialLinesWritten: 5, |
| 154 initialLinesRequested: 3, |
| 155 initialCollectedData: []string{ |
| 156 "charlie charlie\n", |
| 157 "delta delta\n", |
| 158 "echo echo\n", |
| 159 }, |
| 160 appendedCollectedData: alphabetData[5:], |
| 161 }, { |
| 162 description: "less lines already written than initially reques
ted", |
| 163 data: alphabetData, |
| 164 initialLinesWritten: 3, |
| 165 initialLinesRequested: 5, |
| 166 initialCollectedData: []string{ |
| 167 "alpha alpha\n", |
| 168 "bravo bravo\n", |
| 169 "charlie charlie\n", |
| 170 }, |
| 171 appendedCollectedData: alphabetData[3:], |
| 172 }, { |
| 173 description: "lines are longer than buffer size, more lines al
ready written than initially requested", |
| 174 data: alphabetData, |
| 175 initialLinesWritten: 5, |
| 176 initialLinesRequested: 3, |
| 177 bufferSize: 5, |
| 178 initialCollectedData: []string{ |
| 179 "charlie charlie\n", |
| 180 "delta delta\n", |
| 181 "echo echo\n", |
| 182 }, |
| 183 appendedCollectedData: alphabetData[5:], |
| 184 }, { |
| 185 description: "lines are longer than buffer size, less lines al
ready written than initially requested", |
| 186 data: alphabetData, |
| 187 initialLinesWritten: 3, |
| 188 initialLinesRequested: 5, |
| 189 bufferSize: 5, |
| 190 initialCollectedData: []string{ |
| 191 "alpha alpha\n", |
| 192 "bravo bravo\n", |
| 193 "charlie charlie\n", |
| 194 }, |
| 195 appendedCollectedData: alphabetData[3:], |
| 196 }, { |
| 197 description: "filter lines which contain the char 'e'", |
| 198 data: alphabetData, |
| 199 initialLinesWritten: 10, |
| 200 initialLinesRequested: 3, |
| 201 filter: func(line []byte) bool { |
| 202 return bytes.Contains(line, []byte{'e'}) |
| 203 }, |
| 204 initialCollectedData: []string{ |
| 205 "echo echo\n", |
| 206 "hotel hotel\n", |
| 207 "juliet juliet\n", |
| 208 }, |
| 209 appendedCollectedData: []string{ |
| 210 "mike mike\n", |
| 211 "november november\n", |
| 212 "quebec quebec\n", |
| 213 "romeo romeo\n", |
| 214 "sierra sierra\n", |
| 215 "whiskey whiskey\n", |
| 216 "yankee yankee\n", |
| 217 }, |
| 218 }, { |
| 219 description: "stop tailing after 10 collected lines", |
| 220 data: alphabetData, |
| 221 initialLinesWritten: 5, |
| 222 initialLinesRequested: 3, |
| 223 injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) { |
| 224 return func(lines []string) { |
| 225 if len(lines) == 10 { |
| 226 t.Stop() |
| 227 } |
| 228 } |
| 229 }, |
| 230 initialCollectedData: []string{ |
| 231 "charlie charlie\n", |
| 232 "delta delta\n", |
| 233 "echo echo\n", |
| 234 }, |
| 235 appendedCollectedData: alphabetData[5:], |
| 236 }, { |
| 237 description: "generate an error after 10 collected lines", |
| 238 data: alphabetData, |
| 239 initialLinesWritten: 5, |
| 240 initialLinesRequested: 3, |
| 241 injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) { |
| 242 return func(lines []string) { |
| 243 if len(lines) == 10 { |
| 244 rs.setError(fmt.Errorf("ouch after 10 lines")) |
| 245 } |
| 246 } |
| 247 }, |
| 248 initialCollectedData: []string{ |
| 249 "charlie charlie\n", |
| 250 "delta delta\n", |
| 251 "echo echo\n", |
| 252 }, |
| 253 appendedCollectedData: alphabetData[5:], |
| 254 err: "ouch after 10 lines", |
| 255 }, { |
| 256 description: "more lines already written than initially requested, some
empty, unfiltered", |
| 257 data: []string{ |
| 258 "one one\n", |
| 259 "two two\n", |
| 260 "\n", |
| 261 "\n", |
| 262 "three three\n", |
| 263 "four four\n", |
| 264 "\n", |
| 265 "\n", |
| 266 "five five\n", |
| 267 "six six\n", |
| 268 }, |
| 269 initialLinesWritten: 3, |
| 270 initialLinesRequested: 2, |
| 271 initialCollectedData: []string{ |
| 272 "two two\n", |
| 273 "\n", |
| 274 }, |
| 275 appendedCollectedData: []string{ |
| 276 "\n", |
| 277 "three three\n", |
| 278 "four four\n", |
| 279 "\n", |
| 280 "\n", |
| 281 "five five\n", |
| 282 "six six\n", |
| 283 }, |
| 284 }, { |
| 285 description: "more lines already written than initially requested, some
empty, those filtered", |
| 286 data: []string{ |
| 287 "one one\n", |
| 288 "two two\n", |
| 289 "\n", |
| 290 "\n", |
| 291 "three three\n", |
| 292 "four four\n", |
| 293 "\n", |
| 294 "\n", |
| 295 "five five\n", |
| 296 "six six\n", |
| 297 }, |
| 298 initialLinesWritten: 3, |
| 299 initialLinesRequested: 2, |
| 300 filter: func(line []byte) bool { |
| 301 return len(bytes.TrimSpace(line)) > 0 |
| 302 }, |
| 303 initialCollectedData: []string{ |
| 304 "one one\n", |
| 305 "two two\n", |
| 306 }, |
| 307 appendedCollectedData: []string{ |
| 308 "three three\n", |
| 309 "four four\n", |
| 310 "five five\n", |
| 311 "six six\n", |
| 312 }, |
| 313 }} |
| 314 |
| 315 func (tailerSuite) TestTailer(c *gc.C) { |
| 316 for i, test := range tests { |
| 317 c.Logf("Test #%d) %s", i, test.description) |
| 318 bufferSize := test.bufferSize |
| 319 if bufferSize == 0 { |
| 320 // Default value. |
| 321 bufferSize = 4096 |
| 322 } |
| 323 reader, writer := io.Pipe() |
| 324 sigc := make(chan struct{}, 1) |
| 325 rs := startReadSeeker(c, test.data, test.initialLinesWritten, si
gc) |
| 326 tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequ
ested, test.filter, bufferSize, 2*time.Millisecond) |
| 327 linec := startReading(c, tailer, reader, writer) |
| 328 |
| 329 // Collect initial data. |
| 330 assertCollected(c, linec, test.initialCollectedData, nil) |
| 331 |
| 332 sigc <- struct{}{} |
| 333 |
| 334 // Collect remaining data, possibly with injection to stop |
| 335 // earlier or generate an error. |
| 336 var injection func([]string) |
| 337 if test.injector != nil { |
| 338 injection = test.injector(tailer, rs) |
| 339 } |
| 340 |
| 341 assertCollected(c, linec, test.appendedCollectedData, injection) |
| 342 |
| 343 if test.err == "" { |
| 344 c.Assert(tailer.Stop(), gc.IsNil) |
| 345 } else { |
| 346 c.Assert(tailer.Err(), gc.ErrorMatches, test.err) |
| 347 } |
| 348 } |
| 349 } |
| 350 |
| 351 // startReading starts a goroutine receiving the lines out of the reader |
| 352 // in the background and passing them to a created string channel. This |
| 353 // will used in the assertions. |
| 354 func startReading(c *gc.C, tailer *tailer.Tailer, reader *io.PipeReader, writer
*io.PipeWriter) chan string { |
| 355 linec := make(chan string) |
| 356 // Start goroutine for reading. |
| 357 go func() { |
| 358 defer close(linec) |
| 359 reader := bufio.NewReader(reader) |
| 360 for { |
| 361 line, err := reader.ReadString('\n') |
| 362 switch err { |
| 363 case nil: |
| 364 linec <- line |
| 365 case io.EOF: |
| 366 return |
| 367 default: |
| 368 c.Fail() |
| 369 } |
| 370 } |
| 371 }() |
| 372 // Close writer when tailer is stopped or has an error. Tailer using |
| 373 // components can do it the same way. |
| 374 go func() { |
| 375 tailer.Wait() |
| 376 writer.Close() |
| 377 }() |
| 378 return linec |
| 379 } |
| 380 |
| 381 // assertCollected reads lines from the string channel linec. It compares if |
| 382 // those are the one passed with compare until a timeout. If the timeout is |
| 383 // reached earlier than all lines are collected the assertion fails. The |
| 384 // injection function allows to interrupt the processing with a function |
| 385 // generating an error or a regular stopping during the tailing. In case the |
| 386 // linec is closed due to stopping or an error only the values so far care |
| 387 // compared. Checking the reason for termination is done in the test. |
| 388 func assertCollected(c *gc.C, linec chan string, compare []string, injection fun
c([]string)) { |
| 389 timeout := time.After(testing.LongWait) |
| 390 lines := []string{} |
| 391 for { |
| 392 select { |
| 393 case line, ok := <-linec: |
| 394 if ok { |
| 395 lines = append(lines, line) |
| 396 if injection != nil { |
| 397 injection(lines) |
| 398 } |
| 399 if len(lines) == len(compare) { |
| 400 // All data received. |
| 401 c.Assert(lines, gc.DeepEquals, compare) |
| 402 return |
| 403 } |
| 404 } else { |
| 405 // linec closed after stopping or error. |
| 406 c.Assert(lines, gc.DeepEquals, compare[:len(line
s)]) |
| 407 return |
| 408 } |
| 409 case <-timeout: |
| 410 if injection == nil { |
| 411 c.Fatalf("timeout during tailer collection") |
| 412 } |
| 413 return |
| 414 } |
| 415 } |
| 416 } |
| 417 |
| 418 // startReadSeeker returns a ReadSeeker for the Tailer. It simulates |
| 419 // reading and seeking inside a file and also simulating an error. |
| 420 // The goroutine waits for a signal that it can start writing the |
| 421 // appended lines. |
| 422 func startReadSeeker(c *gc.C, data []string, initialLeg int, sigc chan struct{})
*readSeeker { |
| 423 // Write initial lines into the buffer. |
| 424 var rs readSeeker |
| 425 var i int |
| 426 for i = 0; i < initialLeg; i++ { |
| 427 rs.write(data[i]) |
| 428 } |
| 429 |
| 430 go func() { |
| 431 <-sigc |
| 432 |
| 433 for ; i < len(data); i++ { |
| 434 time.Sleep(5 * time.Millisecond) |
| 435 rs.write(data[i]) |
| 436 } |
| 437 }() |
| 438 return &rs |
| 439 } |
| 440 |
| 441 type readSeeker struct { |
| 442 mux sync.Mutex |
| 443 buffer []byte |
| 444 pos int |
| 445 err error |
| 446 } |
| 447 |
| 448 func (r *readSeeker) write(s string) { |
| 449 r.mux.Lock() |
| 450 defer r.mux.Unlock() |
| 451 r.buffer = append(r.buffer, []byte(s)...) |
| 452 } |
| 453 |
| 454 func (r *readSeeker) setError(err error) { |
| 455 r.mux.Lock() |
| 456 defer r.mux.Unlock() |
| 457 r.err = err |
| 458 } |
| 459 |
| 460 func (r *readSeeker) Read(p []byte) (n int, err error) { |
| 461 r.mux.Lock() |
| 462 defer r.mux.Unlock() |
| 463 if r.err != nil { |
| 464 return 0, r.err |
| 465 } |
| 466 if r.pos >= len(r.buffer) { |
| 467 return 0, io.EOF |
| 468 } |
| 469 n = copy(p, r.buffer[r.pos:]) |
| 470 r.pos += n |
| 471 return n, nil |
| 472 } |
| 473 |
| 474 func (r *readSeeker) Seek(offset int64, whence int) (ret int64, err error) { |
| 475 r.mux.Lock() |
| 476 defer r.mux.Unlock() |
| 477 var newPos int64 |
| 478 switch whence { |
| 479 case 0: |
| 480 newPos = offset |
| 481 case 1: |
| 482 newPos = int64(r.pos) + offset |
| 483 case 2: |
| 484 newPos = int64(len(r.buffer)) + offset |
| 485 default: |
| 486 return 0, fmt.Errorf("invalid whence: %d", whence) |
| 487 } |
| 488 if newPos < 0 { |
| 489 return 0, fmt.Errorf("negative position: %d", newPos) |
| 490 } |
| 491 if newPos >= 1<<31 { |
| 492 return 0, fmt.Errorf("position out of range: %d", newPos) |
| 493 } |
| 494 r.pos = int(newPos) |
| 495 return newPos, nil |
| 496 } |
OLD | NEW |