incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1239809 - in /incubator/hcatalog/branches/branch-0.3: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/
Date Thu, 02 Feb 2012 20:29:37 GMT
Author: gates
Date: Thu Feb  2 20:29:37 2012
New Revision: 1239809

URL: http://svn.apache.org/viewvc?rev=1239809&view=rev
Log:
HCATALOG-207 Changes to current HCat subsystem to allow it to work with hive

Modified:
    incubator/hcatalog/branches/branch-0.3/CHANGES.txt
    incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatUtil.java
    incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
    incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
    incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
    incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/pig/PigHCatUtil.java

Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/CHANGES.txt?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.3/CHANGES.txt Thu Feb  2 20:29:37 2012
@@ -23,6 +23,8 @@ Release 0.3.0 (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-207. Changes to current HCat subsystem to allow it to work with hive (khorgath via
gates)
+
   HCAT-204. HCatRecord SerDe (khorgath via gates)
 
   HCAT-192. HBase output storage driver integration with zookeeper based revision manager
(toffer via hashutosh)

Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatConstants.java
(original)
+++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatConstants.java
Thu Feb  2 20:29:37 2012
@@ -75,6 +75,15 @@ public final class HCatConstants {
   public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE +
".jobclient.token.sig";
   public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform";
 
+  public static final String[] OUTPUT_CONFS_TO_SAVE = {
+    HCAT_KEY_OUTPUT_INFO,
+    HCAT_KEY_HIVE_CONF,
+    HCAT_KEY_TOKEN_SIGNATURE,
+    HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE,
+    HCAT_KEY_JOBCLIENT_TOKEN_STRFORM
+    };
+
+
   public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
   public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration";
   

Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatUtil.java
(original)
+++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/common/HCatUtil.java
Thu Feb  2 20:29:37 2012
@@ -26,6 +26,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -55,6 +57,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
@@ -418,12 +421,17 @@ public class HCatUtil {
 
     public static void logEntrySet(Log logger, String itemName,
             Set<? extends Entry> entrySet) {
-        logger.info(itemName + ":");
-        for (Entry e : entrySet) {
-            logger.info("\t[" + e.getKey() + "]=>[" + e.getValue() + "]");
-        }
+        logIterableSet(logger,itemName,entrySet.iterator());
     }
 
+    public static void logIterableSet(Log logger, String itemName, Iterator<? extends
Entry> iterator){
+      logger.info(itemName + ":");
+      while (iterator.hasNext()){
+        Entry e = iterator.next();
+        logger.debug("\t[" + e.getKey() + "]=>[" + e.getValue() + "]");
+      }
+    }
+    
     public static void logAllTokens(Log logger, JobContext context)
             throws IOException {
         for (Token<? extends TokenIdentifier> t : context.getCredentials()
@@ -459,4 +467,15 @@ public class HCatUtil {
         }
     }
 
+    public static Pair<String,String> getDbAndTableName(String tableName) throws IOException{
+      String[] dbTableNametokens = tableName.split("\\.");
+      if(dbTableNametokens.length == 1) {
+        return new Pair<String,String>(MetaStoreUtils.DEFAULT_DATABASE_NAME,tableName);
+      }else if (dbTableNametokens.length == 2) {
+        return new Pair<String, String>(dbTableNametokens[0], dbTableNametokens[1]);
+      }else{
+        throw new IOException("tableName expected in the form "
+            +"<databasename>.<table name> or <table name>. Got " + tableName);
+      }
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
(original)
+++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Thu Feb  2 20:29:37 2012
@@ -19,17 +19,24 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 
 /** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on
  * the underlying record reader is done with the underlying split, not with HCatSplit.
  */
-class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
+class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> 
+  implements org.apache.hadoop.mapred.RecordReader {
+  
+    Log LOG = LogFactory.getLog(HCatRecordReader.class);
+    int lineCount = 0;
 
     /** The underlying record reader to delegate to. */
     private final RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader;
@@ -74,15 +81,25 @@ class HCatRecordReader extends RecordRea
      */
     @Override
     public HCatRecord getCurrentValue() throws IOException, InterruptedException {
-        return storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
+        HCatRecord r = storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
+        return r; 
     }
 
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
      */
     @Override
-    public float getProgress() throws IOException, InterruptedException {
-        return baseRecordReader.getProgress();
+    public float getProgress()  {
+        try {
+          return baseRecordReader.getProgress();
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+          LOG.warn(e.getStackTrace());
+        } catch (InterruptedException e) {
+          LOG.warn(e.getMessage());
+          LOG.warn(e.getStackTrace());
+        }
+        return 0.0f; // errored
     }
 
     /* (non-Javadoc)
@@ -90,6 +107,7 @@ class HCatRecordReader extends RecordRea
      */
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
+        lineCount++;
         return baseRecordReader.nextKeyValue();
     }
 
@@ -100,4 +118,46 @@ class HCatRecordReader extends RecordRea
     public void close() throws IOException {
         baseRecordReader.close();
     }
+
+    @Override
+    public Object createKey() {
+      WritableComparable o = null;
+      try {
+        o = getCurrentKey();
+      } catch (IOException e) {
+        LOG.warn(e.getMessage());
+        LOG.warn(e.getStackTrace());
+      } catch (InterruptedException e) {
+        LOG.warn(e.getMessage());
+        LOG.warn(e.getStackTrace());
+      }
+      return o;
+    }
+
+    @Override
+    public Object createValue() {
+      return new DefaultHCatRecord();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return lineCount;
+    }
+
+    @Override
+    public boolean next(Object key, Object value) throws IOException {
+      try {
+        if (!nextKeyValue()){
+          return false;
+        }
+        
+        ((HCatRecord)value).copy(getCurrentValue());
+        
+        return true;
+      } catch (InterruptedException e) {
+        LOG.warn(e.getMessage());
+        LOG.warn(e.getStackTrace());
+      }
+      return false;
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
(original)
+++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
Thu Feb  2 20:29:37 2012
@@ -22,6 +22,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -29,8 +31,10 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.schema.HCatSchema;
 
 /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */
-class HCatSplit extends InputSplit implements Writable {
+public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit
{
 
+    Log LOG = LogFactory.getLog(HCatSplit.class);
+    
     /** The partition info for the split. */
     private PartInfo partitionInfo;
 
@@ -94,16 +98,34 @@ class HCatSplit extends InputSplit imple
      * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
      */
     @Override
-    public long getLength() throws IOException, InterruptedException {
-        return baseSplit.getLength();
+    public long getLength() {
+        try {
+          return baseSplit.getLength();
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+          LOG.warn(e.getStackTrace());
+        } catch (InterruptedException e) {
+          LOG.warn(e.getMessage());
+          LOG.warn(e.getStackTrace());
+        }
+        return 0; // we errored
     }
 
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
      */
     @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return baseSplit.getLocations();
+    public String[] getLocations() {
+        try {
+          return baseSplit.getLocations();
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+          LOG.warn(e.getStackTrace());
+        } catch (InterruptedException e) {
+          LOG.warn(e.getMessage());
+          LOG.warn(e.getStackTrace());
+        }
+        return new String[0]; // we errored
     }
 
     /* (non-Javadoc)

Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
(original)
+++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
Thu Feb  2 20:29:37 2012
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -46,6 +48,8 @@ import org.apache.hcatalog.data.schema.H
  * info required in the client process context.
  */
 public class InitializeInput {
+  
+  private static final Log LOG = LogFactory.getLog(InitializeInput.class);
 
   /** The prefix for keys used for storage driver arguments */
   static final String HCAT_KEY_PREFIX = "hcat.";
@@ -69,10 +73,23 @@ public class InitializeInput {
     //* Create and initialize an InputJobInfo object
     //* Serialize the InputJobInfo and save in the Job's Configuration object
 
+    job.getConfiguration().set(
+        HCatConstants.HCAT_KEY_JOB_INFO, 
+        getSerializedHcatKeyJobInfo(job, inputJobInfo,null));
+  }
+
+  public static String getSerializedHcatKeyJobInfo(Job job, InputJobInfo inputJobInfo, String
locationFilter) throws Exception {
+    //* Create and initialize an InputJobInfo object
+
     HiveMetaStoreClient client = null;
 
     try {
-      client = createHiveMetaClient(job.getConfiguration(),inputJobInfo);
+      if (job != null){
+        client = createHiveMetaClient(job.getConfiguration(),inputJobInfo);
+      } else {
+        hiveConf = new HiveConf(HCatInputFormat.class);
+        client = new HiveMetaStoreClient(hiveConf, null);
+      }
       Table table = client.getTable(inputJobInfo.getDatabaseName(),
                                                 inputJobInfo.getTableName());
 
@@ -107,17 +124,15 @@ public class InitializeInput {
       inputJobInfo.setPartitions(partInfoList);
       inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
 
-      job.getConfiguration().set(
-          HCatConstants.HCAT_KEY_JOB_INFO,
-          HCatUtil.serialize(inputJobInfo)
-      );
+      return HCatUtil.serialize(inputJobInfo);
     } finally {
       if (client != null ) {
         client.close();
       }
     }
-  }
 
+  }
+  
   private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn)
throws IOException{
     List<String> values = ptn.getValues();
     if( values.size() != table.getPartitionKeys().size() ) {

Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1239809&r1=1239808&r2=1239809&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
(original)
+++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
Thu Feb  2 20:29:37 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatArrayBag;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.Pair;
@@ -68,12 +69,9 @@ public class PigHCatUtil {
     // <database name>.<table name> - parse it and
     // communicate the information to HCatInputFormat
 
-    String[] dbTableNametokens = location.split("\\.");
-    if(dbTableNametokens.length == 1) {
-      return new Pair<String,String>(DEFAULT_DB,location);
-    }else if (dbTableNametokens.length == 2) {
-      return new Pair<String, String>(dbTableNametokens[0], dbTableNametokens[1]);
-    }else{
+    try {
+      return HCatUtil.getDbAndTableName(location);
+    } catch (IOException e) {
       String locationErrMsg = "The input location in load statement " +
       "should be of the form " +
       "<databasename>.<table name> or <table name>. Got " + location;



Mime
View raw message