flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] incubator-flink git commit: [FLINK-1221] Use the source line as the default operator name
Date Thu, 13 Nov 2014 10:22:46 GMT
[FLINK-1221] Use the source line as the default operator name

This closes #197.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/818ebda0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/818ebda0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/818ebda0

Branch: refs/heads/master
Commit: 818ebda0f4d0070499446d2958532af5addc3a17
Parents: b253cb2
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Nov 11 17:30:09 2014 +0100
Committer: uce <uce@apache.org>
Committed: Thu Nov 13 11:21:27 2014 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/Operator.java    |   4 +-
 .../flink/api/common/operators/Union.java       |   9 +-
 .../java/org/apache/flink/api/java/DataSet.java |  43 +++---
 .../flink/api/java/ExecutionEnvironment.java    |  33 ++--
 .../java/org/apache/flink/api/java/Utils.java   |  35 +++++
 .../org/apache/flink/api/java/io/CsvReader.java |  54 +++----
 .../api/java/operators/AggregateOperator.java   |  14 +-
 .../api/java/operators/CoGroupOperator.java     |  11 +-
 .../flink/api/java/operators/CrossOperator.java |  16 +-
 .../flink/api/java/operators/DataSource.java    |  15 +-
 .../api/java/operators/DistinctOperator.java    |   8 +-
 .../api/java/operators/FilterOperator.java      |  10 +-
 .../api/java/operators/FlatMapOperator.java     |   7 +-
 .../api/java/operators/GroupReduceOperator.java |  10 +-
 .../flink/api/java/operators/JoinOperator.java  |  24 +--
 .../flink/api/java/operators/MapOperator.java   |   8 +-
 .../java/operators/MapPartitionOperator.java    |   7 +-
 .../api/java/operators/PartitionOperator.java   |  10 +-
 .../api/java/operators/ReduceOperator.java      |  10 +-
 .../api/java/operators/SortedGrouping.java      |   3 +-
 .../flink/api/java/operators/UnionOperator.java |   8 +-
 .../api/java/operators/UnsortedGrouping.java    |  22 ++-
 .../flink/api/java/tuple/TupleGenerator.java    |  12 +-
 .../flink/api/java/operators/NamesTest.java     | 149 +++++++++++++++++++
 .../org/apache/flink/api/scala/DataSet.scala    |  90 ++++++++---
 .../flink/api/scala/ExecutionEnvironment.scala  |  23 ++-
 .../apache/flink/api/scala/GroupedDataSet.scala |  10 +-
 .../apache/flink/api/scala/coGroupDataSet.scala |  12 +-
 .../apache/flink/api/scala/crossDataSet.scala   |   9 +-
 .../apache/flink/api/scala/joinDataSet.scala    |  15 +-
 .../org/apache/flink/api/scala/package.scala    |   7 +
 .../api/scala/ScalaAPICompletenessTest.scala    |   6 +-
 32 files changed, 512 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 765aa73..85b352a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -244,7 +244,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 		}
 
 		// Otherwise construct union cascade
-		Union<T> lastUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type));
+		Union<T> lastUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type), "<unknown>");
 
 		int i;
 		if (input2[0] == null) {
@@ -263,7 +263,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 			i = 2;
 		}
 		for (; i < input2.length; i++) {
-			Union<T> tmpUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type));
+			Union<T> tmpUnion = new Union<T>(new BinaryOperatorInformation<T, T, T>(type, type, type), "<unknown>");
 			tmpUnion.setSecondInput(lastUnion);
 			if (input2[i] == null) {
 				throw new IllegalArgumentException("The input may not contain null elements.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index fb8626d..d7d0e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -30,19 +30,18 @@ import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
  */
 public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> {
 	
-	private final static String NAME = "Union";
 	
 	/** 
 	 * Creates a new Union operator.
 	 */
-	public Union(BinaryOperatorInformation<T, T, T> operatorInfo) {
+	public Union(BinaryOperatorInformation<T, T, T> operatorInfo, String unionLocationName) {
 		// we pass it an AbstractFunction, because currently all operators expect some form of UDF
-		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, NAME);
+		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, "Union at "+unionLocationName);
 	}
 	
-	public Union(Operator<T> input1, Operator<T> input2) {
+	public Union(Operator<T> input1, Operator<T> input2, String unionLocationName) {
 		this(new BinaryOperatorInformation<T, T, T>(input1.getOperatorInfo().getOutputType(),
-				input1.getOperatorInfo().getOutputType(), input1.getOperatorInfo().getOutputType()));
+				input1.getOperatorInfo().getOutputType(), input1.getOperatorInfo().getOutputType()), unionLocationName);
 		setFirstInput(input1);
 		setSecondInput(input2);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 7b0752c..c78cc7a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -148,7 +148,7 @@ public abstract class DataSet<T> {
 
 		TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());
 
-		return new MapOperator<T, R>(this, resultType, mapper);
+		return new MapOperator<T, R>(this, resultType, mapper, Utils.getCallLocationName());
 	}
 
 
@@ -176,7 +176,7 @@ public abstract class DataSet<T> {
 			throw new NullPointerException("MapPartition function must not be null.");
 		}
 		TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType());
