incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1234271 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Sat, 21 Jan 2012 05:11:35 GMT
Author: hashutosh
Date: Sat Jan 21 05:11:34 2012
New Revision: 1234271

URL: http://svn.apache.org/viewvc?rev=1234271&view=rev
Log:
HCATALOG-192: HBase output storage driver integration with zookeeper based revision manager
(toffer via hashutosh)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Jan 21 05:11:34 2012
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-192. HBase output storage driver integration with zookeeper based revision manager
(toffer via hashutosh) 
+
   HCAT-191. HBase input storage driver integration with zookeeper based revision manager.
(avandana via toffer)
 
   HCAT-193. Snapshot class for HCatalog tables. (avandana via toffer)

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Sat Jan 21 05:11:34 2012
@@ -30,11 +30,14 @@ import org.apache.hcatalog.common.HCatCo
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -62,7 +65,6 @@ abstract  class HBaseBaseOutputStorageDr
         hcatProperties = (Properties)hcatProperties.clone();
         super.initialize(context, hcatProperties);
 
-
         String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
         if( jobString == null ) {
             throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput()
not called?");
@@ -75,16 +77,9 @@ abstract  class HBaseBaseOutputStorageDr
         outputJobInfo.getProperties().putAll(hcatProperties);
         hcatProperties = outputJobInfo.getProperties();
 
-
-        String revision = outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY);
-        if(revision == null) {
-            outputJobInfo.getProperties()
-                         .setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
-                                            Long.toString(System.currentTimeMillis()));
-        }
-
         tableInfo = outputJobInfo.getTableInfo();
         schema = tableInfo.getDataColumns();
+        String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
 
         List<FieldSchema> fields = HCatUtil.getFieldSchemaList(outputSchema.getFields());
         hcatProperties.setProperty(Constants.LIST_COLUMNS,
@@ -92,11 +87,36 @@ abstract  class HBaseBaseOutputStorageDr
         hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
                 MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
 
-        //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize
being called
-        converter = new HBaseSerDeResultConverter(schema,
-                                                  outputSchema,
-                                                  hcatProperties);
-        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo));
+        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+
+        String txnString = outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+        if(txnString == null) {
+            //outputSchema should be set by HCatOutputFormat calling setSchema, prior to
initialize being called
+            //TODO reconcile output_revision passing to HBaseSerDeResultConverter
+            //on the first call to this method hcatProperties will not contain an OUTPUT_VERSION
but that doesn't
+            //matter since we won't use any facilities that require that property set during
that run
+            converter = new HBaseSerDeResultConverter(schema,
+                                                                                outputSchema,
+                                                                                hcatProperties);
+            RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(context.getConfiguration());
+            Transaction txn = null;
+            try {
+                txn = rm.beginWriteTransaction(qualifiedTableName,
+                                               Arrays.asList(converter.getHBaseScanColumns().split("
")));
+            } finally {
+                rm.close();
+            }
+            outputJobInfo.getProperties()
+                         .setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                      HCatUtil.serialize(txn));
+        }
+        else {
+            Transaction txn = (Transaction)HCatUtil.deserialize(txnString);
+            converter = new HBaseSerDeResultConverter(schema,
+                                                                                outputSchema,
+                                                                                hcatProperties,
+                                                                                txn.getRevisionNumber());
+        }
     }
 
     @Override

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
Sat Jan 21 05:11:34 2012
@@ -1,5 +1,7 @@
 package org.apache.hcatalog.hbase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -17,6 +19,12 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
 import java.io.IOException;
 
@@ -28,6 +36,7 @@ import java.io.IOException;
 class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
     private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new
byte[0]);
     private SequenceFileOutputFormat<WritableComparable<?>,Put> baseOutputFormat;
