hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1405915 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: mapred/ mapreduce/
Date Mon, 05 Nov 2012 18:43:47 GMT
Author: liyin
Date: Mon Nov  5 18:43:46 2012
New Revision: 1405915

URL: http://svn.apache.org/viewvc?rev=1405915&view=rev
Log:
[jira] [HBASE-7031] [89-fb] Add startRow/endRow to TableInputFormat

Author: pritam

Summary: We are still using
org.apache.hadoop.hbase.mapred.TableInputFormat (as opposed to
    org.apache.hadoop.hbase.mapreduce.TableInputFormat) for
Hadoop Streaming integration. We need to add startRow/endRow
support to TableIn

Test Plan:
- Run a streaming map-reduce job using TableInputFormat
with startRow and endRow specified.
- Verify that only the requested range of rows is
included in the output.

Reviewers: Kannan, Karthik, JIRA, mbautin,
  aaiyer, Liyin

Reviewed By: Liyin

CC: tjackson

Differential Revision: https://reviews.facebook.net/D6129

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?rev=1405915&r1=1405914&r2=1405915&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
Mon Nov  5 18:43:46 2012
@@ -27,11 +27,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter.UniformSplit;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.MAPPERS_PER_REGION;
+import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.SPLIT_ALGO;
+
 /**
  * Convert HBase tabular data into a format that is consumable by Map/Reduce.
  */
@@ -59,6 +63,14 @@ public class TableInputFormat extends Ta
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
     }
