hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject svn commit: r1615730 - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/queries/positive/ hbase-handler/src/test/results/positive/ hbase-handler/src/test/templat...
Date Mon, 04 Aug 2014 19:17:54 GMT
Author: khorgath
Date: Mon Aug  4 19:17:53 2014
New Revision: 1615730

URL: http://svn.apache.org/r1615730
Log:
HIVE-6584 : Add HiveHBaseTableSnapshotInputFormat (Nick Dimiduk, reviewed by Navis Ryu, Sushanth Sowmyan)

Added:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java
    hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q
    hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
    hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
    hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
    hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java
    hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
    hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug  4 19:17:53 2014
@@ -1256,6 +1256,9 @@ public class HiveConf extends Configurat
         "Disabling this improves HBase write performance at the risk of lost writes in case of a crash."),
     HIVE_HBASE_GENERATE_HFILES("hive.hbase.generatehfiles", false,
         "True when HBaseStorageHandler should generate hfiles instead of operate against the online table."),
+    HIVE_HBASE_SNAPSHOT_NAME("hive.hbase.snapshot.name", null, "The HBase table snapshot name to use."),
+    HIVE_HBASE_SNAPSHOT_RESTORE_DIR("hive.hbase.snapshot.restoredir", "/tmp", "The directory in which to " +
+        "restore the HBase table snapshot."),
 
     // For har files
     HIVEARCHIVEENABLED("hive.archive.enabled", false, "Whether archiving operations are permitted"),

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Mon Aug  4 19:17:53 2014
@@ -31,46 +31,86 @@ import org.apache.hadoop.mapred.InputSpl
  * HBaseSplit augments FileSplit with HBase column mapping.
  */
 public class HBaseSplit extends FileSplit implements InputSplit {
-  private final TableSplit split;
 
+  private final TableSplit tableSplit;
+  private final InputSplit snapshotSplit;
+  private boolean isTableSplit; // should be final but Writable
+
+  /**
+   * For Writable
+   */
   public HBaseSplit() {
     super((Path) null, 0, 0, (String[]) null);
-    split = new TableSplit();
+    tableSplit = new TableSplit();
+    snapshotSplit = HBaseTableSnapshotInputFormatUtil.createTableSnapshotRegionSplit();
   }
 
-  public HBaseSplit(TableSplit split, Path dummyPath) {
+  public HBaseSplit(TableSplit tableSplit, Path dummyPath) {
     super(dummyPath, 0, 0, (String[]) null);
-    this.split = split;
+    this.tableSplit = tableSplit;
+    this.snapshotSplit = HBaseTableSnapshotInputFormatUtil.createTableSnapshotRegionSplit();
+    this.isTableSplit = true;
   }
 
-  public TableSplit getSplit() {
-    return this.split;
+  /**
+   * TODO: use TableSnapshotRegionSplit HBASE-11555 is fixed.
+   */
+  public HBaseSplit(InputSplit snapshotSplit, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.tableSplit = new TableSplit();
+    this.snapshotSplit = snapshotSplit;
+    this.isTableSplit = false;
   }
 
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    split.readFields(in);
+  public TableSplit getTableSplit() {
+    assert isTableSplit;
+    return this.tableSplit;
+  }
+
+  public InputSplit getSnapshotSplit() {
+    assert !isTableSplit;
+    return this.snapshotSplit;
   }
 
   @Override
   public String toString() {
-    return "TableSplit " + split;
+    return "" + (isTableSplit ? tableSplit : snapshotSplit);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.isTableSplit = in.readBoolean();
+    if (this.isTableSplit) {
+      tableSplit.readFields(in);
+    } else {
+      snapshotSplit.readFields(in);
+    }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    split.write(out);
+    out.writeBoolean(isTableSplit);
+    if (isTableSplit) {
+      tableSplit.write(out);
+    } else {
+      snapshotSplit.write(out);
+    }
   }
 
   @Override
   public long getLength() {
-    return split.getLength();
+    long val = 0;
+    try {
+      val = isTableSplit ? tableSplit.getLength() : snapshotSplit.getLength();
+    } finally {
+      return val;
+    }
   }
 
   @Override
   public String[] getLocations() throws IOException {
-    return split.getLocations();
+    return isTableSplit ? tableSplit.getLocations() : snapshotSplit.getLocations();
   }
 }

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Mon Aug  4 19:17:53 2014
@@ -29,7 +29,10 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -69,6 +72,19 @@ import org.apache.hadoop.util.StringUtil
 public class HBaseStorageHandler extends DefaultStorageHandler
   implements HiveMetaHook, HiveStoragePredicateHandler {
 
+  private static final Log LOG = LogFactory.getLog(HBaseStorageHandler.class);
+
+  /** HBase-internal config by which input format receives snapshot name. */
+  private static final String HBASE_SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
+  /** HBase-internal config by which input format received restore dir before HBASE-11335. */
+  private static final String HBASE_SNAPSHOT_TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
+  /** HBase-internal config by which input format received restore dir after HBASE-11335. */
+  private static final String HBASE_SNAPSHOT_RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
+  /** HBase config by which a SlabCache is sized. */
+  private static final String HBASE_OFFHEAP_PCT_KEY = "hbase.offheapcache.percentage";
+  /** HBase config by which a BucketCache is sized. */
+  private static final String HBASE_BUCKETCACHE_SIZE_KEY = "hbase.bucketcache.size";
+
   final static public String DEFAULT_PREFIX = "default.";
 
   //Check if the configure job properties is called from input
@@ -258,6 +274,11 @@ public class HBaseStorageHandler extends
 
   @Override
   public Class<? extends InputFormat> getInputFormatClass() {
+    if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME) != null) {
+      LOG.debug("Using TableSnapshotInputFormat");
+      return HiveHBaseTableSnapshotInputFormat.class;
+    }
+    LOG.debug("Using HiveHBaseTableInputFormat");
     return HiveHBaseTableInputFormat.class;
   }
 
@@ -342,6 +363,37 @@ public class HBaseStorageHandler extends
     // do this for reconciling HBaseStorageHandler for use in HCatalog
     // check to see if this an input job or an outputjob
     if (this.configureInputJobProps) {
+      String snapshotName = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME);
+      if (snapshotName != null) {
+        HBaseTableSnapshotInputFormatUtil.assertSupportsTableSnapshots();
+
+        try {
+          String restoreDir =
+            HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_RESTORE_DIR);
+          if (restoreDir == null) {
+            throw new IllegalArgumentException(
+              "Cannot process HBase snapshot without specifying " + HiveConf.ConfVars
+                .HIVE_HBASE_SNAPSHOT_RESTORE_DIR);
+          }
+
+          HBaseTableSnapshotInputFormatUtil.configureJob(hbaseConf, snapshotName, new Path(restoreDir));
+          // copy over configs touched by above method
+          jobProperties.put(HBASE_SNAPSHOT_NAME_KEY, hbaseConf.get(HBASE_SNAPSHOT_NAME_KEY));
+          if (hbaseConf.get(HBASE_SNAPSHOT_TABLE_DIR_KEY, null) != null) {
+            jobProperties.put(HBASE_SNAPSHOT_TABLE_DIR_KEY, hbaseConf.get(HBASE_SNAPSHOT_TABLE_DIR_KEY));
+          } else {
+            jobProperties.put(HBASE_SNAPSHOT_RESTORE_DIR_KEY, hbaseConf.get(HBASE_SNAPSHOT_RESTORE_DIR_KEY));
+          }
+
+          TableMapReduceUtil.resetCacheConfig(hbaseConf);
+          // copy over configs touched by above method
+          jobProperties.put(HBASE_OFFHEAP_PCT_KEY, hbaseConf.get(HBASE_OFFHEAP_PCT_KEY));
+          jobProperties.put(HBASE_BUCKETCACHE_SIZE_KEY, hbaseConf.get(HBASE_BUCKETCACHE_SIZE_KEY));
+        } catch (IOException e) {
+          throw new IllegalArgumentException(e);
+        }
+      }
+
       for (String k : jobProperties.keySet()) {
         jobConf.set(k, jobProperties.get(k));
       }
