crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-216 Transpose args in MapsideJoinStrategy
Date Tue, 18 Feb 2014 22:35:49 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 296abb86c -> 799b1232f


CRUNCH-216 Transpose args in MapsideJoinStrategy

Initial move in transposing the parameters for the
MapsideJoinStrategy to bring it in line with other join
strategies (i.e. the left-side table should be the smaller of
the two tables being joined).

Introduce static factory methods for creating MapsideJoinStrategy
instances that load the left-side table into memory, and deprecate
the existing public constructor to warn users of the future change.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/799b1232
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/799b1232
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/799b1232

Branch: refs/heads/master
Commit: 799b1232f1aa6179759506722709f26a858adedf
Parents: 296abb8
Author: Gabriel Reid <greid@apache.org>
Authored: Sat Feb 15 23:24:48 2014 +0100
Committer: Gabriel Reid <greid@apache.org>
Committed: Tue Feb 18 23:30:17 2014 +0100

----------------------------------------------------------------------
 .../crunch/lib/join/MapsideJoinStrategyIT.java  | 186 ++++++++++++++++---
 .../crunch/lib/join/MapsideJoinStrategy.java    |  92 ++++++++-
 2 files changed, 247 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/799b1232/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index 9972549..9add60a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -42,6 +42,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
+import sun.print.resources.serviceui;
 
 public class MapsideJoinStrategyIT {
   
@@ -91,22 +92,44 @@ public class MapsideJoinStrategyIT {
 
   @Test
   public void testMapSideJoin_MemPipeline() {
-    runMapsideJoin(MemPipeline.getInstance(), true, false);
+    runMapsideJoin(MemPipeline.getInstance(), true, false, MapsideJoinStrategy.<Integer,String,String>create(false));
+  }
+
+  @Test
+  public void testLegacyMapSideJoin_MemPipeline() {
+    runLegacyMapsideJoin(MemPipeline.getInstance(), true, false, new MapsideJoinStrategy<Integer,
String, String>(false));
   }
 
   @Test
   public void testMapSideJoin_MemPipeline_Materialized() {
-    runMapsideJoin(MemPipeline.getInstance(), true, true);
+    runMapsideJoin(MemPipeline.getInstance(), true, true, MapsideJoinStrategy.<Integer,String,String>create(true));
+  }
+
+  @Test
+  public void testLegacyMapSideJoin_MemPipeline_Materialized() {
+    runLegacyMapsideJoin(MemPipeline.getInstance(), true, true, new MapsideJoinStrategy<Integer,
String, String>(true));
   }
   
   @Test
-  public void testMapSideJoinLeftOuterJoin_MemPipeline() {
-    runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false);
+  public void testMapSideJoinRightOuterJoin_MemPipeline() {
+    runMapsideRightOuterJoin(MemPipeline.getInstance(), true, false,
+                             MapsideJoinStrategy.<Integer, String, String>create(false));
+  }
+
+  @Test
+  public void testLegacyMapSideJoinLeftOuterJoin_MemPipeline() {
+    runLegacyMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false, new MapsideJoinStrategy<Integer,
String, String>(false));
+  }
+
+  @Test
+  public void testMapSideJoinRightOuterJoin_MemPipeline_Materialized() {
+    runMapsideRightOuterJoin(MemPipeline.getInstance(), true, true,
+                             MapsideJoinStrategy.<Integer, String, String>create(true));
   }
 
   @Test
-  public void testMapSideJoinLeftOuterJoin_MemPipeline_Materialized() {
-    runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true);
+  public void testLegacyMapSideJoinLeftOuterJoin_MemPipeline_Materialized() {
+    runLegacyMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true, new MapsideJoinStrategy<Integer,
String, String>(true));
   }
 
   @Test
@@ -128,35 +151,115 @@ public class MapsideJoinStrategyIT {
   }
 
   @Test
+  public void testLegacyMapsideJoin_LeftSideIsEmpty() throws IOException {
+    MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredCustomerTable = customerTable
+        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), customerTable.getPTableType());
+
+
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable,
filteredCustomerTable,
+                                                                    JoinType.INNER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+  }
+
+  @Test
   public void testMapsideJoin() throws IOException {
-    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
false, false);
+    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                   false, false, MapsideJoinStrategy.<Integer, String, String>create(false));
+  }
+
+  @Test
+  public void testLegacyMapsideJoin() throws IOException {
+    runLegacyMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                   false, false, new MapsideJoinStrategy<Integer, String, String>(false));
   }
 
   @Test
   public void testMapsideJoin_Materialized() throws IOException {
-    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
false, true);
+    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                   false, true, MapsideJoinStrategy.<Integer, String, String>create(true));
+  }
+
+  @Test
+  public void testLegacyMapsideJoin_Materialized() throws IOException {
+    runLegacyMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                   false, true, new MapsideJoinStrategy<Integer, String, String>(true));
+  }
+
+  @Test
+  public void testMapsideJoin_RightOuterJoin() throws IOException {
+    runMapsideRightOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                             false, false, MapsideJoinStrategy.<Integer, String, String>create(false));
+  }
+
+  @Test
+  public void testLegacyMapsideJoin_LeftOuterJoin() throws IOException {
+    runLegacyMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                                  false, false,
+                                  new MapsideJoinStrategy<Integer, String, String>(false));
   }
 
   @Test
