incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: Detach iterated join values
Date Fri, 06 Jul 2012 21:39:26 GMT
Updated Branches:
  refs/heads/master 0473d8e86 -> 3a3111546


Detach iterated join values

Values being joined are typically re-used objects from a reducer's
iterator, meaning storing them in a local collection does not have
the desired behavior. The iterated values are now detached (i.e.
deep copied) in joins to get around this.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/3a311154
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/3a311154
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/3a311154

Branch: refs/heads/master
Commit: 3a3111546e788f4139fa291f018fe71bd3420d91
Parents: 0473d8e
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Fri Jul 6 23:35:43 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Fri Jul 6 23:35:43 2012 +0200

----------------------------------------------------------------------
 src/main/java/com/cloudera/crunch/lib/Join.java    |    8 +++---
 .../cloudera/crunch/lib/join/FullOuterJoinFn.java  |    7 +++++-
 .../com/cloudera/crunch/lib/join/InnerJoinFn.java  |   17 +++++++++-----
 .../java/com/cloudera/crunch/lib/join/JoinFn.java  |   14 ++++++++++++
 .../cloudera/crunch/lib/join/LeftOuterJoinFn.java  |    7 +++++-
 .../cloudera/crunch/lib/join/RightOuterJoinFn.java |    9 ++++++-
 .../crunch/lib/join/FullOuterJoinTest.java         |    5 ++-
 .../cloudera/crunch/lib/join/InnerJoinTest.java    |    5 ++-
 .../com/cloudera/crunch/lib/join/JoinTester.java   |    4 +-
 .../crunch/lib/join/LeftOuterJoinTest.java         |    5 ++-
 .../crunch/lib/join/RightOuterJoinTest.java        |    5 ++-
 11 files changed, 62 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Join.java b/src/main/java/com/cloudera/crunch/lib/Join.java
index ca68aec..0871dc1 100644
--- a/src/main/java/com/cloudera/crunch/lib/Join.java
+++ b/src/main/java/com/cloudera/crunch/lib/Join.java
@@ -59,7 +59,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K,
U> left, PTable<K, V> right) {
-    return join(left, right, new InnerJoinFn<K, U, V>());
+    return join(left, right, new InnerJoinFn<K, U, V>(left.getValueType()));
   }
 
   /**
@@ -75,7 +75,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U>
left, PTable<K, V> right) {
-    return join(left, right, new LeftOuterJoinFn<K, U, V>());
+    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getValueType()));
   }
 
   /**
@@ -91,7 +91,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K,
U> left, PTable<K, V> right) {
-    return join(left, right, new RightOuterJoinFn<K, U, V>());
+    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getValueType()));
   }
 
   /**
@@ -106,7 +106,7 @@ public class Join {
    * @return The joined result.
    */
   public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U>
left, PTable<K, V> right) {
-    return join(left, right, new FullOuterJoinFn<K, U, V>());
+    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getValueType()));
   }
 
   public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
left, PTable<K, V> right,

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
index 80728a7..3d8888e 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
@@ -18,6 +18,7 @@ import java.util.List;
 
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
 import com.google.common.collect.Lists;
 
 /**
@@ -33,6 +34,10 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
   private transient K lastKey;
   private transient List<U> leftValues;
 
+  public FullOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void initialize() {
@@ -58,7 +63,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
     if (id == 0) {
       for (Pair<U, V> pair : pairs) {
         if (pair.first() != null)
-          leftValues.add(pair.first());
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
       }
     } else {
       for (Pair<U, V> pair : pairs) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
index e5013e5..0d7230a 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
@@ -18,6 +18,7 @@ import java.util.List;
 
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
 import com.google.common.collect.Lists;
 
 /**
@@ -28,15 +29,19 @@ import com.google.common.collect.Lists;
  * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
  */
 public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  
+
   private transient K lastKey;
-  private transient List<U> LeftValues;
+  private transient List<U> leftValues;
+  
+  public InnerJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
 
   /** {@inheritDoc} */
   @Override
   public void initialize() {
     lastKey = null;
-    this.LeftValues = Lists.newArrayList();
+    this.leftValues = Lists.newArrayList();
   }
 
   /** {@inheritDoc} */
