flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hironori Ogibayashi <ogibaya...@gmail.com>
Subject Writing test for Flink streaming jobs
Date Fri, 27 May 2016 07:33:36 GMT
Hello,

I would like to write a test code for my Flink job.
Looking at flink-examples, I thought the way will be:
- Create test class which extends StreamingMultipleProgramsTestBase
- In each method, just write streaming job as usual, but use
collection data source and iterator sink
- Use TestBaseUtils.compareResultXX method to check the result.

Here is the actual code I wrote.

---
class SampleTestCase extends StreamingMultipleProgramsTestBase {

  @Test
  def testCase1(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromElements("aaa","bbb","aaa")
      .map{ x => (x,1)}
      .keyBy(0)
      .timeWindow(Time.seconds(1))
      .sum(1)

    env.execute()

    val result = DataStreamUtils.collect(stream.javaStream)
    TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)")
  }
}
---

But when I ran the test. I got this error:

java.lang.AssertionError: Wrong number of elements result expected:<2>
but was:<0>

It looks like test finishes before the end of the timeWindow, but I do
not know how to fix it.
Any advise would be appreciated.

Thanks,
Hironori Ogibayashi

Mime
View raw message