@@ -415,7 +467,8 @@ public class HBaseStorageHandler extends
        * only need TableMapReduceUtil.addDependencyJars(jobConf) here.
        */
       TableMapReduceUtil.addDependencyJars(
-          jobConf, HBaseStorageHandler.class, TableInputFormatBase.class);
+          jobConf, HBaseStorageHandler.class, TableInputFormatBase.class,
+          org.cliffc.high_scale_lib.Counter.class); // this will be removed for HBase 1.0
       Set<String> merged = new LinkedHashSet<String>(jobConf.getStringCollection("tmpjars"));
 
       Job copy = new Job(jobConf);

Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseTableSnapshotInputFormatUtil.java Mon Aug  4 19:17:53 2014
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * A helper class to isolate newer HBase features from users running against older versions of
+ * HBase that don't provide those features.
+ *
+ * TODO: remove this class when it's okay to drop support for earlier version of HBase.
+ */
+public class HBaseTableSnapshotInputFormatUtil {
+
+  private static final Log LOG = LogFactory.getLog(HBaseTableSnapshotInputFormatUtil.class);
+
+  /** The class we look for to determine if hbase snapshots are supported. */
+  private static final String TABLESNAPSHOTINPUTFORMAT_CLASS
+    = "org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl";
+
+  private static final String TABLESNAPSHOTREGIONSPLIT_CLASS
+    = "org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat$TableSnapshotRegionSplit";
+
+  /** True when {@link #TABLESNAPSHOTINPUTFORMAT_CLASS} is present. */
+  private static final boolean SUPPORTS_TABLE_SNAPSHOTS;
+
+  static {
+    boolean support = false;
+    try {
+      Class<?> clazz = Class.forName(TABLESNAPSHOTINPUTFORMAT_CLASS);
+      support = clazz != null;
+    } catch (ClassNotFoundException e) {
+      // pass
+    }
+    SUPPORTS_TABLE_SNAPSHOTS = support;
+  }
+
+  /** Return true when the HBase runtime supports {@link HiveHBaseTableSnapshotInputFormat}. */
+  public static void assertSupportsTableSnapshots() {
+    if (!SUPPORTS_TABLE_SNAPSHOTS) {
+      throw new RuntimeException("This version of HBase does not support Hive over table " +
+        "snapshots. Please upgrade to at least HBase 0.98.3 or later. See HIVE-6584 for details.");
+    }
+  }
+
+  /**
+   * Configures {@code conf} for the snapshot job. Call only when
+   * {@link #assertSupportsTableSnapshots()} returns true.
+   */
+  public static void configureJob(Configuration conf, String snapshotName, Path restoreDir)
+      throws IOException {
+    TableSnapshotInputFormatImpl.setInput(conf, snapshotName, restoreDir);
+  }
+
+  /**
+   * Create a bare TableSnapshotRegionSplit. Needed because Writables require a
+   * default-constructed instance to hydrate from the DataInput.
+   *
+   * TODO: remove once HBASE-11555 is fixed.
+   */
+  public static InputSplit createTableSnapshotRegionSplit() {
+    try {
+      assertSupportsTableSnapshots();
+    } catch (RuntimeException e) {
+      LOG.debug("Probably don't support table snapshots. Returning null instance.", e);
+      return null;
+    }
+
+    try {
+      Class<? extends InputSplit> resultType =
+        (Class<? extends InputSplit>) Class.forName(TABLESNAPSHOTREGIONSPLIT_CLASS);
+      Constructor<? extends InputSplit> cxtor = resultType.getDeclaredConstructor(new Class[]{});
+      cxtor.setAccessible(true);
+      return cxtor.newInstance(new Object[]{});
+    } catch (ClassNotFoundException e) {
+      throw new UnsupportedOperationException(
+        "Unable to find " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+    } catch (IllegalAccessException e) {
+      throw new UnsupportedOperationException(
+        "Unable to access specified class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+    } catch (InstantiationException e) {
+      throw new UnsupportedOperationException(
+        "Unable to instantiate specified class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+    } catch (InvocationTargetException e) {
+      throw new UnsupportedOperationException(
+        "Constructor threw an exception for " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+    } catch (NoSuchMethodException e) {
+      throw new UnsupportedOperationException(
+        "Unable to find suitable constructor for class " + TABLESNAPSHOTREGIONSPLIT_CLASS, e);
+    }
+  }
+}

Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java Mon Aug  4 19:17:53 2014
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat.
+ */
+class HiveHBaseInputFormatUtil {
+
+  /**
+   * Parse {@code jobConf} to create the target {@link HTable} instance.
+   */
+  public static HTable getTable(JobConf jobConf) throws IOException {
+    String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+    return new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName));
+  }
+
+  /**
+   * Parse {@code jobConf} to create a {@link Scan} instance.
+   */
+  public static Scan getScan(JobConf jobConf) throws IOException {
+    String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
+    List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+    ColumnMappings columnMappings;
+
+    try {
+      columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
+
+    if (columnMappings.size() < readColIDs.size()) {
+      throw new IOException("Cannot read more columns than the given table contains.");
+    }
+
+    boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf);
+    Scan scan = new Scan();
+    boolean empty = true;
+
+    // The list of families that have been added to the scan
+    List<String> addedFamilies = new ArrayList<String>();
+
+    if (!readAllColumns) {
+      ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
+      for (int i : readColIDs) {
+        ColumnMapping colMap = columnsMapping[i];
+        if (colMap.hbaseRowKey) {
+          continue;
+        }
+
+        if (colMap.qualifierName == null) {
+          scan.addFamily(colMap.familyNameBytes);
+          addedFamilies.add(colMap.familyName);
+        } else {
+          if(!addedFamilies.contains(colMap.familyName)){
+            // add only if the corresponding family has not already been added
+            scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+          }
+        }
+
+        empty = false;
+      }
+    }
+
+    // The HBase table's row key maps to a Hive table column. In the corner case when only the
+    // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
+    // column qualifier will have been added to the scan. We arbitrarily add at least one column
+    // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
+    // tables column projection.
+    if (empty) {
+      for (ColumnMapping colMap: columnMappings) {
+        if (colMap.hbaseRowKey) {
+          continue;
+        }
+
+        if (colMap.qualifierName == null) {
+          scan.addFamily(colMap.familyNameBytes);
+        } else {
+          scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+        }
+
+        if (!readAllColumns) {
+          break;
+        }
+      }
+    }
+
+    String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE);
+    if (scanCache != null) {
+      scan.setCaching(Integer.valueOf(scanCache));
+    }
+    String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS);
+    if (scanCacheBlocks != null) {
+      scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks));
+    }
+    String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH);
+    if (scanBatch != null) {
+      scan.setBatch(Integer.valueOf(scanBatch));
+    }
+    return scan;
+  }
+
+  public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{
+
+    String[] mapInfo = spec.split("#");
+    boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat);
+
+    switch (mapInfo.length) {
+      case 1:
+        return tblLevelDefault;
+
+      case 2:
+        String storageType = mapInfo[1];
+        if(storageType.equals("-")) {
+          return tblLevelDefault;
+        } else if ("string".startsWith(storageType)){
+          return false;
+        } else if ("binary".startsWith(storageType)){
+          return true;
+        }
+
+      default:
+        throw new IOException("Malformed string: " + spec);
+    }
+  }
+}

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Mon Aug  4 19:17:53 2014
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -88,90 +87,11 @@ public class HiveHBaseTableInputFormat e
     final Reporter reporter) throws IOException {
 
     HBaseSplit hbaseSplit = (HBaseSplit) split;
-    TableSplit tableSplit = hbaseSplit.getSplit();
-    String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
-    setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
-    String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-    boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
-    List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
-    ColumnMappings columnMappings;
-
-    try {
-      columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
-    } catch (SerDeException e) {
-      throw new IOException(e);
-    }
+    TableSplit tableSplit = hbaseSplit.getTableSplit();
 
-    if (columnMappings.size() < readColIDs.size()) {
-      throw new IOException("Cannot read more columns than the given table contains.");
-    }
+    setHTable(HiveHBaseInputFormatUtil.getTable(jobConf));
+    setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
 
-    boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf);
-    Scan scan = new Scan();
-    boolean empty = true;
-
-    // The list of families that have been added to the scan
-    List<String> addedFamilies = new ArrayList<String>();
-
-    if (!readAllColumns) {
-      ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping();
-      for (int i : readColIDs) {
-        ColumnMapping colMap = columnsMapping[i];
-        if (colMap.hbaseRowKey) {
-          continue;
-        }
-
-        if (colMap.qualifierName == null) {
-          scan.addFamily(colMap.familyNameBytes);
-          addedFamilies.add(colMap.familyName);
-        } else {
-          if(!addedFamilies.contains(colMap.familyName)){
-            // add only if the corresponding family has not already been added
-            scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
-          }
-        }
-
-        empty = false;
-      }
-    }
-
-    // The HBase table's row key maps to a Hive table column. In the corner case when only the
-    // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
-    // column qualifier will have been added to the scan. We arbitrarily add at least one column
-    // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
-    // tables column projection.
-    if (empty) {
-      for (ColumnMapping colMap: columnMappings) {
-        if (colMap.hbaseRowKey) {
-          continue;
-        }
-
-        if (colMap.qualifierName == null) {
-          scan.addFamily(colMap.familyNameBytes);
-        } else {
-          scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
-        }
-
-        if (!readAllColumns) {
-          break;
-        }
-      }
-    }
-
-    String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE);
-    if (scanCache != null) {
-      scan.setCaching(Integer.valueOf(scanCache));
-    }
-    String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS);
-    if (scanCacheBlocks != null) {
-      scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks));
-    }
-    String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH);
-    if (scanBatch != null) {
-      scan.setBatch(Integer.valueOf(scanBatch));
-    }
-
-    setScan(scan);
     Job job = new Job(jobConf);
     TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
         job.getConfiguration(), reporter);
