incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tof...@apache.org
Subject svn commit: r1232664 - in /incubator/hcatalog/branches/branch-0.3: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Wed, 18 Jan 2012 00:18:03 GMT
Author: toffer
Date: Wed Jan 18 00:18:03 2012
New Revision: 1232664

URL: http://svn.apache.org/viewvc?rev=1232664&view=rev
Log:
HCATALOG-192 HBase output storage driver integration with zookeeper based revision manager (avandana via toffer)

Added:
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Modified:
    incubator/hcatalog/branches/branch-0.3/CHANGES.txt
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
    incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java

Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/CHANGES.txt?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.3/CHANGES.txt Wed Jan 18 00:18:03 2012
@@ -23,6 +23,8 @@ Release 0.3.0 (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-192. HBase output storage driver integration with zookeeper based revision manager (avandana via toffer)
+
   HCAT-193. Snapshot class for HCatalog tables. (avandana via toffer) 
 
   HCAT-87. Newly added partition should inherit table properties. (hashutosh at HIVE-2589 via khorgath)

Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (original)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java Wed Jan 18 00:18:03 2012
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.hbase.HBas
 import org.apache.hcatalog.common.HCatConstants;
 
 /**
- * Constants class for constants used in Ht
+ * Constants class for constants used in HBase storage driver.
  */
 class HBaseConstants {
 
@@ -38,4 +38,7 @@ class HBaseConstants {
     /** key used to define wether bulk storage driver will be used or not  */
     public static final String PROPERTY_OSD_BULK_MODE_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.output.bulkMode";
 
+    /** key used to define the hbase table snapshot. */
+    public static final String PROPERTY_TABLE_SNAPSHOT_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + "hbase.table.snapshot";
+
 }

Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Wed Jan 18 00:18:03 2012
@@ -26,10 +26,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import com.facebook.fb303.FacebookBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -50,14 +50,23 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
 import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.storagehandler.HCatStorageHandler;
 import org.apache.thrift.TBase;
 import org.apache.zookeeper.ZooKeeper;
 
+import com.facebook.fb303.FacebookBase;
+
 /**
  * This class HBaseHCatStorageHandler provides functionality to create HBase
  * tables through HCatalog. The implementation is very similar to the
@@ -191,8 +200,10 @@ public class HBaseHCatStorageHandler ext
                     uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
 
                     for (String columnFamily : uniqueColumnFamilies) {
-                        tableDesc.addFamily(new HColumnDescriptor(Bytes
-                                .toBytes(columnFamily)));
+                        HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes
+                                .toBytes(columnFamily));
+                        familyDesc.setMaxVersions(Integer.MAX_VALUE);
+                        tableDesc.addFamily(familyDesc);
                     }
 
                     getHBaseAdmin().createTable(tableDesc);
@@ -435,4 +446,131 @@ public class HBaseHCatStorageHandler ext
                 FacebookBase.class);
     }
 
+
+    /**
+     * Creates the latest snapshot of the table.
+     *
+     * @param jobConf The job configuration.
+     * @param hbaseTableName The fully qualified name of the HBase table.
+     * @return An instance of HCatTableSnapshot
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static HCatTableSnapshot createSnapshot(Configuration jobConf,
+            String hbaseTableName ) throws IOException {
+
+        RevisionManager rm = null;
+        TableSnapshot snpt;
+        try {
+            rm = getOpenedRevisionManager(jobConf);
+            snpt = rm.createSnapshot(hbaseTableName);
+        } finally {
+            if (rm != null)
+                rm.close();
+        }
+
+        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        if(inputJobString == null){
+            throw new IOException(
+                    "InputJobInfo information not found in JobContext. "
+                            + "HCatInputFormat.setInput() not called?");
+        }
+        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
+        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
+                .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+        return hcatSnapshot;
+    }
+
+    /**
+     * Creates the snapshot using the revision specified by the user.
+     *
+     * @param jobConf The job configuration.
+     * @param tableName The fully qualified name of the table whose snapshot is being taken.
+     * @param revision The revision number to use for the snapshot.
+     * @return An instance of HCatTableSnapshot.
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static HCatTableSnapshot createSnapshot(Configuration jobConf,
+            String tableName, long revision)
+            throws IOException {
+
+        TableSnapshot snpt;
+        RevisionManager rm = null;
+        try {
+            rm = getOpenedRevisionManager(jobConf);
+            snpt = rm.createSnapshot(tableName, revision);
+        } finally {
+            if (rm != null)
+                rm.close();
+        }
+
+        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        if(inputJobString == null){
+            throw new IOException(
+                    "InputJobInfo information not found in JobContext. "
+                            + "HCatInputFormat.setInput() not called?");
+        }
+        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
+        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
+                .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+        return hcatSnapshot;
+    }
+
+    /**
+     * Gets an instance of revision manager which is opened.
+     *
+     * @param jobConf The job configuration.
+     * @return RevisionManager An instance of revision manager.
+     * @throws IOException
+     */
+    static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
+
+        Properties properties = new Properties();
+        String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
+        int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
+                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+
+        if (zkHostList != null) {
+            String[] splits = zkHostList.split(",");
+            StringBuffer sb = new StringBuffer();
+            for (String split : splits) {
+                sb.append(split);
+                sb.append(':');
+                sb.append(port);
+                sb.append(',');
+            }
+
+            sb.deleteCharAt(sb.length() - 1);
+            properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+        }
+        String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
+        if (dataDir != null) {
+            properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
+        }
+        String rmClassName = jobConf.get(
+                RevisionManager.REVISION_MGR_IMPL_CLASS,
+                ZKBasedRevisionManager.class.getName());
+        properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
+        RevisionManager revisionManger = RevisionManagerFactory
+                .getRevisionManager(properties);
+        revisionManger.open();
+        return revisionManger;
+    }
+
+    /**
+     * Set snapshot as a property.
+     *
+     * @param snapshot The HCatTableSnapshot to be passed to the job.
+     * @param inpJobInfo The InputJobInfo for the job.
+     * @throws IOException
+     */
+    public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo inpJobInfo)
+            throws IOException {
+        String serializedSnp = HCatUtil.serialize(snapshot);
+        inpJobInfo.getProperties().setProperty(
+                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
+    }
+
+
 }

Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Wed Jan 18 00:18:03 2012
@@ -21,39 +21,46 @@ package org.apache.hcatalog.hbase;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
 
 /**
  * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
  */
