crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: CRUNCH-189: Clean up Union integration test cases.
Date Mon, 01 Apr 2013 18:21:53 GMT
Updated Branches:
  refs/heads/master 4683a8d49 -> 154c8fbd3


CRUNCH-189: Clean up Union integration test cases.

Unify PTableUnionIT and UnionGbkIT into UnionUT.
Rewrite tests for readability and to verify results.
Remove dependencies to Gutenberg-licensed files.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/154c8fbd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/154c8fbd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/154c8fbd

Branch: refs/heads/master
Commit: 154c8fbd3c345e06fdb392a0a5b83f1fba5c7b4f
Parents: 4683a8d
Author: Matthias Friedrich <matt@mafr.de>
Authored: Fri Mar 29 21:03:56 2013 +0100
Committer: Matthias Friedrich <matt@mafr.de>
Committed: Mon Apr 1 20:17:54 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/crunch/test/TemporaryPath.java |    3 +-
 .../it/java/org/apache/crunch/PTableUnionIT.java   |   99 -----------
 .../src/it/java/org/apache/crunch/UnionGbkIT.java  |  117 -------------
 crunch/src/it/java/org/apache/crunch/UnionIT.java  |  136 +++++++++++++++
 .../src/it/java/org/apache/crunch/test/Tests.java  |   20 ++-
 .../org/apache/crunch/UnionITData/src1.txt         |    5 +
 .../org/apache/crunch/UnionITData/src2.txt         |    3 +
 7 files changed, 164 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java b/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