@@ -443,12 +363,12 @@ public class HiveHBaseTableInputFormat e
     boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
 
     if (hbaseColumnsMapping == null) {
-      throw new IOException("hbase.columns.mapping required for HBase Table.");
+      throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
     }
 
     ColumnMappings columnMappings = null;
     try {
-      columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching);
+      columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
     } catch (SerDeException e) {
       throw new IOException(e);
     }
@@ -463,10 +383,9 @@ public class HiveHBaseTableInputFormat e
     // definition into account and excludes regions which don't satisfy
     // the start/stop row conditions (HBASE-1829).
     Scan scan = createFilterScan(jobConf, iKey,
-        getStorageFormatOfKey(keyMapping.mappingSpec,
+        HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
             jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
 
-
     // The list of families that have been added to the scan
     List<String> addedFamilies = new ArrayList<String>();
 
@@ -503,28 +422,4 @@ public class HiveHBaseTableInputFormat e
 
     return results;
   }
-
-  private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{
-
-    String[] mapInfo = spec.split("#");
-    boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false;
-
-    switch (mapInfo.length) {
-    case 1:
-      return tblLevelDefault;
-
-    case 2:
-      String storageType = mapInfo[1];
-      if(storageType.equals("-")) {
-        return tblLevelDefault;
-      } else if ("string".startsWith(storageType)){
-        return false;
-      } else if ("binary".startsWith(storageType)){
-        return true;
-      }
-
-    default:
-      throw new IOException("Malformed string: " + spec);
-    }
-  }
 }

Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableSnapshotInputFormat.java Mon Aug  4 19:17:53 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HiveHBaseTableSnapshotInputFormat
+    implements InputFormat<ImmutableBytesWritable, ResultWritable> {
+
+  TableSnapshotInputFormat delegate = new TableSnapshotInputFormat();
+
+  private static void setColumns(JobConf job) throws IOException {
+    // hbase mapred API doesn't support scan at the moment.
+    Scan scan = HiveHBaseInputFormatUtil.getScan(job);
+    byte[][] families = scan.getFamilies();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < families.length; i++) {
+      if (i > 0) sb.append(" ");
+      sb.append(Bytes.toString(families[i]));
+    }
+    job.set(TableInputFormat.COLUMN_LIST, sb.toString());
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    setColumns(job);
+
+    // hive depends on FileSplits, so wrap in HBaseSplit
+    Path[] tablePaths = FileInputFormat.getInputPaths(job);
+
+    InputSplit [] results = delegate.getSplits(job, numSplits);
+    for (int i = 0; i < results.length; i++) {
+      results[i] = new HBaseSplit(results[i], tablePaths[0]);
+    }
+
+    return results;
+  }
+
+  @Override
+  public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    setColumns(job);
+    final RecordReader<ImmutableBytesWritable, Result> rr =
+      delegate.getRecordReader(((HBaseSplit) split).getSnapshotSplit(), job, reporter);
+
+    return new RecordReader<ImmutableBytesWritable, ResultWritable>() {
+      @Override
+      public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException {
+        return rr.next(key, value.getResult());
+      }
+
+      @Override
+      public ImmutableBytesWritable createKey() {
+        return rr.createKey();
+      }
+
+      @Override
+      public ResultWritable createValue() {
+        return new ResultWritable(rr.createValue());
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return rr.getPos();
+      }
+
+      @Override
+      public void close() throws IOException {
+        rr.close();
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        return rr.getProgress();
+      }
+    };
+  }
+}

