crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [2/2] git commit: CRUNCH-213 Add sharded join
Date Sun, 09 Jun 2013 21:41:59 GMT
CRUNCH-213 Add sharded join

Also refactor join code in general to introduce a JoinStrategy
interface.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/63016301
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/63016301
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/63016301

Branch: refs/heads/master
Commit: 630163014ccf91edd0dac79780607f73295a46a4
Parents: 1d844b3
Author: Gabriel Reid <greid@apache.org>
Authored: Fri Jun 7 22:49:08 2013 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Sun Jun 9 23:37:05 2013 +0200

----------------------------------------------------------------------
 .../lib/join/AbstractFullOuterJoinIT.java       |  52 +++++
 .../crunch/lib/join/AbstractInnerJoinIT.java    |  51 +++++
 .../lib/join/AbstractLeftOuterJoinIT.java       |  50 +++++
 .../lib/join/AbstractRightOuterJoinIT.java      |  50 +++++
 .../crunch/lib/join/DefaultFullOuterJoinIT.java |  27 +++
 .../crunch/lib/join/DefaultInnerJoinIT.java     |  27 +++
 .../crunch/lib/join/DefaultLeftOuterJoinIT.java |  27 +++
 .../lib/join/DefaultRightOuterJoinIT.java       |  27 +++
 .../crunch/lib/join/FullOuterJoinFnTest.java    |  48 +++++
 .../apache/crunch/lib/join/FullOuterJoinIT.java |  51 -----
 .../org/apache/crunch/lib/join/InnerJoinIT.java |  51 -----
 .../org/apache/crunch/lib/join/JoinTester.java  |  31 ++-
 .../apache/crunch/lib/join/LeftOuterJoinIT.java |  51 -----
 .../apache/crunch/lib/join/MapsideJoinIT.java   | 158 --------------
 .../crunch/lib/join/MapsideJoinStrategyIT.java  | 204 +++++++++++++++++++
 .../crunch/lib/join/RightOuterJoinIT.java       |  51 -----
 .../crunch/lib/join/ShardedInnerJoinIT.java     |  27 +++
 .../lib/join/ShardedRightOuterJoinIT.java       |  30 +++
 crunch-core/src/it/resources/orders.txt         |   3 +-
 .../crunch/impl/mem/collect/MemCollection.java  |  17 +-
 .../main/java/org/apache/crunch/lib/Join.java   |  57 +-----
 .../crunch/lib/join/DefaultJoinStrategy.java    |  90 ++++++++
 .../apache/crunch/lib/join/JoinStrategy.java    |  40 ++++
 .../org/apache/crunch/lib/join/JoinType.java    |  62 ++++++
 .../org/apache/crunch/lib/join/MapsideJoin.java | 164 ---------------
 .../crunch/lib/join/MapsideJoinStrategy.java    | 177 ++++++++++++++++
 .../crunch/lib/join/ShardedJoinStrategy.java    | 174 ++++++++++++++++
 .../crunch/lib/join/FullOuterJoinFnTest.java    |  48 -----
 28 files changed, 1211 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractFullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractFullOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractFullOuterJoinIT.java
