flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/6] flink git commit: [FLINK-2576] [javaAPI] [scalaAPI] [optimizer] Add outerJoin to DataSet API (Java, Scala) and optimizer.
Date Fri, 09 Oct 2015 14:20:52 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
deleted file mode 100644
index 7390af2..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.operators.base;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-@SuppressWarnings({ "unchecked", "serial" })
-public class JoinOperatorBaseTest implements Serializable {
-
-	
-	@Test
-	public void testTupleBaseJoiner(){
-		final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>> joiner =
-					new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>()
-		{
-			@Override
-			public void join(Tuple3<String, Double, Integer> first, Tuple2<Integer, String> second, Collector<Tuple2<Double, String>> out) {
-				Tuple3<String, Double, Integer> fst = (Tuple3<String, Double, Integer>)first;
-				Tuple2<Integer, String> snd = (Tuple2<Integer, String>)second;
-
-				assertEquals(fst.f0, snd.f1);
-				assertEquals(fst.f2, snd.f0);
-
-				out.collect(new Tuple2<Double, String>(fst.f1, snd.f0.toString()));
-			}
-		};
-
-		final TupleTypeInfo<Tuple3<String, Double, Integer>> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo
-				(String.class, Double.class, Integer.class);
-		final TupleTypeInfo<Tuple2<Integer, String>> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
-				String.class);
-		final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
-				String.class);
-
-		final int[] leftKeys = new int[]{0,2};
-		final int[] rightKeys = new int[]{1,0};
-
-		final String taskName = "Collection based tuple joiner";
-
-		final BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double,
-				String>> binaryOpInfo = new BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>>(leftTypeInfo, rightTypeInfo, outTypeInfo);
-
-		final JoinOperatorBase<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>>> base = new JoinOperatorBase<Tuple3<String, Double, Integer>,
-				Tuple2<Integer, String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>,
-				Tuple2<Integer, String>, Tuple2<Double, String>>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName);
-
-		final List<Tuple3<String, Double, Integer> > inputData1 = new ArrayList<Tuple3<String, Double,
-				Integer>>(Arrays.asList(
-				new Tuple3<String, Double, Integer>("foo", 42.0, 1),
-				new Tuple3<String,Double, Integer>("bar", 1.0, 2),
-				new Tuple3<String, Double, Integer>("bar", 2.0, 3),
-				new Tuple3<String, Double, Integer>("foobar", 3.0, 4),
-				new Tuple3<String, Double, Integer>("bar", 3.0, 3)
-		));
-
-		final List<Tuple2<Integer, String>> inputData2 = new ArrayList<Tuple2<Integer, String>>(Arrays.asList(
-				new Tuple2<Integer, String>(3, "bar"),
-				new Tuple2<Integer, String>(4, "foobar"),
-				new Tuple2<Integer, String>(2, "foo")
-		));
-		final Set<Tuple2<Double, String>> expected = new HashSet<Tuple2<Double, String>>(Arrays.asList(
-				new Tuple2<Double, String>(2.0, "3"),
-				new Tuple2<Double, String>(3.0, "3"),
-				new Tuple2<Double, String>(3.0, "4")
-		));
-
-		try {
-			ExecutionConfig executionConfig = new ExecutionConfig();
-			executionConfig.disableObjectReuse();
-			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
-			executionConfig.enableObjectReuse();
-			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
-
-			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
-			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index e890b4e..916086b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
@@ -138,7 +138,7 @@ public class SemanticPropertiesProjectionTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+		InnerJoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput());
 
 		DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties();
 
@@ -166,7 +166,7 @@ public class SemanticPropertiesProjectionTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+		InnerJoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput());
 
 		DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index 33b3958..d01ca32 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