Added: hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q (added)
+++ hive/trunk/hbase-handler/src/test/queries/positive/hbase_handler_snapshot.q Mon Aug  4 19:17:53 2014
@@ -0,0 +1,4 @@
+SET hive.hbase.snapshot.name=src_hbase_snapshot;
+SET hive.hbase.snapshot.restoredir=/tmp;
+
+SELECT * FROM src_hbase LIMIT 5;

Modified: hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out Mon Aug  4 19:17:53 2014
@@ -63,8 +63,8 @@ Table Parameters:	 	 
 	 	 
 # Storage Information	 	 
 SerDe Library:      	org.apache.hadoop.hive.hbase.HBaseSerDe	 
-InputFormat:        	org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat	 
-OutputFormat:       	org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat	 
+InputFormat:        	null                	 
+OutputFormat:       	null                	 
 Compressed:         	No                  	 
 Num Buckets:        	-1                  	 
 Bucket Columns:     	[]                  	 

Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out Mon Aug  4 19:17:53 2014
@@ -63,8 +63,8 @@ Table Parameters:	 	 
 	 	 
 # Storage Information	 	 
 SerDe Library:      	org.apache.hadoop.hive.hbase.HBaseSerDe	 
-InputFormat:        	org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat	 
-OutputFormat:       	org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat	 
+InputFormat:        	null                	 
+OutputFormat:       	null                	 
 Compressed:         	No                  	 
 Num Buckets:        	-1                  	 
 Bucket Columns:     	[]                  	 