-class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
-    
+class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> implements Configurable{
+
     private final TableInputFormat inputFormat;
-    
-    public HBaseInputFormat() {
+    private final InputJobInfo jobInfo;
+    private Configuration conf;
+
+    public HBaseInputFormat(InputJobInfo jobInfo) {
         inputFormat = new TableInputFormat();
+        this.jobInfo = jobInfo;
     }
-    
+
     /*
      * @param instance of InputSplit
-     * 
+     *
      * @param instance of TaskAttemptContext
-     * 
+     *
      * @return RecordReader
-     * 
+     *
      * @throws IOException
-     * 
+     *
      * @throws InterruptedException
-     * 
+     *
      * @see
      * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
      * .hadoop.mapreduce.InputSplit,
@@ -63,18 +70,28 @@ class HBaseInputFormat extends InputForm
     public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
             InputSplit split, TaskAttemptContext tac) throws IOException,
             InterruptedException {
-        return inputFormat.createRecordReader(split, tac);
+
+          String tableName = inputFormat.getConf().get(TableInputFormat.INPUT_TABLE);
+          TableSplit tSplit = (TableSplit) split;
+          HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(jobInfo);
+          Scan sc = new Scan(inputFormat.getScan());
+          sc.setStartRow(tSplit.getStartRow());
+          sc.setStopRow(tSplit.getEndRow());
+          recordReader.setScan(sc);
+          recordReader.setHTable(new HTable(this.conf, tableName));
+          recordReader.init();
+          return recordReader;
     }
-    
+
     /*
      * @param jobContext
-     * 
+     *
      * @return List of InputSplit
-     * 
+     *
      * @throws IOException
-     * 
+     *
      * @throws InterruptedException
-     * 
+     *
      * @see
      * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
      * .JobContext)
@@ -82,19 +99,33 @@ class HBaseInputFormat extends InputForm
     @Override
     public List<InputSplit> getSplits(JobContext jobContext)
             throws IOException, InterruptedException {
+
+        String tableName = this.conf.get(TableInputFormat.INPUT_TABLE);
+        if (tableName == null) {
+           throw new IOException("The input table is not set. The input splits cannot be created.");
+        }
         return inputFormat.getSplits(jobContext);
     }
-    
+
     public void setConf(Configuration conf) {
+        this.conf = conf;
         inputFormat.setConf(conf);
     }
-    
+
     public Scan getScan() {
         return inputFormat.getScan();
     }
-    
+
     public void setScan(Scan scan) {
         inputFormat.setScan(scan);
     }
-    
+
+    /* @return
+     * @see org.apache.hadoop.conf.Configurable#getConf()
+     */
+    @Override
+    public Configuration getConf() {
+       return this.conf;
+    }
+
 }

Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (original)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java Wed Jan 18 00:18:03 2012
@@ -19,6 +19,8 @@
 package org.apache.hcatalog.hbase;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -27,9 +29,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -37,22 +41,28 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+
 
 /**
  * The Class HBaseInputStorageDriver enables reading of HBase tables through
  * HCatalog.
  */
 public class HBaseInputStorageDriver extends HCatInputStorageDriver {
-    private HCatTableInfo   tableInfo;
+
+    private InputJobInfo inpJobInfo;
     private ResultConverter converter;
-    private HCatSchema      outputColSchema;
-    private HCatSchema      dataSchema;
-    private Configuration   jobConf;
-    private String          scanColumns;
+    private HCatSchema outputColSchema;
+    private HCatSchema dataSchema;
+    private Configuration jobConf;
+    private String scanColumns;
+    private HCatTableSnapshot snapshot;
 
     /*
      * @param JobContext
@@ -64,6 +74,7 @@ public class HBaseInputStorageDriver ext
      */
     @Override
     public void initialize(JobContext context, Properties hcatProperties) throws IOException {
+
         jobConf = context.getConfiguration();
         String jobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
         if (jobString == null) {
@@ -71,9 +82,8 @@ public class HBaseInputStorageDriver ext
                     "InputJobInfo information not found in JobContext. "
                             + "HCatInputFormat.setInput() not called?");
         }
-        InputJobInfo jobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
-        tableInfo = jobInfo.getTableInfo();
-        dataSchema = tableInfo.getDataColumns();
+        inpJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+        dataSchema = inpJobInfo.getTableInfo().getDataColumns();
         List<FieldSchema> fields = HCatUtil.getFieldSchemaList(dataSchema
                 .getFields());
         hcatProperties.setProperty(Constants.LIST_COLUMNS,
@@ -83,6 +93,19 @@ public class HBaseInputStorageDriver ext
         converter = new HBaseSerDeResultConverter(dataSchema, outputColSchema,
                 hcatProperties);
         scanColumns = converter.getHBaseScanColumns();
+        String hbaseTableName = HBaseHCatStorageHandler
+                .getFullyQualifiedName(inpJobInfo.getTableInfo());
+        String serSnapshot = (String) inpJobInfo.getProperties().get(
+                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+        if(serSnapshot == null){
+        snapshot = HBaseHCatStorageHandler.createSnapshot(jobConf,
+                hbaseTableName);
+        inpJobInfo.getProperties().setProperty(
+                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+                HCatUtil.serialize(snapshot));
+        }
+
+        context.getConfiguration().set(HCatConstants.HCAT_KEY_JOB_INFO, HCatUtil.serialize(inpJobInfo));
 
     }
 
@@ -97,10 +120,13 @@ public class HBaseInputStorageDriver ext
     @Override
     public InputFormat<ImmutableBytesWritable, Result> getInputFormat(
             Properties hcatProperties) {
-        HBaseInputFormat tableInputFormat = new HBaseInputFormat();
-        String hbaseTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+
+        String hbaseTableName = HBaseHCatStorageHandler
+                .getFullyQualifiedName(inpJobInfo.getTableInfo());
+        HBaseInputFormat tableInputFormat = new HBaseInputFormat(inpJobInfo);
         jobConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName);
         jobConf.set(TableInputFormat.SCAN_COLUMNS, scanColumns);
+        jobConf.setInt(TableInputFormat.SCAN_MAXVERSIONS, 1);
         tableInputFormat.setConf(jobConf);
         // TODO: Make the caching configurable by the user
         tableInputFormat.getScan().setCaching(200);
@@ -109,11 +135,11 @@ public class HBaseInputStorageDriver ext
     }
 
     /*
-     * @param baseKey
+     * @param baseKey The key produced by the MR job.
      *
-     * @param baseValue
+     * @param baseValue The value produced by the MR job.
      *
-     * @return HCatRecord
+     * @return HCatRecord An instance of HCatRecord produced by the key, value.
      *
      * @throws IOException
      *
@@ -128,9 +154,9 @@ public class HBaseInputStorageDriver ext
     }
 
     /*
-     * @param jobContext
+     * @param jobContext The jobcontext of MR job
      *
-     * @param howlSchema
+     * @param howlSchema The output schema of the hcat record.
      *
      * @throws IOException
      *
@@ -161,9 +187,9 @@ public class HBaseInputStorageDriver ext
     }
 
     /*
-     * @param jobContext
+     * @param jobContext The jobcontext of MR job.
      *
-     * @param hcatSchema
+     * @param hcatSchema The schema of the hcat record.
      *
      * @throws IOException
      *
@@ -176,4 +202,74 @@ public class HBaseInputStorageDriver ext
             throws IOException {
         this.dataSchema = hcatSchema;
     }
+
+    static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
+            HCatTableInfo hcatTableInfo) throws IOException {
+
+        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+        Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+        HashMap<String, Long> revisionMap = new HashMap<String, Long>();
+
+        for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+            if(hcatHbaseColMap.containsKey(fSchema.getName())){
+                String colFamily = hcatHbaseColMap.get(fSchema.getName());
+                long revisionID = hbaseSnapshot.getRevision(colFamily);
+                revisionMap.put(fSchema.getName(), revisionID);
+            }
+        }
+
+        HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
+                 hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(),revisionMap);
+        return hcatSnapshot;
+    }
+
+    static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
+            HCatTableInfo hcatTableInfo) throws IOException {
+
+        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+        Map<String, Long> revisionMap = new HashMap<String, Long>();
+        Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+        for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+            String colFamily = hcatHbaseColMap.get(fSchema.getName());
+            if (hcatSnapshot.containsColumn(fSchema.getName())) {
+                long revision = hcatSnapshot.getRevision(fSchema.getName());
+                revisionMap.put(colFamily, revision);
+            }
+        }
+
+        String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
+                + hcatSnapshot.getTableName();
+        return new TableSnapshot(fullyQualifiedName, revisionMap);
+
+    }
+
+    private static Map<String, String> getHCatHBaseColumnMapping( HCatTableInfo hcatTableInfo)
+            throws IOException {
+
+        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+        StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
+        String hbaseColumnMapping = storeInfo.getProperties().getProperty(
+                HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+
+        Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
+        List<String> columnFamilies = new ArrayList<String>();
+        List<String> columnQualifiers = new ArrayList<String>();
+        try {
+            HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies,
+                    null, columnQualifiers, null);
+        } catch (SerDeException e) {
+            throw new IOException("Exception while converting snapshots.", e);
+        }
+
+        for (HCatFieldSchema column : hcatTableSchema.getFields()) {
+            int fieldPos = hcatTableSchema.getPosition(column.getName());
+            String colFamily = columnFamilies.get(fieldPos);
+            if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+                hcatHbaseColMap.put(column.getName(), colFamily);
+            }
+        }
+
+        return hcatHbaseColMap;
+    }
+
 }

Added: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1232664&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (added)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Wed Jan 18 00:18:03 2012
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+
+/**
+ * The Class HbaseSnapshotRecordReader implements logic for filtering records
+ * based on snapshot.
+ */
+class HbaseSnapshotRecordReader extends TableRecordReader {
+
+    static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class);
+    private ResultScanner scanner;
+    private Scan  scan;
+    private HTable  htable;
+    private ImmutableBytesWritable key;
+    private Result value;
+    private InputJobInfo inpJobInfo;
+    private TableSnapshot snapshot;
+    private int maxRevisions;
+    private Iterator<Result> resultItr;
+
+
+    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException {
+        this.inpJobInfo = inputJobInfo;
+        String snapshotString = inpJobInfo.getProperties().getProperty(
+                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+        HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
+                .deserialize(snapshotString);
+        this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot,
+                inpJobInfo.getTableInfo());
+        this.maxRevisions = 1;
+    }
+
+    /* @param firstRow The first record in the split.
+    /* @throws IOException
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[])
+     */
+    @Override
+    public void restart(byte[] firstRow) throws IOException {
+        Scan newScan = new Scan(scan);
+        newScan.setStartRow(firstRow);
+        this.scanner = this.htable.getScanner(newScan);
+        resultItr = this.scanner.iterator();
+    }
+
+    /* @throws IOException
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init()
+     */
+    @Override
+    public void init() throws IOException {
+        restart(scan.getStartRow());
+    }
+
+    /*
+     * @param htable The HTable ( of HBase) to use for the record reader.
+     *
+     * @see
+     * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache
+     * .hadoop.hbase.client.HTable)
+     */
+    @Override
+    public void setHTable(HTable htable) {
+        this.htable = htable;
+    }
+
+    /*
+     * @param scan The scan to be used for reading records.
+     *
+     * @see
+     * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache
+     * .hadoop.hbase.client.Scan)
+     */
+    @Override
+    public void setScan(Scan scan) {
+        this.scan = scan;
+    }
+
+    /*
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close()
+     */
+    @Override
+    public void close() {
+        this.resultItr = null;
+        this.scanner.close();
+    }
+
+    /* @return The row of hbase record.
+    /* @throws IOException
+    /* @throws InterruptedException
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey()
+     */
+    @Override
+    public ImmutableBytesWritable getCurrentKey() throws IOException,
+            InterruptedException {
+        return key;
+    }
+
+    /* @return Single row result of scan of HBase table.
+    /* @throws IOException
+    /* @throws InterruptedException
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue()
+     */
+    @Override
+    public Result getCurrentValue() throws IOException, InterruptedException {
+        return value;
+    }
+
+    /* @return Returns whether a next key-value is available for reading.
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue()
+     */
+    @Override
+    public boolean nextKeyValue() {
+
+        if (this.resultItr == null) {
+            LOG.warn("The HBase result iterator is found null. It is possible"
+                    + " that the record reader has already been closed.");
+        } else {
+
+            if (key == null)
+                key = new ImmutableBytesWritable();
+            while (resultItr.hasNext()) {
+                Result temp = resultItr.next();
+                Result hbaseRow = prepareResult(temp.list());
+                if (hbaseRow != null) {
+                    key.set(hbaseRow.getRow());
+                    value = hbaseRow;
+                    return true;
+                }
+
+            }
+        }
+        return false;
+    }
+
+    private Result prepareResult(List<KeyValue> keyvalues) {
+
+        List<KeyValue> finalKeyVals = new ArrayList<KeyValue>();
+        Map<String, List<KeyValue>> qualValMap = new HashMap<String, List<KeyValue>>();
+        for (KeyValue kv : keyvalues) {
+            byte[] cf = kv.getFamily();
+            byte[] qualifier = kv.getQualifier();
+            String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier);
+            List<KeyValue> kvs;
+            if (qualValMap.containsKey(key)) {
+                kvs = qualValMap.get(key);
+            } else {
+                kvs = new ArrayList<KeyValue>();
+            }
+
+            String family = Bytes.toString(kv.getFamily());
+            long desiredTS = snapshot.getRevision(family);
+            if (kv.getTimestamp() <= desiredTS) {
+                kvs.add(kv);
+            }
+            qualValMap.put(key, kvs);
+        }
+
+        Set<String> keys = qualValMap.keySet();
+        for (String cf : keys) {
+            List<KeyValue> kvs = qualValMap.get(cf);
+            if (maxRevisions <= kvs.size()) {
+                for (int i = 0; i < maxRevisions; i++) {
+                    finalKeyVals.add(kvs.get(i));
+                }
+            } else {
+                finalKeyVals.addAll(kvs);
+            }
+        }
+
+        if(finalKeyVals.size() == 0){
+            return null;
+        } else {
+            KeyValue[] kvArray = new KeyValue[finalKeyVals.size()];
+            finalKeyVals.toArray(kvArray);
+            return new Result(kvArray);
+        }
+    }
+
+    /* @return The progress of the record reader.
+     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress()
+     */
+    @Override
+    public float getProgress() {
+        // Depends on the total number of tuples
+        return 0;
+    }
+
+}

Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Wed Jan 18 00:18:03 2012
@@ -38,6 +38,8 @@ public class ZKBasedRevisionManager impl
 
     public static final String HOSTLIST = "revision.manager.zk.HostList";
     public static final String DATADIR = "revision.manager.zk.DataDir";
