crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject crunch git commit: changes to support only affected regions during hfile write
Date Sat, 13 Feb 2016 00:13:58 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 894446d66 -> 2292c7491


changes to support only affected regions during hfile write

Signed-off-by: Micah Whitacre <mkwhit@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2292c749
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2292c749
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2292c749

Branch: refs/heads/master
Commit: 2292c7491f14a634716126dc161184607eb6b704
Parents: 894446d
Author: Stephen Durfey <sjdurfey@gmail.com>
Authored: Fri Feb 12 17:14:01 2016 -0600
Committer: Micah Whitacre <mkwhit@gmail.com>
Committed: Fri Feb 12 17:35:41 2016 -0600

----------------------------------------------------------------------
 .../crunch/lib/sort/TotalOrderPartitioner.java  |   8 +-
 .../apache/crunch/io/hbase/HFileTargetIT.java   | 135 +++++++++++++++++-
 .../org/apache/crunch/io/hbase/HFileUtils.java  | 140 ++++++++++++++++++-
 3 files changed, 271 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2292c749/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
index 94fbdbe..653c294 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
@@ -122,18 +122,18 @@ public class TotalOrderPartitioner<K, V> extends Partitioner<K,
V> implements Co
   /**
    * Interface to the partitioner to locate a key in the partition keyset.
    */