-		return new MapPartitionOperator<T, R>(this, resultType, mapPartition);
+		return new MapPartitionOperator<T, R>(this, resultType, mapPartition, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -197,7 +197,7 @@ public abstract class DataSet<T> {
 		}
 
 		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
-		return new FlatMapOperator<T, R>(this, resultType, flatMapper);
+		return new FlatMapOperator<T, R>(this, resultType, flatMapper, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -217,7 +217,7 @@ public abstract class DataSet<T> {
 		if (filter == null) {
 			throw new NullPointerException("Filter function must not be null.");
 		}
-		return new FilterOperator<T>(this, filter);
+		return new FilterOperator<T>(this, filter, Utils.getCallLocationName());
 	}
 
 	
@@ -267,7 +267,7 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public AggregateOperator<T> aggregate(Aggregations agg, int field) {
-		return new AggregateOperator<T>(this, agg, field);
+		return new AggregateOperator<T>(this, agg, field, Utils.getCallLocationName());
 	}
 
 	/**
@@ -320,7 +320,7 @@ public abstract class DataSet<T> {
 		if (reducer == null) {
 			throw new NullPointerException("Reduce function must not be null.");
 		}
-		return new ReduceOperator<T>(this, reducer);
+		return new ReduceOperator<T>(this, reducer, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -341,7 +341,7 @@ public abstract class DataSet<T> {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
-		return new GroupReduceOperator<T, R>(this, resultType, reducer);
+		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
 	}
 
 /**
@@ -362,7 +362,7 @@ public abstract class DataSet<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMinFunction(
-				(TupleTypeInfo) this.type, fields));
+				(TupleTypeInfo) this.type, fields), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -383,7 +383,7 @@ public abstract class DataSet<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMaxFunction(
-				(TupleTypeInfo) this.type, fields));
+				(TupleTypeInfo) this.type, fields), Utils.getCallLocationName());
 	}
 
 	/**
@@ -415,7 +415,7 @@ public abstract class DataSet<T> {
 	 */
 	public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
 		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
+		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -430,7 +430,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct(int... fields) {
-		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true));
+		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType(), true), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -444,7 +444,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct(String... fields) {
-		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
+		return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -455,7 +455,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct() {
-		return new DistinctOperator<T>(this, null);
+		return new DistinctOperator<T>(this, null, Utils.getCallLocationName());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -698,7 +698,7 @@ public abstract class DataSet<T> {
 	 * @see Tuple2
 	 */
 	public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other) {
-		return new CrossOperator.DefaultCross<T, R>(this, other);
+		return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -728,7 +728,7 @@ public abstract class DataSet<T> {
 	 * @see Tuple2
 	 */
 	public <R> CrossOperator.DefaultCross<T, R> crossWithTiny(DataSet<R> other) {
-		return new CrossOperator.DefaultCross<T, R>(this, other);
+		return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -758,7 +758,7 @@ public abstract class DataSet<T> {
 	 * @see Tuple2
 	 */
 	public <R> CrossOperator.DefaultCross<T, R> crossWithHuge(DataSet<R> other) {
-		return new CrossOperator.DefaultCross<T, R>(this, other);
+		return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -879,7 +879,7 @@ public abstract class DataSet<T> {
 	 * @return The resulting DataSet.
 	 */
 	public UnionOperator<T> union(DataSet<T> other){
-		return new UnionOperator<T>(this, other);
+		return new UnionOperator<T>(this, other, Utils.getCallLocationName());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -895,7 +895,7 @@ public abstract class DataSet<T> {
 	 * @return The partitioned DataSet.
 	 */
 	public PartitionOperator<T> partitionByHash(int... fields) {
-		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType(), false), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -907,7 +907,7 @@ public abstract class DataSet<T> {
 	 * @return The partitioned DataSet.
 	 */
 	public PartitionOperator<T> partitionByHash(String... fields) {
-		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -922,7 +922,7 @@ public abstract class DataSet<T> {
 	 */
 	public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
 		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -934,7 +934,7 @@ public abstract class DataSet<T> {
 	 * @return The rebalanced DataSet.
 	 */
 	public PartitionOperator<T> rebalance() {
-		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE);
+		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE, Utils.getCallLocationName());
 	}
 		
 	// --------------------------------------------------------------------------------------------
@@ -1174,4 +1174,5 @@ public abstract class DataSet<T> {
 			throw new IllegalArgumentException("The two inputs have different execution contexts.");
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 6b95ad8..541a89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -207,7 +207,7 @@ public abstract class ExecutionEnvironment {
 	public DataSource<String> readTextFile(String filePath) {
 		Validate.notNull(filePath, "The file path may not be null.");
 		
-		return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO );
+		return new DataSource<String>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -223,7 +223,7 @@ public abstract class ExecutionEnvironment {
 
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		format.setCharsetName(charsetName);
-		return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO );
+		return new DataSource<String>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
 	}
 	
 	// -------------------------- Text Input Format With String Value------------------------------
@@ -242,7 +242,7 @@ public abstract class ExecutionEnvironment {
 	public DataSource<StringValue> readTextFileWithValue(String filePath) {
 		Validate.notNull(filePath, "The file path may not be null.");
 		
-		return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class) );
+		return new DataSource<StringValue>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -265,7 +265,7 @@ public abstract class ExecutionEnvironment {
 		TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
 		format.setCharsetName(charsetName);
 		format.setSkipInvalidLines(skipInvalidLines);
-		return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class) );
+		return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class), Utils.getCallLocationName());
 	}
 	
 	// ----------------------------------- CSV Input Format ---------------------------------------
@@ -357,7 +357,7 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("Produced type information must not be null.");
 		}
 		
-		return new DataSource<X>(this, inputFormat, producedType);
+		return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName());
 	}
 	
 	// ----------------------------------- Collection ---------------------------------------
@@ -390,7 +390,9 @@ public abstract class ExecutionEnvironment {
 		
 		X firstValue = data.iterator().next();
 		
-		return fromCollection(data, TypeExtractor.getForObject(firstValue));
+		TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
+		CollectionInputFormat.checkCollection(data, type.getTypeClass());
+		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, Utils.getCallLocationName(4));
 	}
 	
 	/**
@@ -411,9 +413,13 @@ public abstract class ExecutionEnvironment {
 	 * @see #fromCollection(Collection)
 	 */
 	public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
+		return fromCollection(data, type, Utils.getCallLocationName());
+	}
+	
+	private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
 		
-		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type);
+		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type, callLocationName);
 	}
 	
 	/**
@@ -462,7 +468,7 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("The iterator must be serializable.");
 		}
 		
-		return new DataSource<X>(this, new IteratorInputFormat<X>(data), type);
+		return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
 	}
 	
 	
@@ -490,7 +496,7 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("The number of elements must not be zero.");
 		}
 		
-		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]));
+		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
 	}
 	
 	
@@ -532,7 +538,12 @@ public abstract class ExecutionEnvironment {
 	 * @see #fromParallelCollection(SplittableIterator, Class)
 	 */
 	public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
-		return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type);
+		return fromParallelCollection(iterator, type, Utils.getCallLocationName(4));
+	}
+	
+	// private helper for passing different call location names
+	private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
+		return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type, callLocationName);
 	}
 	
 	/**
@@ -544,7 +555,7 @@ public abstract class ExecutionEnvironment {
 	 * @return A DataSet, containing all number in the {@code [from, to]} interval.
 	 */
 	public DataSource<Long> generateSequence(long from, long to) {
-		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO);
+		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName(3));
 	}	
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
new file mode 100644
index 0000000..462cf2c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+public class Utils {
+
+	public static String getCallLocationName() {
+		return getCallLocationName(4);
+	}
+
+	public static String getCallLocationName(int depth) {
+		StackTraceElement[] st = Thread.currentThread().getStackTrace();
+		if(st.length < depth) { // we should not throw an out of bounds exception for this.
+			return "<unknown>";
+		}
+		return st[depth].toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 9465ccc..5cd92e2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.commons.lang3.Validate;
-
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -236,7 +236,7 @@ public class CsvReader {
 		}
 		
 		configureInputFormat(inputFormat, classes);
-		return new DataSource<T>(executionContext, inputFormat, typeInfo);
+		return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -273,7 +273,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple1<T0>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0);
 		CsvInputFormat<Tuple1<T0>> inputFormat = new CsvInputFormat<Tuple1<T0>>(path);
 		configureInputFormat(inputFormat, type0);
-		return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple1<T0>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -290,7 +290,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple2<T0, T1>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1);
 		CsvInputFormat<Tuple2<T0, T1>> inputFormat = new CsvInputFormat<Tuple2<T0, T1>>(path);
 		configureInputFormat(inputFormat, type0, type1);