@@ -238,8 +238,8 @@ Table Parameters:	 	 
 	 	 
 # Storage Information	 	 
 SerDe Library:      	org.apache.hadoop.hive.hbase.HBaseSerDe	 
-InputFormat:        	org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat	 
-OutputFormat:       	org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat	 
+InputFormat:        	null                	 
+OutputFormat:       	null                	 
 Compressed:         	No                  	 
 Num Buckets:        	-1                  	 
 Bucket Columns:     	[]                  	 

Added: hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out?rev=1615730&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out (added)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_handler_snapshot.q.out Mon Aug  4 19:17:53 2014
@@ -0,0 +1,13 @@
+PREHOOK: query: SELECT * FROM src_hbase LIMIT 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_hbase
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM src_hbase LIMIT 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_hbase
+#### A masked pattern was here ####
+0	val_0
+10	val_10
+100	val_100
+103	val_103
+104	val_104

Modified: hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm (original)
+++ hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm Mon Aug  4 19:17:53 2014
@@ -27,7 +27,6 @@ import java.util.*;
 import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
 import org.apache.hadoop.hive.hbase.HBaseQTestUtil;
 import org.apache.hadoop.hive.hbase.HBaseTestSetup;
-import org.apache.hadoop.hive.ql.session.SessionState;
 
 public class $className extends TestCase {
 

Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java Mon Aug  4 19:17:53 2014
@@ -17,24 +17,98 @@
  */
 package org.apache.hadoop.hive.hbase;
 
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.QTestUtil;
-import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType;
+
+import java.util.List;
 
 /**
  * HBaseQTestUtil initializes HBase-specific test fixtures.
  */
 public class HBaseQTestUtil extends QTestUtil {
+
+  /** Name of the HBase table, in both Hive and HBase. */
+  public static String HBASE_SRC_NAME = "src_hbase";
+
+  /** Name of the table snapshot. */
+  public static String HBASE_SRC_SNAPSHOT_NAME = "src_hbase_snapshot";
+
+  /** A handle to this harness's cluster */
+  private final HConnection conn;
+
   public HBaseQTestUtil(
     String outDir, String logDir, MiniClusterType miniMr, HBaseTestSetup setup)
     throws Exception {
 
     super(outDir, logDir, miniMr, null);
     setup.preTest(conf);
+    this.conn = setup.getConnection();
     super.init();
   }
 
+  /** return true when HBase table snapshot exists, false otherwise. */
+  private static boolean hbaseTableSnapshotExists(HBaseAdmin admin, String snapshotName) throws
+      Exception {
+    List<HBaseProtos.SnapshotDescription> snapshots =
+      admin.listSnapshots(".*" + snapshotName + ".*");
+    for (HBaseProtos.SnapshotDescription sn : snapshots) {
+      if (sn.getName().equals(HBASE_SRC_SNAPSHOT_NAME)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public void init() throws Exception {
     // defer
   }
+
+  @Override
+  public void createSources() throws Exception {
+    super.createSources();
+
+    conf.setBoolean("hive.test.init.phase", true);
+
+    // create and load the input data into the hbase table
+    runCreateTableCmd(
+      "CREATE TABLE " + HBASE_SRC_NAME + "(key INT, value STRING)"
+        + "  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'"
+        + "  WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf:val')"
+        + "  TBLPROPERTIES ('hbase.table.name' = '" + HBASE_SRC_NAME + "')"
+    );
+    runCmd("INSERT OVERWRITE TABLE " + HBASE_SRC_NAME + " SELECT * FROM src");
+
+    // create a snapshot
+    HBaseAdmin admin = null;
+    try {
+      admin = new HBaseAdmin(conn.getConfiguration());
+      admin.snapshot(HBASE_SRC_SNAPSHOT_NAME, HBASE_SRC_NAME);
+    } finally {
+      if (admin != null) admin.close();
+    }
+
+    conf.setBoolean("hive.test.init.phase", false);
+  }
+
+  @Override
+  public void cleanUp() throws Exception {
+    super.cleanUp();
+
+    // drop in case leftover from unsuccessful run
+    db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, HBASE_SRC_NAME);
+
+    HBaseAdmin admin = null;
+    try {
+      admin = new HBaseAdmin(conn.getConfiguration());
+      if (hbaseTableSnapshotExists(admin, HBASE_SRC_SNAPSHOT_NAME)) {
+        admin.deleteSnapshot(HBASE_SRC_SNAPSHOT_NAME);
+      }
+    } finally {
+      if (admin != null) admin.close();
+    }
+  }
 }

Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Mon Aug  4 19:17:53 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.hbase;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.Arrays;
@@ -29,12 +28,13 @@ import junit.framework.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -50,6 +50,7 @@ public class HBaseTestSetup extends Test
   private MiniHBaseCluster hbaseCluster;
   private int zooKeeperPort;
   private String hbaseRoot;
+  private HConnection hbaseConn;
 
   private static final int NUM_REGIONSERVERS = 1;
 
@@ -57,6 +58,10 @@ public class HBaseTestSetup extends Test
     super(test);
   }
 
+  public HConnection getConnection() {
+    return this.hbaseConn;
+  }
+
   void preTest(HiveConf conf) throws Exception {
 
     setUpFixtures(conf);
@@ -97,27 +102,23 @@ public class HBaseTestSetup extends Test
     hbaseConf.setInt("hbase.regionserver.info.port", -1);
     hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS);
     conf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort());
+    hbaseConn = HConnectionManager.createConnection(hbaseConf);
+
     // opening the META table ensures that cluster is running
-    new HTable(hbaseConf, HConstants.META_TABLE_NAME);
-    createHBaseTable(hbaseConf);
+    HTableInterface meta = null;
+    try {
+      meta = hbaseConn.getTable(TableName.META_TABLE_NAME);
+    } finally {
+      if (meta != null) meta.close();
+    }
+    createHBaseTable();
   }
 