new file mode 100644
index 0000000..24e67b5
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractFullOuterJoinIT.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+
+public abstract class AbstractFullOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+  
+  @Override
+  protected JoinType getJoinType() {
+    return JoinType.FULL_OUTER_JOIN;
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractInnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractInnerJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractInnerJoinIT.java
new file mode 100644
index 0000000..8ceaa03
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractInnerJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+
+public abstract class AbstractInnerJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinType getJoinType() {
+    return JoinType.INNER_JOIN;
+  }
+ 
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractLeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractLeftOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractLeftOuterJoinIT.java
new file mode 100644
index 0000000..241f5ad
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractLeftOuterJoinIT.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+
+public abstract class AbstractLeftOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinType getJoinType() {
+    return JoinType.LEFT_OUTER_JOIN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractRightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractRightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractRightOuterJoinIT.java
new file mode 100644
index 0000000..43e0479
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/AbstractRightOuterJoinIT.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+
+public abstract class AbstractRightOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinType getJoinType() {
+    return JoinType.RIGHT_OUTER_JOIN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultFullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultFullOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultFullOuterJoinIT.java
new file mode 100644
index 0000000..a7d7e07
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultFullOuterJoinIT.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+public class DefaultFullOuterJoinIT extends AbstractFullOuterJoinIT {
+
+  @Override
+  protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
+    return new DefaultJoinStrategy<K, U, V>();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultInnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultInnerJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultInnerJoinIT.java
new file mode 100644
index 0000000..02a92ec
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultInnerJoinIT.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+public class DefaultInnerJoinIT extends AbstractInnerJoinIT {
+
+  @Override
+  protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
+    return new DefaultJoinStrategy<K, U, V>();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultLeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultLeftOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultLeftOuterJoinIT.java
new file mode 100644
index 0000000..3bbf5b9
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultLeftOuterJoinIT.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+public class DefaultLeftOuterJoinIT extends AbstractLeftOuterJoinIT {
+
+  @Override
+  protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
+    return new DefaultJoinStrategy<K, U, V>();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultRightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultRightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultRightOuterJoinIT.java
new file mode 100644
index 0000000..21983db
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/DefaultRightOuterJoinIT.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+public class DefaultRightOuterJoinIT extends AbstractRightOuterJoinIT {
+
+  @Override
+  protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
+    return new DefaultJoinStrategy<K, U, V>();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
new file mode 100644
index 0000000..5cf4f51
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class FullOuterJoinFnTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter) {
+    verify(emitter)
+        .emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verify(emitter).emit(
+        Pair.of(wrap("right-only"), Pair.of((StringWrapper) null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new FullOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
deleted file mode 100644
index 63d594d..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class FullOuterJoinIT extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = false;
-    boolean passed3 = false;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first()) && 10 == line.second()) {
-        passed2 = true;
-      }
-      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
-        passed3 = true;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new FullOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
deleted file mode 100644
index 4759050..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class InnerJoinIT extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = true;
-    boolean passed3 = true;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first())) {
-        passed2 = false;
-      }
-      if ("Montparnasse.".equals(line.first())) {
-        passed3 = false;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new InnerJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
index 3e8ffda..0e782d1 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java
@@ -26,9 +26,9 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Join;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTableType;
@@ -39,6 +39,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 public abstract class JoinTester implements Serializable {
+  
   private static class WordSplit extends DoFn<String, String> {
     @Override
     public void process(String input, Emitter<String> emitter) {
@@ -52,8 +53,10 @@ public abstract class JoinTester implements Serializable {
     PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
     PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
     PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
+    
+    JoinStrategy<String,Long,Long> joinStrategy = getJoinStrategy();
 
-    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
+    PTable<String, Pair<Long, Long>> join = joinStrategy.join(ws1, ws2, getJoinType());
 
     PTable<String, Long> sums = join.parallelDo("cnt", new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
       @Override
@@ -85,13 +88,29 @@ public abstract class JoinTester implements Serializable {
 
   @Test
   public void testWritableJoin() throws Exception {
-    run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+    run(new MRPipeline(AbstractInnerJoinIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
   }
 
   @Test
   public void testAvroJoin() throws Exception {
-    run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+    run(new MRPipeline(AbstractInnerJoinIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
   }
+  
+  @Test
+  public void testAvroJoin_MemPipeline() throws Exception {
+    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testWritableJoin_MemPipeline() throws Exception {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+  
+  /** 
+   * Return the JoinStrategy to be tested. 
+   */
+  protected abstract <K, U, V> JoinStrategy<K, U, V> getJoinStrategy();
+  
 
   /**
    * Used to check that the result of the join makes sense.
@@ -102,7 +121,7 @@ public abstract class JoinTester implements Serializable {
   public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
 
   /**
-   * @return The JoinFn to use.
+   * @return The JoinType to be used in the test.
    */
-  protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
+  protected abstract JoinType getJoinType();
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
deleted file mode 100644
index 4ad2a81..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class LeftOuterJoinIT extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = false;
-    boolean passed3 = true;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first()) && 10 == line.second()) {
-        passed2 = true;
-      }
-      if ("Montparnasse.".equals(line.first())) {
-        passed3 = false;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
deleted file mode 100644
index 8bb5586..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.PipelineResult;
-import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.fn.MapValuesFn;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class MapsideJoinIT {
-  
-  private static String saveTempDir;
-  
-  @BeforeClass
-  public static void setUpClass(){
-    
-    // Ensure a consistent temporary directory for use of the DistributedCache.
-    
-    // The DistributedCache technically isn't supported when running in local mode, and the default
-    // temporary directiory "/tmp" is used as its location. This typically only causes an issue when 
-    // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
-    // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
-    saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
-  }
-  
-  @AfterClass
-  public static void tearDownClass(){
-    System.setProperty("java.io.tmpdir", saveTempDir);
-  }
-
-  private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
-    @Override
-    public Pair<Integer, String> map(String input) {
-      String[] fields = input.split("\\|");
-      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
-    }
-  }
-
-  private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
-    @Override
-    public String map(String v) {
-      return v.toUpperCase();
-    }
-  }
-  
-  private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> {
-    @Override
-    public String map(Pair<String, String> v) {
-      return v.toString();
-    }
-  }
-  
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Test
-  public void testMapSideJoin_MemPipeline() {
-    runMapsideJoin(MemPipeline.getInstance(), true);
-  }
-
-  @Test
-  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
-    MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration());
-    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
-    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
-
-    PTable<Integer, String> filteredOrderTable = orderTable
-        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
-
-    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, filteredOrderTable);
-
-    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
-
-    assertTrue(materializedJoin.isEmpty());
-  }
-
-  @Test
-  public void testMapsideJoin() throws IOException {
-    runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()), false);
-  }
-
-  private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
-    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
-    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
-    
-    PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable)
-        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
-
-    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType());
-    
-    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders, ORDER_TABLE);
-
-    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
-    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
-    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
-    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
-    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
-    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
-    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
-    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
-    
-    PipelineResult res = pipeline.run();
-    if (!inMemory) {
-      assertEquals(2, res.getStageResults().size());
-    }
-     
-    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
-    Collections.sort(joinedResultList);
-
-    assertEquals(expectedJoinResult, joinedResultList);
-  }
-
-  private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
-    try {
-      return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable", new LineSplitter(),
-          Writables.tableOf(Writables.ints(), Writables.strings()));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
new file mode 100644
index 0000000..c459ad8
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class MapsideJoinStrategyIT {
+  
+  private static String saveTempDir;
+  
+  @BeforeClass
+  public static void setUpClass(){
+    
+    // Ensure a consistent temporary directory for use of the DistributedCache.
+    
+    // The DistributedCache technically isn't supported when running in local mode, and the default
+    // temporary directiory "/tmp" is used as its location. This typically only causes an issue when 
+    // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary
+    // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms.
+    saveTempDir = System.setProperty("java.io.tmpdir", "/tmp");
+  }
+  
+  @AfterClass
+  public static void tearDownClass(){
+    System.setProperty("java.io.tmpdir", saveTempDir);
+  }
+
+  private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
+    @Override
+    public Pair<Integer, String> map(String input) {
+      String[] fields = input.split("\\|");
+      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+    }
+  }
+
+  private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
+    @Override
+    public String map(String v) {
+      return v.toUpperCase();
+    }
+  }
+  
+  private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> {
+    @Override
+    public String map(Pair<String, String> v) {
+      return v.toString();
+    }
+  }
+  
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testMapSideJoin_MemPipeline() {
+    runMapsideJoin(MemPipeline.getInstance(), true);
+  }
+  
+  @Test
+  public void testMapSideJoinLeftOuterJoin_MemPipeline() {
+    runMapsideLeftOuterJoin(MemPipeline.getInstance(), true);
+  }
+
+  @Test
+  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+    MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredOrderTable = orderTable
+        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
+
+    
+    MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredOrderTable, JoinType.INNER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+  }
+
+  @Test
+  public void testMapsideJoin() throws IOException {
+    runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false);
+  }
+  
+  @Test
+  public void testMapsideJoin_LeftOuterJoin() throws IOException {
+    runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false);
+  }
+
+  private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+    
+    MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+    PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN)
+        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType());
+    
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+    
+    PipelineResult res = pipeline.run();
+    if (!inMemory) {
+      assertEquals(2, res.getStageResults().size());
+    }
+     
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+  
+  private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+    
+    MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
+    PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN)
+        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
+
+    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType());
+    
+    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+    expectedJoinResult.add(Pair.of(444, Pair.<String,String>of("[Has No Orders,null]", null)));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+    
+    PipelineResult res = pipeline.run();
+    if (!inMemory) {
+      assertEquals(2, res.getStageResults().size());
+    }
+     
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
+    try {
+      return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable", new LineSplitter(),
+          Writables.tableOf(Writables.ints(), Writables.strings()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
deleted file mode 100644
index d889b61..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PTypeFamily;
-
-public class RightOuterJoinIT extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = true;
-    boolean passed3 = false;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first())) {
-        passed2 = false;
-      }
-      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
-        passed3 = true;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new RightOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedInnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedInnerJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedInnerJoinIT.java
new file mode 100644
index 0000000..d7941f4
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedInnerJoinIT.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+public class ShardedInnerJoinIT extends AbstractInnerJoinIT {
+
+  @Override
+  protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
+    return new ShardedJoinStrategy<K, U, V>(3);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedRightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedRightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedRightOuterJoinIT.java
new file mode 100644
index 0000000..c636b8b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/ShardedRightOuterJoinIT.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+
+public class ShardedRightOuterJoinIT extends AbstractRightOuterJoinIT {
+
+  @Override
+  protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() {
+    return new ShardedJoinStrategy<K, U, V>(3);
+  }
+
+ 
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/it/resources/orders.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/orders.txt b/crunch-core/src/it/resources/orders.txt
index 2f1383f..d60df82 100644
--- a/crunch-core/src/it/resources/orders.txt
+++ b/crunch-core/src/it/resources/orders.txt
@@ -1,4 +1,5 @@
 222|Toilet plunger
 333|Toilet brush
 222|Toilet paper
-111|Corn flakes
\ No newline at end of file
+111|Corn flakes
+555|Not ordered
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index c97fac6..d0df916 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.mem.collect;
 
 import java.lang.reflect.Method;
 import java.util.Collection;
+import java.util.Set;
 
 import javassist.util.proxy.MethodFilter;
 import javassist.util.proxy.MethodHandler;
@@ -50,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 public class MemCollection<S> implements PCollection<S> {
@@ -251,19 +253,22 @@ public class MemCollection<S> implements PCollection<S> {
     Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
     Class[] types = new Class[0];
     Object[] args = new Object[0];
+    final TaskAttemptID taskAttemptId = new TaskAttemptID();
     if (superType.isInterface()) {
       factory.setInterfaces(new Class[] { superType });
     } else {
       types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
           StatusReporter.class };
-      args = new Object[] { conf, new TaskAttemptID(), null, null, null };
+      args = new Object[] { conf, taskAttemptId, null, null, null };
       factory.setSuperclass(superType);
     }
+
+    final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter", 
+                                                  "progress", "getTaskAttemptID");
     factory.setFilter(new MethodFilter() {
       @Override
       public boolean isHandled(Method m) {
-        String name = m.getName();
-        return "getConfiguration".equals(name) || "getCounter".equals(name) || "progress".equals(name);
+        return handledMethods.contains(m.getName());
       }
     });
     MethodHandler handler = new MethodHandler() {
@@ -275,12 +280,16 @@ public class MemCollection<S> implements PCollection<S> {
         } else if ("progress".equals(name)) {
           // no-op
           return null;
-        } else { // getCounter
+        } else if ("getTaskAttemptID".equals(name)) {
+          return taskAttemptId;
+        } else if ("getCounter".equals(name)){ // getCounter
           if (args.length == 1) {
             return MemPipeline.getCounters().findCounter((Enum<?>) args[0]);
           } else {
             return MemPipeline.getCounters().findCounter((String) args[0], (String) args[1]);
           }
+        } else {
+          throw new IllegalStateException("Unhandled method " + name);
         }
       }
     };

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Join.java b/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
index c0c4a6b..2452279 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Join.java
@@ -17,25 +17,17 @@
  */
 package org.apache.crunch.lib;
 
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
-import org.apache.crunch.lib.join.FullOuterJoinFn;
-import org.apache.crunch.lib.join.InnerJoinFn;
-import org.apache.crunch.lib.join.JoinFn;
-import org.apache.crunch.lib.join.JoinUtils;
-import org.apache.crunch.lib.join.LeftOuterJoinFn;
-import org.apache.crunch.lib.join.RightOuterJoinFn;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.lib.join.DefaultJoinStrategy;
+import org.apache.crunch.lib.join.JoinType;
 
 /**
  * Utilities for joining multiple {@code PTable} instances based on a common
  * lastKey.
  */
 public class Join {
+  
   /**
    * Performs an inner join on the specified {@link PTable}s.
    * 
@@ -75,7 +67,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) {
-    return join(left, right, new InnerJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    return new DefaultJoinStrategy<K, U, V>().join(left, right, JoinType.INNER_JOIN);
   }
 
   /**
@@ -97,7 +89,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) {
-    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    return new DefaultJoinStrategy<K, U, V>().join(left, right, JoinType.LEFT_OUTER_JOIN);
   }
 
   /**
@@ -120,7 +112,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) {
-    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    return new DefaultJoinStrategy<K, U, V>().join(left, right, JoinType.RIGHT_OUTER_JOIN);
   }
 
   /**
@@ -141,41 +133,8 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) {
-    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    return new DefaultJoinStrategy<K, U, V>().join(left, right, JoinType.FULL_OUTER_JOIN);
   }
 
-  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinFn<K, U, V> joinFn) {
-    PTypeFamily ptf = left.getTypeFamily();
-    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
-    PTableType<K, Pair<U, V>> ret = ptf
-        .tableOf(left.getKeyType(), ptf.pairs(left.getValueType(), right.getValueType()));
-
-    return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
-  }
-
-  private static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) {
-    PTypeFamily ptf = left.getTypeFamily();
-    PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
-        ptf.pairs(left.getValueType(), right.getValueType()));
-
-    PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
-        new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
-          @Override
-          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
-            return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
-          }
-        }, ptt);
-    PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
-        new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
-          @Override
-          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
-            return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
-          }
-        }, ptt);
-
-    GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
-    optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
-
-    return (tag1.union(tag2)).groupByKey(optionsBuilder.build());
-  }
+  
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
new file mode 100644
index 0000000..87b9495
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Default join strategy that simply sends all data through the map, shuffle, and reduce phase.
+ * <p>
+ * This join strategy is full-featured (i.e. all methods are available), but is not highly
+ * efficient due to its passing all data through the shuffle phase.
+ */
+public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
+  
+  @Override
+  public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
+    switch (joinType) {
+    case INNER_JOIN:
+      return join(left, right, new InnerJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    case LEFT_OUTER_JOIN:
+      return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    case RIGHT_OUTER_JOIN:
+      return join(left, right,
+        new RightOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    case FULL_OUTER_JOIN:
+      return join(left, right, new FullOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
+    default:
+      throw new UnsupportedOperationException("Join type " + joinType + " is not supported");
+    }
+  }
+
+  PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinFn<K, U, V> joinFn) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
+    PTableType<K, Pair<U, V>> ret = ptf
+        .tableOf(left.getKeyType(), ptf.pairs(left.getValueType(), right.getValueType()));
+
+    return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
+  }
+
+  private PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
+        ptf.pairs(left.getValueType(), right.getValueType()));
+
+    PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
+        new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+          @Override
+          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
+            return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
+          }
+        }, ptt);
+    PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
+        new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+          @Override
+          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
+            return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
+          }
+        }, ptt);
+
+    GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
+    optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
+
+    return (tag1.union(tag2)).groupByKey(optionsBuilder.build());
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinStrategy.java
new file mode 100644
index 0000000..134b067
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinStrategy.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.Serializable;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+
+/**
+ * Defines a strategy for joining two PTables together on a common key.
+ */
+public interface JoinStrategy<K, U, V> extends Serializable {
+  
+  /**
+   * Join two tables with the given join type.
+   * 
+   * @param left left table to be joined
+   * @param right right table to be joined
+   * @param joinType type of join to perform
+   * @return joined tables
+   */
+  PTable<K, Pair<U,V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType);
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinType.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinType.java
new file mode 100644
index 0000000..9aceb88
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinType.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+/**
+ * Specifies the specific behavior of how a join should be performed in terms of requiring matching keys 
+ * on both sides of the join.
+ */
+public enum JoinType {
+  /**
+   * Join two tables on a common key. Every value in the left-side table under a given key will be 
+   * present with every value from the right-side table with the same key.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner Join</a>
+   */
+  INNER_JOIN,
+  
+  /**
+   * Join two tables on a common key, including entries from the left-side table that have
+   * no matching key in the right-side table.
+   * <p>
+   * This is an optional method for implementations. 
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Left_outer_join">Left Join</a>
+   */
+  LEFT_OUTER_JOIN,
+  
+  /**
+   * Join two tables on a common key, including entries from the right-side table that have
+   * no matching key in the left-side table.
+   * <p>
+   * This is an optional method for implementations.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Right_outer_join">Right Join</a>
+   */
+  RIGHT_OUTER_JOIN,
+  
+  /**
+   * Join two tables on a common key, also including entries from both tables that have no
+   * matching key in the other table.
+   * <p>
+   * This is an optional method for implementations.
+   * 
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Full_outer_join">Full Join</a>
+   */
+  FULL_OUTER_JOIN
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
deleted file mode 100644
index 56476c1..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.io.IOException;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Utility for doing map side joins on a common key between two {@link PTable}s.
- * <p>
- * A map side join is an optimized join which doesn't use a reducer; instead,
- * the right side of the join is loaded into memory and the join is performed in
- * a mapper. This style of join has the important implication that the output of
- * the join is not sorted, which is the case with a conventional (reducer-based)
- * join.
- * <p>
- * <b>Note:</b>This utility is only supported when running with a
- * {@link MRPipeline} as the pipeline.
- */
-public class MapsideJoin {
-
-  /**
-   * Join two tables using a map side join. The right-side table will be loaded
-   * fully in memory, so this method should only be used if the right side
-   * table's contents can fit in the memory allocated to mappers. The join
-   * performed by this method is an inner join.
-   * 
-   * @param left
-   *          The left-side table of the join
-   * @param right
-   *          The right-side table of the join, whose contents will be fully
-   *          read into memory
-   * @return A table keyed on the join key, containing pairs of joined values
-   */
-  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
-    PTypeFamily tf = left.getTypeFamily();
-    Iterable<Pair<K, V>> iterable = right.materialize();
-
-    if (iterable instanceof MaterializableIterable) {
-      MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
-      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
-          right.getPType());
-      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
-      if (mi.isSourceTarget()) {
-        optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
-      }
-      return left.parallelDo("mapjoin", mapJoinDoFn,
-          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
-          optionsBuilder.build());
-    } else { // in-memory pipeline
-      return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
-          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
-    }
-  }
-
-  static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
-    private Multimap<K, V> joinMap;
-    
-    public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
-      joinMap = HashMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
-      }
-    }
-    
-    @Override
-    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
-      K key = input.first();
-      U value = input.second();
-      for (V joinValue : joinMap.get(key)) {
-        Pair<U, V> valuePair = Pair.of(value, joinValue);
-        emitter.emit(Pair.of(key, valuePair));
-      }
-    }
-  }
-  
-  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
-    private String inputPath;
-    private PType<Pair<K, V>> ptype;
-    private Multimap<K, V> joinMap;
-
-    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
-      this.inputPath = inputPath;
-      this.ptype = ptype;
-    }
-
-    private Path getCacheFilePath() {
-      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
-      if (local == null) {
-        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
-      }
-      return local;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      DistCache.addCacheFile(new Path(inputPath), conf);
-    }
-    
-    @Override
-    public void initialize() {
-      super.initialize();
-
-      ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
-          getCacheFilePath());
-      Iterable<Pair<K, V>> iterable = null;
-      try {
-        iterable = sourceTarget.read(getConfiguration());
-      } catch (IOException e) {
-        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
-      }
-
-      joinMap = ArrayListMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
-      }
-    }
-
-    @Override
-    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
-      K key = input.first();
-      U value = input.second();
-      for (V joinValue : joinMap.get(key)) {
-        Pair<U, V> valuePair = Pair.of(value, joinValue);
-        emitter.emit(Pair.of(key, valuePair));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/63016301/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
new file mode 100644
index 0000000..1710f30
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Utility for doing map side joins on a common key between two {@link PTable}s.
+ * <p>
+ * A map side join is an optimized join which doesn't use a reducer; instead,
+ * the right side of the join is loaded into memory and the join is performed in
+ * a mapper. This style of join has the important implication that the output of
+ * the join is not sorted, which is the case with a conventional (reducer-based)
+ * join.
+ */
+public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
+
+  @Override
+  public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
+    switch (joinType) {
+    case INNER_JOIN:
+      return joinInternal(left, right, false);
+    case LEFT_OUTER_JOIN:
+      return joinInternal(left, right, true);
+    default:
+      throw new UnsupportedOperationException("Join type " + joinType
+          + " not supported by MapsideJoinStrategy");
+    }
+  }
+  
+
+  private PTable<K, Pair<U,V>> joinInternal(PTable<K, U> left, PTable<K, V> right, boolean includeUnmatchedLeftValues) {
+    PTypeFamily tf = left.getTypeFamily();
+    Iterable<Pair<K, V>> iterable = right.materialize();
+
+    if (iterable instanceof MaterializableIterable) {
+      MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
+      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
+          includeUnmatchedLeftValues, right.getPType());
+      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
+      if (mi.isSourceTarget()) {
+        optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
+      }
+      return left.parallelDo("mapjoin", mapJoinDoFn,
+          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
+          optionsBuilder.build());
+    } else { // in-memory pipeline
+      return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable, includeUnmatchedLeftValues),
+          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
+    }
+  }
+
+  static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+    private Multimap<K, V> joinMap;
+    private boolean includeUnmatched;
+    
+    public InMemoryJoinFn(Iterable<Pair<K, V>> iterable, boolean includeUnmatched) {
+      joinMap = HashMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+      this.includeUnmatched = includeUnmatched;
+    }
+    
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+      K key = input.first();
+      U value = input.second();
+      Collection<V> joinValues = joinMap.get(key);
+      if (includeUnmatched && joinValues.isEmpty()) {
+        emitter.emit(Pair.of(key, Pair.of(value, (V)null)));
+      } else {
+        for (V joinValue : joinValues) {
+          Pair<U, V> valuePair = Pair.of(value, joinValue);
+          emitter.emit(Pair.of(key, valuePair));
+        }
+      }
+    }
+  }
+  
+  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+    private String inputPath;
+    private final boolean includeUnmatched;
+    private PType<Pair<K, V>> ptype;
+    private Multimap<K, V> joinMap;
+
+    public MapsideJoinDoFn(String inputPath, boolean includeUnmatched, PType<Pair<K, V>> ptype) {
+      this.inputPath = inputPath;
+      this.includeUnmatched = includeUnmatched;
+      this.ptype = ptype;
+    }
+
+    private Path getCacheFilePath() {
+      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
+      if (local == null) {
+        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+      }
+      return local;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      DistCache.addCacheFile(new Path(inputPath), conf);
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+
+      ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
+          getCacheFilePath());
+      Iterable<Pair<K, V>> iterable = null;
+      try {
+        iterable = sourceTarget.read(getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
+      }
+
+      joinMap = ArrayListMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+    }
+
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+      K key = input.first();
+      U value = input.second();
+      Collection<V> joinValues = joinMap.get(key);
+      if (includeUnmatched && joinValues.isEmpty()) {
+        emitter.emit(Pair.of(key, Pair.<U,V>of(value, null)));
+      } else {
+        for (V joinValue : joinValues) {
+          Pair<U, V> valuePair = Pair.of(value, joinValue);
+          emitter.emit(Pair.of(key, valuePair));
+        }
+      }
+    }
+  }
+}


Mime
View raw message