-		return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple2<T0, T1>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -308,7 +308,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple3<T0, T1, T2>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2);
 		CsvInputFormat<Tuple3<T0, T1, T2>> inputFormat = new CsvInputFormat<Tuple3<T0, T1, T2>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2);
-		return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple3<T0, T1, T2>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -327,7 +327,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3);
 		CsvInputFormat<Tuple4<T0, T1, T2, T3>> inputFormat = new CsvInputFormat<Tuple4<T0, T1, T2, T3>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3);
-		return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple4<T0, T1, T2, T3>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -347,7 +347,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4);
 		CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>> inputFormat = new CsvInputFormat<Tuple5<T0, T1, T2, T3, T4>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4);
-		return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple5<T0, T1, T2, T3, T4>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -368,7 +368,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5);
 		CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>> inputFormat = new CsvInputFormat<Tuple6<T0, T1, T2, T3, T4, T5>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5);
-		return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple6<T0, T1, T2, T3, T4, T5>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -390,7 +390,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6);
 		CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>> inputFormat = new CsvInputFormat<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6);
-		return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -413,7 +413,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7);
 		CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> inputFormat = new CsvInputFormat<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7);
-		return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -437,7 +437,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8);
 		CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> inputFormat = new CsvInputFormat<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8);