@@ -265,7 +265,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getForwardingTargetFields(0, 0));
@@ -292,7 +292,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getForwardingTargetFields(1, 0));
@@ -319,7 +319,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getForwardingTargetFields(1, 0));
@@ -352,7 +352,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getForwardingTargetFields(0, 1));
@@ -382,7 +382,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getForwardingTargetFields(0, 1));
@@ -410,7 +410,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getForwardingTargetFields(0, 0));
@@ -440,7 +440,7 @@ public class SemanticPropertiesTranslationTest {
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
-		JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+		InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
 
 		DualInputSemanticProperties semantics = join.getSemanticProperties();
 		assertNotNull(semantics.getReadFields(0));

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
index b8663ce..c14f175 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
@@ -102,7 +102,7 @@ public class NamesTest implements Serializable {
 		plan.accept(new Visitor<Operator<?>>() {
 			@Override
 			public boolean preVisit(Operator<?> visitable) {
-				if(visitable instanceof JoinOperatorBase) {
+				if(visitable instanceof InnerJoinOperatorBase) {
 					Assert.assertEquals("Join at testJoinWith(NamesTest.java:93)", visitable.getName());
 				}
 				return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index f9ce82f..fd60bc6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
@@ -123,8 +123,8 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			assertEquals(ITERATION_NAME, iteration.getName());
 			
 			MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
-			JoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (JoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
-			JoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (JoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
+			InnerJoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
+			InnerJoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
 			MapOperatorBase<?, ?, ?> worksetMapper = (MapOperatorBase<?, ?, ?>) worksetSelfJoin.getFirstInput();
 			
 			assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 6a3ff09..553c127 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -196,6 +196,9 @@ public abstract class CostEstimator {
 			
 			break;
 		case INNER_MERGE:
+		case FULL_OUTER_MERGE:
+		case LEFT_OUTER_MERGE:
+		case RIGHT_OUTER_MERGE:
 			addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_FIRST:

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
index 02c9b5b..383bbe1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
 import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
 import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
+import org.apache.flink.optimizer.operators.SortMergeInnerJoinDescriptor;
 import org.apache.flink.configuration.Configuration;
 
 /**
@@ -47,7 +47,7 @@ public class JoinNode extends TwoInputNode {
 	 * 
 	 * @param joinOperatorBase The join operator object.
 	 */
-	public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
+	public JoinNode(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
 		super(joinOperatorBase);
 		
 		this.dataProperties = getDataProperties(joinOperatorBase,
@@ -62,8 +62,8 @@ public class JoinNode extends TwoInputNode {
 	 * @return The contract.
 	 */
 	@Override
-	public JoinOperatorBase<?, ?, ?, ?> getOperator() {
-		return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
+	public InnerJoinOperatorBase<?, ?, ?, ?> getOperator() {
+		return (InnerJoinOperatorBase<?, ?, ?, ?>) super.getOperator();
 	}
 
 	@Override
@@ -111,7 +111,7 @@ public class JoinNode extends TwoInputNode {
 		}
 	}
 	
-	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
+	private List<OperatorDescriptorDual> getDataProperties(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
 			Partitioner<?> customPartitioner)
 	{
 		// see if an internal hint dictates the strategy to use
@@ -125,7 +125,7 @@ public class JoinNode extends TwoInputNode {
 				Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
 				Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
 			{
-				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
+				fixedDriverStrat = new SortMergeInnerJoinDescriptor(this.keys1, this.keys2);
 			}
 			else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
 				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
@@ -164,10 +164,10 @@ public class JoinNode extends TwoInputNode {
 					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
 					break;
 				case REPARTITION_SORT_MERGE:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
+					list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
 					break;
 				case OPTIMIZER_CHOOSES:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
+					list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
 					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
 					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
 					break;

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
deleted file mode 100644
index ee8ab05..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
-import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a join operator.
- */
-public class MatchNode extends TwoInputNode {
-	
-	private List<OperatorDescriptorDual> dataProperties;
-	
-	/**
-	 * Creates a new MatchNode for the given join operator.
-	 * 
-	 * @param joinOperatorBase The join operator object.
-	 */
-	public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
-		super(joinOperatorBase);
-		this.dataProperties = getDataProperties(joinOperatorBase, joinOperatorBase.getJoinHint());
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the contract object for this match node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public JoinOperatorBase<?, ?, ?, ?> getOperator() {
-		return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getOperatorName() {
-		return "Join";
-	}
-
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return this.dataProperties;
-	}
-	
-	public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
-		OperatorDescriptorDual op;
-		if (solutionsetInputIndex == 0) {
-			op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-		} else if (solutionsetInputIndex == 1) {
-			op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-		} else {
-			throw new IllegalArgumentException();
-		}
-		
-		this.dataProperties = Collections.singletonList(op);
-	}
-	
-	/**
-	 * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
-	 * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
-	 * The result cardinality is hence the larger one.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
-		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
-		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-		
-		if (this.estimatedNumRecords >= 0) {
-			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-			
-			if (width > 0) {
-				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
-			}
-		}
-	}
-	
-	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) {
-		// see if an internal hint dictates the strategy to use
-		Configuration conf = joinOperatorBase.getParameters();
-		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
-		if (localStrategy != null) {
-			final OperatorDescriptorDual fixedDriverStrat;
-			if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
-			{
-				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
-				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
-				fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-			} else {
-				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
-			}
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(fixedDriverStrat);
-			return list;
-		}
-		else {
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			
-			joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
-			
-			switch (joinHint) {
-				case BROADCAST_HASH_FIRST:
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
-					break;
-				case BROADCAST_HASH_SECOND:
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
-					break;
-				case REPARTITION_HASH_FIRST:
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
-					break;
-				case REPARTITION_HASH_SECOND:
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
-					break;
-				case REPARTITION_SORT_MERGE:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
-					break;
-				case OPTIMIZER_CHOOSES:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
-					break;
-				default:
-					throw new CompilerException("Unrecognized join hint: " + joinHint);
-			}
-			
-			return list;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
new file mode 100644
index 0000000..ebdfcc8
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
+import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
+import org.apache.flink.optimizer.operators.SortMergeRightOuterJoinDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OuterJoinNode extends TwoInputNode {
+
+	private List<OperatorDescriptorDual> dataProperties;
+
+	/**
+	 * Creates a new two input node for the optimizer plan, representing the given operator.
+	 *
+	 * @param operator The operator that the optimizer DAG node should represent.
+	 */
+	public OuterJoinNode(OuterJoinOperatorBase<?, ?, ?, ?> operator) {
+		super(operator);
+
+		this.dataProperties = getDataProperties();
+	}
+
+	private List<OperatorDescriptorDual> getDataProperties() {
+		OuterJoinOperatorBase<?, ?, ?, ?> operator = getOperator();
+
+		OuterJoinType type = operator.getOuterJoinType();
+
+		JoinHint joinHint = operator.getJoinHint();
+		joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
+
+		List<OperatorDescriptorDual> list = new ArrayList<>();
+		switch (joinHint) {
+			case OPTIMIZER_CHOOSES:
+				list.add(getSortMergeDescriptor(type, true));
+				break;
+			case REPARTITION_SORT_MERGE:
+				list.add(getSortMergeDescriptor(type, false));
+				break;
+			case REPARTITION_HASH_FIRST:
+			case REPARTITION_HASH_SECOND:
+			case BROADCAST_HASH_FIRST:
+			case BROADCAST_HASH_SECOND:
+			default:
+				throw new CompilerException("Invalid join hint: " + joinHint + " for outer join type: " + type);
+		}
+
+		Partitioner<?> customPartitioner = operator.getCustomPartitioner();
+		if (customPartitioner != null) {
+			for (OperatorDescriptorDual desc : list) {
+				((AbstractJoinDescriptor) desc).setCustomPartitioner(customPartitioner);
+			}
+		}
+		return list;
+	}
+
+	private OperatorDescriptorDual getSortMergeDescriptor(OuterJoinType type, boolean broadcastAllowed) {
+		if (type == OuterJoinType.FULL) {
+			return new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2);
+		} else if (type == OuterJoinType.LEFT) {
+			return new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+		} else {
+			return new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+		}
+	}
+
+	@Override
+	public OuterJoinOperatorBase<?, ?, ?, ?> getOperator() {
+		return (OuterJoinOperatorBase<?, ?, ?, ?>) super.getOperator();
+	}
+
+	@Override
+	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return dataProperties;
+	}
+
+	@Override
+	public String getOperatorName() {
+		return "Outer Join";
+	}
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
+
+		if (card1 < 0 || card2 < 0) {
+			this.estimatedNumRecords = -1;
+		} else {
+			this.estimatedNumRecords = Math.max(card1, card2);
+		}
+
+		if (this.estimatedNumRecords >= 0) {
+			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
+
+			if (width > 0) {
+				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
new file mode 100644
index 0000000..d54b5cf
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractSortMergeJoinDescriptor extends AbstractJoinDescriptor {
+
+	public AbstractSortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
+		super(keys1, keys2);
+	}
+
+	public AbstractSortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
+			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) {
+		super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+	}
+
+	@Override
+	protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+		RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1));
+		RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
+		return Collections.singletonList(new LocalPropertiesPair(sort1, sort2));
+	}
+
+	@Override
+	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+			LocalProperties produced1, LocalProperties produced2) {
+		int numRelevantFields = this.keys1.size();
+		return checkSameOrdering(produced1, produced2, numRelevantFields);
+	}
+
+	@Override
+	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+		boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+
+		if (inputOrders == null || inputOrders.length < this.keys1.size()) {
+			throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator.");
+		} else if (inputOrders.length > this.keys1.size()) {
+			boolean[] tmp = new boolean[this.keys1.size()];
+			System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+			inputOrders = tmp;
+		}
+
+		String nodeName = String.format("%s(%s)", getNodeName(), node.getOperator().getName());
+		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2, inputOrders);
+	}
+
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+		LocalProperties comb = LocalProperties.combine(in1, in2);
+		return comb.clearUniqueFieldSets();
+	}
+
+	protected abstract String getNodeName();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index 368944e..571d6e6 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -177,30 +177,9 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	
 	@Override
 	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
-			LocalProperties produced1, LocalProperties produced2)
-	{
+			LocalProperties produced1, LocalProperties produced2) {
 		int numRelevantFields = this.keys1.size();
-		
-		Ordering prod1 = produced1.getOrdering();
-		Ordering prod2 = produced2.getOrdering();
-		
-		if (prod1 == null || prod2 == null) {
-			throw new CompilerException("The given properties do not meet this operators requirements.");
-		}
-
-		// check that order of fields is equivalent
-		if (!checkEquivalentFieldPositionsInKeyFields(
-				prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
-			return false;
-		}
-
-		// check that order directions are equivalent
-		for (int i = 0; i < numRelevantFields; i++) {
-			if (prod1.getOrder(i) != prod2.getOrder(i)) {
-				return false;
-			}
-		}
-		return true;
+		return checkSameOrdering(produced1, produced2, numRelevantFields);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
index c21593e..17ea8a5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer.operators;
 
 import java.util.List;
 
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.dag.TwoInputNode;
@@ -124,6 +125,29 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
 		return true;
 	}
 
+	protected boolean checkSameOrdering(LocalProperties produced1, LocalProperties produced2, int numRelevantFields) {
+		Ordering prod1 = produced1.getOrdering();
+		Ordering prod2 = produced2.getOrdering();
+
+		if (prod1 == null || prod2 == null) {
+			throw new CompilerException("The given properties do not meet this operators requirements.");
+		}
+
+		// check that order of fields is equivalent
+		if (!checkEquivalentFieldPositionsInKeyFields(
+				prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+			return false;
+		}
+
+		// check that both inputs have the same directions of order
+		for (int i = 0; i < numRelevantFields; i++) {
+			if (prod1.getOrder(i) != prod2.getOrder(i)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class GlobalPropertiesPair {

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java
new file mode 100644
index 0000000..4e05067
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeFullOuterJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+	public SortMergeFullOuterJoinDescriptor(FieldList keys1, FieldList keys2) {
+		super(keys1, keys2, false, false, true);
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.FULL_OUTER_MERGE;
+	}
+
+	@Override
+	protected String getNodeName() {
+		return "FullOuterJoin";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java
new file mode 100644
index 0000000..1c3ea19
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeInnerJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+	public SortMergeInnerJoinDescriptor(FieldList keys1, FieldList keys2) {
+		super(keys1, keys2);
+	}
+
+	public SortMergeInnerJoinDescriptor(FieldList keys1, FieldList keys2,
+			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) {
+		super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.INNER_MERGE;
+	}
+
+	@Override
+	protected String getNodeName() {
+		return "Join";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
deleted file mode 100644
index 3ab0aa7..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * 
- */
-public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
-	
-	public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
-		super(keys1, keys2);
-	}
-	
-	public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
-			boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
-	{
-		super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
-	}
-
-	@Override
-	public DriverStrategy getStrategy() {
-		return DriverStrategy.INNER_MERGE;
-	}
-
-	@Override
-	protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-		RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1));
-		RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
-		return Collections.singletonList(new LocalPropertiesPair(sort1, sort2));
-	}
-
-	@Override
-	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
-			LocalProperties produced1, LocalProperties produced2)
-	{
-		int numRelevantFields = this.keys1.size();
-		
-		Ordering prod1 = produced1.getOrdering();
-		Ordering prod2 = produced2.getOrdering();
-		
-		if (prod1 == null || prod2 == null) {
-			throw new CompilerException("The given properties do not meet this operators requirements.");
-		}
-
-		// check that order of fields is equivalent
-		if (!checkEquivalentFieldPositionsInKeyFields(
-				prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
-			return false;
-		}
-
-		// check that both inputs have the same directions of order
-		for (int i = 0; i < numRelevantFields; i++) {
-			if (prod1.getOrder(i) != prod2.getOrder(i)) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	@Override
-	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
-		boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
-		
-		if (inputOrders == null || inputOrders.length < this.keys1.size()) {
-			throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator.");
-		} else if (inputOrders.length > this.keys1.size()) {
-			boolean[] tmp = new boolean[this.keys1.size()];
-			System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
-			inputOrders = tmp;
-		}
-		
-		return new DualInputPlanNode(node, "Join(" + node.getOperator().getName() + ")", in1, in2, DriverStrategy.INNER_MERGE, this.keys1, this.keys2, inputOrders);
-	}
-
-	@Override
-	public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
-		LocalProperties comb = LocalProperties.combine(in1, in2);
-		return comb.clearUniqueFieldSets();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java
new file mode 100644
index 0000000..8193960
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeLeftOuterJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+	public SortMergeLeftOuterJoinDescriptor(FieldList keys1, FieldList keys2) {
+		super(keys1, keys2, false, true, true);
+	}
+
+	public SortMergeLeftOuterJoinDescriptor(FieldList keys1, FieldList keys2, boolean broadcastAllowed) {
+		super(keys1, keys2, false, broadcastAllowed, true);
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.LEFT_OUTER_MERGE;
+	}
+
+	@Override
+	protected String getNodeName() {
+		return "LeftOuterJoin";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java
new file mode 100644
index 0000000..3719d05
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeRightOuterJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+	public SortMergeRightOuterJoinDescriptor(FieldList keys1, FieldList keys2) {
+		super(keys1, keys2, true, false, true);
+	}
+
+	public SortMergeRightOuterJoinDescriptor(FieldList keys1, FieldList keys2, boolean broadcastAllowed) {
+		super(keys1, keys2, broadcastAllowed, false, true);
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.RIGHT_OUTER_MERGE;
+	}
+
+	@Override
+	protected String getNodeName() {
+		return "RightOuterJoin";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 7fbdf81..bcdee14 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -26,15 +26,17 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
 import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
@@ -44,6 +46,7 @@ import org.apache.flink.optimizer.dag.BinaryUnionNode;
 import org.apache.flink.optimizer.dag.BulkIterationNode;
 import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
 import org.apache.flink.optimizer.dag.CoGroupNode;
+import org.apache.flink.optimizer.dag.CoGroupRawNode;
 import org.apache.flink.optimizer.dag.CollectorMapNode;
 import org.apache.flink.optimizer.dag.CrossNode;
 import org.apache.flink.optimizer.dag.DagConnection;
@@ -57,6 +60,7 @@ import org.apache.flink.optimizer.dag.JoinNode;
 import org.apache.flink.optimizer.dag.MapNode;
 import org.apache.flink.optimizer.dag.MapPartitionNode;
 import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.OuterJoinNode;
 import org.apache.flink.optimizer.dag.PartitionNode;
 import org.apache.flink.optimizer.dag.ReduceNode;
 import org.apache.flink.optimizer.dag.SolutionSetNode;
@@ -69,8 +73,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
-import org.apache.flink.optimizer.dag.CoGroupRawNode;
 
 /**
  * This traversal creates the optimizer DAG from a program.
@@ -160,8 +162,11 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 		else if (c instanceof GroupReduceOperatorBase) {
 			n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
 		}
-		else if (c instanceof JoinOperatorBase) {
-			n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
+		else if (c instanceof InnerJoinOperatorBase) {
+			n = new JoinNode((InnerJoinOperatorBase<?, ?, ?, ?>) c);
+		}
+		else if (c instanceof OuterJoinOperatorBase) {
+			n = new OuterJoinNode((OuterJoinOperatorBase<?, ?, ?, ?>) c);
 		}
 		else if (c instanceof CoGroupOperatorBase) {
 			n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
index 8c19462..17f0241 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -1431,6 +1431,6 @@ public class FeedbackPropertiesMatchTest {
 	}
 	
 	private static JoinNode getJoinNode() {
-		return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+		return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 00ada2a..321e5ca 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.optimizer;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -126,7 +126,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 		oPlan.accept(new Visitor<PlanNode>() {
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof JoinOperatorBase) {
+				if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof InnerJoinOperatorBase) {
 					DualInputPlanNode node = ((DualInputPlanNode) visitable);
 
 					final Channel inConn1 = node.getInput1();

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
index 839f0a1..1d559c2 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -34,8 +34,8 @@ public class JoinGlobalPropertiesCompatibilityTest {
 		try {
 			final FieldList keysLeft = new FieldList(1, 4);
 			final FieldList keysRight = new FieldList(3, 1);
-			
-			SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+
+			SortMergeInnerJoinDescriptor descr = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
 			
 			// test compatible hash partitioning
 			{
@@ -121,7 +121,7 @@ public class JoinGlobalPropertiesCompatibilityTest {
 				}
 			};
 			
-			SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+			SortMergeInnerJoinDescriptor descr = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
 			
 			// test incompatible hash with custom partitioning
 			{

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 207bc5d..306a15b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.Utils.CountHelper
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
 import org.apache.flink.api.java.io.{DiscardingOutputFormat, PrintingOutputFormat, TextOutputFormat}
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.{DataSet => JavaDataSet, Utils}
 import org.apache.flink.api.scala.operators.{ScalaAggregateOperator, ScalaCsvOutputFormat}
 import org.apache.flink.configuration.Configuration
@@ -840,11 +841,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 
   /**
    * Creates a new DataSet by joining `this` DataSet with the `other` DataSet. To specify the join
-   * keys the `where` and `isEqualTo` methods must be used. For example:
+   * keys the `where` and `equalTo` methods must be used. For example:
    * {{{
    *   val left: DataSet[(String, Int, Int)] = ...
    *   val right: DataSet[(Int, String, Int)] = ...
-   *   val joined = left.join(right).where(0).isEqualTo(1)
+   *   val joined = left.join(right).where(0).equalTo(1)
    * }}}
    *
    * The default join result is a DataSet with 2-Tuples of the joined values. In the above example
@@ -854,7 +855,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * {{{
    *   val left: DataSet[(String, Int, Int)] = ...
    *   val right: DataSet[(Int, String, Int)] = ...
-   *   val joined = left.join(right).where(0).isEqualTo(1) { (l, r) =>
+   *   val joined = left.join(right).where(0).equalTo(1) { (l, r) =>
    *     (l._1, r._2)
    *   }
    * }}}
@@ -864,7 +865,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * {{{
    *   val left: DataSet[(String, Int, Int)] = ...
    *   val right: DataSet[(Int, String, Int)] = ...
-   *   val joined = left.join(right).where(0).isEqualTo(1) {
+   *   val joined = left.join(right).where(0).equalTo(1) {
    *     (l, r, out: Collector[(String, Int)]) =>
    *       if (l._2 > 4) {
    *         out.collect((l._1, r._3))
@@ -875,29 +876,119 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    *     }
    * }}}
    */
-  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
-    new UnfinishedJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES)
+  def join[O](other: DataSet[O]): UnfinishedInnerJoinOperation[T, O] =
+    new UnfinishedInnerJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES)
 
   /**
    * Special [[join]] operation for explicitly telling the system what join strategy to use. If
    * null is given as the join strategy, then the optimizer will pick the strategy.
    */
-  def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O] =
-    new UnfinishedJoinOperation(this, other, strategy)
+  def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedInnerJoinOperation[T, O] =
+    new UnfinishedInnerJoinOperation(this, other, strategy)
   
   /**
    * Special [[join]] operation for explicitly telling the system that the right side is assumed
    * to be a lot smaller than the left side of the join.
    */
-  def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
-    new UnfinishedJoinOperation(this, other, JoinHint.BROADCAST_HASH_SECOND)
+  def joinWithTiny[O](other: DataSet[O]): UnfinishedInnerJoinOperation[T, O] =
+    new UnfinishedInnerJoinOperation(this, other, JoinHint.BROADCAST_HASH_SECOND)
 
   /**
    * Special [[join]] operation for explicitly telling the system that the left side is assumed
    * to be a lot smaller than the right side of the join.
    */
-  def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
-    new UnfinishedJoinOperation(this, other, JoinHint.BROADCAST_HASH_FIRST)
+  def joinWithHuge[O](other: DataSet[O]): UnfinishedInnerJoinOperation[T, O] =
+    new UnfinishedInnerJoinOperation(this, other, JoinHint.BROADCAST_HASH_FIRST)
+
+  /**
+   * Creates a new DataSet by performing a full outer join of `this` DataSet
+   * with the `other` DataSet, by combining two elements of two DataSets on
+   * key equality.
+   * Elements of both DataSets that do not have a matching element on the
+   * opposing side are joined with `null` and emitted to the resulting DataSet.
+   *
+   * To specify the join keys the `where` and `equalTo` methods must be used. For example:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val joined = left.fullOuterJoin(right).where(0).equalTo(1)
+   * }}}
+   *
+   * When using an outer join you are required to specify a join function. For example:
+   * {{{
+   *   val joined = left.fullOuterJoin(right).where(0).equalTo(1) {
+   *     (left, right) =>
+   *       val a = if (left == null) null else left._1
+   *       val b = if (right == null) null else right._3
+   *       (a, b)
+   *  }
+   * }}}
+   */
+  def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] =
+    new UnfinishedOuterJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.FULL_OUTER)
+
+  /**
+   * Special [[fullOuterJoin]] operation for explicitly telling the system what join strategy to
+   * use. If null is given as the join strategy, then the optimizer will pick the strategy.
+   */
+  def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
+    new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.FULL_OUTER)
+
+  /**
+   * An outer join on the left side.
+   *
+   * Elements of the left side (i.e. `this`) that do not have a matching element on the other
+   * side are joined with `null` and emitted to the resulting DataSet.
+   *
+   * @param other The other DataSet with which this DataSet is joined.
+   * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+   * @see #fullOuterJoin
+   */
+  def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] =
+    new UnfinishedOuterJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.LEFT_OUTER)
+
+  /**
+   * An outer join on the left side.
+   *
+   * Elements of the left side (i.e. `this`) that do not have a matching element on the other
+   * side are joined with `null` and emitted to the resulting DataSet.
+   *
+   * @param other The other DataSet with which this DataSet is joined.
+   * @param strategy The strategy that should be used execute the join. If { @code null} is given,
+   *                 then the optimizer will pick the join strategy.
+   * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+   * @see #fullOuterJoin
+   */
+  def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
+    new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.LEFT_OUTER)
+
+  /**
+   * An outer join on the right side.
+   *
+   * Elements of the right side (i.e. `other`) that do not have a matching element on `this`
+   * side are joined with `null` and emitted to the resulting DataSet.
+   *
+   * @param other The other DataSet with which this DataSet is joined.
+   * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+   * @see #fullOuterJoin
+   */
+  def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] =
+    new UnfinishedOuterJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.RIGHT_OUTER)
+
+  /**
+   * An outer join on the right side.
+   *
+   * Elements of the right side (i.e. `other`) that do not have a matching element on `this`
+   * side are joined with `null` and emitted to the resulting DataSet.
+   *
+   * @param other The other DataSet with which this DataSet is joined.
+   * @param strategy The strategy that should be used execute the join. If { @code null} is given,
+   *                 then the optimizer will pick the join strategy.
+   * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+   * @see #fullOuterJoin
+   */
+  def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
+    new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.RIGHT_OUTER)
 
   // --------------------------------------------------------------------------------------------
   //  Co-Group

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index ecc1aab..f57fc25 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.api.scala
 
+import org.apache.flink.api.common.operators.Operator
+import org.apache.flink.api.common.operators.base.JoinOperatorBase
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
 import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
 import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
@@ -60,7 +63,7 @@ class JoinDataSet[L, R](
     rightInput: DataSet[R],
     leftKeys: Keys[L],
     rightKeys: Keys[R])
-  extends DataSet(defaultJoin) {
+  extends DataSet(defaultJoin) with JoinFunctionAssigner[L, R] {
 
   private var customPartitioner : Partitioner[_] = _
   
@@ -84,7 +87,8 @@ class JoinDataSet[L, R](
       joiner,
       implicitly[TypeInformation[O]],
       defaultJoin.getJoinHint,
-      getCallLocationName())
+      getCallLocationName(),
+      defaultJoin.getJoinType)
     
     if (customPartitioner != null) {
       wrap(joinOperator.withPartitioner(customPartitioner))
@@ -114,7 +118,8 @@ class JoinDataSet[L, R](
       joiner,
       implicitly[TypeInformation[O]],
       defaultJoin.getJoinHint,
-      getCallLocationName())
+      getCallLocationName(),
+      defaultJoin.getJoinType)
 
     if (customPartitioner != null) {
       wrap(joinOperator.withPartitioner(customPartitioner))
@@ -142,7 +147,8 @@ class JoinDataSet[L, R](
       joiner,
       implicitly[TypeInformation[O]],
       defaultJoin.getJoinHint,
-      getCallLocationName())
+      getCallLocationName(),
+      defaultJoin.getJoinType)
 
     if (customPartitioner != null) {
       wrap(joinOperator.withPartitioner(customPartitioner))
@@ -171,7 +177,8 @@ class JoinDataSet[L, R](
       generatedFunction, fun,
       implicitly[TypeInformation[O]],
       defaultJoin.getJoinHint,
-      getCallLocationName())
+      getCallLocationName(),
+      defaultJoin.getJoinType)
 
     if (customPartitioner != null) {
       wrap(joinOperator.withPartitioner(customPartitioner))
@@ -205,9 +212,46 @@ class JoinDataSet[L, R](
   }
 }
 
+private[flink] abstract class UnfinishedJoinOperation[L, R, O <: JoinFunctionAssigner[L, R]](
+    leftSet: DataSet[L],
+    rightSet: DataSet[R],
+    val joinHint: JoinHint,
+    val joinType: JoinType)
+  extends UnfinishedKeyPairOperation[L, R, O](leftSet, rightSet) {
+
+  def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]): O
+
+  private[flink] def createDefaultJoin(leftKey: Keys[L], rightKey: Keys[R]) = {
+    val joiner = new FlatJoinFunction[L, R, (L, R)] {
+      def join(left: L, right: R, out: Collector[(L, R)]) = {
+        out.collect((left, right))
+      }
+    }
+    val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
+    val joinOperator = new EquiJoin[L, R, (L, R)](
+      leftSet.javaSet,
+      rightSet.javaSet,
+      leftKey,
+      rightKey,
+      joiner,
+      returnType,
+      joinHint,
+      getCallLocationName(),
+      joinType)
+
+    new JoinDataSet(joinOperator, leftSet, rightSet, leftKey, rightKey)
+  }
+
+  private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
+    createJoinFunctionAssigner(leftKey, rightKey)
+  }
+}
+
 /**
- * An unfinished join operation that results from [[DataSet.join()]] The keys for the left and right
- * side must be specified using first `where` and then `equalTo`. For example:
+ * An unfinished inner join operation that results from calling [[DataSet.join()]].
+ * The keys for the left and right side must be specified using first `where` and then `equalTo`.
+ *
+ * For example:
  *
  * {{{
  *   val left = ...
@@ -217,24 +261,77 @@ class JoinDataSet[L, R](
  * @tparam L The type of the left input of the join.
  * @tparam R The type of the right input of the join.
  */
