From hcatalog-commits-return-760-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Tue Mar 20 16:20:26 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82167982B for ; Tue, 20 Mar 2012 16:20:26 +0000 (UTC) Received: (qmail 99178 invoked by uid 500); 20 Mar 2012 16:20:26 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 99138 invoked by uid 500); 20 Mar 2012 16:20:25 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 99125 invoked by uid 99); 20 Mar 2012 16:20:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2012 16:20:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2012 16:20:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9EA8623889E1; Tue, 20 Mar 2012 16:19:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1302971 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/pig/ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ Date: Tue, 20 Mar 2012 16:19:57 -0000 To: hcatalog-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120320161957.9EA8623889E1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Tue Mar 20 16:19:56 2012 New Revision: 1302971 URL: http://svn.apache.org/viewvc?rev=1302971&view=rev Log: HCATALOG-302 unable to write to hbase channel. HBaseHCatStorageHandler class not found Modified: incubator/hcatalog/trunk/CHANGES.txt incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java Modified: incubator/hcatalog/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/CHANGES.txt (original) +++ incubator/hcatalog/trunk/CHANGES.txt Tue Mar 20 16:19:56 2012 @@ -30,6 +30,7 @@ Trunk (unreleased changes) OPTIMIZATIONS BUG FIXES + HCAT-302 unable to write to hbase channel. HBaseHCatStorageHandler class not found (rohini via gates) Release 0.4.0 - Unreleased Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Mar 20 16:19:56 2012 @@ -70,27 +70,30 @@ public class HCatStorer extends HCatBase @Override public void setStoreLocation(String location, Job job) throws IOException { - job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); String[] userStr = location.split("\\."); OutputJobInfo outputJobInfo; - if(userStr.length == 2) { - outputJobInfo = OutputJobInfo.create(userStr[0], + String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if (outInfoString != null) { + outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString); + } else { + if(userStr.length == 2) { + outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions); - } else if(userStr.length == 1) { - outputJobInfo = OutputJobInfo.create(null, + } else if(userStr.length == 1) { + outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions); - } else { - throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE); + } else { + throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE); + } } - Configuration config = job.getConfiguration(); if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){ @@ -123,6 +126,7 @@ public class HCatStorer extends HCatBase PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO); p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Tue Mar 20 16:19:56 2012 @@ -61,9 +61,6 @@ class HBaseBulkOutputFormat extends HBas @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Put.class); - job.setOutputCommitter(HBaseBulkOutputCommitter.class); baseOutputFormat.checkOutputSpecs(ignored, job); HBaseUtil.addHBaseDelegationToken(job); addJTDelegationToken(job); @@ -73,6 +70,8 @@ class HBaseBulkOutputFormat extends HBas public RecordWriter, Put> getRecordWriter( FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); long version = HBaseRevisionManagerUtil.getOutputRevision(job); return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter( ignored, job, name, progress), version); @@ -188,11 +187,21 @@ class HBaseBulkOutputFormat extends HBas try { Configuration conf = jobContext.getConfiguration(); Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf()); + if (!FileSystem.get(conf).exists(srcPath)) { + throw new IOException("Failed to bulk import hfiles. " + + "Intermediate data directory is cleaned up or missing. " + + "Please look at the bulk import job if it exists for failure reason"); + } Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles"); - ImportSequenceFile.runJob(jobContext, + boolean success = ImportSequenceFile.runJob(jobContext, conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY), srcPath, destPath); + if(!success) { + cleanIntermediate(jobContext); + throw new IOException("Failed to bulk import hfiles." + + " Please look at the bulk import job for failure reason"); + } rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf); rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf)); cleanIntermediate(jobContext); Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Tue Mar 20 16:19:56 2012 @@ -63,9 +63,6 @@ class HBaseDirectOutputFormat extends HB @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - job.setOutputCommitter(HBaseDirectOutputCommitter.class); - job.setIfUnset(TableOutputFormat.OUTPUT_TABLE, - job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)); outputFormat.checkOutputSpecs(ignored, job); HBaseUtil.addHBaseDelegationToken(job); } Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Tue Mar 20 16:19:56 2012 @@ -19,7 +19,6 @@ package org.apache.hcatalog.hbase; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.MasterNot import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -48,14 +49,15 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; +import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter; import org.apache.hcatalog.hbase.snapshot.RevisionManager; import org.apache.hcatalog.hbase.snapshot.Transaction; import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager; @@ -68,20 +70,20 @@ import org.apache.thrift.TBase; import org.apache.zookeeper.ZooKeeper; import com.facebook.fb303.FacebookBase; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class HBaseHCatStorageHandler provides functionality to create HBase * tables through HCatalog. The implementation is very similar to the * HiveHBaseStorageHandler, with more details to suit HCatalog. */ -//TODO remove serializable when HCATALOG-282 is fixed -public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable { +public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable { public final static String DEFAULT_PREFIX = "default."; private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation"; - private transient Configuration hbaseConf; - private transient HBaseAdmin admin; + private Configuration hbaseConf; + private HBaseAdmin admin; @Override public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { @@ -96,20 +98,32 @@ public class HBaseHCatStorageHandler ext jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName); Configuration jobConf = getConf(); + addHbaseResources(jobConf, jobProperties); + Configuration copyOfConf = new Configuration(jobConf); + HBaseConfiguration.addHbaseResources(copyOfConf); + //Getting hbase delegation token in getInputSplits does not work with PIG. So need to + //do it here + if (jobConf instanceof JobConf) { + HBaseUtil.addHBaseDelegationToken((JobConf)jobConf); + } + String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema)); String serSnapshot = (String) inputJobInfo.getProperties().get( HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); if (serSnapshot == null) { - Configuration conf = addHbaseResources(jobConf); - HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf, + HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(copyOfConf, qualifiedTableName, tableInfo); jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, HCatUtil.serialize(snapshot)); } - addHbaseResources(jobConf, jobProperties); + //This adds it directly to the jobConf. Setting in jobProperties does not get propagated + //to JobConf as of now as the jobProperties is maintained per partition + //TODO: Remove when HCAT-308 is fixed + addOutputDependencyJars(jobConf); + jobProperties.put("tmpjars", jobConf.get("tmpjars")); } catch (IOException e) { throw new IllegalStateException("Error while configuring job properties", e); @@ -128,33 +142,50 @@ public class HBaseHCatStorageHandler ext HCatTableInfo tableInfo = outputJobInfo.getTableInfo(); String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo); jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName); + jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName); Configuration jobConf = getConf(); + addHbaseResources(jobConf, jobProperties); + + Configuration copyOfConf = new Configuration(jobConf); + HBaseConfiguration.addHbaseResources(copyOfConf); + String txnString = outputJobInfo.getProperties().getProperty( HBaseConstants.PROPERTY_WRITE_TXN_KEY); - if (txnString == null) { - Configuration conf = addHbaseResources(jobConf); - Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf); + String jobTxnString = jobConf.get(HBaseConstants.PROPERTY_WRITE_TXN_KEY); + //Pig makes 3 calls to HCatOutputFormat.setOutput(HCatStorer) with different JobConf + //which leads to creating 2 transactions. + //So apart from fixing HCatStorer to pass same OutputJobInfo, making the call idempotent for other + //cases which might call multiple times but with same JobConf. + Transaction txn = null; + if (txnString == null && jobTxnString == null) { + txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, copyOfConf); + String serializedTxn = HCatUtil.serialize(txn); outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, - HCatUtil.serialize(txn)); - - if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties() - .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) { - String tableLocation = tableInfo.getTableLocation(); - String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber()) - .toString(); - outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, - location); - // We are writing out an intermediate sequenceFile hence - // location is not passed in OutputJobInfo.getLocation() - // TODO replace this with a mapreduce constant when available - jobProperties.put("mapred.output.dir", location); - } + serializedTxn); + jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, serializedTxn); + } else { + txnString = (txnString == null) ? jobTxnString : txnString; + txn = (Transaction) HCatUtil.deserialize(txnString); + outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, + txnString); + jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, txnString); + } + if (isBulkMode(outputJobInfo)) { + String tableLocation = tableInfo.getTableLocation(); + String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber()) + .toString(); + outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location); + // We are writing out an intermediate sequenceFile hence + // location is not passed in OutputJobInfo.getLocation() + // TODO replace this with a mapreduce constant when available + jobProperties.put("mapred.output.dir", location); + jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName()); + } else { + jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName()); } - jobProperties - .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); - addHbaseResources(jobConf, jobProperties); + jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); addOutputDependencyJars(jobConf); jobProperties.put("tmpjars", jobConf.get("tmpjars")); @@ -429,7 +460,10 @@ public class HBaseHCatStorageHandler ext @Override public void setConf(Configuration conf) { - hbaseConf = HBaseConfiguration.create(conf); + //Not cloning as we want to set tmpjars on it. Putting in jobProperties does not + //get propagated to JobConf in case of InputFormat as they are maintained per partition. + //Also we need to add hbase delegation token to the Credentials. + hbaseConf = conf; } private void checkDeleteTable(Table table) throws MetaException { @@ -479,8 +513,6 @@ public class HBaseHCatStorageHandler ext */ private void addOutputDependencyJars(Configuration conf) throws IOException { TableMapReduceUtil.addDependencyJars(conf, - //hadoop-core - Writable.class, //ZK ZooKeeper.class, //HBase @@ -489,6 +521,8 @@ public class HBaseHCatStorageHandler ext HiveException.class, //HCatalog jar HCatOutputFormat.class, + //hcat hbase storage handler jar + HBaseHCatStorageHandler.class, //hive hbase storage handler jar HBaseSerDe.class, //hive jar @@ -498,18 +532,9 @@ public class HBaseHCatStorageHandler ext //hbase jar Bytes.class, //thrift-fb303 .jar - FacebookBase.class); - } - - /** - * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added - * @param jobConf existing configuration - * @return a new Configuration with hbase-default.xml and hbase-site.xml added - */ - private Configuration addHbaseResources(Configuration jobConf) { - Configuration conf = new Configuration(jobConf); - HBaseConfiguration.addHbaseResources(conf); - return conf; + FacebookBase.class, + //guava jar + ThreadFactoryBuilder.class); } /** Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Tue Mar 20 16:19:56 2012 @@ -105,7 +105,6 @@ class HBaseInputFormat implements InputF public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { inputFormat.setConf(job); - HBaseUtil.addHBaseDelegationToken(job); return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null, Reporter.NULL))); } Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Tue Mar 20 16:19:56 2012 @@ -80,19 +80,16 @@ class HbaseSnapshotRecordReader implemen public void restart(byte[] firstRow) throws IOException { allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan); - long maxValidRevision = snapshot.getLatestRevision(); + long maxValidRevision = getMaximumRevision(scan, snapshot); while (allAbortedTransactions.contains(maxValidRevision)) { maxValidRevision--; } - long minValidRevision = getMinimumRevision(scan, snapshot); - while (allAbortedTransactions.contains(minValidRevision)) { - minValidRevision--; - } Scan newScan = new Scan(scan); newScan.setStartRow(firstRow); //TODO: See if filters in 0.92 can be used to optimize the scan //TODO: Consider create a custom snapshot filter - newScan.setTimeRange(minValidRevision, maxValidRevision + 1); + //TODO: Make min revision a constant in RM + newScan.setTimeRange(0, maxValidRevision + 1); newScan.setMaxVersions(); this.scanner = this.htable.getScanner(newScan); resultItr = this.scanner.iterator(); @@ -120,16 +117,16 @@ class HbaseSnapshotRecordReader implemen } } - private long getMinimumRevision(Scan scan, TableSnapshot snapshot) { - long minRevision = snapshot.getLatestRevision(); + private long getMaximumRevision(Scan scan, TableSnapshot snapshot) { + long maxRevision = 0; byte[][] families = scan.getFamilies(); for (byte[] familyKey : families) { String family = Bytes.toString(familyKey); long revision = snapshot.getRevision(family); - if (revision < minRevision) - minRevision = revision; + if (revision > maxRevision) + maxRevision = revision; } - return minRevision; + return maxRevision; } /* Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Tue Mar 20 16:19:56 2012 @@ -54,6 +54,7 @@ import org.apache.hcatalog.common.HCatUt import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter; import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction; import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction; import org.apache.hcatalog.hbase.snapshot.FamilyRevision; @@ -203,6 +204,7 @@ public class TestHBaseBulkOutputFormat e job.setOutputFormat(HBaseBulkOutputFormat.class); org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath); + job.setOutputCommitter(HBaseBulkOutputCommitter.class); //manually create transaction RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf); Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Tue Mar 20 16:19:56 2012 @@ -138,6 +138,7 @@ public class TestHBaseDirectOutputFormat org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath); job.setOutputFormat(HBaseDirectOutputFormat.class); + job.set(TableOutputFormat.OUTPUT_TABLE, tableName); job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName); //manually create transaction @@ -378,7 +379,17 @@ public class TestHBaseDirectOutputFormat TextInputFormat.setInputPaths(job, inputPath); job.setOutputFormatClass(HCatOutputFormat.class); HCatOutputFormat.setOutput(job, outputJobInfo); - + String txnString = job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY); + //Test passing in same jobConf or same OutputJobInfo multiple times and verify 1 transaction is created + //Same jobConf + HCatOutputFormat.setOutput(job, outputJobInfo); + assertEquals(txnString, job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY)); + String jobString = job.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + //Same OutputJobInfo + outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString); + Job job2 = new Job(conf); + HCatOutputFormat.setOutput(job2, outputJobInfo); + assertEquals(txnString, job2.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY)); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(HCatRecord.class); job.setOutputKeyClass(BytesWritable.class); Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java?rev=1302971&r1=1302970&r2=1302971&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java Tue Mar 20 16:19:56 2012 @@ -449,11 +449,12 @@ public class TestHBaseInputFormat extend assertTrue(doesTableExist); populateHBaseTable(tableName, 2); - populateHBaseTableQualifier1(tableName, 3, null); //Running transaction - populateHBaseTableQualifier1(tableName, 4, Boolean.FALSE); //Aborted transaction - populateHBaseTableQualifier1(tableName, 5, Boolean.TRUE); //Committed transaction - populateHBaseTableQualifier1(tableName, 6, null); //Running Transaction - populateHBaseTableQualifier1(tableName, 7, Boolean.FALSE); //Aborted Transaction + populateHBaseTableQualifier1(tableName, 3, Boolean.TRUE); //Committed transaction + populateHBaseTableQualifier1(tableName, 4, null); //Running transaction + populateHBaseTableQualifier1(tableName, 5, Boolean.FALSE); //Aborted transaction + populateHBaseTableQualifier1(tableName, 6, Boolean.TRUE); //Committed transaction + populateHBaseTableQualifier1(tableName, 7, null); //Running Transaction + populateHBaseTableQualifier1(tableName, 8, Boolean.FALSE); //Aborted Transaction Configuration conf = new Configuration(hcatConf); conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, @@ -588,7 +589,7 @@ public class TestHBaseInputFormat extend System.out.println("HCat record value" + value.toString()); boolean correctValues = (value.size() == 3) && (value.get(0).toString()).equalsIgnoreCase("testRow") - && (value.get(1).toString()).equalsIgnoreCase("textValue-2") + && (value.get(1).toString()).equalsIgnoreCase("textValue-3") && (value.get(2).toString()).equalsIgnoreCase("textValue-2"); if (correctValues == false) {