-  public void testMapsideJoin_LeftOuterJoin() throws IOException {
-    runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
false, false);
+  public void testMapsideJoin_RightOuterJoin_Materialized() throws IOException {
+    runMapsideRightOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                             false, true, MapsideJoinStrategy.<Integer, String, String>create(true));
   }
 
   @Test
-  public void testMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
-    runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
false, true);
+  public void testLegacyMapsideJoin_LeftOuterJoin_Materialized() throws IOException {
+    runLegacyMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()),
+                                  false, true,
+                                  new MapsideJoinStrategy<Integer, String, String>(true));
   }
 
-  private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize) {
+  private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+                              MapsideJoinStrategy<Integer,String, String> joinStrategy)
{
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
     
-    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>(materialize);
-    PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable,
JoinType.INNER_JOIN)
+    PTable<Integer, String> custOrders = joinStrategy.join(orderTable, customerTable,
JoinType.INNER_JOIN)
+        .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+    PTable<Integer, Pair<String, String>> joined = joinStrategy.join(ORDER_TABLE,
custOrders, JoinType.INNER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("CORN FLAKES", "[Corn flakes,John Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet paper,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet plunger,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet paper,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet plunger,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("TOILET BRUSH", "[Toilet brush,Someone Else]")));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+    
+    PipelineResult res = pipeline.run();
+    if (!inMemory) {
+      assertEquals(materialize ? 2 : 1, res.getStageResults().size());
+    }
+     
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private void runLegacyMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+                                    MapsideJoinStrategy<Integer, String, String> mapsideJoinStrategy)
{
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> custOrders = mapsideJoinStrategy.join(customerTable, orderTable,
JoinType.INNER_JOIN)
         .mapValues("concat", new ConcatValuesFn(), Writables.strings());
 
     PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
-    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders,
ORDER_TABLE, JoinType.INNER_JOIN);
+    PTable<Integer, Pair<String, String>> joined = mapsideJoinStrategy.join(custOrders,
ORDER_TABLE, JoinType.INNER_JOIN);
 
     List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
     expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
@@ -166,28 +269,63 @@ public class MapsideJoinStrategyIT {
     expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
     expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
     Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
-    
+
     PipelineResult res = pipeline.run();
     if (!inMemory) {
       assertEquals(materialize ? 2 : 1, res.getStageResults().size());
     }
-     
+
     List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
     Collections.sort(joinedResultList);
 
     assertEquals(expectedJoinResult, joinedResultList);
   }
   
-  private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize)
{
+  private void runMapsideRightOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize,
+                                        MapsideJoinStrategy<Integer, String, String>
mapsideJoinStrategy) {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
     
-    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>(materialize);
-    PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable,
JoinType.LEFT_OUTER_JOIN)
+    PTable<Integer, String> custOrders = mapsideJoinStrategy.join(orderTable, customerTable,
JoinType.RIGHT_OUTER_JOIN)
+        .mapValues("concat", new ConcatValuesFn(), Writables.strings());
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
+    PTable<Integer, Pair<String, String>> joined = mapsideJoinStrategy.join(ORDER_TABLE,
custOrders,
+                                                                     JoinType.RIGHT_OUTER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("CORN FLAKES", "[Corn flakes,John Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet paper,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet plunger,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet paper,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet plunger,Jane Doe]")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("TOILET BRUSH", "[Toilet brush,Someone Else]")));
+    expectedJoinResult.add(Pair.of(444, Pair.<String,String>of(null, "[null,Has No
Orders]")));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+    
+    PipelineResult res = pipeline.run();
+    if (!inMemory) {
+      assertEquals(materialize ? 2 : 1, res.getStageResults().size());
+    }
+     
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private void runLegacyMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean
materialize,
+                                             MapsideJoinStrategy<Integer, String, String>
legacyMapsideJoinStrategy) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> custOrders = legacyMapsideJoinStrategy.join(customerTable,
orderTable,
+                                                                        JoinType.LEFT_OUTER_JOIN)
         .mapValues("concat", new ConcatValuesFn(), Writables.strings());
 
     PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
-    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders,
ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
+    PTable<Integer, Pair<String, String>> joined =
+        legacyMapsideJoinStrategy.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
 
     List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
     expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
@@ -198,12 +336,12 @@ public class MapsideJoinStrategyIT {
     expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
     expectedJoinResult.add(Pair.of(444, Pair.<String,String>of("[Has No Orders,null]",
null)));
     Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
-    
+
     PipelineResult res = pipeline.run();
     if (!inMemory) {
       assertEquals(materialize ? 2 : 1, res.getStageResults().size());
     }
-     
+
     List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
     Collections.sort(joinedResultList);
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/799b1232/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
index 680bb2e..cafb4f9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
@@ -17,12 +17,12 @@
  */
 package org.apache.crunch.lib.join;
 
-import java.io.IOException;
-import java.util.Collection;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
@@ -30,17 +30,22 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.conf.Configuration;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
 
 /**
  * Utility for doing map side joins on a common key between two {@link PTable}s.
  * <p>
  * A map side join is an optimized join which doesn't use a reducer; instead,
- * the right side of the join is loaded into memory and the join is performed in
+ * one side of the join is loaded into memory and the join is performed in
  * a mapper. This style of join has the important implication that the output of
  * the join is not sorted, which is the case with a conventional (reducer-based)
  * join.
+ * <p/>
+ * Instances of this class should be instantiated via the {@link #create()} or {@link #create(boolean)}
factory
+ * methods, or optionally via the deprecated public constructor for backwards compatibility
with
+ * older versions of Crunch where the right-side table was loaded into memory. The public
constructor will be removed
+ * in a future release.
  */
 public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
 
@@ -49,24 +54,54 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K,
U, V> {
   /**
    * Constructs a new instance of the {@code MapsideJoinStratey}, materializing the right-side
    * join table to disk before the join is performed.
+   *
+   * @deprecated Use the {@link #create()} factory method instead
    */
+  @Deprecated
   public MapsideJoinStrategy() {
     this(true);
   }
 
   /**
-   * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code }materialize}
+   * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code materialize}
    * argument is true, then the right-side join {@code PTable} will be materialized to disk
    * before the in-memory join is performed. If it is false, then Crunch can optionally read
    * and process the data from the right-side table without having to run a job to materialize
    * the data to disk first.
    *
    * @param materialize Whether or not to materialize the right-side table before the join
+   *
+   * @deprecated Use the {@link #create(boolean)} factory method instead
    */
+  @Deprecated
   public MapsideJoinStrategy(boolean materialize) {
     this.materialize = materialize;
   }
 
+  /**
+   * Create a new {@code MapsideJoinStrategy} instance that will load its left-side table
into memory,
+   * and will materialize the contents of the left-side table to disk before running the
in-memory join.
+   * <p/>
+   * The smaller of the two tables to be joined should be provided as the left-side table
of the created join
+   * strategy instance.
+   */
+  public static <K, U, V> MapsideJoinStrategy<K, U, V> create() {
+    return create(true);
+  }
+
+  /**
+   * Create a new {@code MapsideJoinStrategy} instance that will load its left-side table
into memory.
+   * <p/>
+   * If the {@code materialize} parameter is true, then the left-side {@code PTable} will
be materialized to disk
+   * before the in-memory join is performed. If it is false, then Crunch can optionally read
and process the data
+   * from the left-side table without having to run a job to materialize the data to disk
first.
+   *
+   * @param materialize Whether or not to materialize the left-side table before the join
+   */
+  public static <K, U, V> MapsideJoinStrategy<K, U, V> create(boolean materialize)
{
+    return new LoadLeftSideMapsideJoinStrategy(materialize);
+  }
+
   @Override
   public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V>
right, JoinType joinType) {
     switch (joinType) {
@@ -138,4 +173,47 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K,
U, V> {
       }
     }
   }
+
+  /**
+   * Loads the left-most table (instead of the right-most) in memory while performing the
join.
+   */
+  private static class LoadLeftSideMapsideJoinStrategy<K, U, V> extends MapsideJoinStrategy<K,
U, V> {
+
+    private MapsideJoinStrategy<K, V, U> mapsideJoinStrategy;
+
+    public LoadLeftSideMapsideJoinStrategy(boolean materialize) {
+      mapsideJoinStrategy = new MapsideJoinStrategy<K, V, U>(materialize);
+    }
+
+    @Override
+    public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V>
right, JoinType joinType) {
+
+      JoinType reversedJoinType;
+      switch (joinType) {
+        case INNER_JOIN:
+          reversedJoinType = JoinType.INNER_JOIN;
+          break;
+        case RIGHT_OUTER_JOIN:
+          reversedJoinType = JoinType.LEFT_OUTER_JOIN;
+          break;
+        default:
+          throw new UnsupportedOperationException("Join type " + joinType + " is not supported");
+      }
+
+
+      return mapsideJoinStrategy.join(right, left, reversedJoinType)
+          .mapValues("Reverse order out output table values",
+                     new ReversePairOrderFn<V, U>(),
+                     left.getTypeFamily().pairs(left.getValueType(), right.getValueType()));
+    }
+  }
+
+  private static class ReversePairOrderFn<V, U> extends MapFn<Pair<V, U>,
Pair<U, V>> {
+
+    @Override
+    public Pair<U, V> map(Pair<V, U> input) {
+      return Pair.of(input.second(), input.first());
+    }
+
+  }
 }


Mime
View raw message