+    private final static Log LOG = LogFactory.getLog(HBaseBulkOutputFormat.class);
 
     public HBaseBulkOutputFormat() {
         baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Put>();
@@ -110,35 +119,36 @@ class HBaseBulkOutputFormat extends Outp
 
         @Override
         public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException
{
+            RevisionManager rm = null;
             try {
                 baseOutputCommitter.abortJob(jobContext,state);
+                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
             } finally {
                 cleanIntermediate(jobContext);
-            }
-        }
-
-        @Override
-        public void cleanupJob(JobContext context) throws IOException {
-            try {
-                baseOutputCommitter.cleanupJob(context);
-            } finally {
-                cleanIntermediate(context);
+                if(rm != null)
+                    rm.close();
             }
         }
 
         @Override
         public void commitJob(JobContext jobContext) throws IOException {
+            RevisionManager rm = null;
             try {
                 baseOutputCommitter.commitJob(jobContext);
                 Configuration conf = jobContext.getConfiguration();
                 Path srcPath = FileOutputFormat.getOutputPath(jobContext);
                 Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
                 ImportSequenceFile.runJob(jobContext,
-                                                        conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
-                                                        srcPath,
-                                                        destPath);
-            } finally {
+                                          conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+                                          srcPath,
+                                          destPath);
+                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
                 cleanIntermediate(jobContext);
+            } finally {
+                if(rm != null)
+                    rm.close();
             }
         }
 

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
Sat Jan 21 05:11:34 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 
 import java.io.IOException;
 import java.util.List;
@@ -55,10 +56,10 @@ public class HBaseBulkOutputStorageDrive
         //initialize() gets called multiple time in the lifecycle of an MR job, client, mapper,
reducer, etc
         //depending on the case we have to make sure for some context variables we set here
that they don't get set again
         if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
+            Transaction txn = (Transaction)
+                    HCatUtil.deserialize(outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
             String tableLocation = context.getConfiguration().get(PROPERTY_TABLE_LOCATION);
-            String location = new  Path(tableLocation,
-                                                    "REVISION_"+outputJobInfo.getProperties()
-                                                                                        
      .getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
+            String location = new  Path(tableLocation, "REVISION_"+txn.getRevisionNumber()).toString();
             outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
             //We are writing out an intermediate sequenceFile hence location is not passed
in OutputJobInfo.getLocation()
             //TODO replace this with a mapreduce constant when available

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
Sat Jan 21 05:11:34 2012
@@ -26,8 +26,8 @@ import org.apache.hcatalog.common.HCatCo
  */
 class HBaseConstants {
 
-    /** key used to define th version number HBaseOutputStorage driver to use when writing
out data for a job */
-    public static final String PROPERTY_OUTPUT_VERSION_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion";
+    /** key used to store write transaction object */
+    public static final String PROPERTY_WRITE_TXN_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.writeTxn";
 
     /** key used to define the name of the table to write to */
     public static final String PROPERTY_OUTPUT_TABLE_NAME_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName";

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
Sat Jan 21 05:11:34 2012
@@ -24,10 +24,12 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 
 
 import java.io.IOException;
@@ -57,7 +59,7 @@ class HBaseDirectOutputFormat extends Ou
 
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
-        return outputFormat.getOutputCommitter(context);
+        return new HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context));
     }
 
     @Override
@@ -72,4 +74,63 @@ class HBaseDirectOutputFormat extends Ou
     public Configuration getConf() {
         return outputFormat.getConf();
     }
+
+    private static class HBaseDirectOutputCommitter extends OutputCommitter {
+        private OutputCommitter baseOutputCommitter;
+
+        public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException
{
+            this.baseOutputCommitter = baseOutputCommitter;
+        }
+
+        @Override
+        public void abortTask(TaskAttemptContext context) throws IOException {
+            baseOutputCommitter.abortTask(context);
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext context) throws IOException {
+            baseOutputCommitter.commitTask(context);
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+            return baseOutputCommitter.needsTaskCommit(context);
+        }
+
+        @Override
+        public void setupJob(JobContext context) throws IOException {
+            baseOutputCommitter.setupJob(context);
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext context) throws IOException {
+            baseOutputCommitter.setupTask(context);
+        }
+
+        @Override
+        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException
{
+            RevisionManager rm = null;
+            try {
+                baseOutputCommitter.abortJob(jobContext, state);
+                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+            } finally {
+                if(rm != null)
+                    rm.close();
+            }
+        }
+
+        @Override
+        public void commitJob(JobContext jobContext) throws IOException {
+            RevisionManager rm = null;
+            try {
+                baseOutputCommitter.commitJob(jobContext);
+                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+            } finally {
+                if(rm != null)
+                    rm.close();
+            }
+        }
+    }
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
Sat Jan 21 05:11:34 2012
@@ -55,12 +55,14 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
 import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.apache.hcatalog.storagehandler.HCatStorageHandler;
 import org.apache.thrift.TBase;
 import org.apache.zookeeper.ZooKeeper;
