incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-154: Fix NPE on materialized union of two PTables
Date Tue, 29 Jan 2013 23:13:02 GMT
Updated Branches:
  refs/heads/master 035b1b91d -> d743ce7c8


CRUNCH-154: Fix NPE on materialized union of two PTables


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/d743ce7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/d743ce7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/d743ce7c

Branch: refs/heads/master
Commit: d743ce7c8aa9107c3c73bac51bdb5ec3c761f094
Parents: 035b1b9
Author: Josh Wills <jwills@apache.org>
Authored: Tue Jan 29 14:11:41 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Jan 29 14:11:41 2013 -0800

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/PTableUnionTest.java |  101 +++++++++++++++
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |    2 +-
 2 files changed, 102 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/d743ce7c/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java b/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java
new file mode 100644
index 0000000..1d31096
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+
+
+public class PTableUnionTest {
+
+  public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>>
{
+
+    private static final long serialVersionUID = 5517897875971194220L;
+
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter)
{
+      if (input.length() > 0) {
+        emitter.emit(Pair.of(input.substring(0, 1), input));
+      }
+    }
+  }
+  
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  protected MRPipeline pipeline;
+
+  @Before
+  public void setUp() {
+    pipeline = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration());
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void tableUnionMaterializeNPE() throws Exception {
+    PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt"));
+    PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt"));
+    lorum.materialize();
+
+    PTable<String, String> wordsByFirstLetter =
+        words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(),
Avros.strings()));
+    PTable<String, String> lorumByFirstLetter =
+        lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(),
Avros.strings()));
+
+    @SuppressWarnings("unchecked")
+    PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter);
+
+    assertNotNull(union.materialize().iterator().next());
+  }
+
+  @Test
+  public void collectionUnionMaterializeNPE() throws Exception {
+    PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt"));
+    PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt"));
+    lorum.materialize();
+
+    IdentityFn<String> identity = IdentityFn.getInstance();
+    words = words.parallelDo(identity, Avros.strings());
+    lorum = lorum.parallelDo(identity, Avros.strings());
+
+    @SuppressWarnings("unchecked")
+    PCollection<String> union = words.union(lorum);
+
+    union.materialize().iterator();
+    
+    assertNotNull(union.materialize().iterator().next());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/d743ce7c/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index c71ef23..6ef7491 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -285,7 +285,7 @@ public class MRPipeline implements Pipeline {
    */
   private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection)
{
     PCollectionImpl<T> pcollectionImpl = null;
-    if (pcollection instanceof UnionCollection) {
+    if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
       pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
           (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
     } else {


Mime
View raw message