crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [1/2] crunch git commit: CRUNCH-545 Use a single job for writing to HFiles
Date Mon, 20 Jul 2015 13:26:24 GMT
Repository: crunch
Updated Branches:
  refs/heads/master b6accf4e3 -> de1553e73


CRUNCH-545 Use a single job for writing to HFiles

Filter Cells on column family after they have been sorted and
partitioned. This ensures that the partitioning and writing to
HFiles will all occur in a single job, instead of one job per
column family that is defined in the table.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b2e6d169
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b2e6d169
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b2e6d169

Branch: refs/heads/master
Commit: b2e6d169b3492d9a04c8a76ca771f586bbaa760d
Parents: b6accf4
Author: Gabriel Reid <greid@apache.org>
Authored: Sun Jul 19 16:12:52 2015 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Mon Jul 20 15:12:11 2015 +0200

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/HFileTargetIT.java   | 30 ++++++++++++++------
 .../org/apache/crunch/io/hbase/HFileUtils.java  | 25 ++++++++++------
 2 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b2e6d169/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index ddb1292..5bcc51c 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -137,11 +137,13 @@ public class HFileTargetIT implements Serializable {
     return createTable(splits, hcol);
   }
 
-  private static HTable createTable(int splits, HColumnDescriptor hcol) throws Exception
{
+  private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception
{
     byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000));
     HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
     HTableDescriptor htable = new HTableDescriptor(tableName);
-    htable.addFamily(hcol);
+    for (HColumnDescriptor hcol : hcols) {
+      htable.addFamily(hcol);
+    }
     admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
     HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000);
     return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName);
@@ -182,12 +184,13 @@ public class HFileTargetIT implements Serializable {
     Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath = getTempPathOnHDFS("out");
-    HTable testTable = createTable(26);
-
+    byte[] columnFamilyA = Bytes.toBytes("colfamA");
+    byte[] columnFamilyB = Bytes.toBytes("colfamB");
+    HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB));
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
     PTable<String,Long> wordCounts = words.count();
-    PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
+    PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
         wordCountPuts,
         testTable,
@@ -208,8 +211,8 @@ public class HFileTargetIT implements Serializable {
         .build();
 
     for (Map.Entry<String, Long> e : EXPECTED.entrySet()) {
-      long actual = getWordCountFromTable(testTable, e.getKey());
-      assertEquals((long) e.getValue(), actual);
+      assertEquals((long) e.getValue(), getWordCountFromTable(testTable, columnFamilyA, e.getKey()));
+      assertEquals((long) e.getValue(), getWordCountFromTable(testTable, columnFamilyB, e.getKey()));
     }
   }
 
@@ -296,6 +299,10 @@ public class HFileTargetIT implements Serializable {
   }
 
   private static PCollection<Put> convertToPuts(PTable<String, Long> in) {
+    return convertToPuts(in, TEST_FAMILY);
+  }
+
+  private static PCollection<Put> convertToPuts(PTable<String, Long> in, final
byte[]...columnFamilies) {
     return in.parallelDo(new MapFn<Pair<String, Long>, Put>() {
       @Override
       public Put map(Pair<String, Long> input) {
@@ -305,7 +312,9 @@ public class HFileTargetIT implements Serializable {
         }
         long c = input.second();
         Put p = new Put(Bytes.toBytes(w));
-        p.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c));
+        for (byte[] columnFamily : columnFamilies) {
+          p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c));
+        }
         return p;
       }
     }, HBaseTypes.puts());
@@ -394,7 +403,12 @@ public class HFileTargetIT implements Serializable {
   }
 
   private static long getWordCountFromTable(HTable table, String word) throws IOException
{
+    return getWordCountFromTable(table, TEST_FAMILY, word);
+  }
+
+  private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word)
throws IOException {
     Get get = new Get(Bytes.toBytes(word));
+    get.addFamily(columnFamily);
     byte[] value = table.get(get).value();
     if (value == null) {
       fail("no such row: " +  word);

http://git-wip-us.apache.org/repos/asf/crunch/blob/b2e6d169/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 34118ca..d18b65b 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -128,6 +128,11 @@ public final class HFileUtils {
     public boolean accept(C input) {
       return Bytes.equals(CellUtil.cloneFamily(input), family);
     }
+
+    @Override
+    public boolean disableDeepCopy() {
+      return true;
+    }
   }
 
   private static class StartRowFilterFn<C extends Cell> extends FilterFn<C> {
@@ -367,10 +372,12 @@ public final class HFileUtils {
       LOG.warn("{} has no column families", table);
       return;
     }
+    PCollection<C> partitioned = sortAndPartition(cells, table);
     for (HColumnDescriptor f : families) {
       byte[] family = f.getName();
-      PCollection<C> sorted = sortAndPartition(cells.filter(new FilterByFamilyFn<C>(family)),
table);
-      sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f));
+      partitioned
+          .filter(new FilterByFamilyFn<C>(family))
+          .write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f));
     }
   }
 
@@ -391,12 +398,14 @@ public final class HFileUtils {
 
   public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C>
cells, HTable table) throws IOException {
     Configuration conf = cells.getPipeline().getConfiguration();
-    PTable<C, Void> t = cells.parallelDo(new MapFn<C, Pair<C, Void>>()
{
-      @Override
-      public Pair<C, Void> map(C input) {
-        return Pair.of(input, (Void) null);
-      }
-    }, tableOf(cells.getPType(), nulls()));
+    PTable<C, Void> t = cells.parallelDo(
+        "Pre-partition",
+        new MapFn<C, Pair<C, Void>>() {
+          @Override
+          public Pair<C, Void> map(C input) {
+            return Pair.of(input, (Void) null);
+          }
+        }, tableOf(cells.getPType(), nulls()));
     List<KeyValue> splitPoints = getSplitPoints(table);
     Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(),
"partition");
     writePartitionInfo(conf, partitionFile, splitPoints);


Mime
View raw message