hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-12798 Map Reduce jobs should not create Tables in setConf() (Solomon Duskis)
Date Sun, 11 Jan 2015 17:25:45 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.0 f52faa10e -> 60d6c7dcc


HBASE-12798 Map Reduce jobs should not create Tables in setConf() (Solomon Duskis)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/60d6c7dc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/60d6c7dc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/60d6c7dc

Branch: refs/heads/branch-1.0
Commit: 60d6c7dcca64be829594985a39ad2ec7b2020fce
Parents: f52faa1
Author: tedyu <yuzhihong@gmail.com>
Authored: Sun Jan 11 09:25:36 2015 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Sun Jan 11 09:25:36 2015 -0800

----------------------------------------------------------------------
 .../hbase/mapreduce/TableInputFormat.java       | 18 ++--
 .../hbase/mapreduce/TableInputFormatBase.java   | 94 ++++++++++++++++----
 .../hbase/mapreduce/TableOutputFormat.java      | 22 +++--
 3 files changed, 102 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/60d6c7dc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index 50da9bc..8896eb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.util.StringUtils;
 public class TableInputFormat extends TableInputFormatBase
 implements Configurable {
 
+  @SuppressWarnings("hiding")
   private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
 
   /** Job parameter that specifies the input table. */
@@ -112,13 +113,6 @@ implements Configurable {
   @Override
   public void setConf(Configuration configuration) {
     this.conf = configuration;
-    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
-    try {
-      // NOTE: This connection doesn't currently get closed explicit1ly.
-      initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
-    } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-    }
 
     Scan scan = null;
 
@@ -180,6 +174,16 @@ implements Configurable {
     setScan(scan);
   }
 
+  @Override
+  protected void initialize() {
+    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
+    try {
+      initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
   /**
    * Parses a combined family and qualifier and adds either both or just the
    * family in case there is no qualifier. This assumes the older colon

http://git-wip-us.apache.org/repos/asf/hbase/blob/60d6c7dc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index 4123467..3365a3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -66,12 +66,10 @@ import org.apache.hadoop.util.StringUtils;
  * <pre>
  *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
  *
+ *     private JobConf job;
+ *
  *     public void configure(JobConf job) {
- *       Connection connection =
- *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- *       TableName tableName = TableName.valueOf("exampleTable");
- *       // mandatory
- *       initializeTable(connection, tableName);
+ *       this.job = job;
  *       Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"),
  *         Bytes.toBytes("cf2") };
  *       // mandatory
@@ -80,6 +78,14 @@ import org.apache.hadoop.util.StringUtils;
  *       // optional
  *       setRowFilter(exampleFilter);
  *     }
+ *     
+ *     protected void initialize() {
+ *       Connection connection =
+ *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory
+ *       initializeTable(connection, tableName);
+ *    }
  *
  *     public void validateInput(JobConf job) throws IOException {
  *     }
@@ -105,13 +111,14 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   private RegionLocator regionLocator;
   /** The reader scanning the table, can be a custom one. */
   private TableRecordReader tableRecordReader = null;
+  /** The underlying {@link Connection} of the table. */
+  private Connection connection;
+
   
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
   private HashMap<InetAddress, String> reverseDNSCacheMap =
     new HashMap<InetAddress, String>();
 
-  private Connection connection;
-
   /**
    * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
    * the default.
@@ -129,6 +136,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
       InputSplit split, TaskAttemptContext context)
   throws IOException {
     if (table == null) {
+      initialize();
+    }
+    if (getTable() == null) {
+      // initialize() must not have been implemented in the subclass.
       throw new IOException("Cannot create a record reader because of a" +
           " previous error. Please look at the previous logs lines from" +
           " the task's full log for more details.");
@@ -141,19 +152,13 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     sc.setStartRow(tSplit.getStartRow());
     sc.setStopRow(tSplit.getEndRow());
     trr.setScan(sc);
-    trr.setTable(table);
+    trr.setTable(getTable());
     return new RecordReader<ImmutableBytesWritable, Result>() {
 
       @Override
       public void close() throws IOException {
         trr.close();
-        close(admin, table, regionLocator, connection);
-      }
-
-      private void close(Closeable... closables) throws IOException {
-        for (Closeable c : closables) {
-          if(c != null) { c.close(); }
-        }
+        closeTable();
       }
 
       @Override
@@ -185,7 +190,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
 
   protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
-    return regionLocator.getStartEndKeys();
+    return getRegionLocator().getStartEndKeys();
   }
 
   /**
@@ -200,10 +205,18 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
+    boolean closeOnFinish = false;
+
     if (table == null) {
+      initialize();
+      closeOnFinish = true;
+    }
+    if (table == null) {
+      // initialize() wasn't implemented, so the table is null.
       throw new IOException("No table was provided.");
     }
 
+    try {
     RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin);
 
     Pair<byte[][], byte[][]> keys = getStartEndKeys();
@@ -267,6 +280,11 @@ extends InputFormat<ImmutableBytesWritable, Result> {
       }
     }
     return splits;
+    } finally {
+      if (closeOnFinish) {
+        closeTable();
+      }
+    }
   }
 
   public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException
{
@@ -321,13 +339,16 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   @Deprecated
   protected HTable getHTable() {
-    return (HTable) this.table;
+    return (HTable) this.getTable();
   }
 
   /**
    * Allows subclasses to get the {@link RegionLocator}.
    */
   protected RegionLocator getRegionLocator() {
+    if (regionLocator == null) {
+      initialize();
+    }
     return regionLocator;
   }
   
@@ -335,6 +356,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * Allows subclasses to get the {@link Table}.
    */
   protected Table getTable() {
+    if (table == null) {
+      initialize();
+    }
     return table;
   }
 
@@ -342,6 +366,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * Allows subclasses to get the {@link Admin}.
    */
   protected Admin getAdmin() {
+    if (admin == null) {
+      initialize();
+    }
     return admin;
   }
 
@@ -356,7 +383,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   protected void setHTable(HTable table) throws IOException {
     this.table = table;
     this.regionLocator = table;
-    this.admin = table.getConnection().getAdmin();
+    this.connection = table.getConnection();
+    this.admin = this.connection.getAdmin();
   }
 
   /**
@@ -401,4 +429,34 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
     this.tableRecordReader = tableRecordReader;
   }
+  
+  /**
+   * This method will be called when any of the following are referenced, but not yet initialized:
+   * admin, regionLocator, table. Subclasses will have the opportunity to call
+   * {@link #initializeTable(Connection, TableName)}
+   */
+  protected void initialize() {
+   
+  }
+
+  /**
+   * Close the Table and related objects that were initialized via
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * @throws IOException
+   */
+  protected void closeTable() throws IOException {
+    close(admin, table, regionLocator, connection);
+    admin = null;
+    table = null;
+    regionLocator = null;
+    connection = null;
+  }
+
+  private void close(Closeable... closables) throws IOException {
+    for (Closeable c : closables) {
+      if(c != null) { c.close(); }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/60d6c7dc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index cd69a5b..c46f41f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -79,15 +79,26 @@ implements Configurable {
   /** The configuration. */
   private Configuration conf = null;
 
-  private Table table;
-  private Connection connection;
-
   /**
    * Writes the reducer output to an HBase table.
    */
   protected class TableRecordWriter
   extends RecordWriter<KEY, Mutation> {
 
+    private Connection connection;
+    private Table table;
+
+    /**
+     * @throws IOException 
+     * 
+     */
+    public TableRecordWriter() throws IOException {
+      String tableName = conf.get(OUTPUT_TABLE);
+      this.connection = ConnectionFactory.createConnection(conf);
+      this.table = connection.getTable(TableName.valueOf(tableName));
+      this.table.setAutoFlushTo(false);
+      LOG.info("Created table instance for "  + tableName);
+    }
     /**
      * Closes the writer, in this case flush table commits.
      *
@@ -165,6 +176,7 @@ implements Configurable {
     return new TableOutputCommitter();
   }
 
+  @Override
   public Configuration getConf() {
     return conf;
   }
@@ -193,10 +205,6 @@ implements Configurable {
       if (zkClientPort != 0) {
         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
       }
-      this.connection = ConnectionFactory.createConnection(this.conf);
-      this.table = connection.getTable(TableName.valueOf(tableName));
-      this.table.setAutoFlushTo(false);
-      LOG.info("Created table instance for "  + tableName);
     } catch(IOException e) {
       LOG.error(e);
       throw new RuntimeException(e);


Mime
View raw message