crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chao...@apache.org
Subject git commit: CRUNCH-251: Fix HFileUtils#sortAndPartition does not work when two instances exist in the same pipeline
Date Wed, 21 Aug 2013 03:31:49 GMT
Updated Branches:
  refs/heads/master c10996a4d -> 891e28d0e


CRUNCH-251: Fix HFileUtils#sortAndPartition does not work when two instances exist in the
same pipeline


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/891e28d0
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/891e28d0
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/891e28d0

Branch: refs/heads/master
Commit: 891e28d0e0e9cd8da00bed28b33d4e448697b171
Parents: c10996a
Author: Chao Shi <chaoshi@apache.org>
Authored: Wed Aug 21 10:45:57 2013 +0800
Committer: Chao Shi <chaoshi@apache.org>
Committed: Wed Aug 21 10:45:57 2013 +0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/HFileTargetIT.java   | 85 ++++++++++++++++----
 .../org/apache/crunch/io/hbase/HFileUtils.java  |  2 +-
 2 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/891e28d0/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 8c1b3f4..667b5ad 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -23,12 +23,14 @@ import com.google.common.io.Resources;
 import org.apache.commons.io.IOUtils;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
+import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.TemporaryPath;
@@ -76,10 +78,17 @@ import static org.junit.Assert.fail;
 public class HFileTargetIT implements Serializable {
 
   private static final HBaseTestingUtility HBASE_TEST_UTILITY = new HBaseTestingUtility();
-  private static final byte[] TEST_TABLE = Bytes.toBytes("test_table");
   private static final byte[] TEST_FAMILY = Bytes.toBytes("test_family");
   private static final byte[] TEST_QUALIFIER = Bytes.toBytes("count");
   private static final Path TEMP_DIR = new Path("/tmp");
+  private static int tableCounter = 0;
+
+  private static FilterFn<String> SHORT_WORD_FILTER = new FilterFn<String>()
{
+    @Override
+    public boolean accept(String input) {
+      return input.length() <= 2;
+    }
+  };
 
   @Rule
   public transient TemporaryPath tmpDir = TemporaryPaths.create();
@@ -90,16 +99,6 @@ public class HFileTargetIT implements Serializable {
     // (we will need it to test bulk load against multiple regions).
     HBASE_TEST_UTILITY.startMiniCluster();
     HBASE_TEST_UTILITY.startMiniMapReduceCluster();
-    HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
-    HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
-    HTableDescriptor htable = new HTableDescriptor(TEST_TABLE);
-    htable.addFamily(hcol);
-    byte[][] splits = new byte[26][];
-    for (int i = 0; i < 26; i++) {
-      byte b = (byte)('a' + i);
-      splits[i] = new byte[] { b };
-    }
-    admin.createTable(htable, splits);
 
     // Set classpath for yarn, otherwise it won't be able to find MRAppMaster
     // (see CRUNCH-249 and HBASE-8528).
@@ -107,6 +106,17 @@ public class HFileTargetIT implements Serializable {
     dirtyFixForJobHistoryServerAddress();
   }
 
+  private static HTable createTable(int splits) throws IOException {
+    byte[] tableName = Bytes.toBytes("test_table_" + tableCounter);
+    HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
+    HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
+    HTableDescriptor htable = new HTableDescriptor(tableName);
+    htable.addFamily(hcol);
+    admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
+    tableCounter++;
+    return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName);
+  }
+
   /**
    * We need to set the address of JobHistory server, as it randomly picks a unused port
    * to listen. Unfortunately, HBaseTestingUtility neither does that nor provides a way
@@ -144,7 +154,6 @@ public class HFileTargetIT implements Serializable {
   public void setUp() throws IOException {
     FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration());
     fs.delete(TEMP_DIR, true);
-    HBASE_TEST_UTILITY.truncateTable(TEST_TABLE);
   }
 
   @Test
@@ -174,12 +183,12 @@ public class HFileTargetIT implements Serializable {
     Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath = getTempPathOnHDFS("out");
+    HTable testTable = createTable(26);
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
     PTable<String,Long> wordCounts = words.count();
     PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
-    HTable testTable = new HTable(HBASE_TEST_UTILITY.getConfiguration(), TEST_TABLE);
     HFileUtils.writeToHFilesForIncrementalLoad(
         wordCountKvs,
         testTable,
@@ -199,11 +208,48 @@ public class HFileTargetIT implements Serializable {
         .put("to", 367L)
         .build();
     for (Map.Entry<String, Long> e : EXPECTED.entrySet()) {
-      assertEquals((long) e.getValue(), Bytes.toLong(
-          testTable.get(new Get(Bytes.toBytes(e.getKey()))).getColumnLatest(TEST_FAMILY,
TEST_QUALIFIER).getValue()));
+      long actual = getWordCountFromTable(testTable, e.getKey());
+      assertEquals((long) e.getValue(), actual);
     }
   }
 
+  /** @seealso CRUNCH-251 */
+  @Test
+  public void testMultipleHFileTargets() throws Exception {
+    Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+    Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
+    Path inputPath = copyResourceFileToHDFS("shakes.txt");
+    Path outputPath1 = getTempPathOnHDFS("out1");
+    Path outputPath2 = getTempPathOnHDFS("out2");
+    HTable table1 = createTable(10);
+    HTable table2 = createTable(20);
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration());
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<String> words = split(shakespeare, "\\s+");
+    PCollection<String> shortWords = words.filter(SHORT_WORD_FILTER);
+    PCollection<String> longWords = words.filter(FilterFns.not(SHORT_WORD_FILTER));
+    PTable<String, Long> shortWordCounts = shortWords.count();
+    PTable<String, Long> longWordCounts = longWords.count();
+    HFileUtils.writeToHFilesForIncrementalLoad(
+        convertToKeyValues(shortWordCounts),
+        table1,
+        outputPath1);
+    HFileUtils.writeToHFilesForIncrementalLoad(
+        convertToKeyValues(longWordCounts),
+        table2,
+        outputPath2);
+
+    PipelineResult result = pipeline.run();
+    assertTrue(result.succeeded());
+    loader.doBulkLoad(outputPath1, table1);
+    loader.doBulkLoad(outputPath2, table2);
+
+    FileSystem fs = FileSystem.get(conf);
+    assertEquals(396L, getWordCountFromTable(table1, "of"));
+    assertEquals(427L, getWordCountFromTable(table2, "and"));
+  }
+
   private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) {
     return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() {
       @Override
@@ -281,4 +327,13 @@ public class HFileTargetIT implements Serializable {
     Path result = new Path(TEMP_DIR, fileName);
     return result.makeQualified(fs);
   }
+
+  private long getWordCountFromTable(HTable table, String word) throws IOException {
+    Get get = new Get(Bytes.toBytes(word));
+    KeyValue keyValue = table.get(get).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER);
+    if (keyValue == null) {
+      fail("no such row: " +  word);
+    }
+    return Bytes.toLong(keyValue.getValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/891e28d0/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 2235538..5e07a67 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -119,9 +119,9 @@ public final class HFileUtils {
     List <KeyValue> splitPoints = getSplitPoints(table);
     Path partitionFile = new Path(((MRPipeline) kvs.getPipeline()).createTempPath(), "partition");
     writePartitionInfo(conf, partitionFile, splitPoints);
-    TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
     GroupingOptions options = GroupingOptions.builder()
         .partitionerClass(TotalOrderPartitioner.class)
+        .conf(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString())
         .numReducers(splitPoints.size() + 1)
         .sortComparatorClass(KeyValueComparator.class)
         .build();


Mime
View raw message