index a4b07b9..1721eca 100644
--- a/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
+++ b/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
@@ -110,7 +110,8 @@ public final class TemporaryPath extends ExternalResource {
    * Copy a classpath resource to {@link File}.
    */
   public File copyResourceFile(String resourceName) throws IOException {
-    File dest = new File(tmp.getRoot(), resourceName);
+    String baseName = new File(resourceName).getName();
+    File dest = new File(tmp.getRoot(), baseName);
     copy(resourceName, dest);
     return dest;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java b/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java
deleted file mode 100644
index 94c548f..0000000
--- a/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-
-
-public class PTableUnionIT {
-
-  public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>>
{
-
-    private static final long serialVersionUID = 5517897875971194220L;
-
-    @Override
-    public void process(String input, Emitter<Pair<String, String>> emitter)
{
-      if (input.length() > 0) {
-        emitter.emit(Pair.of(input.substring(0, 1), input));
-      }
-    }
-  }
-  
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  protected MRPipeline pipeline;
-
-  @Before
-  public void setUp() {
-    pipeline = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration());
-  }
-
-  @After
-  public void tearDown() {
-    pipeline.done();
-  }
-
-  @Test
-  public void tableUnionMaterializeNPE() throws Exception {
-    PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt"));
-    PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt"));
-    lorum.materialize();
-
-    PTable<String, String> wordsByFirstLetter =
-        words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(),
Avros.strings()));
-    PTable<String, String> lorumByFirstLetter =
-        lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(),
Avros.strings()));
-
-    @SuppressWarnings("unchecked")
-    PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter);
-
-    assertNotNull(union.materialize().iterator().next());
-  }
-
-  @Test
-  public void collectionUnionMaterializeNPE() throws Exception {
-    PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt"));
-    PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt"));
-    lorum.materialize();
-
-    IdentityFn<String> identity = IdentityFn.getInstance();
-    words = words.parallelDo(identity, Avros.strings());
-    lorum = lorum.parallelDo(identity, Avros.strings());
-
-    @SuppressWarnings("unchecked")
-    PCollection<String> union = words.union(lorum);
-
-    union.materialize().iterator();
-    
-    assertNotNull(union.materialize().iterator().next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java
deleted file mode 100644
index 3937fe8..0000000
--- a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.PTable;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class UnionGbkIT {
-
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-  
-  MRPipeline pipeline;
-
-  public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>>
{
-    @Override
-    public void process(String input, Emitter<Pair<String, String>> emitter)
{
-      if (input.length() > 0) {
-        emitter.emit(Pair.of(input.substring(0, 1), input));
-      }
-    }
-  }
-
-  public static class ConcatGroupFn extends DoFn<Pair<String, Iterable<String>>,
String> {
-    @Override
-    public void process(Pair<String, Iterable<String>> input, Emitter<String>
emitter) {
-      StringBuilder sb = new StringBuilder();
-      for (String str : input.second()) {
-        sb.append(str);
-      }
-      emitter.emit(sb.toString());
-    }
-  }
-  
-  @Before
-  public void setUp() {
-    pipeline = new MRPipeline(UnionGbkIT.class, tmpDir.getDefaultConfiguration());
-  }
-
-  @After
-  public void tearDown() {
-    pipeline.done();
-  }
-
-  @Test
-  public void tableOfUnionGbk() throws Exception {
-    PCollection<String> words = pipeline.readTextFile(
-        tmpDir.copyResourceFileName("shakes.txt"));
-    PCollection<String> lorum = pipeline.readTextFile(
-        tmpDir.copyResourceFileName("maugham.txt"));
-    lorum.materialize();
-
-    @SuppressWarnings("unchecked")
-    PCollection<String> union = words.union(lorum);
-
-    PGroupedTable<String, String> groupedByFirstLetter =
-        union.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
-            Avros.tableOf(Avros.strings(), Avros.strings()))
-        .groupByKey();
-    PCollection<String> concatted = groupedByFirstLetter
-        .parallelDo("concat", new ConcatGroupFn(), Avros.strings());
-
-    assertNotNull(concatted.materialize().iterator());
-  }
-
-  @Test
-  public void unionOfTablesGbk() throws Exception {
-    PCollection<String> words = pipeline.readTextFile(
-        tmpDir.copyResourceFileName("shakes.txt"));
-    PCollection<String> lorum = pipeline.readTextFile(
-        tmpDir.copyResourceFileName("maugham.txt"));
-    lorum.materialize();
-
-    PTable<String, String> wordsByFirstLetter =
-        words.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
-            Avros.tableOf(Avros.strings(), Avros.strings()));
-    PTable<String, String> lorumByFirstLetter =
-        lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
-            Avros.tableOf(Avros.strings(), Avros.strings()));
-
-    @SuppressWarnings("unchecked")
-    PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter);
-
-    PGroupedTable<String, String> groupedByFirstLetter = union.groupByKey();
-
-    PCollection<String> concatted = groupedByFirstLetter.parallelDo("concat",
-        new ConcatGroupFn(), Avros.strings());
-
-    assertNotNull(concatted.materialize().iterator());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/UnionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionIT.java b/crunch/src/it/java/org/apache/crunch/UnionIT.java
new file mode 100644
index 0000000..1c60a1b
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/UnionIT.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultiset;
+
+
+public class UnionIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  private MRPipeline pipeline;
+  private PCollection<String> words1;
+  private PCollection<String> words2;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new MRPipeline(UnionIT.class, tmpDir.getDefaultConfiguration());
+    words1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    words2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void testUnion() throws Exception {
+    IdentityFn<String> identity = IdentityFn.getInstance();
+    words1 = words1.parallelDo(identity, Avros.strings());
+    words2 = words2.parallelDo(identity, Avros.strings());
+
+    PCollection<String> union = words1.union(words2);
+
+    ImmutableMultiset<String> actual = ImmutableMultiset.copyOf(union.materialize());
+    assertThat(actual.elementSet().size(), is(3));
+    assertThat(actual.count("a1"), is(4));
+    assertThat(actual.count("b2"), is(2));
+    assertThat(actual.count("c3"), is(2));
+  }
+
+  @Test
+  public void testTableUnion() throws IOException {
+    PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
+    PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
+
+    PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
+
+    ImmutableMultiset<Pair<String, String>> actual = ImmutableMultiset.copyOf(union.materialize());
+
+    assertThat(actual.elementSet().size(), is(3));
+    assertThat(actual.count(Pair.of("a", "1")), is(4));
+    assertThat(actual.count(Pair.of("b", "2")), is(2));
+    assertThat(actual.count(Pair.of("c", "3")), is(2));
+  }
+
+  @Test
+  public void testUnionThenGroupByKey() throws IOException {
+    PCollection<String> union = words1.union(words2);
+
+    PGroupedTable<String, String> grouped = byFirstLetter(union).groupByKey();
+
+    Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("",
true))
+        .materializeToMap();
+
+    Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
+    assertThat(actual, is(expected));
+  }
+
+  @Test
+  public void testTableUnionThenGroupByKey() throws IOException {
+    PTable<String, String> words1ByFirstLetter = byFirstLetter(words1);
+    PTable<String, String> words2ByFirstLetter = byFirstLetter(words2);
+
+    PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter);
+
+    PGroupedTable<String, String> grouped = union.groupByKey();
+
+    Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("",
true))
+        .materializeToMap();
+
+    Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33");
+    assertThat(actual, is(expected));
+  }
+
+
+  private static PTable<String, String> byFirstLetter(PCollection<String> values)
{
+    return values.parallelDo("byFirstLetter", new FirstLetterKeyFn(),
+        Avros.tableOf(Avros.strings(), Avros.strings()));
+  }
+
+  private static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>>
{
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter)
{
+      if (input.length() > 1) {
+        emitter.emit(Pair.of(input.substring(0, 1), input.substring(1)));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/test/Tests.java b/crunch/src/it/java/org/apache/crunch/test/Tests.java
index f2c7a86..4c979af 100644
--- a/crunch/src/it/java/org/apache/crunch/test/Tests.java
+++ b/crunch/src/it/java/org/apache/crunch/test/Tests.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch.test;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.Collection;
 
 import org.apache.crunch.Pipeline;
@@ -46,10 +48,24 @@ public final class Tests {
    * @throws IllegalArgumentException Thrown if the resource doesn't exist
    */
   public static String pathTo(Object testCase, String resourceName) {
+    String qualifiedName = resource(testCase, resourceName);
+    return Resources.getResource(qualifiedName).getFile();
+  }
+
+  /**
+   * This doesn't check whether the resource exists!
+   *
+   * @param testCase
+   * @param resourceName
+   * @return The path to the resource (never null)
+   */
+  public static String resource(Object testCase, String resourceName) {
+    checkNotNull(testCase);
+    checkNotNull(resourceName);
+
     // Note: We append "Data" because otherwise Eclipse would complain about the
     //       the case's class name clashing with the resource directory's name.
-    String path = testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName;
-    return Resources.getResource(path).getFile();
+    return testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt b/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt
new file mode 100644
index 0000000..a92974b
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt
@@ -0,0 +1,5 @@
+a1
+b2
+a1
+a1
+b2

http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt b/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt
new file mode 100644
index 0000000..9363398
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt
@@ -0,0 +1,3 @@
+c3
+a1
+c3


Mime
View raw message