-		return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -462,7 +462,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
 		CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> inputFormat = new CsvInputFormat<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9);
-		return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -488,7 +488,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
 		CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> inputFormat = new CsvInputFormat<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10);
-		return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -515,7 +515,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
 		CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> inputFormat = new CsvInputFormat<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11);
-		return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -543,7 +543,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
 		CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> inputFormat = new CsvInputFormat<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12);
-		return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -572,7 +572,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
 		CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> inputFormat = new CsvInputFormat<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13);
-		return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -602,7 +602,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
 		CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> inputFormat = new CsvInputFormat<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14);
-		return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -633,7 +633,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
 		CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> inputFormat = new CsvInputFormat<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15);
-		return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -665,7 +665,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
 		CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> inputFormat = new CsvInputFormat<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16);
-		return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -698,7 +698,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
 		CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> inputFormat = new CsvInputFormat<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17);
-		return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -732,7 +732,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
 		CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> inputFormat = new CsvInputFormat<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18);
-		return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -767,7 +767,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
 		CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> inputFormat = new CsvInputFormat<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19);
-		return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -803,7 +803,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
 		CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> inputFormat = new CsvInputFormat<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20);
-		return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -840,7 +840,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
 		CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> inputFormat = new CsvInputFormat<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21);
-		return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -878,7 +878,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
 		CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> inputFormat = new CsvInputFormat<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22);
-		return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -917,7 +917,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
 		CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> inputFormat = new CsvInputFormat<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23);
-		return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	/**
@@ -957,7 +957,7 @@ public class CsvReader {
 		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
 		CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> inputFormat = new CsvInputFormat<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(path);
 		configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24);
-		return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types);
+		return new DataSource<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(executionContext, inputFormat, types, Utils.getCallLocationName());
 	}
 
 	// END_OF_TUPLE_DEPENDENT_CODE

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 041dc75..e906232 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -54,15 +54,18 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 	
 	private final Grouping<IN> grouping;
 	
+	private final String aggregateLocationName;
+	
 	/**
 	 * <p>
 	 * Non grouped aggregation
 	 */
