hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1354478 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
Date Wed, 27 Jun 2012 12:51:09 GMT
Author: mbautin
Date: Wed Jun 27 12:51:08 2012
New Revision: 1354478

URL: http://svn.apache.org/viewvc?rev=1354478&view=rev
Log:
[HBASE-6278] Allow table splits during a MapReduce job to be parameterized the same way as
we parameterize the splitAlgorithm during table creation.

Author: aaiyer

Summary:
This should allow us to get more even splits, because we use the
same split algorithm during the MR job, as was used during the table
creation.

Test Plan:
 mvn test.
: add a test to split a table into 12 regions.
       - For each implementation of RegionSplitter
           o create Table A with 12 regions
           o Create Table B with 4 regions
              - Split each region in Table B into 3 parts. Ensure that
                the splits correspond (approximately) to the ones from Table A.

Reviewers: kannan, kranganathan

Reviewed By: kranganathan

CC: hbase-eng@, kranganathan

Differential Revision: https://phabricator.fb.com/D502231

Task ID: 1137503

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1354478&r1=1354477&r2=1354478&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
Wed Jun 27 12:51:08 2012
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.RegionSplitter.UniformSplit;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -63,6 +65,8 @@ implements Configurable {
   public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
   /** The number of mappers that should be assigned to each region. */
   public static final String MAPPERS_PER_REGION = "hbase.mapreduce.mappersperregion";
+  /** The Algorithm used to splie each region's keyspace. */
+  public static final String SPLIT_ALGO = "hbase.mapreduce.tableinputformat.split.algo";
 
   /** The configuration. */
   private Configuration conf = null;
@@ -145,6 +149,8 @@ implements Configurable {
       setNumMapperPerRegion(Integer.parseInt(conf.get(MAPPERS_PER_REGION)));
     }
 
+    setSplitAlgorithm(conf.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
+
     setScan(scan);
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1354478&r1=1354477&r2=1354478&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Wed Jun 27 12:51:08 2012
@@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -85,6 +87,8 @@ extends InputFormat<ImmutableBytesWritab
   private TableRecordReader tableRecordReader = null;
   /** The number of mappers to assign to each region. */
   private int numMappersPerRegion = 1;
+  /** Splitting algorithm to be used to split the keys */
+  private String splitAlgmName; // default to Uniform
 
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
   private HashMap<InetAddress, String> reverseDNSCacheMap =
@@ -160,44 +164,33 @@ extends InputFormat<ImmutableBytesWritab
       // Insert null keys at edges
       startKeys[0] = HConstants.EMPTY_START_ROW;
       stopKeys[numRegions * numMappersPerRegion - 1] = HConstants.EMPTY_END_ROW;
-      // Split the second region
-      byte[][] dividingKeys = Bytes.split(keys.getFirst()[1],
-          keys.getSecond()[1], numMappersPerRegion - 1);
-      int count = numMappersPerRegion - 1;
-      stopKeys[count] = keys.getSecond()[0];
-      // Use the interval between these splits to calculate the approximate
-        // dividing keys of the first region
-      for (byte[] approxKey : Bytes.arithmeticProgSeq(dividingKeys[1], dividingKeys[0],
-          numMappersPerRegion - 1)) {
-        startKeys[count--] = approxKey;
-        stopKeys[count] = approxKey;
-      }
-      // Add the second region dividing keys
-      for (int i = 0; i < numMappersPerRegion; i++) {
-        startKeys[numMappersPerRegion + i] = dividingKeys[i];
-        stopKeys[numMappersPerRegion + i] = dividingKeys[i + 1];
-      }
-      // Fill out all the split keys for center regions (3rd...(n-1)th)
-      for (int i = 2; i < numRegions - 1; i++) {
-        dividingKeys = Bytes.split(keys.getFirst()[i],
-            keys.getSecond()[i], numMappersPerRegion - 1);
-        for (int j = 0; j < numMappersPerRegion; j++) {
-          startKeys[i * numMappersPerRegion + j] = dividingKeys[j];
-          stopKeys[i * numMappersPerRegion + j] = dividingKeys[j + 1];
+
+      byte[][] originalStartKeys = keys.getFirst();
+      byte[][] originalStopKeys = keys.getSecond();
+      SplitAlgorithm algmImpl;
+
+      for (int i = 0; i < originalStartKeys.length; i++) {
+        // get a new instance each time
+        algmImpl = RegionSplitter.newSplitAlgoInstance(context.getConfiguration(),
+            this.splitAlgmName);
+        if (originalStartKeys[i].length != 0)
+          algmImpl.setFirstRow(algmImpl.rowToStr(originalStartKeys[i]));
+        if (originalStopKeys[i].length != 0)
+          algmImpl.setLastRow(algmImpl.rowToStr(originalStopKeys[i]));
+        byte[][] dividingKeys = algmImpl.split(numMappersPerRegion);
+
+        startKeys[i*numMappersPerRegion] = originalStartKeys[i];
+        for (int j = 0; j < numMappersPerRegion - 1; j++) {
+          stopKeys[i * numMappersPerRegion + j] = dividingKeys[j];
+          startKeys[i * numMappersPerRegion + j + 1] = dividingKeys[j];
         }
-      }
-      // Use the previous intervals to calc dividing keys of the last region
-      count = numMappersPerRegion * (numRegions - 1);
-      startKeys[count] = keys.getFirst()[numRegions - 1];
-      for (byte[] approxKey : Bytes.arithmeticProgSeq(dividingKeys[numMappersPerRegion -
1],
-          dividingKeys[numMappersPerRegion], numMappersPerRegion - 1)) {
-        stopKeys[count++] = approxKey;
-        startKeys[count] = approxKey;
+        stopKeys[(i+1)*numMappersPerRegion - 1] = originalStopKeys[i];
       }
       splitKeys = new Pair<byte[][], byte[][]>();
       splitKeys.setFirst(startKeys);
       splitKeys.setSecond(stopKeys);
     }
+
     List<InputSplit> splits =
         new ArrayList<InputSplit>(numRegions * numMappersPerRegion);
     byte[] startRow = scan.getStartRow();
@@ -334,8 +327,16 @@ extends InputFormat<ImmutableBytesWritab
   public void setNumMapperPerRegion(int num) throws IllegalArgumentException {
     if (num <= 0) {
       throw new IllegalArgumentException("Expecting at least 1 mapper " +
-		"per region; instead got: " + num);
+          "per region; instead got: " + num);
     }
     numMappersPerRegion = num;
   }
+
+  public void setSplitAlgorithm(String name) {
+    this.splitAlgmName = name;
+  }
+
+  public String getSplitAlgorithm() {
+    return splitAlgmName;
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java?rev=1354478&r1=1354477&r2=1354478&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
Wed Jun 27 12:51:08 2012
@@ -35,9 +35,13 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.util.RegionSplitter.HexStringSplit;
 import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
 import org.apache.hadoop.hbase.util.RegionSplitter.UniformSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -89,6 +93,9 @@ public class TestRegionSplitter {
           // Do table creation/pre-splitting and verification of region boundaries
       preSplitTableAndVerify(expectedBounds,
           HexStringSplit.class.getSimpleName(), "NewHexPresplitTable");
+
+      preSplitTableAndVerifyMRSplit(expectedBounds,
+          4, HexStringSplit.class.getSimpleName(), "NewHexPresplitTableForMR");
   }
 
   /**
@@ -118,6 +125,9 @@ public class TestRegionSplitter {
       // Do table creation/pre-splitting and verification of region boundaries
       preSplitTableAndVerify(expectedBounds, UniformSplit.class.getSimpleName(),
         "NewUniformPresplitTable");
+
+      preSplitTableAndVerifyMRSplit(expectedBounds, 4, UniformSplit.class.getSimpleName(),
+        "NewUniformPresplitTableForMR");
   }
 
   /**
@@ -284,6 +294,47 @@ public class TestRegionSplitter {
     verifyBounds(expectedBounds, tableName);
   }
 
+  private void preSplitTableAndVerifyMRSplit(List<byte[]> expectedBounds, int mappersPerRegion,
+      String splitClass, String tableName) throws Exception {
+    final int numRegions = (expectedBounds.size() - 1) / mappersPerRegion;
+    final Configuration conf = UTIL.getConfiguration();
+    conf.setInt("split.count", numRegions);
+
+    SplitAlgorithm splitAlgo = RegionSplitter.newSplitAlgoInstance(conf,
+        splitClass);
+    RegionSplitter.createPresplitTable(tableName, splitAlgo,
+        new String[] { CF_NAME }, conf);
+
+    conf.set(TableInputFormat.SPLIT_ALGO, splitClass);
+    conf.setInt(TableInputFormat.MAPPERS_PER_REGION, mappersPerRegion);
+    conf.set(TableInputFormat.INPUT_TABLE, tableName);
+    verifyBoundsForMRSplits(expectedBounds, conf);
+  }
+
+  private void verifyBoundsForMRSplits(List<byte[]> expectedBounds,
+      Configuration conf)
+      throws Exception {
+    // Get split boundaries from the table and verify their endpoints
+    JobContext context = new JobContext(conf, null);
+    TableInputFormat inf = new TableInputFormat();
+    inf.setConf(conf);
+    List<InputSplit> splits = inf.getSplits(context);
+
+    for (InputSplit split: splits) {
+      byte[] regionStart = ((TableSplit)split).getStartRow();
+      byte[] regionEnd = ((TableSplit)split).getEndRow();
+
+      // This region's start key should be one of the region boundaries
+      int startBoundaryIndex = indexOfBytes(expectedBounds, regionStart);
+      assertNotSame(-1, startBoundaryIndex);
+
+      // This region's end key should be the region boundary that comes
+      // after the starting boundary.
+      byte[] expectedRegionEnd = expectedBounds.get(startBoundaryIndex + 1);
+      assertEquals(0, Bytes.compareTo(regionEnd, expectedRegionEnd));
+    }
+  }
+
   private void verifyBounds(List<byte[]> expectedBounds, String tableName)
       throws Exception {
     // Get region boundaries from the cluster and verify their endpoints



Mime
View raw message