incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tof...@apache.org
Subject svn commit: r1291925 - in /incubator/hcatalog/trunk: ./ storage-drivers/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-drivers/hbase/src/test/org/apache/hcatalog/hba...
Date Tue, 21 Feb 2012 17:12:41 GMT
Author: toffer
Date: Tue Feb 21 17:12:41 2012
New Revision: 1291925

URL: http://svn.apache.org/viewvc?rev=1291925&view=rev
Log:
HCAT-253 HBase revision manager should configure/drop znodes in the create/drop table meta hooks in the storage handler. (avandana via toffer)

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java
Removed:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/build.xml
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1291925&r1=1291924&r2=1291925&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Feb 21 17:12:41 2012
@@ -33,6 +33,8 @@ Trunk (unreleased changes)
   HCAT-2 Support nested schema conversion between Hive an Pig (julienledem via hashutosh)
 
   IMPROVEMENTS
+  HCAT-253 HBase revision manager should configure/drop znodes in the create/drop table meta hooks in the storage handler. (avandana via toffer)
+
   HCAT-259 Make readFields() and write() in LazyHCatRecord work (gates via khorgath)
 
   HCAT-194. Better error messages for HCatalog access control errors (julienledem via hashutosh)  

Modified: incubator/hcatalog/trunk/storage-drivers/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/build.xml?rev=1291925&r1=1291924&r2=1291925&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/build.xml Tue Feb 21 17:12:41 2012
@@ -24,10 +24,8 @@
             <isset property="dist.drivers.dir"/>
         </condition>
         <!-- add storage drivers here -->
-<!--  TODO migrate hbase storageHandler
         <echo>Executing storage-driver &quot;${target}&quot; for hbase</echo>
         <ant target="${target}" dir="hbase" inheritAll="false" useNativeBasedir="true"/>
--->
     </target>
 
 

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1291925&r1=1291924&r2=1291925&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Tue Feb 21 17:12:41 2012
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hive.hbase.HBaseSerDe;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.Constants;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -49,6 +48,9 @@ import org.apache.hadoop.hive.ql.securit
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
@@ -57,13 +59,11 @@ import org.apache.hcatalog.hbase.snapsho
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
-import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.storagehandler.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
 import org.apache.thrift.TBase;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -74,7 +74,7 @@ import com.facebook.fb303.FacebookBase;
  * tables through HCatalog. The implementation is very similar to the
  * HiveHBaseStorageHandler, with more details to suit HCatalog.
  */
