flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [45/51] [abbrv] git commit: [streaming] Iterative DataStreams updated with buffer timout and max wait time
Date Mon, 18 Aug 2014 17:26:22 GMT
[streaming] Iterative DataStreams updated with buffer timout and max wait time


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1b31f4d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1b31f4d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1b31f4d1

Branch: refs/heads/master
Commit: 1b31f4d19df1fabc7aaee5837f4d2c3439819495
Parents: 9be9814
Author: gyfora <gyula.fora@gmail.com>
Authored: Wed Aug 6 15:41:07 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  2 +-
 .../flink/streaming/api/StreamConfig.java       | 10 ++++++
 .../streaming/api/datastream/DataStream.java    | 17 ++++++----
 .../api/datastream/DataStreamSink.java          |  4 +--
 .../api/datastream/IterativeDataStream.java     | 34 +++++++++++++++-----
 .../datastream/SingleOutputStreamOperator.java  |  1 +
 .../environment/StreamExecutionEnvironment.java | 20 ++++++++++--
 .../AbstractStreamComponent.java                |  6 ++--
 .../streamcomponent/StreamIterationSink.java    | 20 +++++++++---
 .../streamcomponent/StreamIterationSource.java  | 15 +++++++--
 .../apache/flink/streaming/api/IterateTest.java | 13 +++-----
 11 files changed, 104 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 7a10246..9280661 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -643,4 +643,4 @@ public class JobGraphBuilder {
 		return jobGraph;
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 6fe9878..da1189e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -60,6 +60,7 @@ public class StreamConfig {
 	// STRINGS
 
 	private static final String MUTABILITY = "isMutable";
+	private static final String ITERATON_WAIT = "iterationWait";
 
 	private Configuration config;
 
@@ -196,6 +197,14 @@ public class StreamConfig {
 		return config.getString(ITERATION_ID, "iteration-0");
 	}
 
+	public void setIterationWaitTime(long time) {
+		config.setLong(ITERATON_WAIT, time);
+	}
+
+	public long getIterationWaitTime() {
+		return config.getLong(ITERATON_WAIT, 0);
+	}
+
 	public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) {
 		config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels);
 	}
@@ -267,4 +276,5 @@ public class StreamConfig {
 			ClassNotFoundException {
 		return (T) SerializationUtils.deserialize(serializedObject);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d15eaa5..7cab2df 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -169,7 +169,7 @@ public abstract class DataStream<OUT> {
 	}
 
 	/**
-	 * Creates a new {@link CoDataStream} bye connecting {@link DataStream}
+	 * Creates a new {@link CoDataStream} by connecting {@link DataStream}
 	 * outputs of different type with each other. The DataStreams connected
 	 * using this operators can be used with CoFunctions.
 	 * 
@@ -676,9 +676,13 @@ public abstract class DataStream<OUT> {
 	 * To direct tuples to the iteration head or the output specifically one can
 	 * use the {@code split(OutputSelector)} on the iteration tail while
 	 * referencing the iteration head as 'iterate'.
-	 * 
+	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head.
+	 * <p>
+	 * By default a DataStream with iteration will never terminate, but the user
+	 * can use the {@link IterativeDataStream#setMaxWaitTime} call to set a max
+	 * waiting time for the iteration.
 	 * 
 	 * @return The iterative data stream created.
 	 */
@@ -686,12 +690,12 @@ public abstract class DataStream<OUT> {
 		return new IterativeDataStream<OUT>(this);
 	}
 
-	protected <R> DataStream<OUT> addIterationSource(String iterationID) {
+	protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime)
{
 
 		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
 
 		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
-				degreeOfParallelism);
+				degreeOfParallelism, waitTime);
 
 		return this.copy();
 	}
