hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject git commit: HBASE-11562 CopyTable should provide an option to shuffle the mapper tasks (Jean-Marc Spaggiari)
Date Tue, 28 Oct 2014 18:29:30 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 050b8afcd -> a26bcdd4d


HBASE-11562 CopyTable should provide an option to shuffle the mapper tasks (Jean-Marc Spaggiari)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a26bcdd4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a26bcdd4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a26bcdd4

Branch: refs/heads/branch-1
Commit: a26bcdd4d9b04e10dd1a448e49f2df085c6f1be9
Parents: 050b8af
Author: stack <stack@apache.org>
Authored: Tue Oct 28 11:28:54 2014 -0700
Committer: stack <stack@apache.org>
Committed: Tue Oct 28 11:29:22 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/CopyTable.java       | 47 ++++++++++++--------
 .../hbase/mapreduce/TableInputFormat.java       | 31 +++++++++++--
 2 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a26bcdd4/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 5f2d16f..8d930d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -66,7 +66,8 @@ public class CopyTable extends Configured implements Tool {
   String peerAddress = null;
   String families = null;
   boolean allCells = false;
-  
+  static boolean shuffle = false;
+
   boolean bulkload = false;
   Path bulkloadDir = null;
 
@@ -87,7 +88,7 @@ public class CopyTable extends Configured implements Tool {
     if (!doCommandLine(args)) {
       return null;
     }
-    
+
     Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
 
     job.setJarByClass(CopyTable.class);
@@ -100,24 +101,27 @@ public class CopyTable extends Configured implements Tool {
     if (allCells) {
       scan.setRaw(true);
     }
+    if (shuffle) {
+      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
+    }
     if (versions >= 0) {
       scan.setMaxVersions(versions);
     }
-    
+
     if (startRow != null) {
       scan.setStartRow(Bytes.toBytes(startRow));
     }
-    
+
     if (stopRow != null) {
       scan.setStopRow(Bytes.toBytes(stopRow));
     }
-    
+
     if(families != null) {
       String[] fams = families.split(",");
       Map<String,String> cfRenameMap = new HashMap<String,String>();
       for(String fam : fams) {
         String sourceCf;
-        if(fam.contains(":")) { 
+        if(fam.contains(":")) {
             // fam looks like "sourceCfName:destCfName"
             String[] srcAndDest = fam.split(":", 2);
             sourceCf = srcAndDest[0];
@@ -125,21 +129,21 @@ public class CopyTable extends Configured implements Tool {
             cfRenameMap.put(sourceCf, destCf);
         } else {
             // fam is just "sourceCf"
-            sourceCf = fam; 
+            sourceCf = fam;
         }
         scan.addFamily(Bytes.toBytes(sourceCf));
       }
       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
     }
     job.setNumReduceTasks(0);
-    
+
     if (bulkload) {
       TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class,
null,
         null, job);
-      
+
       // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
       TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
-      
+
       FileSystem fs = FileSystem.get(getConf());
       Random rand = new Random();
       Path root = new Path(fs.getWorkingDirectory(), "copytable");
@@ -150,7 +154,7 @@ public class CopyTable extends Configured implements Tool {
           break;
         }
       }
-      
+
       System.out.println("HFiles will be stored at " + this.bulkloadDir);
       HFileOutputFormat2.setOutputPath(job, bulkloadDir);
       try (Connection conn = ConnectionFactory.createConnection(getConf());
@@ -160,11 +164,11 @@ public class CopyTable extends Configured implements Tool {
     } else {
       TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Import.Importer.class, null, null, job);
-      
+
       TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress,
null,
         null);
     }
-    
+
     return job;
   }
 
@@ -229,19 +233,19 @@ public class CopyTable extends Configured implements Tool {
           printUsage(null);
           return false;
         }
-        
+
         final String startRowArgKey = "--startrow=";
         if (cmd.startsWith(startRowArgKey)) {
           startRow = cmd.substring(startRowArgKey.length());
           continue;
         }
-        
+
         final String stopRowArgKey = "--stoprow=";
         if (cmd.startsWith(stopRowArgKey)) {
           stopRow = cmd.substring(stopRowArgKey.length());
           continue;
         }
-        
+
         final String startTimeArgKey = "--starttime=";
         if (cmd.startsWith(startTimeArgKey)) {
           startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
@@ -282,12 +286,17 @@ public class CopyTable extends Configured implements Tool {
           allCells = true;
           continue;
         }
-        
+
         if (cmd.startsWith("--bulkload")) {
           bulkload = true;
           continue;
         }
 
+        if (cmd.startsWith("--shuffle")) {
+          shuffle = true;
+          continue;
+        }
+
         if (i == args.length-1) {
           tableName = cmd;
         } else {
@@ -304,12 +313,12 @@ public class CopyTable extends Configured implements Tool {
         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime="
+ endTime);
         return false;
       }
-      
+
       if (bulkload && peerAddress != null) {
         printUsage("Remote bulkload is not supported!");
         return false;
       }
-      
+
       // set dstTableName if necessary
       if (dstTableName == null) {
         dstTableName = tableName;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a26bcdd4/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index 56ae349..dccaa25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +35,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
@@ -80,6 +84,8 @@ implements Configurable {
   public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
   /** Set the maximum number of values to return for each call to next(). */
   public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
+  /** Specify if we have to shuffle the map tasks. */
+  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
 
   /** The configuration. */
   private Configuration conf = null;
@@ -113,7 +119,7 @@ implements Configurable {
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
     }
-    
+
     Scan scan = null;
 
     if (conf.get(SCAN) != null) {
@@ -173,7 +179,7 @@ implements Configurable {
 
     setScan(scan);
   }
-  
+
   /**
    * Parses a combined family and qualifier and adds either both or just the
    * family in case there is no qualifier. This assumes the older colon
@@ -211,6 +217,25 @@ implements Configurable {
   }
 
   /**
+   * Calculates the splits that will serve as input for the map tasks. The
+   * number of splits matches the number of regions in a table. Splits are shuffled if
+   * required.
+   * @param context  The current job context.
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+   *   org.apache.hadoop.mapreduce.JobContext)
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    List<InputSplit> splits = super.getSplits(context);
+    if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase()))
{
+      Collections.shuffle(splits);
+    }
+    return splits;
+  }
+
+  /**
    * Convenience method to parse a string representation of an array of column specifiers.
    *
    * @param scan The Scan to update.
@@ -235,7 +260,7 @@ implements Configurable {
 
     return super.getStartEndKeys();
   }
-  
+
   /**
    * Sets split table in map-reduce job.
    */


Mime
View raw message