-	public AggregateOperator(DataSet<IN> input, Aggregations function, int field) {
+	public AggregateOperator(DataSet<IN> input, Aggregations function, int field, String aggregateLocationName) {
 		super(Validate.notNull(input), input.getType());
-		
 		Validate.notNull(function);
 		
+		this.aggregateLocationName = aggregateLocationName;
+		
 		if (!input.getType().isTupleType()) {
 			throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
 		}
@@ -90,11 +93,12 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 	 * @param function
 	 * @param field
 	 */
-	public AggregateOperator(Grouping<IN> input, Aggregations function, int field) {
+	public AggregateOperator(Grouping<IN> input, Aggregations function, int field, String aggregateLocationName) {
 		super(Validate.notNull(input).getDataSet(), input.getDataSet().getType());
-		
 		Validate.notNull(function);
 		
+		this.aggregateLocationName = aggregateLocationName;
+		
 		if (!input.getDataSet().getType().isTupleType()) {
 			throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
 		}
@@ -157,7 +161,6 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 			throw new IllegalStateException();
 		}
 		
-		
 		// construct the aggregation function
 		AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
 		int[] fields = new int[this.fields.size()];
@@ -169,6 +172,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 			
 			genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
 		}
+		genName.append(" at ").append(aggregateLocationName);
 		genName.setLength(genName.length()-1);
 		
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 56f90f4..1034e86 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -59,16 +60,20 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 	private final Keys<I1> keys1;
 	private final Keys<I2> keys2;
+	
+	private final String defaultName;
 
 
 	public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2,
 							Keys<I1> keys1, Keys<I2> keys2,
 							CoGroupFunction<I1, I2, OUT> function,
-							TypeInformation<OUT> returnType)
+							TypeInformation<OUT> returnType,
+							String defaultName)
 	{
 		super(input1, input2, returnType);
 
 		this.function = function;
+		this.defaultName = defaultName;
 
 		if (keys1 == null || keys2 == null) {
 			throw new NullPointerException();
@@ -109,7 +114,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	@Override
 	protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "CoGroup at "+defaultName;
 		try {
 			keys1.areCompatible(keys2);
 		} catch (IncompatibleKeysException e) {
@@ -519,7 +524,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 						throw new NullPointerException("CoGroup function must not be null.");
 					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
-					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType);
+					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType, Utils.getCallLocationName());
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index d0b5054..9aa1287 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -48,14 +49,17 @@ import org.apache.flink.api.java.tuple.*;
 public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, CrossOperator<I1, I2, OUT>> {
 
 	private final CrossFunction<I1, I2, OUT> function;
+	private final String defaultName;
 
 	public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
 							CrossFunction<I1, I2, OUT> function,
-							TypeInformation<OUT> returnType)
+							TypeInformation<OUT> returnType,
+							String defaultName)
 	{
 		super(input1, input2, returnType);
 
 		this.function = function;
+		this.defaultName = defaultName;
 
 		if (!(function instanceof ProjectCrossFunction)) {
 			extractSemanticAnnotationsFromUdf(function.getClass());
@@ -72,7 +76,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 	@Override
 	protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "Cross at "+defaultName;
 		// create operator
 		CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
 				new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
@@ -106,9 +110,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		private final DataSet<I1> input1;
 		private final DataSet<I2> input2;
 
-		public DefaultCross(DataSet<I1> input1, DataSet<I2> input2) {
+		public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, String defaultName) {
 			super(input1, input2, (CrossFunction<I1, I2, Tuple2<I1, I2>>) new DefaultCrossFunction<I1, I2>(),
-					new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()));
+					new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), defaultName);
 
 			if (input1 == null || input2 == null) {
 				throw new NullPointerException();
@@ -133,7 +137,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 				throw new NullPointerException("Cross function must not be null.");
 			}
 			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
-			return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
+			return new CrossOperator<I1, I2, R>(input1, input2, function, returnType, Utils.getCallLocationName());
 		}
 
 
@@ -207,7 +211,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 
 		protected ProjectCross(DataSet<I1> input1, DataSet<I2> input2, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
 			super(input1, input2,
-				new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType);
+				new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType, "<unknown>");
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 764803f..2352269 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -34,8 +34,11 @@ import org.apache.flink.configuration.Configuration;
  * @param <OUT> The type of the elements produced by this data source.
  */
 public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
