crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-338 Correct Cogroup TupleN output PType
Date Fri, 07 Feb 2014 07:59:42 GMT
Updated Branches:
  refs/heads/apache-crunch-0.8 96e392b44 -> 1a34cd097


CRUNCH-338 Correct Cogroup TupleN output PType

Fix the PType generated for TupleNs so that it is effectively
a TupleN of Collections.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1a34cd09
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1a34cd09
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1a34cd09

Branch: refs/heads/apache-crunch-0.8
Commit: 1a34cd09758ea30d60e1d4f38cff3e60c1c2c12e
Parents: 96e392b
Author: Gabriel Reid <greid@apache.org>
Authored: Fri Feb 7 00:14:55 2014 +0100
Committer: Gabriel Reid <greid@apache.org>
Committed: Fri Feb 7 08:48:30 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/CogroupIT.java   | 74 +++++++++++++++-----
 .../java/org/apache/crunch/lib/Cogroup.java     |  7 +-
 2 files changed, 59 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1a34cd09/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 1a0bfe9..191c737 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -17,25 +17,17 @@
  */
 package org.apache.crunch.lib;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.crunch.*;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.test.Tests;
 import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
@@ -44,9 +36,13 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
 
 public class CogroupIT {
@@ -101,7 +97,17 @@ public class CogroupIT {
   public void testCogroup4Avro() {
     runCogroup4(AvroTypeFamily.getInstance());
   }
-  
+
+  @Test
+  public void testCogroupNWritables() {
+    runCogroupN(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupNAvro() {
+    runCogroupN(AvroTypeFamily.getInstance());
+  }
+
   public void runCogroup(PTypeFamily ptf) {
     PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
@@ -182,6 +188,38 @@ public class CogroupIT {
 
     assertThat(actual, is(expected));
   }
+
+  public void runCogroupN(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+
+    PTable<String, TupleN> cg = Cogroup.cogroup(kv1, new PTable[]{kv2});
+
+    Map<String, TupleN> result = cg.materializeToMap();
+    Map<String, TupleN> actual = Maps.newHashMap();
+    for (Map.Entry<String, TupleN> e : result.entrySet()) {
+      Collection<String> one = ImmutableSet.copyOf((Collection<? extends String>)
e.getValue().get(0));
+      Collection<String> two = ImmutableSet.copyOf((Collection<? extends String>)e.getValue().get(1));
+      actual.put(e.getKey(), TupleN.of(one, two));
+    }
+    Map<String, TupleN> expected = ImmutableMap.of(
+        "a", TupleN.of(coll("1-1", "1-4"), coll()),
+        "b", TupleN.of(coll("1-2"), coll("2-1")),
+        "c", TupleN.of(coll("1-3"), coll("2-2", "2-3")),
+        "d", TupleN.of(coll(), coll("2-4"))
+    );
+
+    assertThat(actual, is(expected));
+
+    PType<TupleN> tupleValueType = cg.getValueType();
+    List<PType> expectedSubtypes = ImmutableList.<PType>of(
+        ptf.collections(ptf.strings()),
+        ptf.collections(ptf.strings()));
+
+    assertThat(tupleValueType.getSubTypes(), is(expectedSubtypes));
+  }
   
   private static class KeyValueSplit extends DoFn<String, Pair<String, String>>
{
     @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/1a34cd09/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
index 8743a29..63d6f62 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -17,8 +17,7 @@
  */
 package org.apache.crunch.lib;
 
-import java.util.Collection;
-
+import com.google.common.collect.Lists;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
@@ -32,7 +31,7 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.TupleFactory;
 
-import com.google.common.collect.Lists;
+import java.util.Collection;
 
 public class Cogroup {
 
@@ -192,7 +191,7 @@ public class Cogroup {
     PType[] components = new PType[1 + rest.length];
     components[0] = tf.collections(first.getValueType());
     for (int i = 0; i < rest.length; i++) {
-      components[i + 1] = rest[i].getValueType();
+      components[i + 1] = tf.collections(rest[i].getValueType());
     }
     return cogroup(
         tf.tuples(components),


Mime
View raw message