incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: Fix object-reuse bug in key of joins
Date Sat, 21 Jul 2012 10:53:05 GMT
Updated Branches:
  refs/heads/master 4097823bd -> af54c920e


Fix object-reuse bug in key of joins

Fix for issue which breaks all join functions when non-mapped
types are used as keys in a join. Also add unit testing for the
basic functionality of all join functions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/af54c920
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/af54c920
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/af54c920

Branch: refs/heads/master
Commit: af54c920e59bdca7fbe8f7790e263e5c323ada87
Parents: 4097823
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Sat Jul 21 12:43:12 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Sat Jul 21 12:43:12 2012 +0200

----------------------------------------------------------------------
 .../apache/crunch/lib/join/FullOuterJoinIT.java    |    2 +-
 .../org/apache/crunch/lib/join/InnerJoinIT.java    |    2 +-
 .../org/apache/crunch/lib/join/JoinTester.java     |    2 +-
 .../apache/crunch/lib/join/LeftOuterJoinIT.java    |    2 +-
 .../apache/crunch/lib/join/RightOuterJoinIT.java   |    2 +-
 .../src/main/java/org/apache/crunch/lib/Join.java  |    8 +-
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |    6 +-
 .../org/apache/crunch/lib/join/InnerJoinFn.java    |    6 +-
 .../java/org/apache/crunch/lib/join/JoinFn.java    |    6 +-
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |    6 +-
 .../apache/crunch/lib/join/RightOuterJoinFn.java   |    6 +-
 .../crunch/lib/join/FullOuterJoinFnTest.java       |   48 +++++++++
 .../apache/crunch/lib/join/InnerJoinFnTest.java    |   42 ++++++++
 .../org/apache/crunch/lib/join/JoinFnTestBase.java |   81 +++++++++++++++
 .../apache/crunch/lib/join/LeftOuterJoinTest.java  |   46 ++++++++
 .../crunch/lib/join/RightOuterJoinFnTest.java      |   46 ++++++++
 16 files changed, 289 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
index 8016a24..63d594d 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
@@ -46,6 +46,6 @@ public class FullOuterJoinIT extends JoinTester {
 
   @Override
   protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs());
+    return new FullOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
index 97220ac..4759050 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
@@ -46,6 +46,6 @@ public class InnerJoinIT extends JoinTester {
 
   @Override
   protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new InnerJoinFn<String, Long, Long>(typeFamily.longs());
+    return new InnerJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
index 54c4945..5700740 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
+R * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
index aafe1c9..4ad2a81 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
@@ -46,6 +46,6 @@ public class LeftOuterJoinIT extends JoinTester {
 
   @Override
   protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs());
+    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
index a3bb122..d889b61 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
@@ -46,6 +46,6 @@ public class RightOuterJoinIT extends JoinTester {
 
   @Override
   protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new RightOuterJoinFn<String, Long, Long>(typeFamily.longs());
+    return new RightOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/main/java/org/apache/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Join.java b/crunch/src/main/java/org/apache/crunch/lib/Join.java
index b755789..c0c4a6b 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Join.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Join.java
@@ -75,7 +75,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K,
U> left, PTable<K, V> right) {
-    return join(left, right, new InnerJoinFn<K, U, V>(left.getValueType()));
+    return join(left, right, new InnerJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
   }
 
   /**
@@ -97,7 +97,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U>
left, PTable<K, V> right) {
-    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getValueType()));
+    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
   }
 
   /**
@@ -120,7 +120,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K,
U> left, PTable<K, V> right) {
-    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getValueType()));
+    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
   }
 
   /**
@@ -141,7 +141,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U>
left, PTable<K, V> right) {
-    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getValueType()));
+    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
   }
 
   public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
left, PTable<K, V> right, JoinFn<K, U, V> joinFn) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
index 8e8737c..3c63f07 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -41,8 +41,8 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
   private transient K lastKey;
   private transient List<U> leftValues;
 
-  public FullOuterJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
+  public FullOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
   }
 
   /** {@inheritDoc} */
@@ -63,7 +63,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
           emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
         }
       }