@@ -45,16 +50,16 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V>
{
       Emitter<Pair<K, Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       lastKey = key;
-      LeftValues.clear();
+      leftValues.clear();
     }
     if (id == 0) { // from left
       for (Pair<U, V> pair : pairs) {
         if (pair.first() != null)
-          LeftValues.add(pair.first());
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
       }
     } else { // from right
       for (Pair<U, V> pair : pairs) {
-        for (U u : LeftValues) {
+        for (U u : leftValues) {
           emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
index ee3d293..d1305d4 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join;
 import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
 
 /**
  * Represents a {@link com.cloudera.crunch.DoFn} for performing joins.
@@ -28,6 +29,19 @@ import com.cloudera.crunch.Pair;
 public abstract class JoinFn<K, U, V>
     extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>,
Pair<K, Pair<U, V>>> {
   
+  protected PType<U> leftValueType;
+
+  /**
+   * Instantiate with the PType of the value of the left side of the join (used
+   * for creating deep copies of values).
+   * 
+   * @param leftValueType
+   *          The PType of the value type of the left side of the join
+   */
+  public JoinFn(PType<U> leftValueType) {
+    this.leftValueType = leftValueType;
+  }
+
   /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
   public abstract String getJoinType();
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
index 9927964..c121889 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
@@ -18,6 +18,7 @@ import java.util.List;
 
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
 import com.google.common.collect.Lists;
 
 /**
@@ -33,6 +34,10 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
   private transient K lastKey;
   private transient List<U> leftValues;
 
+  public LeftOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void initialize() {
@@ -58,7 +63,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
     if (id == 0) {
       for (Pair<U, V> pair : pairs) {
         if (pair.first() != null)
-          leftValues.add(pair.first());
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
       }
     } else {
       for (Pair<U, V> pair : pairs) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
index 99c046f..1824f77 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
@@ -18,6 +18,7 @@ import java.util.List;
 
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PType;
 import com.google.common.collect.Lists;
 
 /**
@@ -28,10 +29,14 @@ import com.google.common.collect.Lists;
  * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
  */
 public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  
+
   private transient K lastKey;
   private transient List<U> leftValues;
 
+  public RightOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void initialize() {
@@ -50,7 +55,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V>
{
     if (id == 0) {
       for (Pair<U, V> pair : pairs) {
         if (pair.first() != null)
-          leftValues.add(pair.first());
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
       }
     } else {
       for (Pair<U, V> pair : pairs) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
index ef80cd4..88b8225 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join;
 import static org.junit.Assert.assertTrue;
 
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
 
 public class FullOuterJoinTest extends JoinTester {
   @Override
@@ -41,7 +42,7 @@ public class FullOuterJoinTest extends JoinTester {
   }
 
   @Override
-  protected JoinFn<String, Long, Long> getJoinFn() {
-    return new FullOuterJoinFn<String, Long, Long>();
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
index 4bac0c9..b42e51e 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join;
 import static org.junit.Assert.assertTrue;
 
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
 
 public class InnerJoinTest extends JoinTester {
   @Override
@@ -41,7 +42,7 @@ public class InnerJoinTest extends JoinTester {
   }
 
   @Override
-  protected JoinFn<String, Long, Long> getJoinFn() {
-    return new InnerJoinFn<String, Long, Long>();
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new InnerJoinFn<String, Long, Long>(typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
index 651747d..6208cb4 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
@@ -50,7 +50,7 @@ public abstract class JoinTester implements Serializable {
     PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(),
ptf.strings()));
     PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(),
ptf.strings()));
 
-    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn());
+    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
 
     PTable<String, Long> sums = join.parallelDo("cnt",
         new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>()
{
@@ -100,5 +100,5 @@ public abstract class JoinTester implements Serializable {
   /**
    * @return The JoinFn to use.
    */
-  protected abstract JoinFn<String, Long, Long> getJoinFn();
+  protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
index 9517f70..0ad4490 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join;
 import static org.junit.Assert.assertTrue;
 
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
 
 public class LeftOuterJoinTest extends JoinTester {
   @Override
@@ -41,7 +42,7 @@ public class LeftOuterJoinTest extends JoinTester {
   }
 
   @Override
-  protected JoinFn<String, Long, Long> getJoinFn() {
-    return new LeftOuterJoinFn<String, Long, Long>();
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java
index 54467e8..1daed57 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join;
 import static org.junit.Assert.assertTrue;
 
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PTypeFamily;
 
 public class RightOuterJoinTest extends JoinTester {
   @Override
@@ -41,7 +42,7 @@ public class RightOuterJoinTest extends JoinTester {
   }
 
   @Override
-  protected JoinFn<String, Long, Long> getJoinFn() {
-    return new RightOuterJoinFn<String, Long, Long>();
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new RightOuterJoinFn<String, Long, Long>(typeFamily.longs());
   }
 }


Mime
View raw message