-class UnfinishedJoinOperation[L, R](
+class UnfinishedInnerJoinOperation[L, R](
     leftSet: DataSet[L],
     rightSet: DataSet[R],
-    val joinHint: JoinHint)
-  extends UnfinishedKeyPairOperation[L, R, JoinDataSet[L, R]](leftSet, rightSet) {
+    joinHint: JoinHint)
+  extends UnfinishedJoinOperation[L, R, JoinDataSet[L, R]](
+    leftSet, rightSet, joinHint, JoinType.INNER) {
 
-  private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
-    val joiner = new FlatJoinFunction[L, R, (L, R)] {
-      def join(left: L, right: R, out: Collector[(L, R)]) = {
-        out.collect((left, right))
-      }
-    }
-    val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
-    val joinOperator = new EquiJoin[L, R, (L, R)](
-      leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
-        getCallLocationName())
+  override def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]) = {
+    createDefaultJoin(leftKey, rightKey)
+  }
+}
 
-    new JoinDataSet(joinOperator, leftSet, rightSet, leftKey, rightKey)
+/**
+ * An unfinished outer join operation that results from calling, e.g. [[DataSet.fullOuterJoin()]].
+ * The keys for the left and right side must be specified using first `where` and then `equalTo`.
+ *
+ * Note that a join function must always be specified explicitly when construction an outer join
+ * operator.
+ *
+ * For example:
+ *
+ * {{{
+ *   val left = ...
+ *   val right = ...
+ *   val joinResult = left.fullOuterJoin(right).where(...).equalTo(...) {
+ *     (first, second) => ...
+ *   }
+ * }}}
+ * @tparam L The type of the left input of the join.
+ * @tparam R The type of the right input of the join.
+ */
+class UnfinishedOuterJoinOperation[L, R](
+    leftSet: DataSet[L],
+    rightSet: DataSet[R],
+    joinHint: JoinHint,
+    joinType: JoinType)
+  extends UnfinishedJoinOperation[L, R, JoinFunctionAssigner[L, R]](
+    leftSet, rightSet, joinHint, joinType) {
+
+  override def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]):
+      JoinFunctionAssigner[L, R] = {
+    new DefaultJoinFunctionAssigner(createDefaultJoin(leftKey, rightKey))
+  }
+
+  private class DefaultJoinFunctionAssigner(val defaultJoin: JoinDataSet[L, R])
+    extends JoinFunctionAssigner[L, R] {
+
+    override def withPartitioner[K: TypeInformation](part: Partitioner[K]) =
+      defaultJoin.withPartitioner(part)
+
+    override def apply[O: TypeInformation : ClassTag](fun: (L, R) => O) =
+      defaultJoin.apply(fun)
+
+    override def apply[O: TypeInformation : ClassTag](fun: (L, R, Collector[O]) => Unit) =
+      defaultJoin.apply(fun)
+
+    override def apply[O: TypeInformation : ClassTag](fun: FlatJoinFunction[L, R, O]) =
+      defaultJoin.apply(fun)
+
+    override def apply[O: TypeInformation : ClassTag](fun: JoinFunction[L, R, O]) =
+      defaultJoin.apply(fun)
   }
 
 }
+
+trait JoinFunctionAssigner[L, R] {
+
+  def withPartitioner[K : TypeInformation](part : Partitioner[K]) : JoinFunctionAssigner[L, R]
+  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
+  def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
+  def apply[O: TypeInformation: ClassTag](fun: FlatJoinFunction[L, R, O]): DataSet[O]
+  def apply[O: TypeInformation: ClassTag](fun: JoinFunction[L, R, O]): DataSet[O]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 566573e..7605b3a 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,7 +29,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.junit.Assert;
 
 public class TestEnvironment extends ExecutionEnvironment {
 
@@ -60,21 +59,13 @@ public class TestEnvironment extends ExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		try {
-			OptimizedPlan op = compileProgram(jobName);
+		OptimizedPlan op = compileProgram(jobName);
 
-			JobGraphGenerator jgg = new JobGraphGenerator();
-			JobGraph jobGraph = jgg.compileJobGraph(op);
-			
-			this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
-			return this.lastJobExecutionResult;
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Job execution failed!");
-			return null;
-		}
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		JobGraph jobGraph = jgg.compileJobGraph(op);
+
+		this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
+		return this.lastJobExecutionResult;
 	}
 
 


Mime
View raw message