@@ -730,8 +734,9 @@ public abstract class DataStream<OUT> {
 		connectGraph(inputStream, returnStream.getId(), 0);
 
 		if (inputStream instanceof IterativeDataStream) {
-			returnStream.addIterationSource(((IterativeDataStream<OUT>) inputStream).iterationID
-					.toString());
+			IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream;
+			returnStream.addIterationSource(iterativeStream.iterationID.toString(),
+					iterativeStream.waitTime);
 		}
 
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index ee6502f..1cc6ff2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  * @param <IN>
  *            The type of the DataStream closed by the sink.
  */
-public class DataStreamSink<IN> extends DataStream<IN> {
+public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>>
{
 
 	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) {
 		super(environment, operatorType);
@@ -38,7 +38,7 @@ public class DataStreamSink<IN> extends DataStream<IN> {
 	}
 
 	@Override
-	protected DataStream<IN> copy() {
+	protected DataStreamSink<IN> copy() {
 		throw new RuntimeException("Data stream sinks cannot be copied");
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index bdadee4..f1cb13c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -31,20 +31,25 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
  * @param <IN>
  *            Type of the DataStream
  */
-public class IterativeDataStream<IN> extends SingleOutputStreamOperator<IN, IterativeDataStream<IN>>
{
+public class IterativeDataStream<IN> extends
+		SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
 
 	static Integer iterationCount = 0;
 	protected Integer iterationID;
+	protected long waitTime;
 
 	protected IterativeDataStream(DataStream<IN> dataStream) {
 		super(dataStream);
+		setBufferTimeout(dataStream.environment.getBufferTimeout());
 		iterationID = iterationCount;
 		iterationCount++;
+		waitTime = 0;
 	}
 
-	protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID) {
+	protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, long
waitTime) {
 		super(dataStream);
 		this.iterationID = iterationID;
+		this.waitTime = waitTime;
 	}
 
 	/**
@@ -80,18 +85,17 @@ public class IterativeDataStream<IN> extends SingleOutputStreamOperator<IN,
Iter
 		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
 
 		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
-				iterationID.toString(), iterationTail.getParallelism());
+				iterationID.toString(), iterationTail.getParallelism(), waitTime);
 
-		jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
-				iterationTail.getParallelism());
+		jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());
 
 		List<String> name = Arrays.asList(new String[] { iterationName });
 
 		if (iterationTail instanceof ConnectedDataStream) {
 			for (DataStream<IN> stream : ((ConnectedDataStream<IN>) iterationTail).connectedStreams)
{
 				String inputID = stream.getId();
-				jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<IN>(),
-						0, name);
+				jobGraphBuilder.setEdge(inputID, returnStream.getId(),
+						new ForwardPartitioner<IN>(), 0, name);
 			}
 		} else {
 
@@ -102,8 +106,22 @@ public class IterativeDataStream<IN> extends SingleOutputStreamOperator<IN,
Iter
 		return iterationTail;
 	}
 
+	/**
+	 * Sets the max waiting time for the next record before shutting down the
+	 * stream. If not set, then the user needs to manually kill the process to
+	 * stop.
+	 * 
+	 * @param waitTimeMillis
+	 *            Max waiting time in milliseconds
+	 * @return The modified DataStream.
+	 */
+	public IterativeDataStream<IN> setMaxWaitTime(long waitTimeMillis) {
+		this.waitTime = waitTimeMillis;
+		return this;
+	}
+
 	@Override
 	protected IterativeDataStream<IN> copy() {
-		return new IterativeDataStream<IN>(this, iterationID);
+		return new IterativeDataStream<IN>(this, iterationID, waitTime);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index f798563..6d660b1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -38,6 +38,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, String operatorType)
{
 		super(environment, operatorType);
+		setBufferTimeout(environment.getBufferTimeout());
 	}
 
 	protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3773d8e..c357424 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -60,6 +60,8 @@ public abstract class StreamExecutionEnvironment {
 
 	private int executionParallelism = -1;
 
+	private long buffertimeout = 0;;
+
 	protected JobGraphBuilder jobGraphBuilder;
 
 	// --------------------------------------------------------------------------------------------
@@ -110,6 +112,21 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the maximum time frequency (ms) for the flushing of the output
+	 * buffers. By default the output buffers flush only when they are full.
+	 * 
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 */
+	public void setBufferTimeout(long timeoutMillis) {
+		this.buffertimeout = timeoutMillis;
+	}
+
+	public long getBufferTimeout() {
+		return this.buffertimeout;
+	}
+
+	/**
 	 * Sets the number of hardware contexts (CPU cores / threads) used when
 	 * executed in {@link LocalStreamEnvironment}.
 	 * 
@@ -178,8 +195,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the elements.
 	 */
 	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT...
data) {
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(
-				this, "elements");
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 76dee5d..e2e8816 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -64,6 +64,7 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable
{
 	protected boolean isMutable;
 	protected Object function;
 	protected String functionName;
+	protected long bufferTimeout;
 
 	protected static int newComponent() {
 		numComponents++;
@@ -115,7 +116,8 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable
{
 		setCollector();
 
 		int numberOfOutputs = configuration.getNumberOfOutputs();
-
+		bufferTimeout= configuration.getBufferTimeout();
+		
 		for (int i = 0; i < numberOfOutputs; i++) {
 			setPartitioner(i, outputs);
 		}
@@ -130,8 +132,6 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable
{
 
 			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 
-			long bufferTimeout = configuration.getBufferTimeout();
-
 			if (bufferTimeout > 0) {
 				output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(this,
 						outputPartitioner, bufferTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 436ebbf..5586887 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -38,6 +38,8 @@ public class StreamIterationSink<IN extends Tuple> extends
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
+	private long iterationWaitTime;
+	private boolean shouldWait;
 
 	public StreamIterationSink() {
 	}
@@ -51,6 +53,8 @@ public class StreamIterationSink<IN extends Tuple> extends
 			inputIter = createInputIterator(inputs, inputSerializer);
 
 			iterationId = configuration.getIterationId();
+			iterationWaitTime = configuration.getIterationWaitTime();
+			shouldWait = iterationWaitTime > 0;
 			dataChannel = BlockingQueueBroker.instance().get(iterationId);
 
 		} catch (Exception e) {
@@ -73,22 +77,30 @@ public class StreamIterationSink<IN extends Tuple> extends
 	}
 
 	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inputSerializer.createInstance().setId(0);
+		StreamRecord<IN> reuse = inputSerializer.createInstance();
 		while ((reuse = inputIter.next(reuse)) != null) {
-			pushToQueue(reuse);
+			if (!pushToQueue(reuse)) {
+				break;
+			}
 			// TODO: Fix object reuse for iteration
 			reuse = inputSerializer.createInstance();
 		}
 	}
 
-	private void pushToQueue(StreamRecord<IN> record) {
+	private boolean pushToQueue(StreamRecord<IN> record) {
 		try {
-			dataChannel.offer(record, 5, TimeUnit.MILLISECONDS);
+			if (shouldWait) {
+				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+			} else {
+				dataChannel.put(record);
+				return true;
+			}
 		} catch (InterruptedException e) {
 			if (LOG.isErrorEnabled()) {
 				LOG.error(String.format("Pushing back record at iteration %s failed due to: %s",
 						iterationId, StringUtils.stringifyException(e)));
 			}
+			return false;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 67d5066..2514eb0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -42,6 +42,8 @@ public class StreamIterationSource<OUT extends Tuple> extends
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
+	private long iterationWaitTime;
+	private boolean shouldWait;
 
 	@SuppressWarnings("rawtypes")
 	public StreamIterationSource() {
@@ -56,13 +58,15 @@ public class StreamIterationSource<OUT extends Tuple> extends
 	public void setInputsOutputs() {
 		try {
 			setConfigOutputs(outputs);
-			setSinkSerializer();
 		} catch (StreamComponentException e) {
 			e.printStackTrace();
 			throw new StreamComponentException("Cannot register outputs", e);
 		}
 
 		iterationId = configuration.getIterationId();
+		iterationWaitTime = configuration.getIterationWaitTime();
+		shouldWait = iterationWaitTime > 0;
+
 		try {
 			BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
 		} catch (Exception e) {
@@ -71,6 +75,7 @@ public class StreamIterationSource<OUT extends Tuple> extends
 
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
@@ -80,10 +85,14 @@ public class StreamIterationSource<OUT extends Tuple> extends
 		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output :
outputs) {
 			output.initializeSerializers();
 		}
+		StreamRecord<OUT> nextRecord;
 
 		while (true) {
-			@SuppressWarnings("unchecked")
-			StreamRecord<OUT> nextRecord = dataChannel.poll(3, TimeUnit.SECONDS);
+			if (shouldWait) {
+				nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
+			} else {
+				nextRecord = dataChannel.take();
+			}
 			if (nextRecord == null) {
 				break;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b31f4d1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 5872da9..9498b8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -21,9 +21,6 @@ package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -84,13 +81,11 @@ public class IterateTest {
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
-		List<Boolean> bl = new ArrayList<Boolean>();
-		for (int i = 0; i < 100000; i++) {
-			bl.add(false);
-		}
-		DataStream<Boolean> source = env.fromCollection(bl);
+		env.setBufferTimeout(10);
+
+		DataStream<Boolean> source = env.fromElements(false, false, false);
 
-		IterativeDataStream<Boolean> iteration = source.iterate();
+		IterativeDataStream<Boolean> iteration = source.iterate().setMaxWaitTime(3000);
 
 		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(
 				new IterationTail());


Mime
View raw message