-	
+
 	private final InputFormat<OUT, ?> inputFormat;
+
+	private final String dataSourceLocationName;
+
 	private Configuration parameters;
 
 	// --------------------------------------------------------------------------------------------
@@ -47,9 +50,11 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	 * @param inputFormat The input format that the data source executes.
 	 * @param type The type of the elements produced by this input format.
 	 */
-	public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type) {
+	public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
 		super(context, type);
 		
+		this.dataSourceLocationName = dataSourceLocationName;
+		
 		if (inputFormat == null) {
 			throw new IllegalArgumentException("The input format may not be null.");
 		}
@@ -89,9 +94,9 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	// --------------------------------------------------------------------------------------------
 	
 	protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
-		String name = this.name != null ? this.name : this.inputFormat.toString();
-		if (name.length() > 100) {
-			name = name.substring(0, 100);
+		String name = this.name != null ? this.name : "at "+dataSourceLocationName+" ("+inputFormat.getClass().getName()+")";
+		if (name.length() > 150) {
+			name = name.substring(0, 150);
 		}
 		
 		@SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 18fd756..126949c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -47,9 +47,13 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	
 	private final Keys<T> keys;
 	
-	public DistinctOperator(DataSet<T> input, Keys<T> keys) {
+	private final String distinctLocationName;
+	
+	public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
 		super(input, input.getType());
 		
+		this.distinctLocationName = distinctLocationName;
+		
 		// if keys is null distinction is done on all tuple fields
 		if (keys == null) {
 			if (input.getType().isTupleType()) {
@@ -80,7 +84,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		
 		final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
 
-		String name = function.getClass().getName();
+		String name = "Distinct at "+distinctLocationName;
 		
 		if (keys instanceof Keys.ExpressionKeys) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index ab8a6c5..1d93b0a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -33,18 +33,22 @@ import org.apache.flink.api.java.DataSet;
 public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
 	
 	protected final FilterFunction<T> function;
+	
+	protected final String defaultName;
 
-	public FilterOperator(DataSet<T> input, FilterFunction<T> function) {
+	public FilterOperator(DataSet<T> input, FilterFunction<T> function, String defaultName) {
 		super(input, input.getType());
 		
 		this.function = function;
+		this.defaultName = defaultName;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	@Override
 	protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-
-		String name = getName() != null ? getName() : function.getClass().getName();
+		
+		String name = getName() != null ? getName() : "Filter at "+defaultName;
+		
 		// create operator
 		PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index b7e336f..9fa7cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -36,17 +36,20 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 	
 	protected final FlatMapFunction<IN, OUT> function;
 	
-	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
+	protected final String defaultName;
+	
+	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
 		this.function = function;
+		this.defaultName = defaultName;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	@Override
 	protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "FlatMap at "+defaultName;
 		// create operator
 		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 1cd85c5..a040b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -47,6 +47,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	private final GroupReduceFunction<IN, OUT> function;
 
 	private final Grouping<IN> grouper;
+	
+	private final String defaultName;
 
 	private boolean combinable;
 
@@ -56,11 +58,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	 * @param input The input data set to the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+	public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
 		this.function = function;
 		this.grouper = null;
+		this.defaultName = defaultName;
 
 		checkCombinability();
 	}
@@ -71,11 +74,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	 * @param input The grouped input to be processed group-wise by the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+	public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
 		super(input != null ? input.getDataSet() : null, resultType);
 		
 		this.function = function;
 		this.grouper = input;
+		this.defaultName = defaultName;
 
 		checkCombinability();
 
@@ -110,7 +114,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	@Override
 	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
 
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "GroupReduce at "+defaultName;
 		
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 9be6656..93e0371 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
@@ -137,9 +138,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		@SuppressWarnings("unused")
 		private boolean preserve2;
 		
+		private final String joinLocationName;
+		
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
-				TypeInformation<OUT> returnType, JoinHint hint)
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
 		{
 			super(input1, input2, keys1, keys2, returnType, hint);
 			
@@ -148,6 +151,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 			
 			this.function = function;
+			this.joinLocationName = joinLocationName;
 
 			if (!(function instanceof ProjectFlatJoinFunction)) {
 				extractSemanticAnnotationsFromUdf(function.getClass());
@@ -158,9 +162,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
-				TypeInformation<OUT> returnType, JoinHint hint)
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
 		{
 			super(input1, input2, keys1, keys2, returnType, hint);
+			
+			this.joinLocationName = joinLocationName;
 
 			if (function == null) {
 				throw new NullPointerException();
@@ -204,7 +210,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				Operator<I1> input1,
 				Operator<I2> input2) {
 
-			String name = getName() != null ? getName() : function.getClass().getName();
+			String name = getName() != null ? getName() : "Join at "+joinLocationName;
 			try {
 				keys1.areCompatible(super.keys2);
 			} catch(IncompatibleKeysException ike) {
@@ -452,11 +458,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> {
 
 		protected DefaultJoin(DataSet<I1> input1, DataSet<I2> input2, 
-				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
+				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName)
 		{
 			super(input1, input2, keys1, keys2, 
 				(RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
-				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint);
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName);
 		}
 		
 		/**
@@ -475,7 +481,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				throw new NullPointerException("Join function must not be null.");
 			}
 			TypeInformation<R> returnType = TypeExtractor.getFlatJoinReturnTypes(function, getInput1Type(), getInput2Type());
-			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
+			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint(), Utils.getCallLocationName());
 		}
 
 		public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
@@ -484,7 +490,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 			FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
-			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
+			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName());
 		}
 
 		public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
@@ -601,7 +607,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
 			super(input1, input2, keys1, keys2, 
 					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
-					returnType, hint);
+					returnType, hint, Utils.getCallLocationName(4)); // We need to use the 4th element in the stack because the call comes through .types().
 		}
 
 		@Override
@@ -850,7 +856,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
 				}
 
-				return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint);
+				return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index f1ece2c..b4433dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -37,11 +37,13 @@ import org.apache.flink.api.java.DataSet;
 public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
 	
 	protected final MapFunction<IN, OUT> function;
+	
+	protected final String defaultName;
 
-	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
-
+	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
+		this.defaultName = defaultName;
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
@@ -49,7 +51,7 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 	@Override
 	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "Map at "+defaultName;
 		// create operator
 		MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 839298b..067f7af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -38,17 +38,20 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	
 	protected final MapPartitionFunction<IN, OUT> function;
 	
-	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
+	protected final String defaultName;
+	
+	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
 		
 		this.function = function;
+		this.defaultName = defaultName;
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	@Override
 	protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "MapPartition at "+defaultName;
 		// create operator
 		MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index c4548fb..77d5681 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -41,9 +41,11 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
 	
 	private final Keys<T> pKeys;
 	private final PartitionMethod pMethod;
+	private final String partitionLocationName;
 	
-	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys) {
+	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
 		super(input, input.getType());
+		this.partitionLocationName = partitionLocationName;
 
 		if(pMethod == PartitionMethod.HASH && pKeys == null) {
 			throw new IllegalArgumentException("Hash Partitioning requires keys");
@@ -59,8 +61,8 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
 		this.pKeys = pKeys;
 	}
 	
-	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod) {
-		this(input, pMethod, null);
+	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
+		this(input, pMethod, null, partitionLocationName);
 	}
 	
 	/*
@@ -68,7 +70,7 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition
 	 */
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
 	
-		String name = "Partition";
+		String name = "Partition at "+partitionLocationName;
 		
 		// distinguish between partition types
 		if (pMethod == PartitionMethod.REBALANCE) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 8cb64ba..7089cf6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -46,6 +46,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	
 	private final Grouping<IN> grouper;
 	
+	private final String defaultName;
+	
 	/**
 	 * 
 	 * This is the case for a reduce-all case (in contrast to the reduce-per-group case).
@@ -53,21 +55,23 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	 * @param input
 	 * @param function
 	 */
-	public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function) {
+	public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function, String defaultName) {
 		super(input, input.getType());
 		
 		this.function = function;
 		this.grouper = null;
+		this.defaultName = defaultName;
 		
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	
-	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function) {
+	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) {
 		super(input.getDataSet(), input.getDataSet().getType());
 		
 		this.function = function;
 		this.grouper = input;
+		this.defaultName = defaultName;
 		
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
@@ -75,7 +79,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	@Override
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
 		
-		String name = getName() != null ? getName() : function.getClass().getName();
+		String name = getName() != null ? getName() : "Reduce at "+defaultName;
 		
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index c9700ce..36d14ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.FirstReducer;
 
 import java.util.Arrays;
@@ -110,7 +111,7 @@ public class SortedGrouping<T> extends Grouping<T> {
 		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
 				this.getDataSet().getType());
-		return new GroupReduceOperator<T, R>(this, resultType, reducer);
+		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName() );
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index efc1ebc..c6f72f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
-
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -31,14 +30,17 @@ import org.apache.flink.api.java.DataSet;
  */
 public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>> {
 
+	private final String unionLocationName;
 	/**
 	 * Create an operator that produces the union of the two given data sets.
 	 * 
 	 * @param input1 The first data set to be unioned.
 	 * @param input2 The second data set to be unioned.
 	 */
-	public UnionOperator(DataSet<T> input1, DataSet<T> input2) {
+	public UnionOperator(DataSet<T> input1, DataSet<T> input2, String unionLocationName) {
 		super(input1, input2, input1.getType());
+		
+		this.unionLocationName = unionLocationName;
 	}
 	
 	/**
@@ -50,6 +52,6 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>
 	 */
 	@Override
 	protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) {
-		return new Union<T>(input1, input2);
+		return new Union<T>(input1, input2, unionLocationName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 910846d..b504e37 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.functions.SelectByMaxFunction;
@@ -58,7 +59,12 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see DataSet
 	 */
 	public AggregateOperator<T> aggregate(Aggregations agg, int field) {
-		return new AggregateOperator<T>(this, agg, field);
+		return aggregate(agg, field, Utils.getCallLocationName());
+	}
+	
+	// private helper that allows to set a different call location name
+	private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) {
+		return new AggregateOperator<T>(this, agg, field, callLocationName);
 	}
 
 	/**
@@ -69,7 +75,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> sum (int field) {
-		return this.aggregate (Aggregations.SUM, field);
+		return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
 	}
 
 	/**
@@ -80,7 +86,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> max (int field) {
-		return this.aggregate (Aggregations.MAX, field);
+		return this.aggregate (Aggregations.MAX, field, Utils.getCallLocationName());
 	}
 
 	/**
@@ -91,7 +97,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> min (int field) {
-		return this.aggregate (Aggregations.MIN, field);
+		return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -111,7 +117,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		if (reducer == null) {
 			throw new NullPointerException("Reduce function must not be null.");
 		}
-		return new ReduceOperator<T>(this, reducer);
+		return new ReduceOperator<T>(this, reducer, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -133,7 +139,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
 
-		return new GroupReduceOperator<T, R>(this, resultType, reducer);
+		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
 	}
 	
 	/**
@@ -167,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMinFunction(
-				(TupleTypeInfo) this.dataSet.getType(), fields));
+				(TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -188,7 +194,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		}
 			
 		return new ReduceOperator<T>(this, new SelectByMaxFunction(
-				(TupleTypeInfo) this.dataSet.getType(), fields));
+				(TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
 	}
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/818ebda0/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index d439e79..de620f9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -78,7 +78,13 @@ class TupleGenerator {
 	private static final int LAST = 25;
 
 	public static void main(String[] args) throws Exception {
-		File root = new File(ROOT_DIRECTORY);
+		System.err.println("Current directory "+System.getProperty("user.dir"));
+		String rootDir = ROOT_DIRECTORY;
+		if(args.length > 0) {
+			rootDir = args[0] + "/" + ROOT_DIRECTORY;
+		}
+		System.err.println("Using root directory: "+rootDir);
+		File root = new File(rootDir);
 
 		createTupleClasses(root);
 
@@ -478,7 +484,7 @@ class TupleGenerator {
 			// return
 			sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");
 			appendTupleTypeGenerics(sb, numFields);
-			sb.append(">>(executionContext, inputFormat, types);\n");
+			sb.append(">>(executionContext, inputFormat, types, DataSet.getCallLocationName());\n");
 
 			// end of method
 			sb.append("\t}\n");
@@ -834,7 +840,7 @@ class TupleGenerator {
 	}
 
 	private static String HEADER =
-		"/**\n"
+		"/*\n"
 		+ " * Licensed to the Apache Software Foundation (ASF) under one\n"
 		+ " * or more contributor license agreements.  See the NOTICE file\n"
 		+ " * distributed with this work for additional information\n"


Mime
View raw message