flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/2] git commit: [streaming] Enhanced BufferTimeout functionality for StreamRecordWriter
Date Tue, 04 Nov 2014 22:22:43 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 233161b25 -> 5e9a45491


[streaming] Enhanced BufferTimeout functionality for StreamRecordWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5e9a4549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5e9a4549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5e9a4549

Branch: refs/heads/master
Commit: 5e9a4549161c45c85434347d6a2f256953e9883b
Parents: 58f7d30
Author: mbalassi <balassi.marton@gmail.com>
Authored: Tue Nov 4 12:14:33 2014 +0100
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 31 +++++++++++++-------
 .../api/streamvertex/OutputHandler.java         | 26 +++++++++++-----
 .../flink/streaming/io/StreamRecordWriter.java  | 14 +++++++--
 3 files changed, 50 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4d34217..4b7afbb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -59,7 +59,7 @@ public abstract class StreamExecutionEnvironment {
 
 	private int executionParallelism = -1;
 
-	private long buffertimeout = 0;;
+	private long bufferTimeout = 100;
 
 	protected JobGraphBuilder jobGraphBuilder;
 
@@ -112,23 +112,34 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the maximum time frequency (ms) for the flushing of the output
-	 * buffers. By default the output buffers flush only when they are full.
+	 * Sets the maximum time frequency (milliseconds) for the flushing of the
+	 * output buffers. By default the output buffers flush frequently to provide
+	 * low latency and to aid smooth developer experience. Setting the parameter
+	 * can result in three logical modes:
+	 * 
+	 * <ul>
+	 * <li>
+	 * A positive integer triggers flushing periodically by that integer</li>
+	 * <li>
+	 * 0 triggers flushing after every record thus minimizing latency</li>
+	 * <li>
+	 * -1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
+	 * </ul>
 	 * 
 	 * @param timeoutMillis
 	 *            The maximum time between two output flushes.
 	 */
 	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
-		if (timeoutMillis < 0) {
-			throw new IllegalArgumentException("Timeout of buffer must be non-negative");
+		if (timeoutMillis < -1 ) {
+			throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
 		}
 
-		this.buffertimeout = timeoutMillis;
+		this.bufferTimeout = timeoutMillis;
 		return this;
 	}
 
 	public long getBufferTimeout() {
-		return this.buffertimeout;
+		return this.bufferTimeout;
 	}
 
 	/**
@@ -465,10 +476,10 @@ public abstract class StreamExecutionEnvironment {
 	 * the program that have resulted in a "sink" operation. Sink operations are
 	 * for example printing results or forwarding them to a message queue.
 	 * <p>
-	 * The program execution will be logged and displayed with the provided
-	 * name
+	 * The program execution will be logged and displayed with the provided name
 	 * 
-	 * @param jobName Desired name of the job
+	 * @param jobName
+	 *            Desired name of the job
 	 * 
 	 * @throws Exception
 	 **/

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index d3f75dd..8b72195 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 public class OutputHandler<OUT> {
 	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
 
-	private StreamVertex<?,OUT> streamVertex;
+	private StreamVertex<?, OUT> streamVertex;
 	private StreamConfig configuration;
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
outputs;
@@ -50,7 +50,7 @@ public class OutputHandler<OUT> {
 	StreamRecordSerializer<OUT> outSerializer = null;
 	SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
 
-	public OutputHandler(StreamVertex<?,OUT> streamComponent) {
+	public OutputHandler(StreamVertex<?, OUT> streamComponent) {
 		this.streamVertex = streamComponent;
 		this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
@@ -119,12 +119,22 @@ public class OutputHandler<OUT> {
 
 		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 
-		if (bufferTimeout > 0) {
-			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
-					streamVertex, outputPartitioner, bufferTimeout);
+		if (bufferTimeout >= 0) {
+			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
+					outputPartitioner, bufferTimeout);
+
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
+						bufferTimeout, streamVertex.getClass().getSimpleName());
+			}
+
 		} else {
 			output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
 					outputPartitioner);
+
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("RecordWriter initiated for {}", streamVertex.getClass().getSimpleName());
+			}
 		}
 
 		outputs.add(output);
@@ -136,8 +146,8 @@ public class OutputHandler<OUT> {
 		}
 
 		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
-					.getSimpleName(), outputNumber);
+			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
+					.getSimpleName(), outputNumber, streamVertex.getClass().getSimpleName());
 		}
 	}
 
@@ -155,7 +165,7 @@ public class OutputHandler<OUT> {
 
 	long startTime;
 
-	public void invokeUserFunction(String componentTypeName, StreamInvokable<?,OUT> userInvokable)
+	public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
 			throws IOException, InterruptedException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("{} {} invoked with instance id {}", componentTypeName,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 87fd7cd..1237020 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -46,12 +46,12 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
 	// -----------------------------------------------------------------------------------------------------------------
 
 	public StreamRecordWriter(AbstractInvokable invokable) {
-		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+		this(invokable, new RoundRobinChannelSelector<T>(), 100);
 	}
 
 	public StreamRecordWriter(AbstractInvokable invokable,
 			ChannelSelector<T> channelSelector) {
-		this(invokable, channelSelector, 1000);
+		this(invokable, channelSelector, 100);
 	}
 
 	public StreamRecordWriter(AbstractInvokable invokable,
@@ -73,8 +73,12 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
 		this.serializers = new RecordSerializer[numChannels];
 		for (int i = 0; i < this.numChannels; i++) {
 			this.serializers[i] = new SpanningRecordSerializer<T>();
+		}
+		
+		//start a separate thread to handle positive flush intervals
+		if (timeout > 0) {
+			(new OutputFlusher()).start();
 		}
-		(new OutputFlusher()).start();
 	}
 
 	@Override
@@ -99,6 +103,10 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
 									.getBufferSize());
 					result = serializer.setNextBuffer(buffer);
 				}
+			}
+			
+			if (timeout == 0){
+				flush();
 			}
 		}
 	}


Mime
View raw message