-  interface Node<T> {
+  public interface Node<T> {
     /**
      * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
      * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
      */
     int findPartition(T key);
   }
-  
-  class BinarySearchNode implements Node<K> {
+
+  public static class BinarySearchNode<K> implements Node<K> {
     private final K[] splitPoints;
     private final RawComparator<K> comparator;
-    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+    public BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
       this.splitPoints = splitPoints;
       this.comparator = comparator;
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2292c749/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 5bcc51c..c78ae75 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import org.apache.commons.io.IOUtils;
@@ -33,15 +34,21 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.seq.SeqFileReaderFactory;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.WritableDeepCopier;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -63,6 +71,9 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -76,6 +87,8 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -226,6 +239,7 @@ public class HFileTargetIT implements Serializable {
     HTable table1 = createTable(26);
     HTable table2 = createTable(26);
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration());
+    boolean onlyAffectedRegions = true;
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
@@ -240,7 +254,8 @@ public class HFileTargetIT implements Serializable {
     HFileUtils.writePutsToHFilesForIncrementalLoad(
         convertToPuts(longWordCounts),
         table2,
-        outputPath2);
+        outputPath2,
+        onlyAffectedRegions);
 
     PipelineResult result = pipeline.run();
     assertTrue(result.succeeded());
@@ -298,6 +313,124 @@ public class HFileTargetIT implements Serializable {
     assertTrue(hfilesCount > 0);
   }
 
+  /**
+   * @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a>
+     */
+  @Test
+  public void testOnlyAffectedRegionsWhenWritingHFiles() throws Exception {
+    Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
+    Path inputPath = copyResourceFileToHDFS("shakes.txt");
+    Path outputPath1 = getTempPathOnHDFS("out1");
+    HTable table1 = createTable(26);
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<String> words = split(shakespeare, "\\s+");
+    // take the top 5 here to reduce the number of affected regions in the table
+    PTable<String, Long> count = words.filter(SHORT_WORD_FILTER).count().top(5);
+    boolean onlyAffectedRegions = true;
+
+    PCollection<Put> wordPuts = convertToPuts(count);
+    HFileUtils.writePutsToHFilesForIncrementalLoad(
+            wordPuts,
+            table1,
+            outputPath1,
+            onlyAffectedRegions);
+
+    // locate partition file directory and read it in to verify
+    // the number of regions to be written to are less than the
+    // number of regions in the table
+    String tempPath = ((DistributedPipeline) pipeline).createTempPath().toString();
+    Path temp = new Path(tempPath.substring(0, tempPath.lastIndexOf("/")));
+    FileSystem fs = FileSystem.get(pipeline.getConfiguration());
+    Path partitionPath = null;
+
+    for (final FileStatus fileStatus : fs.listStatus(temp)) {
+      RemoteIterator<LocatedFileStatus> remoteFIles = fs.listFiles(fileStatus.getPath(),
true);
+
+      while(remoteFIles.hasNext()) {
+        LocatedFileStatus file = remoteFIles.next();
+        if(file.getPath().toString().contains("partition")) {
+          partitionPath = file.getPath();
+          System.out.println("found written partitions in path: " + partitionPath.toString());
+          break;
+        }
+      }
+
+      if(partitionPath != null) {
+        break;
+      }
+    }
+
+    if(partitionPath == null) {
+      throw new AssertionError("Partition path was not found");
+    }
+
+    Class<BytesWritable> keyClass = BytesWritable.class;
+    List<BytesWritable> writtenPartitions = new ArrayList<>();
+    WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
+    SeqFileReaderFactory<BytesWritable> s = new SeqFileReaderFactory<>(keyClass);
+
+    // read back in the partition file
+    Iterator<BytesWritable> iter = CompositePathIterable.create(fs, partitionPath,
s).iterator();
+    while (iter.hasNext()) {
+      BytesWritable next = iter.next();
+      writtenPartitions.add((BytesWritable) wdc.deepCopy(next));
+    }
+
+    ImmutableList<byte[]> startKeys = ImmutableList.copyOf(table1.getStartKeys());
+    // assert that only affected regions were loaded into
+    assertTrue(startKeys.size() > writtenPartitions.size());
+
+    // write out and read back in the start keys for each region.
+    // do this to get proper byte alignment
+    Path regionStartKeys = tmpDir.getPath("regionStartKeys");
+    List<KeyValue> startKeysToWrite = Lists.newArrayList();
+    for (final byte[] startKey : startKeys.subList(1, startKeys.size())) {
+      startKeysToWrite.add(KeyValueUtil.createFirstOnRow(startKey));
+    }
+    writeToSeqFile(pipeline.getConfiguration(), regionStartKeys, startKeysToWrite);
+
+    List<BytesWritable> writtenStartKeys = new ArrayList<>();
+    iter = CompositePathIterable.create(fs, partitionPath, s).iterator();
+    while (iter.hasNext()) {
+      BytesWritable next = iter.next();
+      writtenStartKeys.add((BytesWritable) wdc.deepCopy(next));
+    }
+
+    // assert the keys read back in match start keys for a region on the table
+    for (final BytesWritable writtenPartition : writtenPartitions) {
+      boolean foundMatchingKv = false;
+      for (final BytesWritable writtenStartKey : writtenStartKeys) {
+        if (writtenStartKey.equals(writtenPartition)) {
+          foundMatchingKv = true;
+          break;
+        }
+      }
+
+      if(!foundMatchingKv) {
+        throw new AssertionError("Written KeyValue: " + writtenPartition + " did not match
any known start keys of the table");
+      }
+    }
+
+    pipeline.done();
+  }
+
+  private static void writeToSeqFile(
+          Configuration conf,
+          Path path,
+          List<KeyValue> splitPoints) throws IOException {
+    SequenceFile.Writer writer = SequenceFile.createWriter(
+            path.getFileSystem(conf),
+            conf,
+            path,
+            NullWritable.class,
+            BytesWritable.class);
+    for (KeyValue key : splitPoints) {
+      writer.append(NullWritable.get(), HBaseTypes.keyValueToBytes(key));
+    }
+    writer.close();
+  }
+
   private static PCollection<Put> convertToPuts(PTable<String, Long> in) {
     return convertToPuts(in, TEST_FAMILY);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2292c749/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 31684d2..57fdffb 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -36,6 +37,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.FilterFn;
@@ -47,6 +49,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.lib.sort.TotalOrderPartitioner;
+import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -370,15 +374,32 @@ public final class HFileUtils {
   }
 
   public static <C extends Cell> void writeToHFilesForIncrementalLoad(
+          PCollection<C> cells,
+          HTable table,
+          Path outputPath) throws IOException {
+    writeToHFilesForIncrementalLoad(cells, table, outputPath, false);
+  }
+
+  /**
+   * Writes out HFiles from the provided <code>cells</code> and <code>table</code>.
<code>limitToAffectedRegions</code>
+   * is used to indicate that the regions the <code>cells</code> will be loaded
into should be identified prior to writing
+   * HFiles. Identifying the regions ahead of time will reduce the number of reducers needed
when writing. This is
+   * beneficial if the data to be loaded only touches a small enough subset of the total
regions in the table. If set to
+   * false, the number of reducers will equal the number of regions in the table.
+   *
+   * @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a>
+   */
+  public static <C extends Cell> void writeToHFilesForIncrementalLoad(
       PCollection<C> cells,
       HTable table,
-      Path outputPath) throws IOException {
+      Path outputPath,
+      boolean limitToAffectedRegions) throws IOException {
     HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
     if (families.length == 0) {
       LOG.warn("{} has no column families", table);
       return;
     }
-    PCollection<C> partitioned = sortAndPartition(cells, table);
+    PCollection<C> partitioned = sortAndPartition(cells, table, limitToAffectedRegions);
     for (HColumnDescriptor f : families) {
       byte[] family = f.getName();
       partitioned
@@ -388,9 +409,26 @@ public final class HFileUtils {
   }
 
   public static void writePutsToHFilesForIncrementalLoad(
+          PCollection<Put> puts,
+          HTable table,
+          Path outputPath) throws IOException {
+    writePutsToHFilesForIncrementalLoad(puts, table, outputPath, false);
+  }
+
+  /**
+   * Writes out HFiles from the provided <code>puts</code> and <code>table</code>.
<code>limitToAffectedRegions</code>
+   * is used to indicate that the regions the <code>puts</code> will be loaded
into should be identified prior to writing
+   * HFiles. Identifying the regions ahead of time will reduce the number of reducers needed
when writing. This is
+   * beneficial if the data to be loaded only touches a small enough subset of the total
regions in the table. If set to
+   * false, the number of reducers will equal the number of regions in the table.
+   *
+   * @see <a href='https://issues.apache.org/jira/browse/CRUNCH-588'>CRUNCH-588</a>
+   */
+  public static void writePutsToHFilesForIncrementalLoad(
       PCollection<Put> puts,
       HTable table,
-      Path outputPath) throws IOException {
+      Path outputPath,
+      boolean limitToAffectedRegions) throws IOException {
     PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put,
Cell>() {
       @Override
       public void process(Put input, Emitter<Cell> emitter) {
@@ -399,10 +437,21 @@ public final class HFileUtils {
         }
       }
     }, HBaseTypes.cells());
-    writeToHFilesForIncrementalLoad(cells, table, outputPath);
+    writeToHFilesForIncrementalLoad(cells, table, outputPath, limitToAffectedRegions);
   }
 
   public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C>
cells, HTable table) throws IOException {
+    return sortAndPartition(cells, table, false);
+  }
+
+  /**
+   * Sorts and partitions the provided <code>cells</code> for the given <code>table</code>
to ensure all elements that belong
+   * in the same region end up in the same reducer. The flag <code>limitToAffectedRegions</code>,
when set to true, will identify
+   * the regions the data in <code>cells</code> belongs to and will set the number
of reducers equal to the number of identified
+   * affected regions. If set to false, then all regions will be used, and the number of
reducers will be set to the number
+   * of regions in the table.
+   */
+  public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C>
cells, HTable table, boolean limitToAffectedRegions) throws IOException {
     Configuration conf = cells.getPipeline().getConfiguration();
     PTable<C, Void> t = cells.parallelDo(
         "Pre-partition",
@@ -412,7 +461,13 @@ public final class HFileUtils {
             return Pair.of(input, (Void) null);
           }
         }, tableOf(cells.getPType(), nulls()));
-    List<KeyValue> splitPoints = getSplitPoints(table);
+
+    List<KeyValue> splitPoints;
+    if(limitToAffectedRegions) {
+      splitPoints = getSplitPoints(table, t);
+    } else {
+      splitPoints = getSplitPoints(table);
+    }
     Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(),
"partition");
     writePartitionInfo(conf, partitionFile, splitPoints);
     GroupingOptions options = GroupingOptions.builder()
@@ -431,13 +486,84 @@ public final class HFileUtils {
     }
     List<KeyValue> splitPoints = Lists.newArrayList();
     for (byte[] startKey : startKeys.subList(1, startKeys.size())) {
-      KeyValue kv = KeyValue.createFirstOnRow(startKey);
-      LOG.debug("split row: " + Bytes.toString(kv.getRow()));
+      KeyValue kv = KeyValueUtil.createFirstOnRow(startKey);
+      LOG.debug("split row: " + Bytes.toString(CellUtil.cloneRow(kv)));
       splitPoints.add(kv);
     }
     return splitPoints;
   }
 
+  private static <C> List<KeyValue> getSplitPoints(HTable table, PTable<C,
Void> affectedRows) throws IOException {
+    List<byte[]> startKeys;
+    try {
+      startKeys = Lists.newArrayList(table.getStartKeys());
+      if (startKeys.isEmpty()) {
+        throw new AssertionError(table + " has no regions!");
+      }
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+
+    Collections.sort(startKeys, Bytes.BYTES_COMPARATOR);
+
+    Iterable<ByteBuffer> bufferedStartKeys = affectedRows
+            .parallelDo(new DetermineAffectedRegionsFn(startKeys), Writables.bytes()).materialize();
+
+    // set to get rid of the potential duplicate start keys emitted
+    ImmutableSet.Builder<KeyValue> startKeyBldr = ImmutableSet.builder();
+    for (final ByteBuffer bufferedStartKey : bufferedStartKeys) {
+      startKeyBldr.add(KeyValueUtil.createFirstOnRow(bufferedStartKey.array()));
+    }
+
+    return ImmutableList.copyOf(startKeyBldr.build());
+  }
+
+  /**
+   * Spins through the {@link Cell}s and determines which regions the data
+   * will be loaded into. Searching the regions is done via a binary search. The
+   * region start key should be provided by the caller to cut down on calls to
+   * HMaster to get those start keys.
+   */
+  public static class DetermineAffectedRegionsFn<C extends Cell> extends DoFn<Pair<C,
Void>, ByteBuffer> {
+
+    private final Set<Cell> startKeysToEmit = new HashSet<>();
+    List<byte[]> startKeys;
+    TotalOrderPartitioner.Node partitions;
+    List<Cell> regionStartKeys = Lists.newArrayList();
+
+    public DetermineAffectedRegionsFn(List<byte[]> startKeys) {
+      this.startKeys = startKeys;
+    }
+
+    @Override
+    public void initialize() {
+      for (byte[] startKey : startKeys.subList(1, startKeys.size())) {
+        Cell cell = KeyValueUtil.createFirstOnRow(startKey);
+        regionStartKeys.add(cell);
+      }
+
+      partitions = new TotalOrderPartitioner.BinarySearchNode<>(regionStartKeys.toArray(new
Cell[regionStartKeys.size()]),
+              new KeyValue.KVComparator());
+    }
+
+    @Override
+    public void process(Pair<C, Void> input, Emitter<ByteBuffer> emitter) {
+      int position = partitions.findPartition(new KeyValue(input.first().getFamilyArray()));
+      // if the position is after the last key, use the last start key
+      // as the split for this key, since it should fall into that region
+      if (position >= regionStartKeys.size() && regionStartKeys.size() > 1)
{
+        position = regionStartKeys.size() - 1;
+      }
+
+      Cell foundCell = regionStartKeys.get(position);
+
+      if (!startKeysToEmit.contains(foundCell)) {
+        startKeysToEmit.add(foundCell);
+        emitter.emit(ByteBuffer.wrap(CellUtil.cloneRow(foundCell)));
+      }
+    }
+  }
+
   private static void writePartitionInfo(
       Configuration conf,
       Path path,


Mime
View raw message