-  private void createHBaseTable(Configuration hbaseConf) throws IOException {
+  private void createHBaseTable() throws IOException {
     final String HBASE_TABLE_NAME = "HiveExternalTable";
     HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes());
     HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes());
     htableDesc.addFamily(hcolDesc);
-    HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConf);
-    if(Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)){
-      // if table is already in there, don't recreate.
-      return;
-    }
-    hbaseAdmin.createTable(htableDesc);
-    HTable htable = new HTable(hbaseConf, HBASE_TABLE_NAME);
-
-    // data
-    Put [] puts = new Put [] {
-        new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes()) };
 
     boolean [] booleans = new boolean [] { true, false, true };
     byte [] bytes = new byte [] { Byte.MIN_VALUE, -1, Byte.MAX_VALUE };
@@ -128,18 +129,37 @@ public class HBaseTestSetup extends Test
     float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE };
     double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE };
 
-    // store data
-    for (int i = 0; i < puts.length; i++) {
-      puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i]));
-      puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte [] { bytes[i] });
-      puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i]));
-      puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i]));
-      puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i]));
-      puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i]));
-      puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i]));
-      puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i]));
-
-      htable.put(puts[i]);
+    HBaseAdmin hbaseAdmin = null;
+    HTableInterface htable = null;
+    try {
+      hbaseAdmin = new HBaseAdmin(hbaseConn.getConfiguration());
+      if (Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)) {
+        // if table is already in there, don't recreate.
+        return;
+      }
+      hbaseAdmin.createTable(htableDesc);
+      htable = hbaseConn.getTable(HBASE_TABLE_NAME);
+
+      // data
+      Put[] puts = new Put[]{
+        new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes())};
+
+      // store data
+      for (int i = 0; i < puts.length; i++) {
+        puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i]));
+        puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte[]{bytes[i]});
+        puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i]));
+        puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i]));
+        puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i]));
+        puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i]));
+        puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i]));
+        puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i]));
+
+        htable.put(puts[i]);
+      }
+    } finally {
+      if (htable != null) htable.close();
+      if (hbaseAdmin != null) hbaseAdmin.close();
     }
   }
 
