using DataStreamUtils.collect() in a test is difficult due to synchronization problems, as you discovered yourself.

What I propose is to write a custom Sink that collects data and verifies the results. Verification should both happen in the invoke() method and in close(). For the sink, you should set the parallelism to 1 to ensure that all data goes to one sink.

Another option is to use https://github.com/ottogroup/flink-spector which provides good ways of specifying expected outputs. Maybe Alex has something else to say about it, I'm looping him hin.


On Fri, 27 May 2016 at 09:33 Hironori Ogibayashi <ogibayashi@gmail.com> wrote:

I would like to write a test code for my Flink job.
Looking at flink-examples, I thought the way will be:
- Create test class which extends StreamingMultipleProgramsTestBase
- In each method, just write streaming job as usual, but use
collection data source and iterator sink
- Use TestBaseUtils.compareResultXX method to check the result.

Here is the actual code I wrote.

class SampleTestCase extends StreamingMultipleProgramsTestBase {

  def testCase1(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromElements("aaa","bbb","aaa")
      .map{ x => (x,1)}


    val result = DataStreamUtils.collect(stream.javaStream)

But when I ran the test. I got this error:

java.lang.AssertionError: Wrong number of elements result expected:<2>
but was:<0>

It looks like test finishes before the end of the timeWindow, but I do
not know how to fix it.
Any advise would be appreciated.

Hironori Ogibayashi