@@ -572,5 +574,26 @@ public class HBaseHCatStorageHandler ext
                 HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
     }
 
+    static Transaction getWriteTransaction(Configuration conf) throws IOException {
+        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+        return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
+                                                               .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+    }
+
+    static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException
{
+        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+        outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
HCatUtil.serialize(txn));
+        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+    }
+
+    /**
+     * Get the Revision number that will be assigned to this job's output data
+     * @param conf configuration of the job
+     * @return the revision number used
+     * @throws IOException
+     */
+    public static long getOutputRevision(Configuration conf) throws IOException {
+        return getWriteTransaction(conf).getRevisionNumber();
+    }
 
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
Sat Jan 21 05:11:34 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
@@ -58,6 +59,7 @@ import java.util.Properties;
  * {@link HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY}
  */
 class HBaseSerDeResultConverter implements  ResultConverter {
+
     private HBaseSerDe serDe;
     private HCatSchema schema;
     private HCatSchema outputSchema;
@@ -75,14 +77,25 @@ class HBaseSerDeResultConverter implemen
     HBaseSerDeResultConverter(HCatSchema schema,
                                      HCatSchema outputSchema,
                                      Properties hcatProperties) throws IOException {
+        this(schema,outputSchema,hcatProperties,null);
+    }
+
+    /**
+     * @param schema table schema
+     * @param outputSchema schema of projected output
+     * @param hcatProperties table properties
+     * @param outputVersion value to write in timestamp field
+     * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized
+     */
+    HBaseSerDeResultConverter(HCatSchema schema,
+                                     HCatSchema outputSchema,
+                                     Properties hcatProperties,
+                                     Long outputVersion) throws IOException {
 
         hbaseColumnMapping =  hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
         hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,hbaseColumnMapping);
 
-        if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY))
-            outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY));
-        else
-            outputVersion = null;
+        this.outputVersion = outputVersion;
 
         this.schema = schema;
         if(outputSchema == null) {

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Sat Jan 21 05:11:34 2012
@@ -30,12 +30,16 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -145,6 +149,19 @@ public class TestHBaseBulkOutputStorageD
         job.setOutputFormatClass(HBaseBulkOutputFormat.class);
         SequenceFileOutputFormat.setOutputPath(job,interPath);
 
+        //manually create transaction
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null,
null, null);
+            Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+            outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                                      HCatUtil.serialize(txn));
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                       HCatUtil.serialize(outputJobInfo));
+        } finally {
+            rm.close();
+        }
+
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
 
@@ -323,113 +340,16 @@ public class TestHBaseBulkOutputStorageD
 
         job.setNumReduceTasks(0);
 
-        long ubTimestamp = System.currentTimeMillis();
-        long lbTimestamp = ubTimestamp;
-
         assertTrue(job.waitForCompletion(true));
-
-        ubTimestamp = System.currentTimeMillis();
-
-        //verify
-        HTable table = new HTable(conf, databaseName+"."+tableName);
-        Scan scan = new Scan();
-        scan.addFamily(familyNameBytes);
-        ResultScanner scanner = table.getScanner(scan);
-        long prevTimestamp = -1;
-        int index=0;
-        for(Result result: scanner) {
-            String vals[] = data[index].toString().split(",");
-            for(int i=1;i<vals.length;i++) {
-                String pair[] = vals[i].split(":");
-                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
-                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
-
-                //verify revision
-                long timestamp = result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
-                if(prevTimestamp == -1) {
-                    prevTimestamp = timestamp;
-                }
-                else {
-                    assertEquals(prevTimestamp+"="+timestamp,
-                                       prevTimestamp,
-                                       timestamp);
-                }
-                assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
-                                 timestamp >= lbTimestamp && timestamp <= ubTimestamp);
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+            for(String el: snapshot.getColumnFamilies()) {
+                assertEquals(1,snapshot.getRevision(el));
             }
-            index++;
+        } finally {
+            rm.close();
         }