+
+    // Reuse scan configuration option support from the new TableInputFormat implementation.
+    setScan(org.apache.hadoop.hbase.mapreduce.TableInputFormat.createScan(job));
+    if (job.get(MAPPERS_PER_REGION) != null) {
+      setNumMapperPerRegion(Integer.parseInt(job.get(MAPPERS_PER_REGION)));
+    }
+
+    setSplitAlgorithm(job.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
   }
 
   public void validateInput(JobConf job) throws IOException {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=1405915&r1=1405914&r2=1405915&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
Mon Nov  5 18:43:46 2012
@@ -20,12 +20,13 @@
 package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -73,6 +74,15 @@ implements InputFormat<ImmutableBytesWri
   private TableRecordReader tableRecordReader;
   private Filter rowFilter;
 
+  /** Holds the details for the internal scanner. */
+  private Scan scan;
+
+  /** The number of mappers to assign to each region. */
+  private int numMappersPerRegion = 1;
+
+  /** Splitting algorithm to be used to split the keys */
+  private String splitAlgmName; // default to Uniform
+
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
@@ -92,6 +102,7 @@ implements InputFormat<ImmutableBytesWri
     trr.setStartRow(tSplit.getStartRow());
     trr.setEndRow(tSplit.getEndRow());
     trr.setHTable(this.table);
+    trr.setScan(scan);
     trr.setInputColumns(this.inputColumns);
     trr.setRowFilter(this.rowFilter);
     trr.init();
@@ -116,33 +127,20 @@ implements InputFormat<ImmutableBytesWri
    * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf,
int)
    */
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    if (this.table == null) {
-      throw new IOException("No table was provided");
-    }
-    byte [][] startKeys = this.table.getStartKeys();
-    if (startKeys == null || startKeys.length == 0) {
-      throw new IOException("Expecting at least one region");
-    }
-    if (this.inputColumns == null || this.inputColumns.length == 0) {
-      throw new IOException("Expecting at least one column");
+    List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
+        org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplitsInternal(
+            table, job, scan, numMappersPerRegion, splitAlgmName, null);
+    int n = newStyleSplits.size();
+    InputSplit[] result = new InputSplit[n];
+    for (int i = 0; i < n; ++i) {
+      org.apache.hadoop.hbase.mapreduce.TableSplit newTableSplit =
+          (org.apache.hadoop.hbase.mapreduce.TableSplit) newStyleSplits.get(i);
+      result[i] = new TableSplit(table.getTableName(),
+          newTableSplit.getStartRow(),
+          newTableSplit.getEndRow(),
+          newTableSplit.getRegionLocation());
     }
-    int realNumSplits = numSplits > startKeys.length? startKeys.length:
-      numSplits;
-    InputSplit[] splits = new InputSplit[realNumSplits];
-    int middle = startKeys.length / realNumSplits;
-    int startPos = 0;
-    for (int i = 0; i < realNumSplits; i++) {
-      int lastPos = startPos + middle;
-      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
-      String regionLocation = table.getRegionLocation(startKeys[startPos]).
-        getServerAddress().getHostname();
-      splits[i] = new TableSplit(this.table.getTableName(),
-        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
-          HConstants.EMPTY_START_ROW, regionLocation);
-      LOG.info("split: " + i + "->" + splits[i]);
-      startPos = lastPos;
-    }
-    return splits;
+    return result;
   }
 
   /**
@@ -186,4 +184,31 @@ implements InputFormat<ImmutableBytesWri
   protected void setRowFilter(Filter rowFilter) {
     this.rowFilter = rowFilter;
   }
-}
\ No newline at end of file
+
+  /**
+   * Sets the scan defining the actual details like columns etc.
+   *
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  /**
+   * Sets the number of mappers assigned to each region.
+   *
+   * @param num
+   * @throws IllegalArgumentException When <code>num</code> <= 0.
+   */
+  public void setNumMapperPerRegion(int num) throws IllegalArgumentException {
+    if (num <= 0) {
+      throw new IllegalArgumentException("Expecting at least 1 mapper " +
+          "per region; instead got: " + num);
+    }
+    numMappersPerRegion = num;
+  }
+
+  public void setSplitAlgorithm(String name) {
+    this.splitAlgmName = name;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java?rev=1405915&r1=1405914&r2=1405915&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
Mon Nov  5 18:43:46 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapred.RecordReader;
@@ -135,4 +136,8 @@ implements RecordReader<ImmutableBytesWr
   throws IOException {
     return this.recordReaderImpl.next(key, value);
   }
+
+  public void setScan(Scan scan) {
+    recordReaderImpl.setScan(scan);
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=1405915&r1=1405914&r2=1405915&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
Mon Nov  5 18:43:46 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.util.StringUtil
 public class TableRecordReaderImpl {
   static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
 
+  private Scan scan = null;
   private byte [] startRow;
   private byte [] endRow;
   private byte [] lastRow = null;
@@ -56,9 +57,11 @@ public class TableRecordReaderImpl {
    * @throws IOException
    */
   public void restart(byte[] firstRow) throws IOException {
+    Scan scan = new Scan(this.scan);
+    scan.setStartRow(firstRow);
     if ((endRow != null) && (endRow.length > 0)) {
+      scan.setStopRow(endRow);
       if (trrRowFilter != null) {
-        Scan scan = new Scan(firstRow, endRow);
         scan.addColumns(trrInputColumns);
         scan.setFilter(trrRowFilter);
         scan.setCacheBlocks(false);
@@ -67,7 +70,6 @@ public class TableRecordReaderImpl {
         LOG.debug("TIFB.restart, firstRow: " +
             Bytes.toStringBinary(firstRow) + ", endRow: " +
             Bytes.toStringBinary(endRow));
-        Scan scan = new Scan(firstRow, endRow);
         scan.addColumns(trrInputColumns);
         this.scanner = this.htable.getScanner(scan);
       }
@@ -75,7 +77,6 @@ public class TableRecordReaderImpl {
       LOG.debug("TIFB.restart, firstRow: " +
           Bytes.toStringBinary(firstRow) + ", no endRow");
 
-      Scan scan = new Scan(firstRow);
       scan.addColumns(trrInputColumns);
 //      scan.setFilter(trrRowFilter);
       this.scanner = this.htable.getScanner(scan);
@@ -193,4 +194,8 @@ public class TableRecordReaderImpl {
     }
     return false;
   }
+
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1405915&r1=1405914&r2=1405915&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
Mon Nov  5 18:43:46 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.apache.hadoop.hbase.util.RegionSplitter.UniformSplit;
 import org.apache.hadoop.util.StringUtils;
 
@@ -39,7 +38,7 @@ import org.apache.hadoop.util.StringUtil
 public class TableInputFormat extends TableInputFormatBase
 implements Configurable {
 
-  private final Log LOG = LogFactory.getLog(TableInputFormat.class);
+  private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
 
   /** Job parameter that specifies the input table. */
   public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
@@ -47,25 +46,41 @@ implements Configurable {
    * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
    */
   public static final String SCAN = "hbase.mapreduce.scan";
+
   /** Column Family to Scan */
   public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
+
   /** Space delimited list of columns to scan. */
   public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
+
   /** The timestamp used to filter columns with a specific timestamp. */
   public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
+
   /** The starting timestamp used to filter columns with a specific range of versions. */
   public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
+
   /** The ending timestamp used to filter columns with a specific range of versions. */
   public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
+
   /** The maximum number of version to return. */
   public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
+
   /** Set to false to disable server-side caching of blocks for this scan. */
   public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
+
   /** The number of rows for caching that will be passed to scanners. */
   public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
+
+  /** Start row of the scan */
+  public static final String SCAN_START_ROW = "hbase.mapreduce.scan.startrow";
+
+  /** End row of the scan */
+  public static final String SCAN_END_ROW = "hbase.mapreduce.scan.endrow";
+
   /** The number of mappers that should be assigned to each region. */
   public static final String MAPPERS_PER_REGION = "hbase.mapreduce.mappersperregion";
-  /** The Algorithm used to splie each region's keyspace. */
+
+  /** The Algorithm used to split each region's keyspace. */
   public static final String SPLIT_ALGO = "hbase.mapreduce.tableinputformat.split.algo";
 
   /** The configuration. */
@@ -100,13 +115,23 @@ implements Configurable {
       LOG.error(StringUtils.stringifyException(e));
     }
 
-    Scan scan = null;
+    setScan(createScan(conf));
+    if (conf.get(MAPPERS_PER_REGION) != null) {
+      setNumMapperPerRegion(Integer.parseInt(conf.get(MAPPERS_PER_REGION)));
+    }
+
+    setSplitAlgorithm(conf.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
+  }
+
+  public static Scan createScan(Configuration conf) {
+    Scan scan;
 
     if (conf.get(SCAN) != null) {
       try {
         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
       } catch (IOException e) {
         LOG.error("An error occurred.", e);
+        throw new RuntimeException(e);
       }
     } else {
       try {
@@ -140,18 +165,24 @@ implements Configurable {
 
         // false by default, full table scans generate too much BC churn
         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
+
+        String startRow = conf.get(SCAN_START_ROW);
+        if (startRow != null) {
+          LOG.info("Setting start row to: " + startRow);
+          scan.setStartRow(Bytes.toBytes(startRow));
+        }
+
+        String endRow = conf.get(SCAN_END_ROW);
+        if (conf.get(SCAN_END_ROW) != null) {
+          LOG.info("Setting end row to: " + endRow);
+          scan.setStopRow(Bytes.toBytes(endRow));
+        }
       } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
+        LOG.error(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
       }
     }
-
-    if (conf.get(MAPPERS_PER_REGION) != null) {
-      setNumMapperPerRegion(Integer.parseInt(conf.get(MAPPERS_PER_REGION)));
-    }
-
-    setSplitAlgorithm(conf.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
-
-    setScan(scan);
+    return scan;
   }
 
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1405915&r1=1405914&r2=1405915&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Mon Nov  5 18:43:46 2012
@@ -22,13 +22,15 @@ package org.apache.hadoop.hbase.mapreduc
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.naming.NamingException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.client.HTable;
@@ -50,34 +52,11 @@ import org.apache.hadoop.net.DNS;
  * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
  * {@link Scan} instance that defines the input columns etc. Subclasses may use
  * other TableRecordReader implementations.
- * <p>
- * An example of a subclass:
- * <pre>
- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
- *
- *     public void configure(JobConf job) {
- *       HTable exampleTable = new HTable(new HBaseConfiguration(job),
- *         Bytes.toBytes("exampleTable"));
- *       // mandatory
- *       setHTable(exampleTable);
- *       Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *         Bytes.toBytes("columnB") };
- *       // mandatory
- *       setInputColumns(inputColumns);
- *       RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
- *       // optional
- *       setRowFilter(exampleFilter);
- *     }
- *
- *     public void validateInput(JobConf job) throws IOException {
- *     }
- *  }
- * </pre>
  */
 public abstract class TableInputFormatBase
 extends InputFormat<ImmutableBytesWritable, Result> {
 
-  final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+  private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
 
   /** Holds the details for the internal scanner. */
   private Scan scan = null;
@@ -91,11 +70,11 @@ extends InputFormat<ImmutableBytesWritab
   private String splitAlgmName; // default to Uniform
 
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
-  private HashMap<InetAddress, String> reverseDNSCacheMap =
-    new HashMap<InetAddress, String>();
+  private static ConcurrentHashMap<InetAddress, String> reverseDNSCacheMap =
+      new ConcurrentHashMap<InetAddress, String>();
   
   /** The NameServer address */
-  private String nameServer = null;
+  private static String nameServer = null;
   
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
@@ -140,18 +119,26 @@ extends InputFormat<ImmutableBytesWritab
    */
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
-    // Get the name server address and the default value is null.
-    this.nameServer =
-      context.getConfiguration().get("hbase.nameserver.address", null);
-    
+    return getSplitsInternal(table, context.getConfiguration(), scan, numMappersPerRegion,
+        splitAlgmName, this);
+  }
+
+  public static List<InputSplit> getSplitsInternal(HTable table,
+      Configuration conf,
+      Scan scan,
+      int numMappersPerRegion,
+      String splitAlgmName,
+      TableInputFormatBase tifb) throws IOException {
+    if (table == null) {
+      throw new IOException("No table was provided.");
+    }
+    determineNameServer(conf);
+
     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     if (keys == null || keys.getFirst() == null ||
         keys.getFirst().length == 0) {
       throw new IOException("Expecting at least one region.");
     }
-    if (table == null) {
-      throw new IOException("No table was provided.");
-    }
     Pair<byte[][], byte[][]> splitKeys = null;
     int numRegions = keys.getFirst().length;
     //TODO: Can anything else be done when there are less than 3 regions?
@@ -171,8 +158,8 @@ extends InputFormat<ImmutableBytesWritab
 
       for (int i = 0; i < originalStartKeys.length; i++) {
         // get a new instance each time
-        algmImpl = RegionSplitter.newSplitAlgoInstance(context.getConfiguration(),
-            this.splitAlgmName);
+        algmImpl = RegionSplitter.newSplitAlgoInstance(conf,
+            splitAlgmName);
         if (originalStartKeys[i].length != 0)
           algmImpl.setFirstRow(algmImpl.rowToStr(originalStartKeys[i]));
         if (originalStopKeys[i].length != 0)
@@ -196,7 +183,7 @@ extends InputFormat<ImmutableBytesWritab
     byte[] startRow = scan.getStartRow();
     byte[] stopRow = scan.getStopRow();
     for (int i = 0; i < numRegions * numMappersPerRegion; i++) {
-      if (!includeRegionInSplit(keys.getFirst()[i / numMappersPerRegion],
+      if (tifb != null && !tifb.includeRegionInSplit(keys.getFirst()[i / numMappersPerRegion],
           keys.getSecond()[i / numMappersPerRegion])) {
         continue;
       }
@@ -234,13 +221,20 @@ extends InputFormat<ImmutableBytesWritab
     }
     return splits;
   }
-  
-  private String reverseDNS(InetAddress ipAddress)
+
+  private static synchronized void determineNameServer(Configuration conf) {
+    // Get the name server address and the default value is null.
+    if (nameServer == null) {
+      nameServer = conf.get("hbase.nameserver.address", null);
+    }
+  }
+
+  private static String reverseDNS(InetAddress ipAddress)
   throws NamingException {
-    String hostName = this.reverseDNSCacheMap.get(ipAddress);
+    String hostName = reverseDNSCacheMap.get(ipAddress);
     if (hostName == null) {
-      hostName = DNS.reverseDns(ipAddress, this.nameServer);
-      this.reverseDNSCacheMap.put(ipAddress, hostName);
+      hostName = DNS.reverseDns(ipAddress, nameServer);
+      reverseDNSCacheMap.put(ipAddress, hostName);
     }
     return hostName;
   }
@@ -335,8 +329,4 @@ extends InputFormat<ImmutableBytesWritab
   public void setSplitAlgorithm(String name) {
     this.splitAlgmName = name;
   }
-
-  public String getSplitAlgorithm() {
-    return splitAlgmName;
-  }
 }



Mime
View raw message