flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [18/20] flink git commit: [FLINK-1539] [streaming] Remove calls to uninitalized runtimecontexts
Date Mon, 16 Feb 2015 14:25:44 GMT
[FLINK-1539] [streaming] Remove calls to uninitalized runtimecontexts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44702075
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44702075
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44702075

Branch: refs/heads/master
Commit: 4470207501ff318bd4d59bad58a5f3fc4ccbfa6f
Parents: bb5dc7e
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Feb 13 19:57:55 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 11 ++++----
 .../api/datastream/WindowedDataStream.java      | 12 ++++-----
 .../aggregation/ComparableAggregator.java       | 14 +++++-----
 .../api/function/aggregation/SumAggregator.java | 10 +++++---
 .../api/function/source/FileSourceFunction.java | 15 +++--------
 .../api/invokable/StreamInvokable.java          |  5 ++--
 .../invokable/operator/ProjectInvokable.java    |  4 ++-
 .../api/invokable/operator/co/CoInvokable.java  |  3 +--
 .../api/streamvertex/CoStreamVertex.java        |  2 +-
 .../api/streamvertex/StreamTaskContext.java     |  5 +++-
 .../api/streamvertex/StreamVertex.java          |  2 +-
 .../windowing/windowbuffer/WindowBuffer.java    |  4 +--
 .../flink/streaming/util/MockCoContext.java     |  7 ++++-
 .../flink/streaming/util/MockContext.java       |  7 ++++-
 .../src/test/resources/log4j-test.properties    | 10 +++++++-
 .../src/test/resources/log4j.properties         | 27 ++++++++++++++++++++
 16 files changed, 92 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e766626..71a97f8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -636,7 +636,8 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(),
