flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/7] flink git commit: [FLINK-4391] Add asynchronous I/O operations
Date Tue, 20 Dec 2016 05:05:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4a27d2105 -> bfdaa3821


http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
new file mode 100644
index 0000000..b8788c6
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test case for {@link RichAsyncFunction}
+ */
+public class RichAsyncFunctionTest {
+
+	private RichAsyncFunction<String, String> initFunction() {
+		RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getRuntimeContext().getState(mock(ValueStateDescriptor.class));
+			}
+		};
+
+		return function;
+	}
+
+	@Test
+	public void testIterationRuntimeContext() throws Exception {
+		// test runtime context is not set
+		RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getIterationRuntimeContext().getIterationAggregator("test");
+			}
+		};
+
+		try {
+			function.asyncInvoke("test", mock(AsyncCollector.class));
+		}
+		catch (Exception e) {
+			Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+		}
+
+		// test get agg from iteration runtime context
+		function.setRuntimeContext(mock(IterationRuntimeContext.class));
+
+		try {
+			function.asyncInvoke("test", mock(AsyncCollector.class));
+		}
+		catch (Exception e) {
+			Assert.assertEquals("Get iteration aggregator is not supported in rich async function", e.getMessage());
+		}
+
+		// get state from iteration runtime context
+		function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getIterationRuntimeContext().getState(mock(ValueStateDescriptor.class));
+			}
+		};
+
+		function.setRuntimeContext(mock(RuntimeContext.class));
+
+		try {
+			function.asyncInvoke("test", mock(AsyncCollector.class));
+		}
+		catch (Exception e) {
+			Assert.assertEquals("State is not supported in rich async function", e.getMessage());
+		}
+
+		// test getting a counter from iteration runtime context
+		function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getIterationRuntimeContext().getIntCounter("test").add(6);
+			}
+		};
+
+		IterationRuntimeContext context = mock(IterationRuntimeContext.class);
+		IntCounter counter = new IntCounter(0);
+		when(context.getIntCounter(anyString())).thenReturn(counter);
+
+		function.setRuntimeContext(context);
+
+		function.asyncInvoke("test", mock(AsyncCollector.class));
+
+		Assert.assertTrue(6 == counter.getLocalValue());
+	}
+
+	@Test
+	public void testRuntimeContext() throws Exception {
+		// test run time context is not set
+		RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getRuntimeContext().getState(mock(ValueStateDescriptor.class));
+			}
+		};
+
+		try {
+			function.asyncInvoke("test", mock(AsyncCollector.class));
+		}
+		catch (Exception e) {
+			Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+		}
+
+		// test get state
+		function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getRuntimeContext().getState(mock(ValueStateDescriptor.class));
+			}
+		};
+
+		function.setRuntimeContext(mock(RuntimeContext.class));
+
+		try {
+			function.asyncInvoke("test", mock(AsyncCollector.class));
+		}
+		catch (Exception e) {
+			Assert.assertEquals("State is not supported in rich async function", e.getMessage());
+		}
+
+		// test getting a counter from runtime context
+		function = new RichAsyncFunction<String, String>() {
+			@Override
+			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
+				getIterationRuntimeContext().getIntCounter("test").add(6);
+			}
+		};
+
+		IterationRuntimeContext context = mock(IterationRuntimeContext.class);
+		IntCounter counter = new IntCounter(0);
+		when(context.getIntCounter(anyString())).thenReturn(counter);
+
+		function.setRuntimeContext(context);
+
+		function.asyncInvoke("test", mock(AsyncCollector.class));
+
+		Assert.assertTrue(6 == counter.getLocalValue());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
new file mode 100644
index 0000000..d118d80
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * <ul>
+ *     <li>Add a new item into the buffer</li>
+ *     <li>Ordered mode processing</li>
+ *     <li>Unordered mode processing</li>
+ *     <li>Error handling</li>
+ * </ul>
+ */
+public class AsyncCollectorBufferTest {
+	private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
+
+	private final Random RANDOM = new Random();
+
+	private AsyncFunction<Integer, Integer> function;
+
+	private AsyncWaitOperator<Integer, Integer> operator;
+
+	private AsyncCollectorBuffer<Integer, Integer> buffer;
+
+	private Output<StreamRecord<Integer>> output;
+
+	private Object lock = new Object();
+
+	public AsyncCollectorBuffer<Integer, Integer> getBuffer(int bufferSize, AsyncDataStream.OutputMode mode) throws Exception {
+		function = new AsyncFunction<Integer, Integer>() {
+			@Override
+			public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+
+			}
+		};
+
+		operator = new AsyncWaitOperator<>(function, bufferSize, mode);
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setTypeSerializerIn1(IntSerializer.INSTANCE);
+
+		StreamTask<?, ?> mockTask = mock(StreamTask.class);
+
+		when(mockTask.getCheckpointLock()).thenReturn(lock);
+
+		Environment env = new DummyEnvironment("DUMMY;-D", 1, 0);
+		when(mockTask.getEnvironment()).thenReturn(env);
+
+		output = new FakedOutput();
+
+		operator.setup(mockTask, cfg, output);
+
+		buffer = operator.getBuffer();
+
+		return buffer;
+	}
+
+	@Test
+	public void testAdd() throws Exception {
+		buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
+
+		synchronized (lock) {
+			buffer.addWatermark(new Watermark(0l));
+			buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
+		}
+
+		Assert.assertEquals(2, buffer.getQueue().size());
+
+		Iterator<StreamElementEntry<Integer>> iterator = buffer.getQueue().iterator();
+		Watermark watermark = iterator.next().getStreamElement().asWatermark();
+		Assert.assertEquals(0l, watermark.getTimestamp());
+
+		LatencyMarker latencyMarker = iterator.next().getStreamElement().asLatencyMarker();
+		Assert.assertEquals(111l, latencyMarker.getMarkedTime());
+
+		buffer.setExtraStreamElement(new Watermark(222l));
+
+		Iterator<StreamElement> elementIterator = buffer.getStreamElementsInBuffer();
+		Assert.assertEquals(0l, elementIterator.next().asWatermark().getTimestamp());
+		Assert.assertEquals(111l, elementIterator.next().asLatencyMarker().getMarkedTime());
+		Assert.assertEquals(222l, elementIterator.next().asWatermark().getTimestamp());
+		Assert.assertFalse(elementIterator.hasNext());
+	}
+
+	private void work(final boolean throwExcept) throws Exception {
+		final int ASYNC_COLLECTOR_NUM = 7;
+
+		Iterator<StreamElement> iterator = new Iterator<StreamElement>() {
+			private int idx = 0;
+
+			@Override
+			public boolean hasNext() {
+				return idx < ASYNC_COLLECTOR_NUM;
+			}
+
+			@Override
+			public StreamElement next() {
+				++idx;
+
+				if (idx == 4) {
+					return new Watermark(333l);
+				}
+				else if (idx == 7) {
+					return new LatencyMarker(111L, 0, 0);
+				}
+				else {
+					StreamRecord<Integer> ret = new StreamRecord<>(idx);
+					ret.setTimestamp(idx * idx);
+
+					return ret;
+				}
+			}
+
+			@Override
+			public void remove() {
+				// do nothing
+			}
+		};
+
+		while (iterator.hasNext()) {
+			final StreamElement record = iterator.next();
+
+			if (record.isRecord()) {
+				AsyncCollector tmp;
+
+				synchronized (lock) {
+					tmp = buffer.addStreamRecord(record.<Integer>asRecord());
+				}
+
+				final AsyncCollector collector = tmp;
+
+				EXECUTOR_SERVICE.submit(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							Thread.sleep(RANDOM.nextInt(100));
+
+							if (throwExcept) {
+								collector.collect(new Exception("wahahahaha..."));
+							}
+							else {
+								collector.collect(Collections.singletonList(record.asRecord().getValue()));
+							}
+						} catch (InterruptedException e) {
+							// do nothing
+						}
+					}
+				});
+			}
+			else if (record.isWatermark()) {
+				synchronized (lock) {
+					buffer.addWatermark(record.asWatermark());
+				}
+			}
+			else {
+				synchronized (lock) {
+					buffer.addLatencyMarker(record.asLatencyMarker());
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testOrderedBuffer() throws Exception {
+		buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
+
+		buffer.startEmitterThread();
+
+		work(false);
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+		}
+
+		buffer.stopEmitterThread();
+
+		Assert.assertEquals("1,2,3,5,6,", ((FakedOutput)output).getValue());
+		Assert.assertEquals("1,4,9,333,25,36,111,", ((FakedOutput)output).getTimestamp());
+	}
+
+	@Test
+	public void testUnorderedBuffer() throws Exception {
+		buffer = getBuffer(3, AsyncDataStream.OutputMode.UNORDERED);
+
+		buffer.startEmitterThread();
+
+		work(false);
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+		}
+
+		buffer.stopEmitterThread();
+
+		Assert.assertEquals(333L, ((FakedOutput)output).getRawTimestamp().toArray()[3]);
+
+		List<Long> result = ((FakedOutput)output).getRawValue();
+		Collections.sort(result);
+		Assert.assertEquals("[1, 2, 3, 5, 6]", result.toString());
+
+		result = ((FakedOutput)output).getRawTimestamp();
+		Collections.sort(result);
+		Assert.assertEquals("[1, 4, 9, 25, 36, 111, 333]", result.toString());
+	}
+
+	@Test
+	public void testOrderedBufferWithManualTriggering() throws Exception {
+		// test AsyncCollectorBuffer with different combinations of StreamElements in the buffer.
+		// by triggering completion of each AsyncCollector one by one manually, we can verify
+		// the output one by one accurately.
+
+		FakedOutput fakedOutput;
+		AsyncCollector<Integer> collector1, collector2;
+
+		// 1. head element is a Watermark or LatencyMarker
+		buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		fakedOutput.expect(1);
+
+		buffer.startEmitterThread();
+
+		synchronized (lock) {
+			buffer.addWatermark(new Watermark(1L));
+		}
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("1,", fakedOutput.getTimestamp());
+
+
+		fakedOutput.expect(1);
+
+		synchronized (lock) {
+			buffer.addLatencyMarker(new LatencyMarker(2L, 0, 0));
+		}
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("1,2,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+
+
+		// 2. buffer layout: WM -> SR1 -> LM -> SR2, where SR2 finishes first, then SR1.
+		buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		synchronized (lock) {
+			buffer.addWatermark(new Watermark(1L));
+			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
+			buffer.addLatencyMarker(new LatencyMarker(3L, 0, 0));
+			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 4L));
+		}
+
+		fakedOutput.expect(1);
+
+		buffer.startEmitterThread();
+
+		fakedOutput.waitToFinish();
+
+		// in ORDERED mode, the result of completed SR2 will not be emitted right now.
+		collector2.collect(Collections.singletonList(222));
+
+		Thread.sleep(1000);
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("1,", fakedOutput.getTimestamp());
+
+		fakedOutput.expect(3);
+
+		collector1.collect(Collections.singletonList(111));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("111,222,", fakedOutput.getValue());
+		Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+
+		// 3. buffer layout: WM -> SR1 -> LM -> S2, where SR1 completes first, then SR2.
+		buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		synchronized (lock) {
+			buffer.addWatermark(new Watermark(1L));
+			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
+			buffer.addLatencyMarker(new LatencyMarker(3L, 0, 0));
+			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 4L));
+		}
+
+		fakedOutput.expect(1);
+
+		buffer.startEmitterThread();
+
+		fakedOutput.waitToFinish();
+
+		fakedOutput.expect(2);
+
+		// in ORDERED mode, the result of completed SR1 will be emitted asap.
+		collector1.collect(Collections.singletonList(111));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("111,", fakedOutput.getValue());
+		Assert.assertEquals("1,2,3,", fakedOutput.getTimestamp());
+
+		fakedOutput.expect(1);
+
+		collector2.collect(Collections.singletonList(222));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("111,222,", fakedOutput.getValue());
+		Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+
+		// 4. buffer layout: SR1 -> SR2 -> WM -> LM, where SR2 finishes first.
+		buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		synchronized (lock) {
+			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 1L));
+			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 2L));
+			buffer.addWatermark(new Watermark(3L));
+			buffer.addLatencyMarker(new LatencyMarker(4L, 0, 0));
+		}
+
+		buffer.startEmitterThread();
+
+		// in ORDERED mode, the result of completed SR2 will not be emitted right now.
+		collector2.collect(Collections.singletonList(222));
+
+		Thread.sleep(1000);
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("", fakedOutput.getTimestamp());
+
+		fakedOutput.expect(4);
+
+		collector1.collect(Collections.singletonList(111));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("111,222,", fakedOutput.getValue());
+		Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+	}
+
+	@Test
+	public void testUnorderedWithManualTriggering() throws Exception {
+		// verify the output in UNORDERED mode by manual triggering.
+
+		FakedOutput fakedOutput;
+		AsyncCollector<Integer> collector1, collector2, collector3;
+
+		// 1. head element is a Watermark or LatencyMarker
+		buffer = getBuffer(5, AsyncDataStream.OutputMode.UNORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		fakedOutput.expect(1);
+
+		buffer.startEmitterThread();
+
+		synchronized (lock) {
+			buffer.addWatermark(new Watermark(1L));
+		}
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("1,", fakedOutput.getTimestamp());
+
+
+		fakedOutput.expect(1);
+
+		synchronized (lock) {
+			buffer.addLatencyMarker(new LatencyMarker(2L, 0, 0));
+		}
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("1,2,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+
+
+		// 2. buffer layout: LM -> SR1 -> SR2 -> WM1 -> SR3 -> WM2, where the order of completion is SR3, SR2, SR1
+		buffer = getBuffer(6, AsyncDataStream.OutputMode.UNORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		synchronized (lock) {
+			buffer.addLatencyMarker(new LatencyMarker(1L, 0, 0));
+			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
+			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 3L));
+			buffer.addWatermark(new Watermark(4L));
+			collector3 = buffer.addStreamRecord(new StreamRecord<>(333, 5L));
+			buffer.addWatermark(new Watermark(6L));
+		}
+
+		fakedOutput.expect(1);
+
+		buffer.startEmitterThread();
+
+		fakedOutput.waitToFinish();
+
+		// in UNORDERED mode, the result of completed SR3 will not be emitted right now.
+		collector3.collect(Collections.singletonList(333));
+
+		Thread.sleep(1000);
+
+		Assert.assertEquals("", fakedOutput.getValue());
+		Assert.assertEquals("1,", fakedOutput.getTimestamp());
+
+		fakedOutput.expect(1);
+
+		// SR2 will be emitted
+		collector2.collect(Collections.singletonList(222));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("222,", fakedOutput.getValue());
+		Assert.assertEquals("1,3,", fakedOutput.getTimestamp());
+
+		// SR1 will be emitted first, then WM, and then SR3 and WM2
+		fakedOutput.expect(4);
+		collector1.collect(Collections.singletonList(111));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("222,111,333,", fakedOutput.getValue());
+		Assert.assertEquals("1,3,2,4,5,6,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+
+		// 3. buffer layout: WM1 -> SR1 -> SR2 -> LM -> SR3 -> WM2, where the order of completion is SR2, SR1, SR3
+		buffer = getBuffer(6, AsyncDataStream.OutputMode.UNORDERED);
+		fakedOutput = (FakedOutput)output;
+
+		synchronized (lock) {
+			buffer.addWatermark(new Watermark(1L));
+			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
+			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 3L));
+			buffer.addLatencyMarker(new LatencyMarker(4L, 0, 0));
+			collector3 = buffer.addStreamRecord(new StreamRecord<>(333, 5L));
+			buffer.addWatermark(new Watermark(6L));
+		}
+
+		// the result of SR2 will be emitted following WM1
+		collector2.collect(Collections.singletonList(222));
+
+		fakedOutput.expect(2);
+
+		buffer.startEmitterThread();
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("222,", fakedOutput.getValue());
+		Assert.assertEquals("1,3,", fakedOutput.getTimestamp());
+
+		// SR1 and LM will be emitted
+		fakedOutput.expect(2);
+		collector1.collect(Collections.singletonList(111));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("222,111,", fakedOutput.getValue());
+		Assert.assertEquals("1,3,2,4,", fakedOutput.getTimestamp());
+
+		// SR3 and WM2 will be emitted
+		fakedOutput.expect(2);
+		collector3.collect(Collections.singletonList(333));
+
+		fakedOutput.waitToFinish();
+
+		Assert.assertEquals("222,111,333,", fakedOutput.getValue());
+		Assert.assertEquals("1,3,2,4,5,6,", fakedOutput.getTimestamp());
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+			buffer.stopEmitterThread();
+		}
+
+	}
+
+
+
+	@Test
+	public void testBufferWithException() throws Exception {
+		buffer = getBuffer(3, AsyncDataStream.OutputMode.UNORDERED);
+
+		buffer.startEmitterThread();
+
+		IOException expected = null;
+		try {
+			work(true);
+		}
+		catch (IOException e) {
+			expected = e;
+		}
+
+		Assert.assertNotNull(expected);
+		Assert.assertEquals(expected.getMessage(), "wahahahaha...");
+
+		synchronized (lock) {
+			buffer.waitEmpty();
+		}
+
+		buffer.stopEmitterThread();
+	}
+
+	public class FakedOutput implements Output<StreamRecord<Integer>> {
+		private List<Long> outputs;
+		private List<Long> timestamps;
+
+		private CountDownLatch latch;
+
+		public FakedOutput() {
+			this.outputs = new ArrayList<>();
+			this.timestamps = new ArrayList<>();
+		}
+
+		@Override
+		public void collect(StreamRecord<Integer> record) {
+			outputs.add(record.getValue().longValue());
+			if (record.hasTimestamp()) {
+				timestamps.add(record.getTimestamp());
+			}
+
+			if (latch != null) {
+				latch.countDown();
+			}
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			timestamps.add(mark.getTimestamp());
+
+			if (latch != null) {
+				latch.countDown();
+			}
+		}
+
+		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			timestamps.add(latencyMarker.getMarkedTime());
+
+			if (latch != null) {
+				latch.countDown();
+			}
+		}
+
+		@Override
+		public void close() {
+		}
+
+		public String getValue() {
+			StringBuilder sb = new StringBuilder();
+			for (Long i : outputs) {
+				sb.append(i).append(",");
+			}
+			return sb.toString();
+		}
+
+		public String getTimestamp() {
+			StringBuilder sb = new StringBuilder();
+			for (Long i : timestamps) {
+				sb.append(i).append(",");
+			}
+			return sb.toString();
+		}
+
+		public List<Long> getRawValue() {
+			return outputs;
+		}
+
+		public List<Long> getRawTimestamp() {
+			return timestamps;
+		}
+
+		public void expect(int count) {
+			latch = new CountDownLatch(count);
+		}
+
+		public void waitToFinish() throws InterruptedException {
+			latch.await();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
new file mode 100644
index 0000000..560ee5a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AsyncWaitOperator}. These test that:
+ *
+ * <ul>
+ *     <li>Process StreamRecords and Watermarks in ORDERED mode</li>
+ *     <li>Process StreamRecords and Watermarks in UNORDERED mode</li>
+ *     <li>AsyncWaitOperator in operator chain</li>
+ *     <li>Snapshot state and restore state</li>
+ * </ul>
+ */
+public class AsyncWaitOperatorTest {
+
+	// hold sink result
+	private static Queue<Object> sinkResult;
+
+	private static class MyAsyncFunction extends RichAsyncFunction<Integer, Integer> {
+		final int SLEEP_FACTOR = 100;
+		final int THREAD_POOL_SIZE = 10;
+
+		transient static ExecutorService executorService;
+		static int counter = 0;
+
+		static Random random = new Random();
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			synchronized (MyAsyncFunction.class) {
+				if (counter == 0) {
+					executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+				}
+
+				++counter;
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+
+			synchronized (MyAsyncFunction.class) {
+				--counter;
+
+				if (counter == 0) {
+					executorService.shutdown();
+					executorService.awaitTermination(SLEEP_FACTOR * THREAD_POOL_SIZE, TimeUnit.MILLISECONDS);
+				}
+			}
+		}
+
+		@Override
+		public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
+			this.executorService.submit(new Runnable() {
+				@Override
+				public void run() {
+					// wait for while to simulate async operation here
+					int sleep = (int) (random.nextFloat() * SLEEP_FACTOR);
+
+					try {
+						Thread.sleep(sleep);
+						List<Integer> ret = new ArrayList<>();
+						ret.add(input*2);
+						collector.collect(ret);
+					}
+					catch (InterruptedException e) {
+						// do nothing
+					}
+				}
+			});
+		}
+	}
+
+	/**
+	 * A special {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} without issuing
+	 * {@link AsyncCollector#collect} until the latch counts to zero.
+	 * This function is used in the testStateSnapshotAndRestore, ensuring
+	 * that {@link org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry} can stay
+	 * in the {@link org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer} to be
+	 * snapshotted while checkpointing.
+	 */
+	private static class LazyAsyncFunction extends MyAsyncFunction {
+		private static CountDownLatch latch;
+
+		public LazyAsyncFunction() {
+			latch = new CountDownLatch(1);
+		}
+
+		@Override
+		public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
+			this.executorService.submit(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						latch.await();
+					}
+					catch (InterruptedException e) {
+						// do nothing
+					}
+
+					collector.collect(Collections.singletonList(input));
+				}
+			});
+		}
+
+		public static void countDown() {
+			latch.countDown();
+		}
+	}
+
+	/**
+	 * A {@link Comparator} to compare {@link StreamRecord} while sorting them.
+	 */
+	private class StreamRecordComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Integer> sr0 = (StreamRecord<Integer>) o1;
+				StreamRecord<Integer> sr1 = (StreamRecord<Integer>) o2;
+
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+
+				int comparison = sr0.getValue().compareTo(sr1.getValue());
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					return sr0.getValue() - sr1.getValue();
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testWaterMarkOrdered() throws Exception {
+		testWithWatermark(AsyncDataStream.OutputMode.ORDERED);
+	}
+
+	@Test
+	public void testWaterMarkUnordered() throws Exception {
+		testWithWatermark(AsyncDataStream.OutputMode.UNORDERED);
+	}
+
+	private void testWithWatermark(AsyncDataStream.OutputMode mode) throws Exception {
+		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 2, mode);
+
+		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
+
+		final long initialTime = 0L;
+		final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+			testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+			testHarness.processWatermark(new Watermark(initialTime + 2));
+			testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+		}
+
+		// wait until all async collectors in the buffer have been emitted out.
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.close();
+		}
+
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+		expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+
+		if (AsyncDataStream.OutputMode.ORDERED == mode) {
+			TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
+		}
+		else {
+			Object[] jobOutputQueue = testHarness.getOutput().toArray();
+
+			Assert.assertEquals("Watermark should be at index 2", new Watermark(initialTime + 2), jobOutputQueue[2]);
+			Assert.assertEquals("StreamRecord 3 should be at the end", new StreamRecord<>(6, initialTime + 3), jobOutputQueue[3]);
+
+			TestHarnessUtil.assertOutputEqualsSorted(
+					"Output for StreamRecords does not match",
+					expectedOutput,
+					testHarness.getOutput(),
+					new StreamRecordComparator());
+		}
+	}
+
+	@Test
+	public void testOrdered() throws Exception {
+		testRun(AsyncDataStream.OutputMode.ORDERED);
+	}
+
+	@Test
+	public void testUnordered() throws Exception {
+		testRun(AsyncDataStream.OutputMode.UNORDERED);
+	}
+
+	private void testRun(AsyncDataStream.OutputMode mode) throws Exception {
+		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 6, mode);
+
+		final StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(operator);
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		final long initialTime = 0L;
+		final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+
+		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+		testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+		testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
+		testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
+		testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
+		testHarness.processElement(new StreamRecord<>(7, initialTime + 7));
+		testHarness.processElement(new StreamRecord<>(8, initialTime + 8));
+
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+		expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+		expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+		expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
+		expectedOutput.add(new StreamRecord<>(10, initialTime + 5));
+		expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
+		expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
+		expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
+
+		testHarness.waitForInputProcessing();
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		if (mode == AsyncDataStream.OutputMode.ORDERED) {
+			TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
+		}
+		else {
+			TestHarnessUtil.assertOutputEqualsSorted(
+					"UNORDERED Output was not correct.",
+					expectedOutput,
+					testHarness.getOutput(),
+					new StreamRecordComparator());
+		}
+	}
+
+	private JobVertex createChainedVertex(boolean withLazyFunction) {
+		StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// the input is only used to construct a chained operator, and they will not be used in the real tests.
+		DataStream<Integer> input = chainEnv.fromElements(1, 2, 3);
+
+		if (withLazyFunction) {
+			input = AsyncDataStream.orderedWait(input, new LazyAsyncFunction(), 6);
+		}
+		else {
+			input = AsyncDataStream.orderedWait(input, new MyAsyncFunction(), 6);
+		}
+
+		// the map function is designed to chain after async function. we place an Integer object in it and
+		// it is initialized in the open() method.
+		// it is used to verify that operators in the operator chain should be opened from the tail to the head,
+		// so the result from AsyncWaitOperator can pass down successfully and correctly.
+		// if not, the test can not be passed.
+		input = input.map(new RichMapFunction<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			private Integer initialValue = null;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				initialValue = 1;
+			}
+
+			@Override
+			public Integer map(Integer value) throws Exception {
+				return initialValue + value;
+			}
+		});
+
+		input = AsyncDataStream.unorderedWait(input, new MyAsyncFunction(), 3);
+
+		input.addSink(new SinkFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void invoke(Integer value) throws Exception {
+				sinkResult.add(value);
+			}
+		});
+
+		// be build our own OperatorChain
+		final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
+
+		Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2);
+
+		return jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+	}
+
+	/**
+	 * Get the {@link SubtaskState} for the operator chain. The state will keep several inputs.
+	 *
+	 * @return A {@link SubtaskState}
+	 * @throws Exception
+     */
+	private SubtaskState createTaskState() throws Exception {
+		sinkResult = new ConcurrentLinkedDeque<>();
+
+		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+		JobVertex chainedVertex = createChainedVertex(true);
+
+		testHarness.taskConfig = chainedVertex.getConfiguration();
+
+		final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
+				testHarness.jobConfig,
+				testHarness.taskConfig,
+				testHarness.getExecutionConfig(),
+				testHarness.memorySize,
+				new MockInputSplitProvider(),
+				testHarness.bufferSize);
+
+		final StreamConfig streamConfig = testHarness.getStreamConfig();
+		final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
+		final AsyncWaitOperator<Integer, Integer> headOperator =
+				operatorChainStreamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
+		streamConfig.setStreamOperator(headOperator);
+
+		testHarness.invoke(env);
+		testHarness.waitForTaskRunning();
+
+		testHarness.processElement(new StreamRecord<>(1));
+		testHarness.processElement(new StreamRecord<>(2));
+		testHarness.processElement(new StreamRecord<>(3));
+		testHarness.processElement(new StreamRecord<>(4));
+
+		testHarness.waitForInputProcessing();
+
+		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 1L);
+
+		task.triggerCheckpoint(checkpointMetaData);
+
+		env.getCheckpointLatch().await();
+
+		assertEquals(1L, env.getCheckpointId());
+
+		LazyAsyncFunction.countDown();
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+
+		return env.getCheckpointStateHandles();
+	}
+
+	@Test
+	public void testOperatorChain() throws Exception {
+
+		JobVertex chainedVertex = createChainedVertex(false);
+
+		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+		task.setInitialState(new TaskStateHandles(createTaskState()));
+
+		sinkResult = new ConcurrentLinkedDeque<>();
+
+		testHarness.taskConfig = chainedVertex.getConfiguration();
+
+		final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
+				testHarness.jobConfig,
+				testHarness.taskConfig,
+				testHarness.getExecutionConfig(),
+				testHarness.memorySize,
+				new MockInputSplitProvider(),
+				testHarness.bufferSize);
+
+		final StreamConfig streamConfig = testHarness.getStreamConfig();
+		final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
+		final AsyncWaitOperator<Integer, Integer> headOperator =
+				operatorChainStreamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
+		streamConfig.setStreamOperator(headOperator);
+
+		testHarness.invoke(env);
+		testHarness.waitForTaskRunning();
+
+		testHarness.processElement(new StreamRecord<>(5));
+		testHarness.processElement(new StreamRecord<>(6));
+		testHarness.processElement(new StreamRecord<>(7));
+		testHarness.processElement(new StreamRecord<>(8));
+		testHarness.processElement(new StreamRecord<>(9));
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(6);
+		expectedOutput.add(10);
+		expectedOutput.add(14);
+		expectedOutput.add(18);
+		expectedOutput.add(22);
+		expectedOutput.add(26);
+		expectedOutput.add(30);
+		expectedOutput.add(34);
+		expectedOutput.add(38);
+
+		TestHarnessUtil.assertOutputEqualsSorted(
+				"Test for chained operator with AsyncWaitOperator failed",
+				expectedOutput,
+				sinkResult,
+				new Comparator<Object>() {
+					@Override
+					public int compare(Object o1, Object o2) {
+						return (Integer)o1 - (Integer)o2;
+					}
+				});
+	}
+
+	@Test
+	public void testStateSnapshotAndRestore() throws Exception {
+		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+		AsyncWaitOperator<Integer, Integer> operator =
+				new AsyncWaitOperator<>(new LazyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED);
+
+		final StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(operator);
+
+		final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
+				testHarness.jobConfig,
+				testHarness.taskConfig,
+				testHarness.getExecutionConfig(),
+				testHarness.memorySize,
+				new MockInputSplitProvider(),
+				testHarness.bufferSize);
+
+		testHarness.invoke(env);
+		testHarness.waitForTaskRunning();
+
+		final long initialTime = 0L;
+
+		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+		testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+		testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
+
+		testHarness.waitForInputProcessing();
+
+		final long checkpointId = 1L;
+		final long checkpointTimestamp = 1L;
+
+		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
+
+		task.triggerCheckpoint(checkpointMetaData);
+
+		env.getCheckpointLatch().await();
+
+		assertEquals(checkpointId, env.getCheckpointId());
+
+		LazyAsyncFunction.countDown();
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+
+		// set the operator state from previous attempt into the restored one
+		final OneInputStreamTask<Integer, Integer> restoredTask = new OneInputStreamTask<>();
+		restoredTask.setInitialState(new TaskStateHandles(env.getCheckpointStateHandles()));
+
+		final OneInputStreamTaskTestHarness<Integer, Integer> restoredTaskHarness =
+				new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+		AsyncWaitOperator<Integer, Integer> restoredOperator =
+				new AsyncWaitOperator<>(new MyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED);
+
+		restoredTaskHarness.getStreamConfig().setStreamOperator(restoredOperator);
+
+		restoredTaskHarness.invoke();
+		restoredTaskHarness.waitForTaskRunning();
+
+		restoredTaskHarness.processElement(new StreamRecord<>(5, initialTime + 5));
+		restoredTaskHarness.processElement(new StreamRecord<>(6, initialTime + 6));
+		restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));
+
+		// trigger the checkpoint while processing stream elements
+		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp));
+
+		restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
+
+		restoredTaskHarness.endInput();
+		restoredTaskHarness.waitForTaskCompletion();
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+		expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+		expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+		expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
+		expectedOutput.add(new StreamRecord<>(10, initialTime + 5));
+		expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
+		expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
+		expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
+
+		// remove CheckpointBarrier which is not expected
+		Iterator<Object> iterator = restoredTaskHarness.getOutput().iterator();
+		while (iterator.hasNext()) {
+			if (iterator.next() instanceof CheckpointBarrier) {
+				iterator.remove();
+			}
+		}
+
+		TestHarnessUtil.assertOutputEquals(
+				"StateAndRestored Test Output was not correct.",
+				expectedOutput,
+				restoredTaskHarness.getOutput());
+	}
+
+	private static class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
+		private volatile long checkpointId;
+		private volatile SubtaskState checkpointStateHandles;
+
+		private final OneShotLatch checkpointLatch = new OneShotLatch();
+
+		public long getCheckpointId() {
+			return checkpointId;
+		}
+
+		AcknowledgeStreamMockEnvironment(
+				Configuration jobConfig, Configuration taskConfig,
+				ExecutionConfig executionConfig, long memorySize,
+				MockInputSplitProvider inputSplitProvider, int bufferSize) {
+				super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize);
+		}
+
+
+		@Override
+		public void acknowledgeCheckpoint(
+				CheckpointMetaData checkpointMetaData,
+				SubtaskState checkpointStateHandles) {
+
+			this.checkpointId = checkpointMetaData.getCheckpointId();
+			this.checkpointStateHandles = checkpointStateHandles;
+			checkpointLatch.trigger();
+		}
+
+		public OneShotLatch getCheckpointLatch() {
+			return checkpointLatch;
+		}
+
+		public SubtaskState getCheckpointStateHandles() {
+			return checkpointStateHandles;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index c95a85e..4e405fd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -142,6 +142,8 @@ public class StreamOperatorChainingTest {
 
 		StreamConfig streamConfig = new StreamConfig(configuration);
 
+		System.out.println(streamConfig);
+
 		StreamMap<Integer, Integer> headOperator =
 				streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index b20b3a3..8dc6afa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -264,6 +264,10 @@ public class StreamTaskTestHarness<OUT> {
 		return streamConfig;
 	}
 
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
 	private void shutdownIOManager() throws Exception {
 		this.mockEnv.getIOManager().shutdown();
 		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 7468d9a..86fbaa0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -17,9 +17,11 @@
  */
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
@@ -35,6 +37,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 
 	private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
+	public OneInputStreamOperatorTestHarness(
+			OneInputStreamOperator<IN, OUT> operator,
+			TypeSerializer<IN> typeSerializerIn) throws Exception {
+		this(operator, 1, 1, 0);
+
+		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+	}
+
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
 		this(operator, 1, 1, 0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 3b98d33..ea99fe3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -22,47 +22,27 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.util.MathUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.MathUtils;
+import org.junit.*;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
 
-	private String resultPath1;
-	private String resultPath2;
-	private String expected1;
-	private String expected2;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath1 = tempFolder.newFile().toURI().toString();
-		resultPath2 = tempFolder.newFile().toURI().toString();
-		expected1 = "";
-		expected2 = "";
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected1, resultPath1);
-		compareResultsByLinesInMemory(expected2, resultPath2);
-	}
 
 	/**
 	 * Tests the proper functioning of the streaming fold operator. For this purpose, a stream
@@ -112,6 +92,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 				}
 			});
 
+		final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
+
 		splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
 			private static final long serialVersionUID = 2114608668010092995L;
 
@@ -119,7 +101,10 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
 				return value.f1;
 			}
-		}).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+		}).addSink(sinkFunction1);
+
+
+		final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
 
 		splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
 			private static final long serialVersionUID = 5631104389744681308L;
@@ -128,27 +113,34 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
 				return value.f1;
 			}
-		}).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+		}).addSink(sinkFunction2);
 
-		StringBuilder builder1 = new StringBuilder();
-		StringBuilder builder2 = new StringBuilder();
+		Collection<Integer> expected1 = new ArrayList<>(10);
+		Collection<Integer> expected2 = new ArrayList<>(10);
 		int counter1 = 0;
 		int counter2 = 0;
 
 		for (int i = 0; i < numElements; i++) {
 			if (MathUtils.murmurHash(i) % numKeys == 0) {
 				counter1 += i;
-				builder1.append(counter1 + "\n");
+				expected1.add(counter1);
 			} else {
 				counter2 += i;
-				builder2.append(counter2 + "\n");
+				expected2.add(counter2);
 			}
 		}
 
-		expected1 = builder1.toString();
-		expected2 = builder2.toString();
-
 		env.execute();
+
+		Collection<Integer> result1 = sinkFunction1.getResult();
+		Collections.sort((ArrayList)result1);
+		Collection<Integer> result2 = sinkFunction2.getResult();
+		Collections.sort((ArrayList)result2);
+
+		Assert.assertArrayEquals(result1.toArray(), expected1.toArray());
+		Assert.assertArrayEquals(result2.toArray(), expected2.toArray());
+
+		MemorySinkFunction.clear();
 	}
 
 	/**
@@ -162,6 +154,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
 
+		final MemorySinkFunction sinkFunction = new MemorySinkFunction(0);
+
 		input
 			.keyBy(0)
 			.fold(
@@ -182,17 +176,100 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 					return value.value;
 				}
 			})
-			.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+			.addSink(sinkFunction);
 
-		StringBuilder builder = new StringBuilder();
+		Collection<Integer> expected = new ArrayList<>(10);
 
 		for (int i = 0; i < numElements; i++) {
-			builder.append(42 + i + "\n");
+			expected.add(42 + i );
 		}
 
-		expected1 = builder.toString();
+		env.execute();
+
+		Collection<Integer> result = sinkFunction.getResult();
+		Collections.sort((ArrayList)result);
+
+		Assert.assertArrayEquals(result.toArray(), expected.toArray());
+
+		MemorySinkFunction.clear();
+	}
+
+	@Test
+	public void testAsyncWaitOperator() throws Exception {
+		final int numElements = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
+
+		AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
+			transient ExecutorService executorService;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				executorService = Executors.newFixedThreadPool(numElements);
+			}
+
+			@Override
+			public void close() throws Exception {
+				super.close();
+				executorService.shutdown();
+			}
+
+			@Override
+			public void asyncInvoke(final Tuple2<Integer, NonSerializable> input,
+									final AsyncCollector<Integer> collector) throws Exception {
+				this.executorService.submit(new Runnable() {
+					@Override
+					public void run() {
+						// wait for while to simulate async operation here
+						int sleep = (int) (new Random().nextFloat() * 10);
+						try {
+							Thread.sleep(sleep);
+							List<Integer> ret = new ArrayList<>();
+							ret.add(input.f0+input.f0);
+							collector.collect(ret);
+						}
+						catch (InterruptedException e) {
+							collector.collect(new ArrayList<Integer>(0));
+						}
+					}
+				});
+			}
+		};
+
+		DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
+
+		// save result from ordered process
+		final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
+
+		orderedResult.addSink(sinkFunction1).setParallelism(1);
+
+
+		DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(input, function, 2);
+
+		// save result from unordered process
+		final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
+
+		unorderedResult.addSink(sinkFunction2);
+
+
+		Collection<Integer> expected = new ArrayList<>(10);
+
+		for (int i = 0; i < numElements; i++) {
+			expected.add(i+i);
+		}
 
 		env.execute();
+
+		Assert.assertArrayEquals(expected.toArray(), sinkFunction1.getResult().toArray());
+
+		Collection<Integer> result = sinkFunction2.getResult();
+		Collections.sort((ArrayList)result);
+		Assert.assertArrayEquals(expected.toArray(), result.toArray());
+
+		MemorySinkFunction.clear();
 	}
 
 	private static class NonSerializable {
@@ -247,7 +324,50 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		}
 
 		@Override
+
+
 		public void cancel() {
 		}
 	}
+
+	private static class MemorySinkFunction implements SinkFunction<Integer> {
+		private final static Collection<Integer> collection1 = new ArrayList<>(10);
+
+		private final static Collection<Integer> collection2 = new ArrayList<>(10);
+
+		private  final long serialVersionUID = -8815570195074103860L;
+
+		private final int idx;
+
+		public MemorySinkFunction(int idx) {
+			this.idx = idx;
+		}
+
+		@Override
+		public void invoke(Integer value) throws Exception {
+			if (idx == 0) {
+				synchronized (collection1) {
+					collection1.add(value);
+				}
+			}
+			else {
+				synchronized (collection2) {
+					collection2.add(value);
+				}
+			}
+		}
+
+		public Collection<Integer> getResult() {
+			if (idx == 0) {
+				return collection1;
+			}
+
+			return collection2;
+		}
+
+		public static void clear() {
+			collection1.clear();
+			collection2.clear();
+		}
+	}
 }


Mime
View raw message