flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/5] git commit: JoinHints are passed to the optimizer
Date Tue, 14 Oct 2014 17:13:12 GMT
JoinHints are passed to the optimizer


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/025589f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/025589f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/025589f0

Branch: refs/heads/master
Commit: 025589f0b1dd64acba8d5e6de066b1555101bafd
Parents: 3f98994
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Oct 14 15:52:54 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Oct 14 15:52:54 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/compiler/dag/MatchNode.java    | 49 +++++++++++---
 .../operators/AbstractJoinDescriptor.java       | 30 +++++++--
 .../operators/HashJoinBuildFirstProperties.java |  9 ++-
 .../HashJoinBuildSecondProperties.java          |  8 ++-
 .../operators/SortMergeJoinDescriptor.java      | 10 ++-
 .../common/operators/base/JoinOperatorBase.java | 69 +++++++++++++++++++-
 .../java/org/apache/flink/api/java/DataSet.java | 25 ++++++-
 .../flink/api/java/operators/JoinOperator.java  | 64 ++++--------------
 .../org/apache/flink/api/scala/DataSet.scala    |  9 ++-
 .../apache/flink/api/scala/joinDataSet.scala    |  3 +-
 10 files changed, 198 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
index e13bc18..fca7a72 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
@@ -33,17 +33,20 @@ import org.apache.flink.compiler.operators.SortMergeJoinDescriptor;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * The Optimizer representation of a <i>Match</i> contract node.
+ * The Optimizer representation of a join operator.
  */
 public class MatchNode extends TwoInputNode {
 	
+	private final JoinHint joinHint;
+	
 	/**
-	 * Creates a new MatchNode for the given contract.
+	 * Creates a new MatchNode for the given join operator.
 	 * 
-	 * @param pactContract The match contract object.
+	 * @param joinOperatorBase The join operator object.
 	 */
-	public MatchNode(JoinOperatorBase<?, ?, ?, ?> pactContract) {
-		super(pactContract);
+	public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
+		super(joinOperatorBase);
+		this.joinHint = joinOperatorBase.getJoinHint();
 	}
 
 	// ------------------------------------------------------------------------
@@ -87,11 +90,37 @@ public class MatchNode extends TwoInputNode {
 			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
 			list.add(fixedDriverStrat);
 			return list;
-		} else {
+		}
+		else {
 			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
-			list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
-			list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
+			
+			JoinHint hint = this.joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : this.joinHint;
+			
+			switch (hint) {
+				case BROADCAST_HASH_FIRST:
+					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
+					break;
+				case BROADCAST_HASH_SECOND:
+					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
+					break;
+				case REPARTITION_HASH_FIRST:
+					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
+					break;
+				case REPARTITION_HASH_SECOND:
+					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
+					break;
+				case REPARTITION_SORT_MERGE:
+					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
+					break;
+				case OPTIMIZER_CHOOSES:
+					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
+					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
+					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
+					break;
+				default:
+					throw new CompilerException("Unrecognized join hint: " + joinHint);
+			}
+			
 			return list;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index 12a4e99..47069e6 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import java.util.ArrayList;
@@ -27,31 +26,52 @@ import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.PartitioningProperty;
 import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 
-
+/**
+ * Defines the possible global properties for a join.
+ */
 public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 	
+	private final boolean broadcastFirstAllowed;
+	private final boolean broadcastSecondAllowed;
+	private final boolean repartitionAllowed;
+	
 	protected AbstractJoinDescriptor(FieldList keys1, FieldList keys2) {
+		this(keys1, keys2, true, true, true);
+	}
+	
+	protected AbstractJoinDescriptor(FieldList keys1, FieldList keys2,
+			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+	{
 		super(keys1, keys2);
+		
+		this.broadcastFirstAllowed = broadcastFirstAllowed;
+		this.broadcastSecondAllowed = broadcastSecondAllowed;
+		this.repartitionAllowed = repartitionAllowed;
 	}
 	
 	@Override
 	protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
 		ArrayList<GlobalPropertiesPair> pairs = new ArrayList<GlobalPropertiesPair>();
 		
-		{ // partition both (hash)
+		if (repartitionAllowed) {
+			// partition both (hash)
 			RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
 			partitioned1.setHashPartitioned(this.keys1);
 			RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
 			partitioned2.setHashPartitioned(this.keys2);
 			pairs.add(new GlobalPropertiesPair(partitioned1, partitioned2));
 		}
-		{ // replicate second
+		
+		if (broadcastSecondAllowed) {
+			// replicate second
 			RequestedGlobalProperties any1 = new RequestedGlobalProperties();
 			RequestedGlobalProperties replicated2 = new RequestedGlobalProperties();
 			replicated2.setFullyReplicated();
 			pairs.add(new GlobalPropertiesPair(any1, replicated2));
 		}
-		{ // replicate first
+		
+		if (broadcastFirstAllowed) {
+			// replicate first
 			RequestedGlobalProperties replicated1 = new RequestedGlobalProperties();
 			replicated1.setFullyReplicated();
 			RequestedGlobalProperties any2 = new RequestedGlobalProperties();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildFirstProperties.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildFirstProperties.java
index 76d7993..212e3ef 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildFirstProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildFirstProperties.java
@@ -39,6 +39,12 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor
{
 	public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2) {
 		super(keys1, keys2);
 	}
+	
+	public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2,
+			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+	{
+		super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+	}
 
 	@Override
 	public DriverStrategy getStrategy() {
@@ -48,8 +54,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor
{
 	@Override
 	protected List<LocalPropertiesPair> createPossibleLocalProperties() {
 		// all properties are possible
-		return Collections.singletonList(new LocalPropertiesPair(
-			new RequestedLocalProperties(), new RequestedLocalProperties()));
+		return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(),
new RequestedLocalProperties()));
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildSecondProperties.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildSecondProperties.java
index b821218..4d69bc2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildSecondProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/HashJoinBuildSecondProperties.java
@@ -36,6 +36,12 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
 	public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2) {
 		super(keys1, keys2);
 	}
+	
+	public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2,
+			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+	{
+		super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+	}
 
 	@Override
 	public DriverStrategy getStrategy() {
@@ -72,7 +78,7 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
 		else {
 			strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
 		}
-		return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2,
strategy, this.keys1, this.keys2);
+		return new DualInputPlanNode(node, "Join ("+node.getPactContract().getName()+")", in1,
in2, strategy, this.keys1, this.keys2);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
index 287df47..5c6de30 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
@@ -36,11 +36,17 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 /**
  * 
  */
-public class SortMergeJoinDescriptor extends AbstractJoinDescriptor
-{
+public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
+	
 	public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
 		super(keys1, keys2);
 	}
+	
+	public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
+			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+	{
+		super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+	}
 
 	@Override
 	public DriverStrategy getStrategy() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 7bfe39f..ba71b01 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -46,8 +46,59 @@ import java.util.Map;
 /**
  * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2,
OUT>> extends DualInputOperator<IN1, IN2, OUT, FT>
-{
+public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2,
OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+	
+	/**
+	 * An enumeration of hints, optionally usable to tell the system how exactly execute the
join.
+	 */
+	public static enum JoinHint {
+		/**
+		 * leave the choice how to do the join to the optimizer. If in doubt, the
+		 * optimizer will choose a repartitioning join.
+		 */
+		OPTIMIZER_CHOOSES,
+		
+		/**
+		 * Hint that the first join input is much smaller than the second. This results in
+		 * broadcasting and hashing the first input, unless the optimizer infers that
+		 * prior existing partitioning is available that is even cheaper to exploit.
+		 */
+		BROADCAST_HASH_FIRST,
+		
+		/**
+		 * Hint that the second join input is much smaller than the second. This results in
+		 * broadcasting and hashing the second input, unless the optimizer infers that
+		 * prior existing partitioning is available that is even cheaper to exploit.
+		 */
+		BROADCAST_HASH_SECOND,
+		
+		/**
+		 * Hint that the first join input is a bit smaller than the second. This results in
+		 * repartitioning both inputs and hashing the first input, unless the optimizer infers
that
+		 * prior existing partitioning and orders are available that are even cheaper to exploit.
+		 */
+		REPARTITION_HASH_FIRST,
+		
+		/**
+		 * Hint that the second join input is a bit smaller than the second. This results in
+		 * repartitioning both inputs and hashing the second input, unless the optimizer infers
that
+		 * prior existing partitioning and orders are available that are even cheaper to exploit.
+		 */
+		REPARTITION_HASH_SECOND,
+		
+		/**
+		 * Hint that the join should repartitioning both inputs and use sorting and merging
+		 * as the join strategy.
+		 */
+		REPARTITION_SORT_MERGE,
+	};
+	
+	// --------------------------------------------------------------------------------------------
+	
+	
+	private JoinHint joinHint = JoinHint.OPTIMIZER_CHOOSES;
+	
+	
 	public JoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1,
IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
 		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
 	}
@@ -59,6 +110,20 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1,
IN
 	public JoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1,
IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
 		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2,
name);
 	}
+	
+	
+	public void setJoinHint(JoinHint joinHint) {
+		if (joinHint == null) {
+			throw new IllegalArgumentException("Join Hint must not be null.");
+		}
+		this.joinHint = joinHint;
+	}
+	
+	public JoinHint getJoinHint() {
+		return joinHint;
+	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index bfaf612..7b0752c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FormattingMapper;
@@ -52,7 +53,6 @@ import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
@@ -566,6 +566,27 @@ public abstract class DataSet<T> {
 	public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
 		return new JoinOperatorSets<T, R>(this, other);
 	}
+	
+	/**
+	 * Initiates a Join transformation. <br/>
+	 * A Join transformation joins the elements of two 
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
+	 *   joining elements into one DataSet.</br>
+	 * 
+	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 * can be called to define the join key of the first joining (i.e., this) DataSet.
+	 *  
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @param strategy The strategy that should be used execute the join. If {@code null} is
give, then the
+	 *                 optimizer will pick the join strategy.
+	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
+	 * 
+	 * @see JoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> JoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy)
{
+		return new JoinOperatorSets<T, R>(this, other, strategy);
+	}
 
 	/**
 	 * Initiates a Join transformation. <br/>
@@ -621,7 +642,7 @@ public abstract class DataSet<T> {
 	 *   is called with an empty group for the non-existing group.</br>
 	 * The CoGroupFunction can iterate over the elements of both groups and return any number

 	 *   of elements including none.</br>
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 * This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
 	 * 
 	 * @param other The other DataSet of the CoGroup transformation.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index caa27dc..9be6656 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -62,55 +63,10 @@ import org.apache.flink.util.Collector;
  */
 public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
I2, OUT, JoinOperator<I1, I2, OUT>> {
 	
-	/**
-	 * An enumeration of hints, optionally usable to tell the system how exactly execute the
join.
-	 */
-	public static enum JoinHint {
-		/**
-		 * leave the choice how to do the join to the optimizer. If in doubt, the
-		 * optimizer will choose a repartitioning join.
-		 */
-		OPTIMIZER_CHOOSES,
-		
-		/**
-		 * Hint that the first join input is much smaller than the second. This results in
-		 * broadcasting and hashing the first input, unless the optimizer infers that
-		 * prior existing partitioning is available that is even cheaper to exploit.
-		 */
-		BROADCAST_HASH_FIRST,
-		
-		/**
-		 * Hint that the second join input is much smaller than the second. This results in
-		 * broadcasting and hashing the second input, unless the optimizer infers that
-		 * prior existing partitioning is available that is even cheaper to exploit.
-		 */
-		BROADCAST_HASH_SECOND,
-		
-		/**
-		 * Hint that the first join input is a bit smaller than the second. This results in
-		 * repartitioning both inputs and hashing the first input, unless the optimizer infers
that
-		 * prior existing partitioning and orders are available that are even cheaper to exploit.
-		 */
-		REPARTITION_HASH_FIRST,
-		
-		/**
-		 * Hint that the second join input is a bit smaller than the second. This results in
-		 * repartitioning both inputs and hashing the second input, unless the optimizer infers
that
-		 * prior existing partitioning and orders are available that are even cheaper to exploit.
-		 */
-		REPARTITION_HASH_SECOND,
-		
-		/**
-		 * Hint that the join should repartitioning both inputs and use sorting and merging
-		 * as the join strategy.
-		 */
-		REPARTITION_SORT_MERGE,
-	};
-	
 	protected final Keys<I1> keys1;
 	protected final Keys<I2> keys2;
 	
-	private JoinHint joinHint;
+	private final JoinHint joinHint;
 	
 	protected JoinOperator(DataSet<I1> input1, DataSet<I2> input2, 
 			Keys<I1> keys1, Keys<I2> keys2,
@@ -142,7 +98,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		this.keys1 = keys1;
 		this.keys2 = keys2;
-		this.joinHint = hint;
+		this.joinHint = hint == null ? JoinHint.OPTIMIZER_CHOOSES : hint;
 	}
 	
 	protected Keys<I1> getKeys1() {
@@ -255,6 +211,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				throw new InvalidProgramException("The types of the key fields do not match.", ike);
 			}
 
+			final JoinOperatorBase<?, ?, OUT, ?> translated;
+			
 			if (keys1 instanceof Keys.SelectorFunctionKeys
 					&& keys2 instanceof Keys.SelectorFunctionKeys) {
 				// Both join sides have a key selector function, so we need to do the
@@ -274,8 +232,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// set dop
 				po.setDegreeOfParallelism(this.getParallelism());
 				
-				return po;
-				
+				translated = po;
 			}
 			else if (keys2 instanceof Keys.SelectorFunctionKeys) {
 				// The right side of the join needs the tuple wrapping/unwrapping
@@ -294,7 +251,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// set dop
 				po.setDegreeOfParallelism(this.getParallelism());
 
-				return po;
+				translated = po;
 			}
 			else if (keys1 instanceof Keys.SelectorFunctionKeys) {
 				// The left side of the join needs the tuple wrapping/unwrapping
@@ -313,7 +270,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// set dop
 				po.setDegreeOfParallelism(this.getParallelism());
 
-				return po;
+				translated = po;
 			}
 			else if (super.keys1 instanceof Keys.ExpressionKeys && super.keys2 instanceof
Keys.ExpressionKeys)
 			{
@@ -334,12 +291,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// set dop
 				po.setDegreeOfParallelism(this.getParallelism());
 				
-				return po;
+				translated = po;
 			}
 			else {
 				throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
 			}
 			
+			translated.setJoinHint(getJoinHint());
+			
+			return translated;
 		}
 		
 		private static <I1, I2, K, OUT> PlanBothUnwrappingJoinOperator<I1, I2, OUT, K>
translateSelectorFunctionJoin(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 0f7f723..6114b32 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.base.PartitionOperatorBase.Partitio
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
 import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
-import org.apache.flink.api.java.operators.JoinOperator.JoinHint
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
@@ -677,6 +677,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     new UnfinishedJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES)
 
   /**
+   * Special [[join]] operation for explicitly telling the system what join strategy to use.
If
+   * null is given as the join strategy, then the optimizer will pick the strategy.
+   */
+  def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O] =
+    new UnfinishedJoinOperation(this, other, strategy)
+  
+  /**
    * Special [[join]] operation for explicitly telling the system that the right side is
assumed
    * to be a lot smaller than the left side of the join.
    */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/025589f0/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index d333a66..36e4d36 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -21,8 +21,9 @@ import org.apache.commons.lang3.Validate
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{JoinFunction, RichFlatJoinFunction, FlatJoinFunction}
 import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
-import org.apache.flink.api.java.operators.JoinOperator.{EquiJoin, JoinHint}
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin;
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation


Mime
View raw message