+				getExecutionConfig()));
 	}
 
 	/**
@@ -667,7 +668,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(String field) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
-				false));
+				false, getExecutionConfig()));
 	}
 
 	/**
@@ -698,7 +699,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(String field) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
-				false));
+				false, getExecutionConfig()));
 	}
 
 	/**
@@ -718,7 +719,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(),
-				AggregationType.MINBY, first));
+				AggregationType.MINBY, first, getExecutionConfig()));
 	}
 
 	/**
@@ -738,7 +739,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(),
-				AggregationType.MAXBY, first));
+				AggregationType.MAXBY, first, getExecutionConfig()));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 3ff5859..4da12ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -381,7 +381,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public WindowedDataStream<OUT> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(),
+				getExecutionConfig()));
 	}
 
 	/**
@@ -411,7 +412,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public WindowedDataStream<OUT> min(String field) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
-				false));
+				false, getExecutionConfig()));
 	}
 
 	/**
@@ -475,7 +476,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public WindowedDataStream<OUT> minBy(String field, boolean first) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(),
-				AggregationType.MINBY, first));
+				AggregationType.MINBY, first, getExecutionConfig()));
 	}
 
 	/**
@@ -505,7 +506,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public WindowedDataStream<OUT> max(String field) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
-				false));
+				false, getExecutionConfig()));
 	}
 
 	/**
@@ -569,11 +570,10 @@ public class WindowedDataStream<OUT> {
 	 */
 	public WindowedDataStream<OUT> maxBy(String field, boolean first) {
 		return aggregate(ComparableAggregator.getAggregator(field, getType(),
-				AggregationType.MAXBY, first));
+				AggregationType.MAXBY, first, getExecutionConfig()));
 	}
 
 	private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator)
{
-
 		return reduceWindow(aggregator);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
index 7f7cf0b..66da931 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -65,9 +66,10 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T>
{
 	}
 
 	public static <R> AggregationFunction<R> getAggregator(String field,
-			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
+			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first,
+			ExecutionConfig config) {
 
-		return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first);
+		return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first, config);
 	}
 
 	private static class TupleComparableAggregator<T> extends ComparableAggregator<T>
{
@@ -177,7 +179,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T>
{
 		PojoComparator<T> pojoComparator;
 
 		public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
-				AggregationType aggregationType, boolean first) {
+				AggregationType aggregationType, boolean first, ExecutionConfig config) {
 			super(0, aggregationType, first);
 			if (!(typeInfo instanceof CompositeType<?>)) {
 				throw new IllegalArgumentException(
@@ -193,7 +195,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T>
{
 
 			if (cType instanceof PojoTypeInfo) {
 				pojoComparator = (PojoComparator<T>) cType.createComparator(
-						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
+						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
 			} else {
 				throw new IllegalArgumentException(
 						"Key expressions are only supported on POJO types. "
@@ -225,8 +227,8 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T>
{
 			} else {
 				if (c == 1) {
 					keyFields[0].set(value2, field1);
-				} 
-				
+				}
+
 				return value2;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
index 74e4597..20d4450 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -47,9 +48,10 @@ public abstract class SumAggregator {
 
 	}
 
-	public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T>
typeInfo) {
+	public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T>
typeInfo,
+			ExecutionConfig config) {
 
-		return new PojoSumAggregator<T>(field, typeInfo);
+		return new PojoSumAggregator<T>(field, typeInfo, config);
 	}
 
 	private static class TupleSumAggregator<T> extends AggregationFunction<T> {
@@ -126,7 +128,7 @@ public abstract class SumAggregator {
 		SumFunction adder;
 		PojoComparator<T> comparator;
 
-		public PojoSumAggregator(String field, TypeInformation<?> type) {
+		public PojoSumAggregator(String field, TypeInformation<?> type, ExecutionConfig config)
{
 			super(0);
 			if (!(type instanceof CompositeType<?>)) {
 				throw new IllegalArgumentException(
@@ -146,7 +148,7 @@ public abstract class SumAggregator {
 
 			if (cType instanceof PojoTypeInfo) {
 				comparator = (PojoComparator<T>) cType.createComparator(
-						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
+						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
 			} else {
 				throw new IllegalArgumentException(
 						"Key expressions are only supported on POJO types. "

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 6d1441a..20f5f56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -23,8 +23,6 @@ import java.util.NoSuchElementException;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -38,17 +36,11 @@ public class FileSourceFunction extends RichSourceFunction<String>
{
 
 	private InputFormat<String, ?> inputFormat;
 
-	private TypeSerializerFactory<String> serializerFactory;
+	private TypeInformation<String> typeInfo;
 
 	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String>
typeInfo) {
 		this.inputFormat = format;
-		this.serializerFactory = createSerializer(typeInfo);
-	}
-
-	private TypeSerializerFactory<String> createSerializer(TypeInformation<String>
typeInfo) {
-		TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
-		return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass());
+		this.typeInfo = typeInfo;
 	}
 
 	@Override
@@ -60,7 +52,8 @@ public class FileSourceFunction extends RichSourceFunction<String>
{
 
 	@Override
 	public void invoke(Collector<String> collector) throws Exception {
-		final TypeSerializer<String> serializer = serializerFactory.getSerializer();
+		final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
+				.getExecutionConfig());
 		final Iterator<InputSplit> splitIterator = getInputSplits();
 		@SuppressWarnings("unchecked")
 		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>)
this.inputFormat;

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 7feeac8..733edc7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -73,9 +73,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable
{
 	 * 
 	 * @param taskContext
 	 *            StreamTaskContext representing the vertex
-	 * @param executionConfig
 	 */
-	public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig)
{
+	public void setup(StreamTaskContext<OUT> taskContext) {
 		this.collector = taskContext.getOutputCollector();
 		this.recordIterator = taskContext.getIndexedInput(0);
 		this.inSerializer = taskContext.getInputSerializer(0);
@@ -84,7 +83,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable
{
 			this.objectSerializer = inSerializer.getObjectSerializer();
 		}
 		this.taskContext = taskContext;
-		this.executionConfig = executionConfig;
+		this.executionConfig = taskContext.getExecutionConfig();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 69c7cee..31689c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -28,6 +28,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 
 	transient OUT outTuple;
 	TypeSerializer<OUT> outTypeSerializer;
+	TypeInformation<OUT> outTypeInformation;
 	int[] fields;
 	int numFields;
 
@@ -35,7 +36,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 		super(null);
 		this.fields = fields;
 		this.numFields = this.fields.length;
-		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
+		this.outTypeInformation = outTypeInformation;
 	}
 
 	@Override
@@ -56,6 +57,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
+		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
 		outTuple = outTypeSerializer.createInstance();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 9f98db3..604873e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -47,7 +46,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1,
OU
 	protected TypeSerializer<IN2> serializer2;
 
 	@Override
-	public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig)
{
+	public void setup(StreamTaskContext<OUT> taskContext) {
 		this.collector = taskContext.getOutputCollector();
 
 		this.recordIterator = taskContext.getCoReader();

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 7b6e75e..de4660a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -69,7 +69,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1,
OUT> {
 	@Override
 	protected void setInvokable() {
 		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this, getExecutionConfig());
+		userInvokable.setup(this);
 	}
 
 	protected void setConfigInputs() throws StreamVertexException {

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
index 665decd..1c904ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
@@ -32,7 +33,7 @@ public interface StreamTaskContext<OUT> {
 	ClassLoader getUserCodeClassLoader();
 
 	<X> MutableObjectIterator<X> getInput(int index);
-	
+
 	<X> IndexedReaderIterator<X> getIndexedInput(int index);
 
 	<X> StreamRecordSerializer<X> getInputSerializer(int index);
@@ -40,4 +41,6 @@ public interface StreamTaskContext<OUT> {
 	Collector<OUT> getOutputCollector();
 
 	<X, Y> CoReaderIterator<X, Y> getCoReader();
+
+	ExecutionConfig getExecutionConfig();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 91ffec1..5033357 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -98,7 +98,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 
 	protected void setInvokable() {
 		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this, getExecutionConfig());
+		userInvokable.setup(this);
 	}
 
 	public String getName() {

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index 59abcd6..1a45194 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
-public interface WindowBuffer<T> extends Serializable {
+public interface WindowBuffer<T> extends Serializable, Cloneable {
 
 	public void store(T element) throws Exception;
 
@@ -31,7 +31,7 @@ public interface WindowBuffer<T> extends Serializable {
 	public boolean emitWindow(Collector<StreamWindow<T>> collector);
 
 	public int size();
-	
+
 	public WindowBuffer<T> clone();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index a0d08f1..98a6a8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -156,7 +156,7 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT>
{
 	public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1,
IN2, OUT> invokable,
 			List<IN1> input1, List<IN2> input2) {
 		MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1,
input2);
-		invokable.setup(mockContext, new ExecutionConfig());
+		invokable.setup(mockContext);
 
 		try {
 			invokable.open(null);
@@ -222,4 +222,9 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT>
{
 				"Indexed iterator is currently unsupported for connected streams.");
 	}
 
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return new ExecutionConfig();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index f3d977f..03038b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -112,7 +112,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT>
{
 	public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT>
invokable,
 			List<IN> inputs) {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-		invokable.setup(mockContext, new ExecutionConfig());
+		invokable.setup(mockContext);
 		try {
 			invokable.open(null);
 			invokable.invoke();
@@ -170,4 +170,9 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT>
{
 		return (IndexedReaderIterator<X>) iterator;
 	}
 
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return new ExecutionConfig();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
index 2fb9345..0b686e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
@@ -16,4 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file


Mime
View raw message