-        //test if load count is the same
-        assertEquals(data.length,index);
-    }
-
-    @Test
-    public void hbaseBulkOutputStorageDriverTestWithRevision() throws Exception {
-        String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
-        Path methodTestDir = new Path(getTestDir(),testName);
-        LOG.info("starting: "+testName);
-
-        String databaseName = testName.toLowerCase();
-        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
-        String tableName = newTableName(testName).toLowerCase();
-        byte[] tableNameBytes = Bytes.toBytes(tableName);
-        String familyName = "my_family";
-        byte[] familyNameBytes = Bytes.toBytes(familyName);
-
-
-        //include hbase config in conf file
-        Configuration conf = new Configuration(allConf);
-        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
-
-        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
-        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
-                              "(key int, english string, spanish string) STORED BY " +
-                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
-                              "TBLPROPERTIES ('hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
-
-        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
-        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
-        String data[] = {"1,english:ONE,spanish:UNO",
-                               "2,english:ONE,spanish:DOS",
-                               "3,english:ONE,spanish:TRES"};
-
-        // input/output settings
-        Path inputPath = new Path(methodTestDir,"mr_input");
-        getFileSystem().mkdirs(inputPath);
-        //create multiple files so we can test with multiple mappers
-        for(int i=0;i<data.length;i++) {
-            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
-            os.write(Bytes.toBytes(data[i] + "\n"));
-            os.close();
-        }
-
-        //create job
-        Job job = new Job(conf,testName);
-        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
-        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
-        job.setJarByClass(this.getClass());
-        job.setMapperClass(MapHCatWrite.class);
-
-        job.setInputFormatClass(TextInputFormat.class);
-        TextInputFormat.setInputPaths(job, inputPath);
-
-
-        job.setOutputFormatClass(HCatOutputFormat.class);
-        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
-        outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
-        HCatOutputFormat.setOutput(job,outputJobInfo);
-
-        job.setMapOutputKeyClass(BytesWritable.class);
-        job.setMapOutputValueClass(HCatRecord.class);
-
-        job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(HCatRecord.class);
-
-        job.setNumReduceTasks(0);
-
-        assertTrue(job.waitForCompletion(true));
 
         //verify
         HTable table = new HTable(conf, databaseName+"."+tableName);

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Sat Jan 21 05:11:34 2012
@@ -39,11 +39,15 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -117,6 +121,19 @@ public class TestHBaseDirectOutputStorag
         job.setOutputFormatClass(HBaseDirectOutputFormat.class);
         job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
 
+        //manually create transaction
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null,
null, null);
+            Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+            outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                                      HCatUtil.serialize(txn));
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                       HCatUtil.serialize(outputJobInfo));
+        } finally {
+            rm.close();
+        }
+
         job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
 
@@ -149,7 +166,7 @@ public class TestHBaseDirectOutputStorag
         String testName = "directOutputStorageDriverTest";
         Path methodTestDir = new Path(getTestDir(),testName);
 
-        String databaseName = "default";
+        String databaseName = testName.toLowerCase();
         String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
         String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -198,116 +215,29 @@ public class TestHBaseDirectOutputStorag
 
         job.setOutputFormatClass(HCatOutputFormat.class);
         OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
-        long lbTimestamp = System.currentTimeMillis();
         HCatOutputFormat.setOutput(job,outputJobInfo);
 
         job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
 
         job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
-
         assertTrue(job.waitForCompletion(true));
-        long ubTimestamp = System.currentTimeMillis();
 