-public class HBaseHCatStorageHandler extends HCatStorageHandler {
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook {
 
     final static public String DEFAULT_PREFIX = "default.";
 
@@ -82,36 +82,24 @@ public class HBaseHCatStorageHandler ext
 
     private HBaseAdmin         admin;
 
-    /*
-     * @return subclass of HCatInputStorageDriver
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #getInputStorageDriver()
-     */
     @Override
-    public Class<? extends HCatInputStorageDriver> getInputStorageDriver() {
-        return HBaseInputStorageDriver.class;
+    public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+        //TODO complete rework and fill this in
     }
 
-    /*
-     * @return subclass of HCatOutputStorageDriver
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #getOutputStorageDriver()
-     */
     @Override
-    public Class<? extends HCatOutputStorageDriver> getOutputStorageDriver() {
-        return HBaseOutputStorageDriver.class;
+    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+        //TODO complete rework and fill this in
     }
 
     /*
-     * @return instance of HiveAuthorizationProvider
-     *
-     * @throws HiveException
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#
-     * getAuthorizationProvider()
-     */
+    * @return instance of HiveAuthorizationProvider
+    *
+    * @throws HiveException
+    *
+    * @see org.apache.hcatalog.storagehandler.HCatStorageHandler#
+    * getAuthorizationProvider()
+    */
     @Override
     public HiveAuthorizationProvider getAuthorizationProvider()
             throws HiveException {
@@ -191,14 +179,13 @@ public class HBaseHCatStorageHandler ext
                     hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
 
             HTableDescriptor tableDesc;
-
+            Set<String> uniqueColumnFamilies = new HashSet<String>();
             if (!getHBaseAdmin().tableExists(tableName)) {
                 // if it is not an external table then create one
                 if (!isExternal) {
                     // Create the column descriptors
                     tableDesc = new HTableDescriptor(tableName);
-                    Set<String> uniqueColumnFamilies = new HashSet<String>(
-                            hbaseColumnFamilies);
+                    uniqueColumnFamilies.addAll(hbaseColumnFamilies);
                     uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
 
                     for (String columnFamily : uniqueColumnFamilies) {
@@ -242,6 +229,15 @@ public class HBaseHCatStorageHandler ext
 
             // ensure the table is online
             new HTable(hbaseConf, tableDesc.getName());
+
+            //Set up znodes in revision manager.
+            RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+            if (rm instanceof ZKBasedRevisionManager) {
+                ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
+                zkRM.setUpZNodes(tableName, new ArrayList<String>(
+                        uniqueColumnFamilies));
+            }
+
         } catch (MasterNotRunningException mnre) {
             throw new MetaException(StringUtils.stringifyException(mnre));
         } catch (IOException ie) {
@@ -299,34 +295,35 @@ public class HBaseHCatStorageHandler ext
         return this;
     }
 
-    /*
-     * @param tableDesc
-     *
-     * @param jobProperties
-     *
-     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-     * #configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
-     * java.util.Map)
-     */
-    @Override
-    public void configureTableJobProperties(TableDesc tableDesc,
-            Map<String, String> jobProperties) {
-        Properties tableProperties = tableDesc.getProperties();
-
-        jobProperties.put(HBaseSerDe.HBASE_COLUMNS_MAPPING,
-                tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
-
-        String tableName = tableProperties
-                .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
-        if (tableName == null) {
-            tableName = tableProperties.getProperty(Constants.META_TABLE_NAME);
-            if (tableName.startsWith(DEFAULT_PREFIX)) {
-                tableName = tableName.substring(DEFAULT_PREFIX.length());
-            }
-        }
-        jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
-
-    }
+//TODO finish rework remove this
+//    /*
+//     * @param tableDesc
+//     *
+//     * @param jobProperties
+//     *
+//     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
+//     * #configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
+//     * java.util.Map)
+//     */
+//    @Override
+//    public void configureTableJobProperties(TableDesc tableDesc,
+//            Map<String, String> jobProperties) {
+//        Properties tableProperties = tableDesc.getProperties();
+//
+//        jobProperties.put(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+//                tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
+//
+//        String tableName = tableProperties
+//                .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
+//        if (tableName == null) {
+//            tableName = tableProperties.getProperty(Constants.META_TABLE_NAME);
+//            if (tableName.startsWith(DEFAULT_PREFIX)) {
+//                tableName = tableName.substring(DEFAULT_PREFIX.length());
+//            }
+//        }
+//        jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
+//
+//    }
 
     private HBaseAdmin getHBaseAdmin() throws MetaException {
         try {
@@ -357,14 +354,26 @@ public class HBaseHCatStorageHandler ext
         return tableName;
     }
 
+    @Override
+    public Class<? extends InputFormat> getInputFormatClass() {
+        //TODO replace this with rework
+        return InputFormat.class;
+    }
+
+    @Override
+    public Class<? extends OutputFormat> getOutputFormatClass() {
+        //TODO replace this with rework
+        return SequenceFileOutputFormat.class;
+    }
+
     /*
-     * @return subclass of SerDe
-     *
-     * @throws UnsupportedOperationException
-     *
-     * @see
-     * org.apache.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
-     */
+    * @return subclass of SerDe
+    *
+    * @throws UnsupportedOperationException
+    *
+    * @see
+    * org.apache.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
+    */
     @Override
     public Class<? extends SerDe> getSerDeClass()
             throws UnsupportedOperationException {
@@ -395,6 +404,13 @@ public class HBaseHCatStorageHandler ext
                     getHBaseAdmin().disableTable(tableName);
                 }
                 getHBaseAdmin().deleteTable(tableName);
+
+              //Set up znodes in revision manager.
+                RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+                if (rm instanceof ZKBasedRevisionManager) {
+                    ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
+                    zkRM.deleteZNodes(tableName);
+                }
             }
         } catch (IOException ie) {
             throw new MetaException(StringUtils.stringifyException(ie));

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1291925&r1=1291924&r2=1291925&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Tue Feb 21 17:12:41 2012
@@ -419,6 +419,28 @@ public class ZKBasedRevisionManager impl
          return lockPath;
      }
 
+     /**
+      * Sets up the table, column family znodes in zookeeper.
+      *
+      * @param tableName the hbase table name
+      * @param columnFamilies the column families in hbase
+      * @throws IOException Signals that an I/O exception has occurred.
+      */
+     public void setUpZNodes(String tableName, List<String> columnFamilies) throws IOException{
+         zkUtil.createRootZNodes();
+         zkUtil.setUpZnodesForTable(tableName, columnFamilies);
+     }
+
+     /**
+      * Delete the table znodes from zookeeper.
+      *
+      * @param tableName the table name
+      * @throws IOException Signals that an I/O exception has occurred.
+      */
+     public void deleteZNodes(String tableName) throws IOException {
+         zkUtil.deleteZNodes(tableName);
+     }
+
 
     /*
      * This class is a listener class for the locks used in revision management.

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java?rev=1291925&r1=1291924&r2=1291925&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java Tue Feb 21 17:12:41 2012
@@ -429,6 +429,36 @@ class ZKUtil {
     }
 
     /**
+     * Delete table znodes.
+     *
+     * @param tableName the hbase table name
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    void deleteZNodes(String tableName) throws IOException {
+        String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir,
+                tableName);
+        deleteRecursively(transactionDataTablePath);
+    }
+
+    void deleteRecursively(String path) throws IOException {
+        try {
+            List<String> children = getSession().getChildren(path, false);
+            if (children.size() != 0) {
+                for (String child : children) {
+                    deleteRecursively(path + "/" + child);
+                }
+            }
+            getSession().delete(path, -1);
+        } catch (KeeperException e) {
+            throw new IOException(
+                    "Exception while deleting path " + path + ".", e);
+        } catch (InterruptedException e) {
+            throw new IOException(
+                    "Exception while deleting path " + path + ".", e);
+        }
+    }
+
+    /**
      * This method serializes a given instance of TBase object.
      *
      * @param obj An instance of TBase

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken?rev=1291925&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken Tue Feb 21 17:12:41 2012
@@ -0,0 +1,458 @@
+package org.apache.hcatalog.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests components of HBaseBulkOutputStorageDriver using ManyMiniCluster.
+ * Including ImprtSequenceFile, HBaseOutputStorageDrivers and HBaseBulkOutputFormat
+ */
+public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
+    private final static Log LOG = LogFactory.getLog(TestHBaseBulkOutputStorageDriver.class);
+
+    private final HiveConf allConf;
+    private final HCatDriver hcatDriver;
+
+    public TestHBaseBulkOutputStorageDriver() {
+        allConf = getHiveConf();
+        allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+        allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+        //Add hbase properties
+        for (Map.Entry<String, String> el : getHbaseConf())
+            allConf.set(el.getKey(), el.getValue());
+        for (Map.Entry<String, String> el : getJobConf())
+            allConf.set(el.getKey(), el.getValue());
+
+        SessionState.start(new CliSessionState(allConf));
+        hcatDriver = new HCatDriver();
+    }
+
+    public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+    }
+
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = jobInfo.getOutputSchema();
+            String vals[] = value.toString().split(",");
+            record.setInteger("key",schema,Integer.parseInt(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0],schema,pair[1]);
+            }
+            context.write(null,record);
+        }
+    }
+
+    @Test
+    public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String testName = "hbaseBulkOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
+
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+
+        //create table
+        conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+        createTable(tableName, new String[]{familyName});
+
+        String data[] = {"1,english:one,spanish:uno",
+                               "2,english:two,spanish:dos",
+                               "3,english:three,spanish:tres"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+        Path interPath = new Path(methodTestDir,"inter");
+        //create job
+        Job job = new Job(conf, testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(HBaseBulkOutputFormat.class);
+        SequenceFileOutputFormat.setOutputPath(job,interPath);
+
+        //manually create transaction
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null, null, null);
+            Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+            outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                                      HCatUtil.serialize(txn));
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                       HCatUtil.serialize(outputJobInfo));
+        } finally {
+            rm.close();
+        }
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+        //test if scratch directory was erased
+        assertFalse(FileSystem.get(job.getConfiguration()).exists(interPath));
+    }
+
+    @Test
+    public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String testName = "importSequenceFileTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
+
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+
+        String data[] = {"1,english:one,spanish:uno",
+                               "2,english:two,spanish:dos",
+                               "3,english:three,spanish:tres"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+        Path interPath = new Path(methodTestDir,"inter");
+        Path scratchPath = new Path(methodTestDir,"scratch");
+
+
+        //create job
+        Job job = new Job(conf, testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        SequenceFileOutputFormat.setOutputPath(job,interPath);
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(Put.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        job = new Job(new Configuration(allConf),testName+"_importer");
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+        assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+        //test if scratch directory was erased
+        assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
+    }
+
+    @Test
+    public void hbaseBulkOutputStorageDriverTest() throws Exception {
+        String testName = "hbaseBulkOutputStorageDriverTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
+
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        //create job
+        Job job = new Job(conf,testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+            for(String el: snapshot.getColumnFamilies()) {
+                assertEquals(1,snapshot.getRevision(el));
+            }
+        } finally {
+            rm.close();
+        }
+
+        //verify
+        HTable table = new HTable(conf, databaseName+"."+tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void hbaseBulkOutputStorageDriverTestWithDefaultDB() throws Exception {
+        String testName = "hbaseBulkOutputStorageDriverTestWithDefaultDB";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = "default";
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES (" +
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf,testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
+
+}
+

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken?rev=1291925&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken Tue Feb 21 17:12:41 2012
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test HBaseDirectOuputStorageDriver and HBaseDirectOUtputFormat using a MiniCluster
+ */
+public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest {
+
+    private final HiveConf allConf;
+    private final HCatDriver hcatDriver;
+
+    public TestHBaseDirectOutputStorageDriver() {
+        allConf = getHiveConf();
+        allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+        allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+        //Add hbase properties
+        for (Map.Entry<String, String> el : getHbaseConf())
+            allConf.set(el.getKey(), el.getValue());
+        for (Map.Entry<String, String> el : getJobConf())
+            allConf.set(el.getKey(), el.getValue());
+
+        SessionState.start(new CliSessionState(allConf));
+        hcatDriver = new HCatDriver();
+    }
+
+    @Test
+    public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String testName = "directOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:ONE,spanish:DOS",
+                "3,english:ONE,spanish:TRES"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(HBaseDirectOutputFormat.class);
+        job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+        //manually create transaction
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null, null, null);
+            Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+            outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                                      HCatUtil.serialize(txn));
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                       HCatUtil.serialize(outputJobInfo));
+        } finally {
+            rm.close();
+        }
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void directOutputStorageDriverTest() throws Exception {
+        String testName = "directOutputStorageDriverTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        //create job
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+            for(String el: snapshot.getColumnFamilies()) {
+                assertEquals(1,snapshot.getRevision(el));
+            }
+        } finally {
+            rm.close();
+        }
+
+        //verify
+        HTable table = new HTable(conf, databaseName+"."+tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+            }
+            index++;
+        }
+        assertEquals(data.length,index);
+    }
+
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = jobInfo.getOutputSchema();
+            String vals[] = value.toString().split(",");
+            record.setInteger("key",schema,Integer.parseInt(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0],schema,pair[1]);
+            }
+            context.write(null,record);
+        }
+    }
+
+    public static class MapWrite extends Mapper<LongWritable, Text, BytesWritable, Put> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken?rev=1291925&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken Tue Feb 21 17:12:41 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.junit.Test;
+
+public class TestHBaseInputStorageDriver extends SkeletonHBaseTest {
+
+    private static HiveConf   hcatConf;
+    private static HCatDriver hcatDriver;
+    private final byte[] FAMILY     = Bytes.toBytes("testFamily");
+    private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
+    private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+
+    private List<Put> generatePuts(int num, String tableName) throws IOException {
+
+        List<String> columnFamilies = Arrays.asList("testFamily");
+        RevisionManager rm = null;
+        List<Put> myPuts;
+        try {
+            rm = HBaseHCatStorageHandler
+                    .getOpenedRevisionManager(getHbaseConf());
+            rm.open();
+            myPuts = new ArrayList<Put>();
+            for (int i = 1; i <= num; i++) {
+                Put put = new Put(Bytes.toBytes("testRow"));
+                put.add(FAMILY, QUALIFIER1, i, Bytes.toBytes("textValue-" + i));
+                put.add(FAMILY, QUALIFIER2, i, Bytes.toBytes("textValue-" + i));
+                myPuts.add(put);
+                Transaction tsx = rm.beginWriteTransaction(tableName,
+                        columnFamilies);
+                rm.commitWriteTransaction(tsx);
+            }
+        } finally {
+            if (rm != null)
+                rm.close();
+        }
+
+        return myPuts;
+    }
+
+   private void Initialize() throws Exception {
+        hcatConf = getHiveConf();
+        hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        URI fsuri = getFileSystem().getUri();
+        Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(),
+                getTestDir());
+        hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString());
+        hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString());
+
+        //Add hbase properties
+
+        for (Map.Entry<String, String> el : getHbaseConf()) {
+            if (el.getKey().startsWith("hbase.")) {
+                hcatConf.set(el.getKey(), el.getValue());
+            }
+        }
+
+        SessionState.start(new CliSessionState(hcatConf));
+        hcatDriver = new HCatDriver();
+
+    }
+
+   private void populateHBaseTable(String tName, int revisions) throws IOException {
+        List<Put> myPuts = generatePuts(revisions, tName);
+        HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName));
+        table.put(myPuts);
+    }
+
+    @Test
+    public void TestHBaseTableReadMR() throws Exception {
+        Initialize();
+        String tableName = newTableName("mytable");
+        String databaseName = newTableName("mydatabase");
+        String db_dir = getTestDir() + "/hbasedb";
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+                            + db_dir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName
+                              + "(key string, testqualifier1 string, testqualifier2 string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+                              + "TBLPROPERTIES ('hbase.columns.mapping'=':key,testFamily:testQualifier1,testFamily:testQualifier2')" ;
+
+        CommandProcessorResponse responseOne = hcatDriver.run(dbquery);
+        assertEquals(0, responseOne.getResponseCode());
+        CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery);
+        assertEquals(0, responseTwo.getResponseCode());
+
+        HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf());
+        String hbaseTableName = databaseName + "." + tableName;
+        boolean doesTableExist = hAdmin.tableExists(hbaseTableName);
+        assertTrue(doesTableExist);
+
+        populateHBaseTable(hbaseTableName, 5);
+        Configuration conf = new Configuration(hcatConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+        // output settings
+        Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead");
+        FileSystem fs = getFileSystem();
+        if (fs.exists(outputDir)) {
+            fs.delete(outputDir, true);
+        }
+        // create job
+        Job job = new Job(conf, "hbase-mr-read-test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapReadHTable.class);
+
+        job.setInputFormatClass(HCatInputFormat.class);
+        InputJobInfo inputJobInfo = InputJobInfo.create(databaseName, tableName,
+                null, null, null);
+        HCatInputFormat.setInput(job, inputJobInfo);
+        job.setOutputFormatClass(TextOutputFormat.class);
+        TextOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+        assertFalse(MapReadHTable.error);
+        assertEquals(MapReadHTable.count, 1);
+
+        String dropTableQuery = "DROP TABLE " + hbaseTableName ;
+        CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery);
+        assertEquals(0, responseThree.getResponseCode());
+
+        boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName);
+        assertFalse(isHbaseTableThere);
+
+        String dropDB = "DROP DATABASE " + databaseName;
+        CommandProcessorResponse responseFour = hcatDriver.run(dropDB);
+        assertEquals(0, responseFour.getResponseCode());
+    }
+
+    @Test
+    public void TestHBaseTableProjectionReadMR() throws Exception {
+
+        Initialize();
+        String tableName = newTableName("mytable");
+        String tableQuery = "CREATE TABLE " + tableName
+                              + "(key string, testqualifier1 string, testqualifier2 string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+                              + "TBLPROPERTIES ('hbase.columns.mapping'=':key," +
+                              		"testFamily:testQualifier1,testFamily:testQualifier2')" ;
+
+        CommandProcessorResponse responseTwo = hcatDriver.run(tableQuery);
+        assertEquals(0, responseTwo.getResponseCode());
+
+        HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf());
+        boolean doesTableExist = hAdmin.tableExists(tableName);
+        assertTrue(doesTableExist);
+
+        populateHBaseTable(tableName, 5);
+
+        Configuration conf = new Configuration(hcatConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+        // output settings
+        Path outputDir = new Path(getTestDir(), "mapred/testHBaseTableProjectionReadMR");
+        FileSystem fs = getFileSystem();
+        if (fs.exists(outputDir)) {
+            fs.delete(outputDir, true);
+        }
+        // create job
+        Job job = new Job(conf, "hbase-column-projection");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapReadProjHTable.class);
+        job.setInputFormatClass(HCatInputFormat.class);
+        InputJobInfo inputJobInfo = InputJobInfo.create(
+                MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null,
+                null);
+        HCatInputFormat.setOutputSchema(job, getProjectionSchema());
+        HCatInputFormat.setInput(job, inputJobInfo);
+        job.setOutputFormatClass(TextOutputFormat.class);
+        TextOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+        assertFalse(MapReadProjHTable.error);
+        assertEquals(MapReadProjHTable.count, 1);
+
+        String dropTableQuery = "DROP TABLE " + tableName ;
+        CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery);
+        assertEquals(0, responseThree.getResponseCode());
+
+        boolean isHbaseTableThere = hAdmin.tableExists(tableName);
+        assertFalse(isHbaseTableThere);
+    }
+
+
+    static class MapReadHTable
+            extends
+            Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
+
+        static boolean error = false;
+        static int count = 0;
+        @Override
+        public void map(ImmutableBytesWritable key, HCatRecord value,
+                Context context) throws IOException, InterruptedException {
+            System.out.println("HCat record value" + value.toString());
+            boolean correctValues = (value.size() == 3)
+                    && (value.get(0).toString()).equalsIgnoreCase("testRow")
+                    && (value.get(1).toString()).equalsIgnoreCase("textValue-5")
+                    && (value.get(2).toString()).equalsIgnoreCase("textValue-5");
+
+            if (correctValues == false) {
+                error = true;
+            }
+            count++;
+        }
+    }
+
+    static class MapReadProjHTable
+            extends
+            Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
+
+        static boolean error = false;
+        static int count = 0;
+        @Override
+        public void map(ImmutableBytesWritable key, HCatRecord value,
+                Context context) throws IOException, InterruptedException {
+            System.out.println("HCat record value" + value.toString());
+            boolean correctValues = (value.size() == 2)
+                    && (value.get(0).toString()).equalsIgnoreCase("testRow")
+                    && (value.get(1).toString()).equalsIgnoreCase("textValue-5");
+
+            if (correctValues == false) {
+                error = true;
+            }
+            count++;
+        }
+    }
+
+ private HCatSchema getProjectionSchema() throws HCatException {
+
+        HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+        schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,
+                ""));
+        schema.append(new HCatFieldSchema("testqualifier1",
+                HCatFieldSchema.Type.STRING, ""));
+        return schema;
+    }
+
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken?rev=1291925&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken Tue Feb 21 17:12:41 2012
@@ -0,0 +1,122 @@
+package org.apache.hcatalog.hbase;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.junit.Test;
+
+public class TestSnapshots extends SkeletonHBaseTest {
+    private static HiveConf   hcatConf;
+    private static HCatDriver hcatDriver;
+
+    public void Initialize() throws Exception {
+        hcatConf = getHiveConf();
+        hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        URI fsuri = getFileSystem().getUri();
+        Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(),
+                getTestDir());
+        hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString());
+        hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString());
+
+        //Add hbase properties
+
+        for (Map.Entry<String, String> el : getHbaseConf()) {
+            if (el.getKey().startsWith("hbase.")) {
+                hcatConf.set(el.getKey(), el.getValue());
+            }
+        }
+
+        SessionState.start(new CliSessionState(hcatConf));
+        hcatDriver = new HCatDriver();
+
+    }
+
+    @Test
+    public void TestSnapshotConversion() throws Exception{
+        Initialize();
+        String tableName = newTableName("mytableOne");
+        String databaseName = newTableName("mydatabase");
+        String fullyQualTableName = databaseName + "." + tableName;
+        String db_dir = getTestDir() + "/hbasedb";
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+                            + db_dir + "'";
+        String tableQuery = "CREATE TABLE " + fullyQualTableName
+                              + "(key string, value1 string, value2 string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+                              + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf2:q2')" ;
+
+        CommandProcessorResponse cmdResponse = hcatDriver.run(dbquery);
+        assertEquals(0, cmdResponse.getResponseCode());
+        cmdResponse = hcatDriver.run(tableQuery);
+        assertEquals(0, cmdResponse.getResponseCode());
+
+        InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null, null, null);
+        Configuration conf = new Configuration(hcatConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(getHiveConf().getAllProperties()));
+        Job job = new Job(conf);
+        InitializeInput.setInput(job, inputInfo);
+        String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+        inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+
+        Map<String, Long> revMap = new HashMap<String, Long>();
+        revMap.put("cf1", 3L);
+        revMap.put("cf2", 5L);
+        TableSnapshot hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap,-1);
+        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo());
+
+        assertEquals(hcatSnapshot.getRevision("value1"), 3);
+        assertEquals(hcatSnapshot.getRevision("value2"), 5);
+
+        String dropTable = "DROP TABLE " + fullyQualTableName;
+        cmdResponse = hcatDriver.run(dropTable);
+        assertEquals(0, cmdResponse.getResponseCode());
+
+        tableName = newTableName("mytableTwo");
+        fullyQualTableName = databaseName + "." + tableName;
+        tableQuery = "CREATE TABLE " + fullyQualTableName
+        + "(key string, value1 string, value2 string) STORED BY " +
+        "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+        + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf1:q2')" ;
+        cmdResponse = hcatDriver.run(tableQuery);
+        assertEquals(0, cmdResponse.getResponseCode());
+        revMap.clear();
+        revMap.put("cf1", 3L);
+        hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap, -1);
+        inputInfo = InputJobInfo.create(databaseName, tableName, null, null, null);
+        InitializeInput.setInput(job, inputInfo);
+        modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+        inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo);
+        hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo());
+        assertEquals(hcatSnapshot.getRevision("value1"), 3);
+        assertEquals(hcatSnapshot.getRevision("value2"), 3);
+
+        dropTable = "DROP TABLE " + fullyQualTableName;
+        cmdResponse = hcatDriver.run(dropTable);
+        assertEquals(0, cmdResponse.getResponseCode());
+
+        String dropDatabase = "DROP DATABASE IF EXISTS " + databaseName + "CASCADE";
+        cmdResponse = hcatDriver.run(dropDatabase);
+        assertEquals(0, cmdResponse.getResponseCode());
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java?rev=1291925&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestZNodeSetUp.java Tue Feb 21 17:12:41 2012
@@ -0,0 +1,100 @@
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+
+public class TestZNodeSetUp extends SkeletonHBaseTest{
+
+    private static HiveConf   hcatConf;
+    private static HCatDriver hcatDriver;
+
+    public void Initialize() throws Exception {
+
+        hcatConf = getHiveConf();
+        hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        URI fsuri = getFileSystem().getUri();
+        Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(),
+                getTestDir());
+        hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString());
+        hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString());
+
+        //Add hbase properties
+
+        for (Map.Entry<String, String> el : getHbaseConf()) {
+            if (el.getKey().startsWith("hbase.")) {
+                hcatConf.set(el.getKey(), el.getValue());
+            }
+        }
+        hcatConf.set("revision.manager.zk.DataDir", "/rm_base");
+        SessionState.start(new CliSessionState(hcatConf));
+        hcatDriver = new HCatDriver();
+
+    }
+
+    @Test
+    public void testBasicZNodeCreation() throws Exception{
+
+        Initialize();
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+        String[] splits = servers.split(",");
+        StringBuffer sb = new StringBuffer();
+        for(String split : splits){
+            sb.append(split);
+            sb.append(':');
+            sb.append(port);
+        }
+
+        hcatDriver.run("drop table test_table");
+        CommandProcessorResponse response = hcatDriver
+                .run("create table test_table(key int, value string) STORED BY " +
+                     "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'"
+                    + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:val')");
+
+        assertEquals(0, response.getResponseCode());
+
+        HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf());
+        boolean doesTableExist = hAdmin.tableExists("test_table");
+        assertTrue(doesTableExist);
+
+
+        ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+        ZooKeeper zk = zkutil.getSession();
+        String tablePath = PathUtil.getTxnDataPath("/rm_base", "test_table");
+        Stat tempTwo = zk.exists(tablePath, false);
+        assertTrue(tempTwo != null);
+
+        String cfPath = PathUtil.getTxnDataPath("/rm_base", "test_table") + "/cf1";
+        Stat tempThree = zk.exists(cfPath, false);
+        assertTrue(tempThree != null);
+
+        hcatDriver.run("drop table test_table");
+
+        System.out.println("Table path : " + tablePath);
+        Stat tempFour = zk.exists(tablePath, false);
+        assertTrue(tempFour == null);
+
+    }
+
+}



Mime
View raw message