@@ -152,6 +172,10 @@ public class HBaseTestSetup extends Test
 
   @Override
   protected void tearDown() throws Exception {
+    if (hbaseConn != null) {
+      hbaseConn.close();
+      hbaseConn = null;
+    }
     if (hbaseCluster != null) {
       HConnectionManager.deleteAllConnections(true);
       hbaseCluster.shutdown();

Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Mon Aug  4 19:17:53 2014
@@ -130,7 +130,7 @@ public class QTestUtil {
   public static final HashSet<String> srcTables = new HashSet<String>();
   private static MiniClusterType clusterType = MiniClusterType.none;
   private ParseDriver pd;
-  private Hive db;
+  protected Hive db;
   protected HiveConf conf;
   private Driver drv;
   private BaseSemanticAnalyzer sem;
@@ -630,7 +630,7 @@ public class QTestUtil {
     return;
   }
 
-  private void runCreateTableCmd(String createTableCmd) throws Exception {
+  protected void runCreateTableCmd(String createTableCmd) throws Exception {
     int ecode = 0;
     ecode = drv.run(createTableCmd).getResponseCode();
     if (ecode != 0) {
@@ -641,7 +641,7 @@ public class QTestUtil {
     return;
   }
 
-  private void runCmd(String cmd) throws Exception {
+  protected void runCmd(String cmd) throws Exception {
     int ecode = 0;
     ecode = drv.run(cmd).getResponseCode();
     drv.close();

Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Mon Aug  4 19:17:53 2014
@@ -113,8 +113,8 @@
     <hadoop-20S.version>1.2.1</hadoop-20S.version>
     <hadoop-23.version>2.4.0</hadoop-23.version>
     <hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
-    <hbase.hadoop1.version>0.96.0-hadoop1</hbase.hadoop1.version>
-    <hbase.hadoop2.version>0.96.0-hadoop2</hbase.hadoop2.version>
+    <hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version>
+    <hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version>
     <!-- httpcomponents are not always in version sync -->
     <httpcomponents.client.version>4.2.5</httpcomponents.client.version>
     <httpcomponents.core.version>4.2.5</httpcomponents.core.version>
@@ -774,7 +774,7 @@
             <test.warehouse.dir>${test.warehouse.scheme}${test.warehouse.dir}</test.warehouse.dir>
             <java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
             <!-- EnforceReadOnlyTables hook and QTestUtil -->
-            <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc</test.src.tables>
+            <test.src.tables>src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase</test.src.tables>
             <java.security.krb5.conf>${test.tmp.dir}/conf/krb5.conf</java.security.krb5.conf>
           </systemPropertyVariables>
         </configuration>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1615730&r1=1615729&r2=1615730&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Mon Aug  4 19:17:53 2014
@@ -3924,12 +3924,16 @@ public class DDLTask extends Task<DDLWor
     tbl.setInputFormatClass(crtTbl.getInputFormat());
     tbl.setOutputFormatClass(crtTbl.getOutputFormat());
 
-    tbl.getTTable().getSd().setInputFormat(
-        tbl.getInputFormatClass().getName());
-    tbl.getTTable().getSd().setOutputFormat(
-        tbl.getOutputFormatClass().getName());
+    // only persist input/ouput format to metadata when it is explicitly specified.
+    // Otherwise, load lazily via StorageHandler at query time.
+    if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) {
+      tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName());
+    }
+    if (crtTbl.getOutputFormat() != null && !crtTbl.getOutputFormat().isEmpty()) {
+      tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName());
+    }
 
-    if (!Utilities.isDefaultNameNode(conf)) {
+    if (!Utilities.isDefaultNameNode(conf) && tbl.getTTable().getSd().isSetLocation()) {
       // If location is specified - ensure that it is a full qualified name
       makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tbl.getTableName());
     }



Mime
View raw message