-        //verify
-        HTable table = new HTable(conf, tableName);
-        Scan scan = new Scan();
-        scan.addFamily(familyNameBytes);
-        ResultScanner scanner = table.getScanner(scan);
-        int index = 0;
-        long prevTimestamp = -1;
-        for(Result result: scanner) {
-            String vals[] = data[index].toString().split(",");
-            for(int i=1;i<vals.length;i++) {
-                String pair[] = vals[i].split(":");
-                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
-                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
-                long timestamp = result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
-                if(prevTimestamp < 1)
-                    prevTimestamp = timestamp;
-                else
-                    assertEquals(prevTimestamp+"="+timestamp,
-                                       prevTimestamp,
-                                       timestamp);
-                assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
-                           timestamp >= lbTimestamp && timestamp <= ubTimestamp);
+        RevisionManager rm = HBaseHCatStorageHandler.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+            for(String el: snapshot.getColumnFamilies()) {
+                assertEquals(1,snapshot.getRevision(el));
             }
-            index++;
+        } finally {
+            rm.close();
         }
-        assertEquals(data.length,index);
-    }
-
-    @Test
-    public void directOutputStorageDriverTestWithRevision() throws Exception {
-        String testName = "directOutputStorageDriverTestWithRevision";
-        Path methodTestDir = new Path(getTestDir(),testName);
-
-        String databaseName = "default";
-        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
-        String tableName = newTableName(testName).toLowerCase();
-        byte[] tableNameBytes = Bytes.toBytes(tableName);
-        String familyName = "my_family";
-        byte[] familyNameBytes = Bytes.toBytes(familyName);
-
-
-        //include hbase config in conf file
-        Configuration conf = new Configuration(allConf);
-        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
-
-        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
-        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
-                              "(key int, english string, spanish string) STORED BY " +
-                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
-                              "TBLPROPERTIES ('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
-                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
-
-        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
-        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
-        String data[] = {"1,english:ONE,spanish:UNO",
-                               "2,english:ONE,spanish:DOS",
-                               "3,english:ONE,spanish:TRES"};
-
-        // input/output settings
-        Path inputPath = new Path(methodTestDir,"mr_input");
-        getFileSystem().mkdirs(inputPath);
-        //create multiple files so we can test with multiple mappers
-        for(int i=0;i<data.length;i++) {
-            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
-            os.write(Bytes.toBytes(data[i] + "\n"));
-            os.close();
-        }
-
-        //create job
-        Job job = new Job(conf, testName);
-        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
-        job.setJarByClass(this.getClass());
-        job.setMapperClass(MapHCatWrite.class);
-
-        job.setInputFormatClass(TextInputFormat.class);
-        TextInputFormat.setInputPaths(job, inputPath);
-
-
-        job.setOutputFormatClass(HCatOutputFormat.class);
-        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
-        outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
-        HCatOutputFormat.setOutput(job,outputJobInfo);
-
-        job.setMapOutputKeyClass(BytesWritable.class);
-        job.setMapOutputValueClass(HCatRecord.class);
-
-        job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(HCatRecord.class);
-
-        job.setNumReduceTasks(0);
-        assertTrue(job.waitForCompletion(true));
 
         //verify
-        HTable table = new HTable(conf, tableName);
+        HTable table = new HTable(conf, databaseName+"."+tableName);
         Scan scan = new Scan();
         scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java?rev=1234271&r1=1234270&r2=1234271&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
Sat Jan 21 05:11:34 2012
@@ -55,7 +55,6 @@ public class TestHBaseSerDeResultConvert
         tbl.setProperty(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING,
                 ":key,my_family:my_qualifier1,my_family:my_qualifier2,my_family2:");
         tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
-        tbl.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,"1");
         return tbl;
     }
 
@@ -74,8 +73,9 @@ public class TestHBaseSerDeResultConvert
     @Test
     public void testDeserialize() throws IOException {
         HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(createHCatSchema(),
-                null,
-                createProperties());
+                                                                                        
                           null,
+                                                                                        
                           createProperties(),
+                                                                                        
                           1l);
         //test integer
         Result result = new Result(new KeyValue[]{new KeyValue(Bytes.toBytes("row"),
                 Bytes.toBytes("my_family"),
@@ -127,8 +127,9 @@ public class TestHBaseSerDeResultConvert
     public void testSerialize() throws IOException {
         HCatSchema schema = createHCatSchema();
         HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(schema,
-                null,
-                createProperties());
+                                                                                        
                           null,
+                                                                                        
                           createProperties(),
+                                                                                        
                           1l);
         HCatRecord in = new DefaultHCatRecord(4);
         //row key
         in.set(0,"row");



Mime
View raw message