-      lastKey = key;
+      lastKey = keyType.getDetachedValue(key);
       leftValues.clear();
     }
     if (id == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
index 11ddeaf..bbcc35f 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -40,8 +40,8 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
   private transient K lastKey;
   private transient List<U> leftValues;
 
-  public InnerJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
+  public InnerJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
   }
 
   /** {@inheritDoc} */
@@ -55,7 +55,7 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
   @Override
   public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K,
Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
-      lastKey = key;
+      lastKey = keyType.getDetachedValue(key);
       leftValues.clear();
     }
     if (id == 0) { // from left

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
index 7ecaaf4..f45ce9c 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -34,16 +34,20 @@ import org.apache.crunch.types.PType;
  */
 public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>,
Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
 
+  protected PType<K> keyType;
   protected PType<U> leftValueType;
 
   /**
    * Instantiate with the PType of the value of the left side of the join (used
    * for creating deep copies of values).
    * 
+   * @param keyType
+   *          The PType of the value used as the key of the join
    * @param leftValueType
    *          The PType of the value type of the left side of the join
    */
-  public JoinFn(PType<U> leftValueType) {
+  public JoinFn(PType<K> keyType, PType<U> leftValueType) {
+    this.keyType = keyType;
     this.leftValueType = leftValueType;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
index 8da372b..272e081 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -41,8 +41,8 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
   private transient K lastKey;
   private transient List<U> leftValues;
 
-  public LeftOuterJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
+  public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
   }
 
   /** {@inheritDoc} */
@@ -63,7 +63,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
           emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
         }
       }
-      lastKey = key;
+      lastKey = keyType.getDetachedValue(key);
       leftValues.clear();
     }
     if (id == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
index 15a8930..2dbb2f9 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -40,8 +40,8 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
   private transient K lastKey;
   private transient List<U> leftValues;
 
-  public RightOuterJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
+  public RightOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
+    super(keyType, leftValueType);
   }
 
   /** {@inheritDoc} */
@@ -55,7 +55,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
   @Override
   public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K,
Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
-      lastKey = key;
+      lastKey = keyType.getDetachedValue(key);
       leftValues.clear();
     }
     if (id == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
new file mode 100644
index 0000000..5cf4f51
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class FullOuterJoinFnTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>>
emitter) {
+    verify(emitter)
+        .emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verify(emitter).emit(
+        Pair.of(wrap("right-only"), Pair.of((StringWrapper) null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new FullOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java
new file mode 100644
index 0000000..d2347de
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class InnerJoinFnTest extends JoinFnTestBase {
+
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>>
joinEmitter) {
+    verify(joinEmitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verifyNoMoreInteractions(joinEmitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new InnerJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
new file mode 100644
index 0000000..270d2c7
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public abstract class JoinFnTestBase {
+
+  private JoinFn<StringWrapper, StringWrapper, String> joinFn;
+
+  private Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter;
+
+  // Avoid warnings on generic Emitter mock
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() {
+    joinFn = getJoinFn();
+    joinFn.initialize();
+    emitter = mock(Emitter.class);
+  }
+
+  @Test
+  public void testJoin() {
+
+    StringWrapper key = new StringWrapper();
+    StringWrapper leftValue = new StringWrapper();
+    key.setValue("left-only");
+    leftValue.setValue("left-only-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+
+    key.setValue("both");
+    leftValue.setValue("both-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+    joinFn.join(key, 1, createValuePairList(null, "both-right"), emitter);
+
+    key.setValue("right-only");
+    joinFn.join(key, 1, createValuePairList(null, "right-only-right"), emitter);
+
+    checkOutput(emitter);
+
+  }
+
+  protected abstract void checkOutput(
+      Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter);
+
+  protected abstract JoinFn<StringWrapper, StringWrapper, String> getJoinFn();
+
+  protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper
leftValue,
+      String rightValue) {
+    Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue);
+    List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList();
+    valuePairList.add(valuePair);
+    return valuePairList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
new file mode 100644
index 0000000..a90457e
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class LeftOuterJoinTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>>
emitter) {
+    verify(emitter)
+        .emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new LeftOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/af54c920/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java
new file mode 100644
index 0000000..7e41284
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class RightOuterJoinFnTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>>
emitter) {
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verify(emitter).emit(
+        Pair.of(wrap("right-only"), Pair.of((StringWrapper) null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new RightOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}


Mime
View raw message