+    public static final String DEFAULT_DATADIR = "/revision-management";
+    public static final String DEFAULT_HOSTLIST = "localhost:2181";
     private static  int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000;
     private static final Log LOG = LogFactory.getLog(ZKBasedRevisionManager.class);
     private String zkHostList;
@@ -50,8 +52,11 @@ public class ZKBasedRevisionManager impl
      */
     @Override
     public void initialize(Properties properties) {
-        this.zkHostList = properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181");
-        this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management");
+        this.zkHostList = properties.getProperty(
+                ZKBasedRevisionManager.HOSTLIST,
+                ZKBasedRevisionManager.DEFAULT_HOSTLIST);
+        this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,
+                ZKBasedRevisionManager.DEFAULT_DATADIR);
     }
 
     /**

Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java?rev=1232664&r1=1232663&r2=1232664&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (original)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java Wed Jan 18 00:18:03 2012
@@ -18,11 +18,13 @@
 package org.apache.hcatalog.hbase;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -54,6 +56,8 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.junit.Test;
@@ -66,20 +70,34 @@ public class TestHBaseInputStorageDriver
     private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
     private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2");
 
-    List<Put> generatePuts(int num) {
-        List<Put> myPuts = new ArrayList<Put>();
-        for (int i = 0; i < num; i++) {
-            Put put = new Put(Bytes.toBytes("testRow" + i));
-            put.add(FAMILY, QUALIFIER1, 0,
-                    Bytes.toBytes("testQualifier1-" + "textValue-" + i));
-            put.add(FAMILY, QUALIFIER2, 0,
-                    Bytes.toBytes("testQualifier2-" + "textValue-" + i));
-            myPuts.add(put);
+    private List<Put> generatePuts(int num, String tableName) throws IOException {
+
+        List<String> columnFamilies = Arrays.asList("testFamily");
+        RevisionManager rm = null;
+        List<Put> myPuts;
+        try {
+            rm = HBaseHCatStorageHandler
+                    .getOpenedRevisionManager(getHbaseConf());
+            rm.open();
+            myPuts = new ArrayList<Put>();
+            for (int i = 1; i <= num; i++) {
+                Put put = new Put(Bytes.toBytes("testRow"));
+                put.add(FAMILY, QUALIFIER1, i, Bytes.toBytes("textValue-" + i));
+                put.add(FAMILY, QUALIFIER2, i, Bytes.toBytes("textValue-" + i));
+                myPuts.add(put);
+                Transaction tsx = rm.beginWriteTransaction(tableName,
+                        columnFamilies);
+                rm.commitWriteTransaction(tsx);
+            }
+        } finally {
+            if (rm != null)
+                rm.close();
         }
+
         return myPuts;
     }
 
-    public void Initialize() throws Exception {
+   private void Initialize() throws Exception {
         hcatConf = getHiveConf();
         hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
                 HCatSemanticAnalyzer.class.getName());
@@ -102,8 +120,8 @@ public class TestHBaseInputStorageDriver
 
     }
 
-    public void populateHBaseTable(String tName) throws IOException {
-        List<Put> myPuts = generatePuts(10);
+   private void populateHBaseTable(String tName, int revisions) throws IOException {
+        List<Put> myPuts = generatePuts(revisions, tName);
         HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName));
         table.put(myPuts);
     }
@@ -132,7 +150,7 @@ public class TestHBaseInputStorageDriver
         boolean doesTableExist = hAdmin.tableExists(hbaseTableName);
         assertTrue(doesTableExist);
 
-        populateHBaseTable(hbaseTableName);
+        populateHBaseTable(hbaseTableName, 5);
         Configuration conf = new Configuration(hcatConf);
         conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
                 HCatUtil.serialize(getHiveConf().getAllProperties()));
@@ -160,14 +178,15 @@ public class TestHBaseInputStorageDriver
         job.setOutputValueClass(Text.class);
         job.setNumReduceTasks(0);
         assertTrue(job.waitForCompletion(true));
-        assertTrue(MapReadHTable.error == false);
+        assertFalse(MapReadHTable.error);
+        assertEquals(MapReadHTable.count, 1);
 
         String dropTableQuery = "DROP TABLE " + hbaseTableName ;
         CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery);
         assertEquals(0, responseThree.getResponseCode());
 
         boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName);
-        assertTrue(isHbaseTableThere == false);
+        assertFalse(isHbaseTableThere);
 
         String dropDB = "DROP DATABASE " + databaseName;
         CommandProcessorResponse responseFour = hcatDriver.run(dropDB);
@@ -192,7 +211,7 @@ public class TestHBaseInputStorageDriver
         boolean doesTableExist = hAdmin.tableExists(tableName);
         assertTrue(doesTableExist);
 
-        populateHBaseTable(tableName);
+        populateHBaseTable(tableName, 5);
 
         Configuration conf = new Configuration(hcatConf);
         conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
@@ -222,14 +241,15 @@ public class TestHBaseInputStorageDriver
         job.setOutputValueClass(Text.class);
         job.setNumReduceTasks(0);
         assertTrue(job.waitForCompletion(true));
-        assertTrue(MapReadHTable.error == false);
+        assertFalse(MapReadProjHTable.error);
+        assertEquals(MapReadProjHTable.count, 1);
 
         String dropTableQuery = "DROP TABLE " + tableName ;
         CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery);
         assertEquals(0, responseThree.getResponseCode());
 
         boolean isHbaseTableThere = hAdmin.tableExists(tableName);
-        assertTrue(isHbaseTableThere == false);
+        assertFalse(isHbaseTableThere);
     }
 
 
@@ -238,18 +258,20 @@ public class TestHBaseInputStorageDriver
             Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
 
         static boolean error = false;
-
+        static int count = 0;
         @Override
         public void map(ImmutableBytesWritable key, HCatRecord value,
                 Context context) throws IOException, InterruptedException {
+            System.out.println("HCat record value" + value.toString());
             boolean correctValues = (value.size() == 3)
-                    && (value.get(0).toString()).startsWith("testRow")
-                    && (value.get(1).toString()).startsWith("testQualifier1")
-                    && (value.get(2).toString()).startsWith("testQualifier2");
+                    && (value.get(0).toString()).equalsIgnoreCase("testRow")
+                    && (value.get(1).toString()).equalsIgnoreCase("textValue-5")
+                    && (value.get(2).toString()).equalsIgnoreCase("textValue-5");
 
             if (correctValues == false) {
                 error = true;
             }
+            count++;
         }
     }
 
@@ -258,17 +280,19 @@ public class TestHBaseInputStorageDriver
             Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
 
         static boolean error = false;
-
+        static int count = 0;
         @Override
         public void map(ImmutableBytesWritable key, HCatRecord value,
                 Context context) throws IOException, InterruptedException {
+            System.out.println("HCat record value" + value.toString());
             boolean correctValues = (value.size() == 2)
-                    && (value.get(0).toString()).startsWith("testRow")
-                    && (value.get(1).toString()).startsWith("testQualifier1");
+                    && (value.get(0).toString()).equalsIgnoreCase("testRow")
+                    && (value.get(1).toString()).equalsIgnoreCase("textValue-5");
 
             if (correctValues == false) {
                 error = true;
             }
+            count++;
         }
     }
 

Added: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java?rev=1232664&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java (added)
+++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java Wed Jan 18 00:18:03 2012
@@ -0,0 +1,122 @@
+package org.apache.hcatalog.hbase;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.junit.Test;
+
+public class TestSnapshots extends SkeletonHBaseTest {
+    private static HiveConf   hcatConf;
+    private static HCatDriver hcatDriver;
+
+    public void Initialize() throws Exception {
+        hcatConf = getHiveConf();
+        hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        URI fsuri = getFileSystem().getUri();
+        Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(),
+                getTestDir());
+        hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString());
+        hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString());
+
+        //Add hbase properties
+
+        for (Map.Entry<String, String> el : getHbaseConf()) {
+            if (el.getKey().startsWith("hbase.")) {
+                hcatConf.set(el.getKey(), el.getValue());
+            }
+        }
+
+        SessionState.start(new CliSessionState(hcatConf));
+        hcatDriver = new HCatDriver();
+
+    }
+
+    @Test
+    public void TestSnapshotConversion() throws Exception{
+        Initialize();
+        String tableName = newTableName("mytableOne");
+        String databaseName = newTableName("mydatabase");
+        String fullyQualTableName = databaseName + "." + tableName;
+        String db_dir = getTestDir() + "/hbasedb";
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+                            + db_dir + "'";
+        String tableQuery = "CREATE TABLE " + fullyQualTableName
+                              + "(key string, value1 string, value2 string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+                              + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf2:q2')" ;
+
+        CommandProcessorResponse cmdResponse = hcatDriver.run(dbquery);
+        assertEquals(0, cmdResponse.getResponseCode());
+        cmdResponse = hcatDriver.run(tableQuery);
+        assertEquals(0, cmdResponse.getResponseCode());
+
+        InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null, null, null);
+        Configuration conf = new Configuration(hcatConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(getHiveConf().getAllProperties()));
+        Job job = new Job(conf);
+        InitializeInput.setInput(job, inputInfo);
+        String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+        inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+
+        Map<String, Long> revMap = new HashMap<String, Long>();
+        revMap.put("cf1", 3L);
+        revMap.put("cf2", 5L);
+        TableSnapshot hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap);
+        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo());
+
+        assertEquals(hcatSnapshot.getRevision("value1"), 3);
+        assertEquals(hcatSnapshot.getRevision("value2"), 5);
+
+        String dropTable = "DROP TABLE " + fullyQualTableName;
+        cmdResponse = hcatDriver.run(dropTable);
+        assertEquals(0, cmdResponse.getResponseCode());
+
+        tableName = newTableName("mytableTwo");
+        fullyQualTableName = databaseName + "." + tableName;
+        tableQuery = "CREATE TABLE " + fullyQualTableName
+        + "(key string, value1 string, value2 string) STORED BY " +
+        "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+        + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf1:q2')" ;
+        cmdResponse = hcatDriver.run(tableQuery);
+        assertEquals(0, cmdResponse.getResponseCode());
+        revMap.clear();
+        revMap.put("cf1", 3L);
+        hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap);
+        inputInfo = InputJobInfo.create(databaseName, tableName, null, null, null);
+        InitializeInput.setInput(job, inputInfo);
+        modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+        inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+        hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo());
+        assertEquals(hcatSnapshot.getRevision("value1"), 3);
+        assertEquals(hcatSnapshot.getRevision("value2"), 3);
+
+        dropTable = "DROP TABLE " + fullyQualTableName;
+        cmdResponse = hcatDriver.run(dropTable);
+        assertEquals(0, cmdResponse.getResponseCode());
+
+        String dropDatabase = "DROP DATABASE IF EXISTS " + databaseName + "CASCADE";
+        cmdResponse = hcatDriver.run(dropDatabase);
+        assertEquals(0, cmdResponse.getResponseCode());
+    }
+
+}



Mime
View raw message