flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From KristoffSC <krzysiek.chmielew...@gmail.com>
Subject Re: Testing RichAsyncFunction with TestHarness
Date Sun, 29 Mar 2020 12:08:01 GMT
Hi, 
another update on this one. 
I managed to make the workaround a little bit cleaner. 

The test setup I have now is like this:

ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream();
    ObjectOutputStream oosStreamEdges = new
ObjectOutputStream(streamEdgesBytes);
    oosStreamEdges.writeObject(Collections.<StreamEdge>emptyList());

    KryoSerializer<MyMessage> kryoSerializer = new KryoSerializer<>(
        MyMessage.class, executionConfig);
    ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream();
    ObjectOutputStream oosKryoSerializer = new
ObjectOutputStream(kryoSerializerBytes);
    oosKryoSerializer.writeObject(kryoSerializer);

Configuration configuration = new Configuration();
    configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray());
    configuration.setBytes("typeSerializer_in_1",
kryoSerializerBytes.toByteArray());

    MockEnvironment environment = MockEnvironment.builder().build();
    ExecutionConfig executionConfig = environment.getExecutionConfig();
    environment.getTaskConfiguration().addAll(configuration);

this.testHarness = new OneInputStreamOperatorTestHarness<>(
        new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.UNORDERED), environment);

With this setup, this.testHarness.open(); works. 
However there is another problem, 
When calling:
testHarness.processElement(myMessage, 1L); 
it throws another exception:

java.lang.AssertionError
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400)
	at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228)
	at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112)
	at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message