From hcatalog-commits-return-615-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Wed Jan 18 00:18:27 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C3E0B942D for ; Wed, 18 Jan 2012 00:18:27 +0000 (UTC) Received: (qmail 34952 invoked by uid 500); 18 Jan 2012 00:18:27 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 34919 invoked by uid 500); 18 Jan 2012 00:18:27 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 34912 invoked by uid 99); 18 Jan 2012 00:18:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2012 00:18:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2012 00:18:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2F7F523888E7; Wed, 18 Jan 2012 00:18:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1232664 - in /incubator/hcatalog/branches/branch-0.3: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ Date: Wed, 18 Jan 2012 00:18:03 -0000 To: hcatalog-commits@incubator.apache.org From: toffer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120118001804.2F7F523888E7@eris.apache.org> Author: toffer Date: Wed Jan 18 00:18:03 2012 New Revision: 1232664 URL: http://svn.apache.org/viewvc?rev=1232664&view=rev Log: HCATALOG-192 HBase output storage driver integration with zookeeper based revision manager (avandana via toffer) Added: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/CHANGES.txt?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/CHANGES.txt (original) +++ incubator/hcatalog/branches/branch-0.3/CHANGES.txt Wed Jan 18 00:18:03 2012 @@ -23,6 +23,8 @@ Release 0.3.0 (unreleased changes) INCOMPATIBLE CHANGES NEW FEATURES + HCAT-192. HBase output storage driver integration with zookeeper based revision manager (avandana via toffer) + HCAT-193. Snapshot class for HCatalog tables. (avandana via toffer) HCAT-87. Newly added partition should inherit table properties. (hashutosh at HIVE-2589 via khorgath) Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (original) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java Wed Jan 18 00:18:03 2012 @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.hbase.HBas import org.apache.hcatalog.common.HCatConstants; /** - * Constants class for constants used in Ht + * Constants class for constants used in HBase storage driver. */ class HBaseConstants { @@ -38,4 +38,7 @@ class HBaseConstants { /** key used to define wether bulk storage driver will be used or not */ public static final String PROPERTY_OSD_BULK_MODE_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.output.bulkMode"; + /** key used to define the hbase table snapshot. */ + public static final String PROPERTY_TABLE_SNAPSHOT_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + "hbase.table.snapshot"; + } Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Wed Jan 18 00:18:03 2012 @@ -26,10 +26,10 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import com.facebook.fb303.FacebookBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; @@ -50,14 +50,23 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory; +import org.apache.hcatalog.hbase.snapshot.TableSnapshot; +import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager; import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; import org.apache.hcatalog.mapreduce.HCatTableInfo; +import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.storagehandler.HCatStorageHandler; import org.apache.thrift.TBase; import org.apache.zookeeper.ZooKeeper; +import com.facebook.fb303.FacebookBase; + /** * This class HBaseHCatStorageHandler provides functionality to create HBase * tables through HCatalog. The implementation is very similar to the @@ -191,8 +200,10 @@ public class HBaseHCatStorageHandler ext uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey)); for (String columnFamily : uniqueColumnFamilies) { - tableDesc.addFamily(new HColumnDescriptor(Bytes - .toBytes(columnFamily))); + HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes + .toBytes(columnFamily)); + familyDesc.setMaxVersions(Integer.MAX_VALUE); + tableDesc.addFamily(familyDesc); } getHBaseAdmin().createTable(tableDesc); @@ -435,4 +446,131 @@ public class HBaseHCatStorageHandler ext FacebookBase.class); } + + /** + * Creates the latest snapshot of the table. + * + * @param jobConf The job configuration. + * @param hbaseTableName The fully qualified name of the HBase table. + * @return An instance of HCatTableSnapshot + * @throws IOException Signals that an I/O exception has occurred. + */ + public static HCatTableSnapshot createSnapshot(Configuration jobConf, + String hbaseTableName ) throws IOException { + + RevisionManager rm = null; + TableSnapshot snpt; + try { + rm = getOpenedRevisionManager(jobConf); + snpt = rm.createSnapshot(hbaseTableName); + } finally { + if (rm != null) + rm.close(); + } + + String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO); + if(inputJobString == null){ + throw new IOException( + "InputJobInfo information not found in JobContext. " + + "HCatInputFormat.setInput() not called?"); + } + InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString); + HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver + .convertSnapshot(snpt, inputInfo.getTableInfo()); + + return hcatSnapshot; + } + + /** + * Creates the snapshot using the revision specified by the user. + * + * @param jobConf The job configuration. + * @param tableName The fully qualified name of the table whose snapshot is being taken. + * @param revision The revision number to use for the snapshot. + * @return An instance of HCatTableSnapshot. + * @throws IOException Signals that an I/O exception has occurred. + */ + public static HCatTableSnapshot createSnapshot(Configuration jobConf, + String tableName, long revision) + throws IOException { + + TableSnapshot snpt; + RevisionManager rm = null; + try { + rm = getOpenedRevisionManager(jobConf); + snpt = rm.createSnapshot(tableName, revision); + } finally { + if (rm != null) + rm.close(); + } + + String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO); + if(inputJobString == null){ + throw new IOException( + "InputJobInfo information not found in JobContext. " + + "HCatInputFormat.setInput() not called?"); + } + InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString); + HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver + .convertSnapshot(snpt, inputInfo.getTableInfo()); + + return hcatSnapshot; + } + + /** + * Gets an instance of revision manager which is opened. + * + * @param jobConf The job configuration. + * @return RevisionManager An instance of revision manager. + * @throws IOException + */ + static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException { + + Properties properties = new Properties(); + String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM); + int port = jobConf.getInt("hbase.zookeeper.property.clientPort", + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + + if (zkHostList != null) { + String[] splits = zkHostList.split(","); + StringBuffer sb = new StringBuffer(); + for (String split : splits) { + sb.append(split); + sb.append(':'); + sb.append(port); + sb.append(','); + } + + sb.deleteCharAt(sb.length() - 1); + properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString()); + } + String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR); + if (dataDir != null) { + properties.put(ZKBasedRevisionManager.DATADIR, dataDir); + } + String rmClassName = jobConf.get( + RevisionManager.REVISION_MGR_IMPL_CLASS, + ZKBasedRevisionManager.class.getName()); + properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName); + RevisionManager revisionManger = RevisionManagerFactory + .getRevisionManager(properties); + revisionManger.open(); + return revisionManger; + } + + /** + * Set snapshot as a property. + * + * @param snapshot The HCatTableSnapshot to be passed to the job. + * @param inpJobInfo The InputJobInfo for the job. + * @throws IOException + */ + public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo inpJobInfo) + throws IOException { + String serializedSnp = HCatUtil.serialize(snapshot); + inpJobInfo.getProperties().setProperty( + HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp); + } + + } Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Wed Jan 18 00:18:03 2012 @@ -21,39 +21,46 @@ package org.apache.hcatalog.hbase; import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.mapreduce.InputJobInfo; /** * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase. */ -class HBaseInputFormat extends InputFormat { - +class HBaseInputFormat extends InputFormat implements Configurable{ + private final TableInputFormat inputFormat; - - public HBaseInputFormat() { + private final InputJobInfo jobInfo; + private Configuration conf; + + public HBaseInputFormat(InputJobInfo jobInfo) { inputFormat = new TableInputFormat(); + this.jobInfo = jobInfo; } - + /* * @param instance of InputSplit - * + * * @param instance of TaskAttemptContext - * + * * @return RecordReader - * + * * @throws IOException - * + * * @throws InterruptedException - * + * * @see * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache * .hadoop.mapreduce.InputSplit, @@ -63,18 +70,28 @@ class HBaseInputFormat extends InputForm public RecordReader createRecordReader( InputSplit split, TaskAttemptContext tac) throws IOException, InterruptedException { - return inputFormat.createRecordReader(split, tac); + + String tableName = inputFormat.getConf().get(TableInputFormat.INPUT_TABLE); + TableSplit tSplit = (TableSplit) split; + HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(jobInfo); + Scan sc = new Scan(inputFormat.getScan()); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + recordReader.setScan(sc); + recordReader.setHTable(new HTable(this.conf, tableName)); + recordReader.init(); + return recordReader; } - + /* * @param jobContext - * + * * @return List of InputSplit - * + * * @throws IOException - * + * * @throws InterruptedException - * + * * @see * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce * .JobContext) @@ -82,19 +99,33 @@ class HBaseInputFormat extends InputForm @Override public List getSplits(JobContext jobContext) throws IOException, InterruptedException { + + String tableName = this.conf.get(TableInputFormat.INPUT_TABLE); + if (tableName == null) { + throw new IOException("The input table is not set. The input splits cannot be created."); + } return inputFormat.getSplits(jobContext); } - + public void setConf(Configuration conf) { + this.conf = conf; inputFormat.setConf(conf); } - + public Scan getScan() { return inputFormat.getScan(); } - + public void setScan(Scan scan) { inputFormat.setScan(scan); } - + + /* @return + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return this.conf; + } + } Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java (original) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java Wed Jan 18 00:18:03 2012 @@ -19,6 +19,8 @@ package org.apache.hcatalog.hbase; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -27,9 +29,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hive.hbase.HBaseSerDe; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputFormat; @@ -37,22 +41,28 @@ import org.apache.hadoop.mapreduce.JobCo import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; import org.apache.hcatalog.mapreduce.HCatTableInfo; import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.StorerInfo; + /** * The Class HBaseInputStorageDriver enables reading of HBase tables through * HCatalog. */ public class HBaseInputStorageDriver extends HCatInputStorageDriver { - private HCatTableInfo tableInfo; + + private InputJobInfo inpJobInfo; private ResultConverter converter; - private HCatSchema outputColSchema; - private HCatSchema dataSchema; - private Configuration jobConf; - private String scanColumns; + private HCatSchema outputColSchema; + private HCatSchema dataSchema; + private Configuration jobConf; + private String scanColumns; + private HCatTableSnapshot snapshot; /* * @param JobContext @@ -64,6 +74,7 @@ public class HBaseInputStorageDriver ext */ @Override public void initialize(JobContext context, Properties hcatProperties) throws IOException { + jobConf = context.getConfiguration(); String jobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO); if (jobString == null) { @@ -71,9 +82,8 @@ public class HBaseInputStorageDriver ext "InputJobInfo information not found in JobContext. " + "HCatInputFormat.setInput() not called?"); } - InputJobInfo jobInfo = (InputJobInfo) HCatUtil.deserialize(jobString); - tableInfo = jobInfo.getTableInfo(); - dataSchema = tableInfo.getDataColumns(); + inpJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString); + dataSchema = inpJobInfo.getTableInfo().getDataColumns(); List fields = HCatUtil.getFieldSchemaList(dataSchema .getFields()); hcatProperties.setProperty(Constants.LIST_COLUMNS, @@ -83,6 +93,19 @@ public class HBaseInputStorageDriver ext converter = new HBaseSerDeResultConverter(dataSchema, outputColSchema, hcatProperties); scanColumns = converter.getHBaseScanColumns(); + String hbaseTableName = HBaseHCatStorageHandler + .getFullyQualifiedName(inpJobInfo.getTableInfo()); + String serSnapshot = (String) inpJobInfo.getProperties().get( + HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); + if(serSnapshot == null){ + snapshot = HBaseHCatStorageHandler.createSnapshot(jobConf, + hbaseTableName); + inpJobInfo.getProperties().setProperty( + HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, + HCatUtil.serialize(snapshot)); + } + + context.getConfiguration().set(HCatConstants.HCAT_KEY_JOB_INFO, HCatUtil.serialize(inpJobInfo)); } @@ -97,10 +120,13 @@ public class HBaseInputStorageDriver ext @Override public InputFormat getInputFormat( Properties hcatProperties) { - HBaseInputFormat tableInputFormat = new HBaseInputFormat(); - String hbaseTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo); + + String hbaseTableName = HBaseHCatStorageHandler + .getFullyQualifiedName(inpJobInfo.getTableInfo()); + HBaseInputFormat tableInputFormat = new HBaseInputFormat(inpJobInfo); jobConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName); jobConf.set(TableInputFormat.SCAN_COLUMNS, scanColumns); + jobConf.setInt(TableInputFormat.SCAN_MAXVERSIONS, 1); tableInputFormat.setConf(jobConf); // TODO: Make the caching configurable by the user tableInputFormat.getScan().setCaching(200); @@ -109,11 +135,11 @@ public class HBaseInputStorageDriver ext } /* - * @param baseKey + * @param baseKey The key produced by the MR job. * - * @param baseValue + * @param baseValue The value produced by the MR job. * - * @return HCatRecord + * @return HCatRecord An instance of HCatRecord produced by the key, value. * * @throws IOException * @@ -128,9 +154,9 @@ public class HBaseInputStorageDriver ext } /* - * @param jobContext + * @param jobContext The jobcontext of MR job * - * @param howlSchema + * @param howlSchema The output schema of the hcat record. * * @throws IOException * @@ -161,9 +187,9 @@ public class HBaseInputStorageDriver ext } /* - * @param jobContext + * @param jobContext The jobcontext of MR job. * - * @param hcatSchema + * @param hcatSchema The schema of the hcat record. * * @throws IOException * @@ -176,4 +202,74 @@ public class HBaseInputStorageDriver ext throws IOException { this.dataSchema = hcatSchema; } + + static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot, + HCatTableInfo hcatTableInfo) throws IOException { + + HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns(); + Map hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo); + HashMap revisionMap = new HashMap(); + + for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) { + if(hcatHbaseColMap.containsKey(fSchema.getName())){ + String colFamily = hcatHbaseColMap.get(fSchema.getName()); + long revisionID = hbaseSnapshot.getRevision(colFamily); + revisionMap.put(fSchema.getName(), revisionID); + } + } + + HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot( + hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(),revisionMap); + return hcatSnapshot; + } + + static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot, + HCatTableInfo hcatTableInfo) throws IOException { + + HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns(); + Map revisionMap = new HashMap(); + Map hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo); + for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) { + String colFamily = hcatHbaseColMap.get(fSchema.getName()); + if (hcatSnapshot.containsColumn(fSchema.getName())) { + long revision = hcatSnapshot.getRevision(fSchema.getName()); + revisionMap.put(colFamily, revision); + } + } + + String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "." + + hcatSnapshot.getTableName(); + return new TableSnapshot(fullyQualifiedName, revisionMap); + + } + + private static Map getHCatHBaseColumnMapping( HCatTableInfo hcatTableInfo) + throws IOException { + + HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns(); + StorerInfo storeInfo = hcatTableInfo.getStorerInfo(); + String hbaseColumnMapping = storeInfo.getProperties().getProperty( + HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY); + + Map hcatHbaseColMap = new HashMap(); + List columnFamilies = new ArrayList(); + List columnQualifiers = new ArrayList(); + try { + HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, + null, columnQualifiers, null); + } catch (SerDeException e) { + throw new IOException("Exception while converting snapshots.", e); + } + + for (HCatFieldSchema column : hcatTableSchema.getFields()) { + int fieldPos = hcatTableSchema.getPosition(column.getName()); + String colFamily = columnFamilies.get(fieldPos); + if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) { + hcatHbaseColMap.put(column.getName(), colFamily); + } + } + + return hcatHbaseColMap; + } + } Added: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1232664&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (added) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Wed Jan 18 00:18:03 2012 @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableRecordReader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.hbase.snapshot.TableSnapshot; +import org.apache.hcatalog.mapreduce.InputJobInfo; + +/** + * The Class HbaseSnapshotRecordReader implements logic for filtering records + * based on snapshot. + */ +class HbaseSnapshotRecordReader extends TableRecordReader { + + static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class); + private ResultScanner scanner; + private Scan scan; + private HTable htable; + private ImmutableBytesWritable key; + private Result value; + private InputJobInfo inpJobInfo; + private TableSnapshot snapshot; + private int maxRevisions; + private Iterator resultItr; + + + HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException { + this.inpJobInfo = inputJobInfo; + String snapshotString = inpJobInfo.getProperties().getProperty( + HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); + HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil + .deserialize(snapshotString); + this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot, + inpJobInfo.getTableInfo()); + this.maxRevisions = 1; + } + + /* @param firstRow The first record in the split. + /* @throws IOException + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[]) + */ + @Override + public void restart(byte[] firstRow) throws IOException { + Scan newScan = new Scan(scan); + newScan.setStartRow(firstRow); + this.scanner = this.htable.getScanner(newScan); + resultItr = this.scanner.iterator(); + } + + /* @throws IOException + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init() + */ + @Override + public void init() throws IOException { + restart(scan.getStartRow()); + } + + /* + * @param htable The HTable ( of HBase) to use for the record reader. + * + * @see + * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache + * .hadoop.hbase.client.HTable) + */ + @Override + public void setHTable(HTable htable) { + this.htable = htable; + } + + /* + * @param scan The scan to be used for reading records. + * + * @see + * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache + * .hadoop.hbase.client.Scan) + */ + @Override + public void setScan(Scan scan) { + this.scan = scan; + } + + /* + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close() + */ + @Override + public void close() { + this.resultItr = null; + this.scanner.close(); + } + + /* @return The row of hbase record. + /* @throws IOException + /* @throws InterruptedException + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey() + */ + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return key; + } + + /* @return Single row result of scan of HBase table. + /* @throws IOException + /* @throws InterruptedException + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue() + */ + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return value; + } + + /* @return Returns whether a next key-value is available for reading. + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue() + */ + @Override + public boolean nextKeyValue() { + + if (this.resultItr == null) { + LOG.warn("The HBase result iterator is found null. It is possible" + + " that the record reader has already been closed."); + } else { + + if (key == null) + key = new ImmutableBytesWritable(); + while (resultItr.hasNext()) { + Result temp = resultItr.next(); + Result hbaseRow = prepareResult(temp.list()); + if (hbaseRow != null) { + key.set(hbaseRow.getRow()); + value = hbaseRow; + return true; + } + + } + } + return false; + } + + private Result prepareResult(List keyvalues) { + + List finalKeyVals = new ArrayList(); + Map> qualValMap = new HashMap>(); + for (KeyValue kv : keyvalues) { + byte[] cf = kv.getFamily(); + byte[] qualifier = kv.getQualifier(); + String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier); + List kvs; + if (qualValMap.containsKey(key)) { + kvs = qualValMap.get(key); + } else { + kvs = new ArrayList(); + } + + String family = Bytes.toString(kv.getFamily()); + long desiredTS = snapshot.getRevision(family); + if (kv.getTimestamp() <= desiredTS) { + kvs.add(kv); + } + qualValMap.put(key, kvs); + } + + Set keys = qualValMap.keySet(); + for (String cf : keys) { + List kvs = qualValMap.get(cf); + if (maxRevisions <= kvs.size()) { + for (int i = 0; i < maxRevisions; i++) { + finalKeyVals.add(kvs.get(i)); + } + } else { + finalKeyVals.addAll(kvs); + } + } + + if(finalKeyVals.size() == 0){ + return null; + } else { + KeyValue[] kvArray = new KeyValue[finalKeyVals.size()]; + finalKeyVals.toArray(kvArray); + return new Result(kvArray); + } + } + + /* @return The progress of the record reader. + * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress() + */ + @Override + public float getProgress() { + // Depends on the total number of tuples + return 0; + } + +} Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Wed Jan 18 00:18:03 2012 @@ -38,6 +38,8 @@ public class ZKBasedRevisionManager impl public static final String HOSTLIST = "revision.manager.zk.HostList"; public static final String DATADIR = "revision.manager.zk.DataDir"; + public static final String DEFAULT_DATADIR = "/revision-management"; + public static final String DEFAULT_HOSTLIST = "localhost:2181"; private static int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000; private static final Log LOG = LogFactory.getLog(ZKBasedRevisionManager.class); private String zkHostList; @@ -50,8 +52,11 @@ public class ZKBasedRevisionManager impl */ @Override public void initialize(Properties properties) { - this.zkHostList = properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181"); - this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management"); + this.zkHostList = properties.getProperty( + ZKBasedRevisionManager.HOSTLIST, + ZKBasedRevisionManager.DEFAULT_HOSTLIST); + this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR, + ZKBasedRevisionManager.DEFAULT_DATADIR); } /** Modified: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java?rev=1232664&r1=1232663&r2=1232664&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java (original) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java Wed Jan 18 00:18:03 2012 @@ -18,11 +18,13 @@ package org.apache.hcatalog.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -54,6 +56,8 @@ import org.apache.hcatalog.common.HCatUt import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.snapshot.RevisionManager; +import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.junit.Test; @@ -66,20 +70,34 @@ public class TestHBaseInputStorageDriver private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2"); - List generatePuts(int num) { - List myPuts = new ArrayList(); - for (int i = 0; i < num; i++) { - Put put = new Put(Bytes.toBytes("testRow" + i)); - put.add(FAMILY, QUALIFIER1, 0, - Bytes.toBytes("testQualifier1-" + "textValue-" + i)); - put.add(FAMILY, QUALIFIER2, 0, - Bytes.toBytes("testQualifier2-" + "textValue-" + i)); - myPuts.add(put); + private List generatePuts(int num, String tableName) throws IOException { + + List columnFamilies = Arrays.asList("testFamily"); + RevisionManager rm = null; + List myPuts; + try { + rm = HBaseHCatStorageHandler + .getOpenedRevisionManager(getHbaseConf()); + rm.open(); + myPuts = new ArrayList(); + for (int i = 1; i <= num; i++) { + Put put = new Put(Bytes.toBytes("testRow")); + put.add(FAMILY, QUALIFIER1, i, Bytes.toBytes("textValue-" + i)); + put.add(FAMILY, QUALIFIER2, i, Bytes.toBytes("textValue-" + i)); + myPuts.add(put); + Transaction tsx = rm.beginWriteTransaction(tableName, + columnFamilies); + rm.commitWriteTransaction(tsx); + } + } finally { + if (rm != null) + rm.close(); } + return myPuts; } - public void Initialize() throws Exception { + private void Initialize() throws Exception { hcatConf = getHiveConf(); hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName()); @@ -102,8 +120,8 @@ public class TestHBaseInputStorageDriver } - public void populateHBaseTable(String tName) throws IOException { - List myPuts = generatePuts(10); + private void populateHBaseTable(String tName, int revisions) throws IOException { + List myPuts = generatePuts(revisions, tName); HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName)); table.put(myPuts); } @@ -132,7 +150,7 @@ public class TestHBaseInputStorageDriver boolean doesTableExist = hAdmin.tableExists(hbaseTableName); assertTrue(doesTableExist); - populateHBaseTable(hbaseTableName); + populateHBaseTable(hbaseTableName, 5); Configuration conf = new Configuration(hcatConf); conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); @@ -160,14 +178,15 @@ public class TestHBaseInputStorageDriver job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); assertTrue(job.waitForCompletion(true)); - assertTrue(MapReadHTable.error == false); + assertFalse(MapReadHTable.error); + assertEquals(MapReadHTable.count, 1); String dropTableQuery = "DROP TABLE " + hbaseTableName ; CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery); assertEquals(0, responseThree.getResponseCode()); boolean isHbaseTableThere = hAdmin.tableExists(hbaseTableName); - assertTrue(isHbaseTableThere == false); + assertFalse(isHbaseTableThere); String dropDB = "DROP DATABASE " + databaseName; CommandProcessorResponse responseFour = hcatDriver.run(dropDB); @@ -192,7 +211,7 @@ public class TestHBaseInputStorageDriver boolean doesTableExist = hAdmin.tableExists(tableName); assertTrue(doesTableExist); - populateHBaseTable(tableName); + populateHBaseTable(tableName, 5); Configuration conf = new Configuration(hcatConf); conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, @@ -222,14 +241,15 @@ public class TestHBaseInputStorageDriver job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); assertTrue(job.waitForCompletion(true)); - assertTrue(MapReadHTable.error == false); + assertFalse(MapReadProjHTable.error); + assertEquals(MapReadProjHTable.count, 1); String dropTableQuery = "DROP TABLE " + tableName ; CommandProcessorResponse responseThree = hcatDriver.run(dropTableQuery); assertEquals(0, responseThree.getResponseCode()); boolean isHbaseTableThere = hAdmin.tableExists(tableName); - assertTrue(isHbaseTableThere == false); + assertFalse(isHbaseTableThere); } @@ -238,18 +258,20 @@ public class TestHBaseInputStorageDriver Mapper { static boolean error = false; - + static int count = 0; @Override public void map(ImmutableBytesWritable key, HCatRecord value, Context context) throws IOException, InterruptedException { + System.out.println("HCat record value" + value.toString()); boolean correctValues = (value.size() == 3) - && (value.get(0).toString()).startsWith("testRow") - && (value.get(1).toString()).startsWith("testQualifier1") - && (value.get(2).toString()).startsWith("testQualifier2"); + && (value.get(0).toString()).equalsIgnoreCase("testRow") + && (value.get(1).toString()).equalsIgnoreCase("textValue-5") + && (value.get(2).toString()).equalsIgnoreCase("textValue-5"); if (correctValues == false) { error = true; } + count++; } } @@ -258,17 +280,19 @@ public class TestHBaseInputStorageDriver Mapper { static boolean error = false; - + static int count = 0; @Override public void map(ImmutableBytesWritable key, HCatRecord value, Context context) throws IOException, InterruptedException { + System.out.println("HCat record value" + value.toString()); boolean correctValues = (value.size() == 2) - && (value.get(0).toString()).startsWith("testRow") - && (value.get(1).toString()).startsWith("testQualifier1"); + && (value.get(0).toString()).equalsIgnoreCase("testRow") + && (value.get(1).toString()).equalsIgnoreCase("textValue-5"); if (correctValues == false) { error = true; } + count++; } } Added: incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java?rev=1232664&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java (added) +++ incubator/hcatalog/branches/branch-0.3/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java Wed Jan 18 00:18:03 2012 @@ -0,0 +1,122 @@ +package org.apache.hcatalog.hbase; + +import static org.junit.Assert.assertEquals; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.hbase.snapshot.TableSnapshot; +import org.apache.hcatalog.mapreduce.InitializeInput; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.junit.Test; + +public class TestSnapshots extends SkeletonHBaseTest { + private static HiveConf hcatConf; + private static HCatDriver hcatDriver; + + public void Initialize() throws Exception { + hcatConf = getHiveConf(); + hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + URI fsuri = getFileSystem().getUri(); + Path whPath = new Path(fsuri.getScheme(), fsuri.getAuthority(), + getTestDir()); + hcatConf.set(HiveConf.ConfVars.HADOOPFS.varname, fsuri.toString()); + hcatConf.set(ConfVars.METASTOREWAREHOUSE.varname, whPath.toString()); + + //Add hbase properties + + for (Map.Entry el : getHbaseConf()) { + if (el.getKey().startsWith("hbase.")) { + hcatConf.set(el.getKey(), el.getValue()); + } + } + + SessionState.start(new CliSessionState(hcatConf)); + hcatDriver = new HCatDriver(); + + } + + @Test + public void TestSnapshotConversion() throws Exception{ + Initialize(); + String tableName = newTableName("mytableOne"); + String databaseName = newTableName("mydatabase"); + String fullyQualTableName = databaseName + "." + tableName; + String db_dir = getTestDir() + "/hbasedb"; + String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + + db_dir + "'"; + String tableQuery = "CREATE TABLE " + fullyQualTableName + + "(key string, value1 string, value2 string) STORED BY " + + "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" + + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf2:q2')" ; + + CommandProcessorResponse cmdResponse = hcatDriver.run(dbquery); + assertEquals(0, cmdResponse.getResponseCode()); + cmdResponse = hcatDriver.run(tableQuery); + assertEquals(0, cmdResponse.getResponseCode()); + + InputJobInfo inputInfo = InputJobInfo.create(databaseName, tableName, null, null, null); + Configuration conf = new Configuration(hcatConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(getHiveConf().getAllProperties())); + Job job = new Job(conf); + InitializeInput.setInput(job, inputInfo); + String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); + inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); + + Map revMap = new HashMap(); + revMap.put("cf1", 3L); + revMap.put("cf2", 5L); + TableSnapshot hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap); + HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo()); + + assertEquals(hcatSnapshot.getRevision("value1"), 3); + assertEquals(hcatSnapshot.getRevision("value2"), 5); + + String dropTable = "DROP TABLE " + fullyQualTableName; + cmdResponse = hcatDriver.run(dropTable); + assertEquals(0, cmdResponse.getResponseCode()); + + tableName = newTableName("mytableTwo"); + fullyQualTableName = databaseName + "." + tableName; + tableQuery = "CREATE TABLE " + fullyQualTableName + + "(key string, value1 string, value2 string) STORED BY " + + "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" + + "TBLPROPERTIES ('hbase.columns.mapping'=':key,cf1:q1,cf1:q2')" ; + cmdResponse = hcatDriver.run(tableQuery); + assertEquals(0, cmdResponse.getResponseCode()); + revMap.clear(); + revMap.put("cf1", 3L); + hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap); + inputInfo = InputJobInfo.create(databaseName, tableName, null, null, null); + InitializeInput.setInput(job, inputInfo); + modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); + inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); + hcatSnapshot = HBaseInputStorageDriver.convertSnapshot(hbaseSnapshot, inputInfo.getTableInfo()); + assertEquals(hcatSnapshot.getRevision("value1"), 3); + assertEquals(hcatSnapshot.getRevision("value2"), 3); + + dropTable = "DROP TABLE " + fullyQualTableName; + cmdResponse = hcatDriver.run(dropTable); + assertEquals(0, cmdResponse.getResponseCode()); + + String dropDatabase = "DROP DATABASE IF EXISTS " + databaseName + "CASCADE"; + cmdResponse = hcatDriver.run(dropDatabase); + assertEquals(0, cmdResponse.getResponseCode()); + } + +}