incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1179746 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/cli/SemanticAnalysis/ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/storagehandler/
Date Thu, 06 Oct 2011 18:22:14 GMT
Author: gates
Date: Thu Oct  6 18:22:14 2011
New Revision: 1179746

URL: http://svn.apache.org/viewvc?rev=1179746&view=rev
Log:
HCATALOG-89 Support for creating non-native tables

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1179746&r1=1179745&r2=1179746&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Oct  6 18:22:14 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-89. Support for creating non-native tables (avandana via gates)
+
   HCAT-60. Refactor HCatalog to support non-filebased outputformats (toffer via gates)
 
   HCAT-63. RPM package integration with Hadoop (khorgath via hashutosh)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java?rev=1179746&r1=1179745&r2=1179746&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java Thu Oct  6 18:22:14 2011
@@ -22,11 +22,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
@@ -42,172 +42,258 @@ import org.apache.hadoop.hive.ql.parse.H
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hcatalog.common.AuthUtils;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+import org.apache.hcatalog.storagehandler.HCatStorageHandler;
 
-final class CreateTableHook  extends AbstractSemanticAnalyzerHook{
+final class CreateTableHook extends AbstractSemanticAnalyzerHook {
 
-  private String inStorageDriver, outStorageDriver, tableName;
+    private String inStorageDriver, outStorageDriver, tableName;
 
-  @Override
-  public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
-  throws SemanticException {
-
-    Hive db;
-    try {
-      db = context.getHive();
-    } catch (HiveException e) {
-      throw new SemanticException("Couldn't get Hive DB instance in semantic analysis phase.", e);
-    }
+    @Override
+    public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
+            ASTNode ast) throws SemanticException {
 
-    // Analyze and create tbl properties object
-    int numCh = ast.getChildCount();
+        Hive db;
+        try {
+            db = context.getHive();
+        } catch (HiveException e) {
+            throw new SemanticException(
+                    "Couldn't get Hive DB instance in semantic analysis phase.",
+                    e);
+        }
 
-    String inputFormat = null, outputFormat = null;
-    tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode)ast.getChild(0));
+        // Analyze and create tbl properties object
+        int numCh = ast.getChildCount();
 
-    for (int num = 1; num < numCh; num++) {
-      ASTNode child = (ASTNode) ast.getChild(num);
+        String inputFormat = null, outputFormat = null;
+        tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast
+                .getChild(0));
+
+        for (int num = 1; num < numCh; num++) {
+            ASTNode child = (ASTNode) ast.getChild(num);
+
+            switch (child.getToken().getType()) {
+
+                case HiveParser.TOK_QUERY: // CTAS
+                    throw new SemanticException(
+                            "Operation not supported. Create table as " +
+                            "Select is not a valid operation.");
+
+                case HiveParser.TOK_TABLEBUCKETS:
+                    throw new SemanticException(
+                            "Operation not supported. HCatalog doesn't " +
+                            "allow Clustered By in create table.");
+
+                case HiveParser.TOK_TBLSEQUENCEFILE:
+                    throw new SemanticException(
+                            "Operation not supported. HCatalog doesn't support " +
+                            "Sequence File by default yet. "
+                             + "You may specify it through INPUT/OUTPUT storage drivers.");
+
+                case HiveParser.TOK_TBLTEXTFILE:
+                    throw new SemanticException(
+                            "Operation not supported. HCatalog doesn't support " +
+                            "Text File by default yet. "
+                            + "You may specify it through INPUT/OUTPUT storage drivers.");
+
+                case HiveParser.TOK_LIKETABLE:
+
+                    String likeTableName;
+                    if (child.getChildCount() > 0
+                            && (likeTableName = BaseSemanticAnalyzer
+                                    .getUnescapedName((ASTNode) ast.getChild(0))) != null) {
+
+                        throw new SemanticException(
+                                "Operation not supported. CREATE TABLE LIKE is not supported.");
+                        // Map<String, String> tblProps;
+                        // try {
+                        // tblProps =
+                        // db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+                        // likeTableName).getParameters();
+                        // } catch (HiveException he) {
+                        // throw new SemanticException(he);
+                        // }
+                        // if(!(tblProps.containsKey(InitializeInput.HOWL_ISD_CLASS)
+                        // &&
+                        // tblProps.containsKey(InitializeInput.HOWL_OSD_CLASS))){
+                        // throw new
+                        // SemanticException("Operation not supported. Table "+likeTableName+" should have been created through HCat. Seems like its not.");
+                        // }
+                        // return ast;
+                    }
+                    break;
+
+                case HiveParser.TOK_IFNOTEXISTS:
+                    try {
+                        List<String> tables = db.getTablesByPattern(tableName);
+                        if (tables != null && tables.size() > 0) { // table
+                                                                   // exists
+                            return ast;
+                        }
+                    } catch (HiveException e) {
+                        throw new SemanticException(e);
+                    }
+                    break;
+
+                case HiveParser.TOK_TABLEPARTCOLS:
+                    List<FieldSchema> partCols = BaseSemanticAnalyzer
+                            .getColumns((ASTNode) child.getChild(0), false);
+                    for (FieldSchema fs : partCols) {
+                        if (!fs.getType().equalsIgnoreCase("string")) {
+                            throw new SemanticException(
+                                    "Operation not supported. HCatalog only " +
+                                    "supports partition columns of type string. "
+                                            + "For column: "
+                                            + fs.getName()
+                                            + " Found type: " + fs.getType());
+                        }
+                    }
+                    break;
+
+                case HiveParser.TOK_STORAGEHANDLER:
+                    String storageHandler = BaseSemanticAnalyzer
+                            .unescapeSQLString(child.getChild(0).getText());
+                    if (org.apache.commons.lang.StringUtils
+                            .isNotEmpty(storageHandler)) {
+                        return ast;
+                    }
+
+                    break;
+
+                case HiveParser.TOK_TABLEFILEFORMAT:
+                    if (child.getChildCount() < 4) {
+                        throw new SemanticException(
+                                "Incomplete specification of File Format. " +
+                                "You must provide InputFormat, OutputFormat, " +
+                                "InputDriver, OutputDriver.");
+                    }
+                    inputFormat = BaseSemanticAnalyzer.unescapeSQLString(child
+                            .getChild(0).getText());
+                    outputFormat = BaseSemanticAnalyzer.unescapeSQLString(child
+                            .getChild(1).getText());
+                    inStorageDriver = BaseSemanticAnalyzer
+                            .unescapeSQLString(child.getChild(2).getText());
+                    outStorageDriver = BaseSemanticAnalyzer
+                            .unescapeSQLString(child.getChild(3).getText());
+                    break;
+
+                case HiveParser.TOK_TBLRCFILE:
+                    inputFormat = RCFileInputFormat.class.getName();
+                    outputFormat = RCFileOutputFormat.class.getName();
+                    inStorageDriver = RCFileInputDriver.class.getName();
+                    outStorageDriver = RCFileOutputDriver.class.getName();
+                    break;
 
-      switch (child.getToken().getType()) {
+            }
+        }
 
-      case HiveParser.TOK_QUERY: // CTAS
-        throw new SemanticException("Operation not supported. Create table as Select is not a valid operation.");
+        if (inputFormat == null || outputFormat == null
+                || inStorageDriver == null || outStorageDriver == null) {
+            throw new SemanticException(
+                    "STORED AS specification is either incomplete or incorrect.");
+        }
 
-      case HiveParser.TOK_TABLEBUCKETS:
-        throw new SemanticException("Operation not supported. HCatalog doesn't allow Clustered By in create table.");
+        return ast;
+    }
 
-      case HiveParser.TOK_TBLSEQUENCEFILE:
-        throw new SemanticException("Operation not supported. HCatalog doesn't support Sequence File by default yet. " +
-        "You may specify it through INPUT/OUTPUT storage drivers.");
+    @Override
+    public void postAnalyze(HiveSemanticAnalyzerHookContext context,
+            List<Task<? extends Serializable>> rootTasks)
+            throws SemanticException {
+
+        if (rootTasks.size() == 0) {
+            // There will be no DDL task created in case if its CREATE TABLE IF
+            // NOT EXISTS
+            return;
+        }
+        CreateTableDesc desc = ((DDLTask) rootTasks.get(rootTasks.size() - 1))
+                .getWork().getCreateTblDesc();
 
-      case HiveParser.TOK_TBLTEXTFILE:
-        throw new SemanticException("Operation not supported. HCatalog doesn't support Text File by default yet. " +
-        "You may specify it through INPUT/OUTPUT storage drivers.");
+        Map<String, String> tblProps = desc.getTblProps();
+        if (tblProps == null) {
+            // tblProps will be null if user didnt use tblprops in his CREATE
+            // TABLE cmd.
+            tblProps = new HashMap<String, String>();
 
-      case HiveParser.TOK_LIKETABLE:
+        }
 
-        String likeTableName;
-        if (child.getChildCount() > 0 && (likeTableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode)ast.getChild(0))) != null) {
+        // first check if we will allow the user to create table.
+        String storageHandler = desc.getStorageHandler();
+        if (StringUtils.isEmpty(storageHandler)) {
+
+            authorize(context, desc.getLocation());
+            tblProps.put(HCatConstants.HCAT_ISD_CLASS, inStorageDriver);
+            tblProps.put(HCatConstants.HCAT_OSD_CLASS, outStorageDriver);
+
+        } else {
+            // Create instance of HCatStorageHandler and obtain the
+            // HiveAuthorizationprovider for the handler and use it
+            // to authorize.
+            try {
+                HCatStorageHandler storageHandlerInst = HCatUtil
+                        .getStorageHandler(context.getConf(), storageHandler);
+                HiveAuthorizationProvider auth = storageHandlerInst
+                        .getAuthorizationProvider();
+
+                // TBD: To pass in the exact read and write privileges.
+                auth.authorize(context.getHive().getTable(tableName), null,
+                        null);
+
+                tblProps.put(HCatConstants.HCAT_ISD_CLASS, storageHandlerInst
+                        .getInputStorageDriver().toString());
+                tblProps.put(HCatConstants.HCAT_OSD_CLASS, storageHandlerInst
+                        .getOutputStorageDriver().toString());
+
+            } catch (HiveException e) {
+                new SemanticException(e);
+            }
 
-          throw new SemanticException("Operation not supported. CREATE TABLE LIKE is not supported.");
-//          Map<String, String> tblProps;
-//          try {
-//            tblProps = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, likeTableName).getParameters();
-//          } catch (HiveException he) {
-//            throw new SemanticException(he);
-//          }
-//          if(!(tblProps.containsKey(InitializeInput.HOWL_ISD_CLASS) && tblProps.containsKey(InitializeInput.HOWL_OSD_CLASS))){
-//            throw new SemanticException("Operation not supported. Table "+likeTableName+" should have been created through HCat. Seems like its not.");
-//          }
-//          return ast;
         }
-        break;
 
-      case HiveParser.TOK_IFNOTEXISTS:
-        try {
-          List<String> tables = db.getTablesByPattern(tableName);
-          if (tables != null && tables.size() > 0) { // table exists
-            return ast;
-          }
-        } catch (HiveException e) {
-          throw new SemanticException(e);
+        if (desc == null) {
+            // Desc will be null if its CREATE TABLE LIKE. Desc will be
+            // contained
+            // in CreateTableLikeDesc. Currently, HCat disallows CTLT in
+            // pre-hook.
+            // So, desc can never be null.
+            return;
         }
-        break;
-
-      case HiveParser.TOK_TABLEPARTCOLS:
-        List<FieldSchema> partCols = BaseSemanticAnalyzer.getColumns((ASTNode) child.getChild(0), false);
-        for(FieldSchema fs : partCols){
-          if(!fs.getType().equalsIgnoreCase("string")){
-            throw new SemanticException("Operation not supported. HCatalog only supports partition columns of type string. " +
-                "For column: "+fs.getName()+" Found type: "+fs.getType());
-          }
-        }
-        break;
-
-      case HiveParser.TOK_TABLEFILEFORMAT:
-        if(child.getChildCount() < 4) {
-          throw new SemanticException("Incomplete specification of File Format. You must provide InputFormat, OutputFormat, InputDriver, OutputDriver.");
-        }
-        inputFormat      = BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText());
-        outputFormat     = BaseSemanticAnalyzer.unescapeSQLString(child.getChild(1).getText());
-        inStorageDriver  = BaseSemanticAnalyzer.unescapeSQLString(child.getChild(2).getText());
-        outStorageDriver = BaseSemanticAnalyzer.unescapeSQLString(child.getChild(3).getText());
-        break;
-
-      case HiveParser.TOK_TBLRCFILE:
-        inputFormat      = RCFileInputFormat.class.getName();
-        outputFormat     = RCFileOutputFormat.class.getName();
-        inStorageDriver  = RCFileInputDriver.class.getName();
-        outStorageDriver = RCFileOutputDriver.class.getName();
-        break;
 
-      }
+        desc.setTblProps(tblProps);
+        context.getConf().set(HCatConstants.HCAT_CREATE_TBL_NAME, tableName);
     }
 
-    if(inputFormat == null || outputFormat == null || inStorageDriver == null || outStorageDriver == null){
-      throw new SemanticException("STORED AS specification is either incomplete or incorrect.");
-    }
-
-    return ast;
-  }
-
-  @Override
-  public void postAnalyze(HiveSemanticAnalyzerHookContext context, List<Task<? extends Serializable>> rootTasks) throws SemanticException {
-
-    if(rootTasks.size() == 0){
-      // There will be no DDL task created in case if its CREATE TABLE IF NOT EXISTS
-      return;
-    }
-    CreateTableDesc desc = ((DDLTask)rootTasks.get(rootTasks.size()-1)).getWork().getCreateTblDesc();
-
-    // first check if we will allow the user to create table.
-    authorize(context, desc.getLocation());
+    private void authorize(HiveSemanticAnalyzerHookContext context, String loc)
+            throws SemanticException {
 
-    if(desc == null){
-      // Desc will be null if its CREATE TABLE LIKE. Desc will be contained
-      // in CreateTableLikeDesc. Currently, HCat disallows CTLT in pre-hook.
-      // So, desc can never be null.
-      return;
-    }
-    Map<String,String> tblProps = desc.getTblProps();
-    if(tblProps == null) {
-      // tblProps will be null if user didnt use tblprops in his CREATE TABLE cmd.
-      tblProps = new HashMap<String, String>();
-    }
-    tblProps.put(HCatConstants.HCAT_ISD_CLASS, inStorageDriver);
-    tblProps.put(HCatConstants.HCAT_OSD_CLASS, outStorageDriver);
-    desc.setTblProps(tblProps);
-    context.getConf().set(HCatConstants.HCAT_CREATE_TBL_NAME, tableName);
-  }
-
-  private void authorize(HiveSemanticAnalyzerHookContext context, String loc) throws SemanticException{
-
-    Path tblDir;
-    Configuration conf = context.getConf();
-    try {
-      Warehouse wh = new Warehouse(conf);
-      if (loc == null || loc.isEmpty()){
-    	Hive hive = context.getHive();
-        tblDir = wh.getTablePath(hive.getDatabase(hive.getCurrentDatabase()), tableName).getParent();
-      }
-      else{
-        tblDir = wh.getDnsPath(new Path(loc));
-      }
-
-      try {
-        AuthUtils.authorize(tblDir, FsAction.WRITE, conf);
-      } catch (HCatException e) {
-        throw new SemanticException(e);
-      }
-    }
-    catch (MetaException e) {
-      throw new SemanticException(e);
-    } catch (HiveException e) {
-      throw new SemanticException(e);
+        Path tblDir;
+        Configuration conf = context.getConf();
+        try {
+            Warehouse wh = new Warehouse(conf);
+            if (loc == null || loc.isEmpty()) {
+                Hive hive = context.getHive();
+                tblDir = wh.getTablePath(
+                        hive.getDatabase(hive.getCurrentDatabase()), tableName)
+                        .getParent();
+            } else {
+                tblDir = wh.getDnsPath(new Path(loc));
+            }
+
+            try {
+                AuthUtils.authorize(tblDir, FsAction.WRITE, conf);
+            } catch (HCatException e) {
+                throw new SemanticException(e);
+            }
+        } catch (MetaException e) {
+            throw new SemanticException(e);
+        } catch (HiveException e) {
+            throw new SemanticException(e);
+        }
     }
-  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1179746&r1=1179745&r2=1179746&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Thu Oct  6 18:22:14 2011
@@ -33,363 +33,430 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hcatalog.storagehandler.HCatStorageHandler;
 import org.apache.thrift.TException;
 
 public class HCatUtil {
 
-//  static final private Log LOG = LogFactory.getLog(HCatUtil.class);
+    // static final private Log LOG = LogFactory.getLog(HCatUtil.class);
 
-  public static boolean checkJobContextIfRunningFromBackend(JobContext j){
-    if (j.getConfiguration().get("mapred.task.id", "").equals("")){
-      return false;
+    public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
+        if (j.getConfiguration().get("mapred.task.id", "").equals("")) {
+            return false;
+        }
+        return true;
     }
-    return true;
-  }
 
-  public static String serialize(Serializable obj) throws IOException {
-    if (obj == null) {
-      return "";
+    public static String serialize(Serializable obj) throws IOException {
+        if (obj == null) {
+            return "";
+        }
+        try {
+            ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
+            ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
+            objStream.writeObject(obj);
+            objStream.close();
+            return encodeBytes(serialObj.toByteArray());
+        } catch (Exception e) {
+            throw new IOException("Serialization error: " + e.getMessage(), e);
+        }
     }
-    try {
-      ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
-      ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
-      objStream.writeObject(obj);
-      objStream.close();
-      return encodeBytes(serialObj.toByteArray());
-    } catch (Exception e) {
-      throw new IOException("Serialization error: " + e.getMessage(), e);
+
+    public static Object deserialize(String str) throws IOException {
+        if (str == null || str.length() == 0) {
+            return null;
+        }
+        try {
+            ByteArrayInputStream serialObj = new ByteArrayInputStream(
+                    decodeBytes(str));
+            ObjectInputStream objStream = new ObjectInputStream(serialObj);
+            return objStream.readObject();
+        } catch (Exception e) {
+            throw new IOException("Deserialization error: " + e.getMessage(), e);
+        }
     }
-  }
 
-  public static Object deserialize(String str) throws IOException {
-    if (str == null || str.length() == 0) {
-      return null;
+    public static String encodeBytes(byte[] bytes) {
+        StringBuffer strBuf = new StringBuffer();
+
+        for (int i = 0; i < bytes.length; i++) {
+            strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
+            strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
+        }
+
+        return strBuf.toString();
     }
-    try {
-      ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str));
-      ObjectInputStream objStream = new ObjectInputStream(serialObj);
-      return objStream.readObject();
-    } catch (Exception e) {
-      throw new IOException("Deserialization error: " + e.getMessage(), e);
+
+    public static byte[] decodeBytes(String str) {
+        byte[] bytes = new byte[str.length() / 2];
+        for (int i = 0; i < str.length(); i += 2) {
+            char c = str.charAt(i);
+            bytes[i / 2] = (byte) ((c - 'a') << 4);
+            c = str.charAt(i + 1);
+            bytes[i / 2] += (c - 'a');
+        }
+        return bytes;
     }
-  }
 
-  public static String encodeBytes(byte[] bytes) {
-    StringBuffer strBuf = new StringBuffer();
+    public static List<HCatFieldSchema> getHCatFieldSchemaList(
+            FieldSchema... fields) throws HCatException {
+        List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>(
+                fields.length);
+
+        for (FieldSchema f : fields) {
+            result.add(HCatSchemaUtils.getHCatFieldSchema(f));
+        }
+
+        return result;
+    }
 
-    for (int i = 0; i < bytes.length; i++) {
-      strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
-      strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
+    public static List<HCatFieldSchema> getHCatFieldSchemaList(
+            List<FieldSchema> fields) throws HCatException {
+        if (fields == null) {
+            return null;
+        } else {
+            List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>();
+            for (FieldSchema f : fields) {
+                result.add(HCatSchemaUtils.getHCatFieldSchema(f));
+            }
+            return result;
+        }
     }
 
-    return strBuf.toString();
-  }
+    public static HCatSchema extractSchemaFromStorageDescriptor(
+            StorageDescriptor sd) throws HCatException {
+        if (sd == null) {
+            throw new HCatException(
+                    "Cannot construct partition info from an empty storage descriptor.");
+        }
+        HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(sd
+                .getCols()));
+        return schema;
+    }
 
-  public static byte[] decodeBytes(String str) {
-    byte[] bytes = new byte[str.length() / 2];
-    for (int i = 0; i < str.length(); i+=2) {
-      char c = str.charAt(i);
-      bytes[i/2] = (byte) ((c - 'a') << 4);
-      c = str.charAt(i+1);
-      bytes[i/2] += (c - 'a');
+    public static List<FieldSchema> getFieldSchemaList(
+            List<HCatFieldSchema> hcatFields) {
+        if (hcatFields == null) {
+            return null;
+        } else {
+            List<FieldSchema> result = new ArrayList<FieldSchema>();
+            for (HCatFieldSchema f : hcatFields) {
+                result.add(HCatSchemaUtils.getFieldSchema(f));
+            }
+            return result;
+        }
     }
-    return bytes;
-  }
 
-  public static List<HCatFieldSchema> getHCatFieldSchemaList(FieldSchema ... fields) throws HCatException {
-    List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>(fields.length);
+    public static Table getTable(HiveMetaStoreClient client, String dbName,
+            String tableName) throws Exception {
+        return client.getTable(dbName, tableName);
+    }
+
+    public static HCatSchema getTableSchemaWithPtnCols(Table table)
+            throws IOException {
+        HCatSchema tableSchema = extractSchemaFromStorageDescriptor(table
+                .getSd());
+
+        if (table.getPartitionKeys().size() != 0) {
+
+            // add partition keys to table schema
+            // NOTE : this assumes that we do not ever have ptn keys as columns
+            // inside the table schema as well!
+            for (FieldSchema fs : table.getPartitionKeys()) {
+                tableSchema.append(HCatSchemaUtils.getHCatFieldSchema(fs));
+            }
+        }
+        return tableSchema;
+    }
 
-    for(FieldSchema f : fields) {
-      result.add(HCatSchemaUtils.getHCatFieldSchema(f));
+    /**
+     * return the partition columns from a table instance
+     *
+     * @param table the instance to extract partition columns from
+     * @return HCatSchema instance which contains the partition columns
+     * @throws IOException
+     */
+    public static HCatSchema getPartitionColumns(Table table)
+            throws IOException {
+        HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+        if (table.getPartitionKeys().size() != 0) {
+            for (FieldSchema fs : table.getPartitionKeys()) {
+                cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
+            }
+        }
+        return cols;
     }
 
-    return result;
-  }
+    /**
+     * Validate partition schema, checks if the column types match between the
+     * partition and the existing table schema. Returns the list of columns
+     * present in the partition but not in the table.
+     *
+     * @param table the table
+     * @param partitionSchema the partition schema
+     * @return the list of newly added fields
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static List<FieldSchema> validatePartitionSchema(Table table,
+            HCatSchema partitionSchema) throws IOException {
+        Map<String, FieldSchema> partitionKeyMap = new HashMap<String, FieldSchema>();
+
+        for (FieldSchema field : table.getPartitionKeys()) {
+            partitionKeyMap.put(field.getName().toLowerCase(), field);
+        }
+
+        List<FieldSchema> tableCols = table.getSd().getCols();
+        List<FieldSchema> newFields = new ArrayList<FieldSchema>();
 
-  public static List<HCatFieldSchema> getHCatFieldSchemaList(List<FieldSchema> fields) throws HCatException {
-      if(fields == null) {
-          return null;
-      } else {
-          List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>();
-          for(FieldSchema f: fields) {
-              result.add(HCatSchemaUtils.getHCatFieldSchema(f));
-          }
-          return result;
-      }
-  }
+        for (int i = 0; i < partitionSchema.getFields().size(); i++) {
 
+            FieldSchema field = HCatSchemaUtils.getFieldSchema(partitionSchema
+                    .getFields().get(i));
 
-  public static HCatSchema extractSchemaFromStorageDescriptor(StorageDescriptor sd) throws HCatException {
-      if (sd == null){
-          throw new HCatException("Cannot construct partition info from an empty storage descriptor.");
+            FieldSchema tableField;
+            if (i < tableCols.size()) {
+                tableField = tableCols.get(i);
+
+                if (!tableField.getName().equalsIgnoreCase(field.getName())) {
+                    throw new HCatException(
+                            ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH,
+                            "Expected column <" + tableField.getName()
+                                    + "> at position " + (i + 1)
+                                    + ", found column <" + field.getName()
+                                    + ">");
+                }
+            } else {
+                tableField = partitionKeyMap.get(field.getName().toLowerCase());
+
+                if (tableField != null) {
+                    throw new HCatException(
+                            ErrorType.ERROR_SCHEMA_PARTITION_KEY, "Key <"
+                                    + field.getName() + ">");
+                }
+            }
+
+            if (tableField == null) {
+                // field present in partition but not in table
+                newFields.add(field);
+            } else {
+                // field present in both. validate type has not changed
+                TypeInfo partitionType = TypeInfoUtils
+                        .getTypeInfoFromTypeString(field.getType());
+                TypeInfo tableType = TypeInfoUtils
+                        .getTypeInfoFromTypeString(tableField.getType());
+
+                if (!partitionType.equals(tableType)) {
+                    throw new HCatException(
+                            ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, "Column <"
+                                    + field.getName() + ">, expected <"
+                                    + tableType.getTypeName() + ">, got <"
+                                    + partitionType.getTypeName() + ">");
+                }
+            }
         }
-        HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(sd.getCols()));
-        return schema;
-  }
 
-  public static List<FieldSchema> getFieldSchemaList(List<HCatFieldSchema> hcatFields) {
-      if(hcatFields == null) {
-          return null;
-      } else {
-          List<FieldSchema> result = new ArrayList<FieldSchema>();
-          for(HCatFieldSchema f: hcatFields) {
-              result.add(HCatSchemaUtils.getFieldSchema(f));
-          }
-          return result;
-      }
-  }
-
-  public static Table getTable(HiveMetaStoreClient client, String dbName, String tableName) throws Exception{
-    return client.getTable(dbName,tableName);
-  }
-
-  public static HCatSchema getTableSchemaWithPtnCols(Table table) throws IOException{
-      HCatSchema tableSchema = extractSchemaFromStorageDescriptor(table.getSd());
-
-      if( table.getPartitionKeys().size() != 0 ) {
-
-        // add partition keys to table schema
-        // NOTE : this assumes that we do not ever have ptn keys as columns inside the table schema as well!
-        for (FieldSchema fs : table.getPartitionKeys()){
-            tableSchema.append(HCatSchemaUtils.getHCatFieldSchema(fs));
-        }
-      }
-      return tableSchema;
-    }
-
-  /**
-   * return the partition columns from a table instance
-   * @param table the instance to extract partition columns from
-   * @return HCatSchema instance which contains the partition columns
-   * @throws IOException
-   */
-  public static HCatSchema getPartitionColumns(Table table) throws IOException{
-    HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-    if( table.getPartitionKeys().size() != 0 ) {
-      for (FieldSchema fs : table.getPartitionKeys()){
-          cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
-      }
-    }
-    return cols;
-  }
-
-  /**
-   * Validate partition schema, checks if the column types match between the partition
-   * and the existing table schema. Returns the list of columns present in the partition
-   * but not in the table.
-   * @param table the table
-   * @param partitionSchema the partition schema
-   * @return the list of newly added fields
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  public static List<FieldSchema> validatePartitionSchema(Table table, HCatSchema partitionSchema) throws IOException {
-    Map<String, FieldSchema> partitionKeyMap = new HashMap<String, FieldSchema>();
-
-    for(FieldSchema field : table.getPartitionKeys()) {
-      partitionKeyMap.put(field.getName().toLowerCase(), field);
-    }
-
-    List<FieldSchema> tableCols = table.getSd().getCols();
-    List<FieldSchema> newFields = new ArrayList<FieldSchema>();
-
-    for(int i = 0;i <  partitionSchema.getFields().size();i++) {
-
-      FieldSchema field = HCatSchemaUtils.getFieldSchema(partitionSchema.getFields().get(i));
-
-      FieldSchema tableField;
-      if( i < tableCols.size() ) {
-        tableField = tableCols.get(i);
-
-        if( ! tableField.getName().equalsIgnoreCase(field.getName())) {
-          throw new HCatException(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, "Expected column <" + tableField.getName() +
-              "> at position " + (i + 1) + ", found column <" + field.getName() + ">");
-        }
-      } else {
-        tableField = partitionKeyMap.get(field.getName().toLowerCase());
-
-        if( tableField != null ) {
-          throw new HCatException(ErrorType.ERROR_SCHEMA_PARTITION_KEY, "Key <" +  field.getName() + ">");
-        }
-      }
-
-      if( tableField == null ) {
-        //field present in partition but not in table
-        newFields.add(field);
-      } else {
-        //field present in both. validate type has not changed
-        TypeInfo partitionType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
-        TypeInfo tableType = TypeInfoUtils.getTypeInfoFromTypeString(tableField.getType());
-
-        if( ! partitionType.equals(tableType) ) {
-          throw new HCatException(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, "Column <" + field.getName() + ">, expected <" +
-              tableType.getTypeName() + ">, got <" + partitionType.getTypeName() + ">");
-        }
-      }
-    }
-
-    return newFields;
-  }
-
-  /**
-   * Test if the first FsAction is more permissive than the second. This is useful in cases where
-   * we want to ensure that a file owner has more permissions than the group they belong to, for eg.
-   * More completely(but potentially more cryptically)
-   *  owner-r >= group-r >= world-r : bitwise and-masked with 0444 => 444 >= 440 >= 400 >= 000
-   *  owner-w >= group-w >= world-w : bitwise and-masked with &0222 => 222 >= 220 >= 200 >= 000
-   *  owner-x >= group-x >= world-x : bitwise and-masked with &0111 => 111 >= 110 >= 100 >= 000
-   * @return true if first FsAction is more permissive than the second, false if not.
-   */
-  public static boolean validateMorePermissive(FsAction first, FsAction second) {
-    if ((first == FsAction.ALL) ||
-        (second == FsAction.NONE) ||
-        (first == second)) {
-      return true;
-    }
-    switch (first){
-      case READ_EXECUTE : return ((second == FsAction.READ) || (second == FsAction.EXECUTE));
-      case READ_WRITE : return ((second == FsAction.READ) || (second == FsAction.WRITE));
-      case WRITE_EXECUTE : return ((second == FsAction.WRITE) || (second == FsAction.EXECUTE));
-    }
-    return false;
-  }
-
-  /**
-   * Ensure that read or write permissions are not granted without also granting execute permissions.
-   * Essentially, r-- , rw- and -w- are invalid,
-   * r-x, -wx, rwx, ---, --x are valid
-   *
-   * @param perms The FsAction to verify
-   * @return true if the presence of read or write permission is accompanied by execute permissions
-   */
-  public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms){
-    if ((perms == FsAction.READ) || (perms == FsAction.WRITE) || (perms == FsAction.READ_WRITE)){
-      return false;
-    }
-    return true;
-  }
-
-  public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(Configuration conf, String userName) throws Exception {
-//    LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
-    JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
-    Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl.getDelegationToken(new Text(userName));
-//    LOG.info("got "+t);
-    return t;
-    
-//    return null;
-  }
-
-  public static void cancelJobTrackerDelegationToken(String tokenStrForm, String tokenSignature) throws Exception {
-//    LOG.info("cancelJobTrackerDelegationToken("+tokenStrForm+","+tokenSignature+")");
-    JobClient jcl = new JobClient(new JobConf(new Configuration(), HCatOutputFormat.class));
-    Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = extractJobTrackerToken(tokenStrForm,tokenSignature);
-//    LOG.info("canceling "+t);
-    try {
-      jcl.cancelDelegationToken(t);
-    }catch(Exception e){
-//      HCatUtil.logToken(LOG, "jcl token to cancel", t);
-      // ignore if token has already been invalidated.
-    }
-  }
-  
-  
-  public static Token<? extends AbstractDelegationTokenIdentifier> 
-      extractThriftToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException {
-//    LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
-    Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
-    t.decodeFromUrlString(tokenStrForm);
-    t.setService(new Text(tokenSignature));
-//    LOG.info("returning "+t);
-    return t;
-  }
-
-  public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> 
-      extractJobTrackerToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException {
-//    LOG.info("extractJobTrackerToken("+tokenStrForm+","+tokenSignature+")");
-    Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = 
-        new Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier>();
-    t.decodeFromUrlString(tokenStrForm);
-    t.setService(new Text(tokenSignature));
-//    LOG.info("returning "+t);
-    return t;
-  }
-
-  /**
-   * Logging stack trace
-   * @param logger
-   */
-  public static void logStackTrace(Log logger) {
-    StackTraceElement[] stackTrace = new Exception().getStackTrace();
-    for (int i = 1 ; i < stackTrace.length ; i++){
-      logger.info("\t"+stackTrace[i].toString());
-    }
-  }
-
-  /**
-   * debug log the hive conf
-   * @param logger
-   * @param hc
-   */
-  public static void logHiveConf(Log logger, HiveConf hc){
-    logEntrySet(logger,"logging hiveconf:",hc.getAllProperties().entrySet());
-  }
-
-  
-  public static void logList(Log logger, String itemName, List<? extends Object> list){
-      logger.info(itemName+":");
-      for (Object item : list){
-          logger.info("\t["+item+"]");
-      }
-  }
-  
-  public static void logMap(Log logger, String itemName, Map<? extends Object,? extends Object> map){
-    logEntrySet(logger,itemName,map.entrySet());
-  }
-
-  public static void logEntrySet(Log logger, String itemName, Set<? extends Entry> entrySet) {
-    logger.info(itemName+":");
-    for (Entry e : entrySet){
-      logger.info("\t["+e.getKey()+"]=>["+e.getValue()+"]");
-    }
-  }
-
-  public static void logAllTokens(Log logger, JobContext context) throws IOException {
-    for (Token<? extends TokenIdentifier>t : context.getCredentials().getAllTokens()){
-      logToken(logger,"token",t);
-    }
-  }
-
-  public static void logToken(Log logger, String itemName, Token<? extends TokenIdentifier> t) throws IOException {
-    logger.info(itemName+":");
-    logger.info("\tencodeToUrlString : "+t.encodeToUrlString());
-    logger.info("\ttoString : "+t.toString());
-    logger.info("\tkind : "+t.getKind());
-    logger.info("\tservice : "+t.getService());
-  }
-  
+        return newFields;
+    }
+
+    /**
+     * Test if the first FsAction is more permissive than the second. This is
+     * useful in cases where we want to ensure that a file owner has more
+     * permissions than the group they belong to, for eg. More completely(but
+     * potentially more cryptically) owner-r >= group-r >= world-r : bitwise
+     * and-masked with 0444 => 444 >= 440 >= 400 >= 000 owner-w >= group-w >=
+     * world-w : bitwise and-masked with &0222 => 222 >= 220 >= 200 >= 000
+     * owner-x >= group-x >= world-x : bitwise and-masked with &0111 => 111 >=
+     * 110 >= 100 >= 000
+     *
+     * @return true if first FsAction is more permissive than the second, false
+     *         if not.
+     */
+    public static boolean validateMorePermissive(FsAction first, FsAction second) {
+        if ((first == FsAction.ALL) || (second == FsAction.NONE)
+                || (first == second)) {
+            return true;
+        }
+        switch (first) {
+            case READ_EXECUTE:
+                return ((second == FsAction.READ) || (second == FsAction.EXECUTE));
+            case READ_WRITE:
+                return ((second == FsAction.READ) || (second == FsAction.WRITE));
+            case WRITE_EXECUTE:
+                return ((second == FsAction.WRITE) || (second == FsAction.EXECUTE));
+        }
+        return false;
+    }
+
+    /**
+     * Ensure that read or write permissions are not granted without also
+     * granting execute permissions. Essentially, r-- , rw- and -w- are invalid,
+     * r-x, -wx, rwx, ---, --x are valid
+     *
+     * @param perms The FsAction to verify
+     * @return true if the presence of read or write permission is accompanied
+     *         by execute permissions
+     */
+    public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms) {
+        if ((perms == FsAction.READ) || (perms == FsAction.WRITE)
+                || (perms == FsAction.READ_WRITE)) {
+            return false;
+        }
+        return true;
+    }
+
+    public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(
+            Configuration conf, String userName) throws Exception {
+        // LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
+        JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
+        Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl
+                .getDelegationToken(new Text(userName));
+        // LOG.info("got "+t);
+        return t;
+
+        // return null;
+    }
+
+    public static void cancelJobTrackerDelegationToken(String tokenStrForm,
+            String tokenSignature) throws Exception {
+        // LOG.info("cancelJobTrackerDelegationToken("+tokenStrForm+","+tokenSignature+")");
+        JobClient jcl = new JobClient(new JobConf(new Configuration(),
+                HCatOutputFormat.class));
+        Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = extractJobTrackerToken(
+                tokenStrForm, tokenSignature);
+        // LOG.info("canceling "+t);
+        try {
+            jcl.cancelDelegationToken(t);
+        } catch (Exception e) {
+            // HCatUtil.logToken(LOG, "jcl token to cancel", t);
+            // ignore if token has already been invalidated.
+        }
+    }
+
+    public static Token<? extends AbstractDelegationTokenIdentifier> extractThriftToken(
+            String tokenStrForm, String tokenSignature) throws MetaException,
+            TException, IOException {
+        // LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
+        Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
+        t.decodeFromUrlString(tokenStrForm);
+        t.setService(new Text(tokenSignature));
+        // LOG.info("returning "+t);
+        return t;
+    }
+
+    public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> extractJobTrackerToken(
+            String tokenStrForm, String tokenSignature) throws MetaException,
+            TException, IOException {
+        // LOG.info("extractJobTrackerToken("+tokenStrForm+","+tokenSignature+")");
+        Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = new Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier>();
+        t.decodeFromUrlString(tokenStrForm);
+        t.setService(new Text(tokenSignature));
+        // LOG.info("returning "+t);
+        return t;
+    }
+
+    /**
+     * Logging stack trace
+     *
+     * @param logger
+     */
+    public static void logStackTrace(Log logger) {
+        StackTraceElement[] stackTrace = new Exception().getStackTrace();
+        for (int i = 1; i < stackTrace.length; i++) {
+            logger.info("\t" + stackTrace[i].toString());
+        }
+    }
+
+    /**
+     * debug log the hive conf
+     *
+     * @param logger
+     * @param hc
+     */
+    public static void logHiveConf(Log logger, HiveConf hc) {
+        logEntrySet(logger, "logging hiveconf:", hc.getAllProperties()
+                .entrySet());
+    }
+
+    public static void logList(Log logger, String itemName,
+            List<? extends Object> list) {
+        logger.info(itemName + ":");
+        for (Object item : list) {
+            logger.info("\t[" + item + "]");
+        }
+    }
+
+    public static void logMap(Log logger, String itemName,
+            Map<? extends Object, ? extends Object> map) {
+        logEntrySet(logger, itemName, map.entrySet());
+    }
+
+    public static void logEntrySet(Log logger, String itemName,
+            Set<? extends Entry> entrySet) {
+        logger.info(itemName + ":");
+        for (Entry e : entrySet) {
+            logger.info("\t[" + e.getKey() + "]=>[" + e.getValue() + "]");
+        }
+    }
+
+    public static void logAllTokens(Log logger, JobContext context)
+            throws IOException {
+        for (Token<? extends TokenIdentifier> t : context.getCredentials()
+                .getAllTokens()) {
+            logToken(logger, "token", t);
+        }
+    }
+
+    public static void logToken(Log logger, String itemName,
+            Token<? extends TokenIdentifier> t) throws IOException {
+        logger.info(itemName + ":");
+        logger.info("\tencodeToUrlString : " + t.encodeToUrlString());
+        logger.info("\ttoString : " + t.toString());
+        logger.info("\tkind : " + t.getKind());
+        logger.info("\tservice : " + t.getService());
+    }
+
+    public static HCatStorageHandler getStorageHandler(Configuration conf,
+            String className) throws HiveException {
+
+        if (className == null) {
+            return null;
+        }
+        try {
+            Class<? extends HCatStorageHandler> handlerClass = (Class<? extends HCatStorageHandler>) Class
+                    .forName(className, true, JavaUtils.getClassLoader());
+            HCatStorageHandler storageHandler = (HCatStorageHandler) ReflectionUtils
+                    .newInstance(handlerClass, conf);
+            return storageHandler;
+        } catch (ClassNotFoundException e) {
+            throw new HiveException("Error in loading storage handler."
+                    + e.getMessage(), e);
+        }
+    }
+
 }

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java?rev=1179746&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java Thu Oct  6 18:22:14 2011
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.storagehandler;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+
+/**
+ * The abstract Class HCatStorageHandler would server as the base class for all
+ * the storage handlers required for non-native tables in HCatalog.
+ */
+public abstract class HCatStorageHandler implements HiveMetaHook,
+        HiveStorageHandler {
+    
+    /**
+     * Gets the input storage driver.
+     * 
+     * @return the input storage driver
+     */
+    public abstract Class<? extends HCatInputStorageDriver> getInputStorageDriver();
+    
+    /**
+     * Gets the output storage driver.
+     * 
+     * @return the output storage driver
+     */
+    public abstract Class<? extends HCatOutputStorageDriver> getOutputStorageDriver();
+    
+    /**
+     * 
+     * 
+     * @return authorization provider
+     * @throws HiveException
+     */
+    public abstract HiveAuthorizationProvider getAuthorizationProvider()
+            throws HiveException;
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.metastore.HiveMetaHook#commitCreateTable(org.apache
+     * .hadoop.hive.metastore.api.Table)
+     */
+    @Override
+    public abstract void commitCreateTable(Table table) throws MetaException;
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.metastore.HiveMetaHook#commitDropTable(org.apache
+     * .hadoop.hive.metastore.api.Table, boolean)
+     */
+    @Override
+    public abstract void commitDropTable(Table table, boolean deleteData)
+            throws MetaException;
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.metastore.HiveMetaHook#preCreateTable(org.apache
+     * .hadoop.hive.metastore.api.Table)
+     */
+    @Override
+    public abstract void preCreateTable(Table table) throws MetaException;
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.metastore.HiveMetaHook#preDropTable(org.apache
+     * .hadoop.hive.metastore.api.Table)
+     */
+    @Override
+    public abstract void preDropTable(Table table) throws MetaException;
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.metastore.HiveMetaHook#rollbackCreateTable(org
+     * .apache.hadoop.hive.metastore.api.Table)
+     */
+    @Override
+    public abstract void rollbackCreateTable(Table table) throws MetaException;
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.hive.metastore.HiveMetaHook#rollbackDropTable
+     * (org.apache.hadoop.hive.metastore.api.Table)
+     */
+    @Override
+    public abstract void rollbackDropTable(Table table) throws MetaException;
+    
+    @Override
+    public abstract HiveMetaHook getMetaHook();
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.hive.ql.metadata.HiveStorageHandler#
+     * configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
+     * java.util.Map)
+     */
+    @Override
+    public abstract void configureTableJobProperties(TableDesc tableDesc,
+            Map<String, String> jobProperties);
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.conf.Configurable#getConf()
+     */
+    @Override
+    public abstract Configuration getConf();
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.
+     * Configuration)
+     */
+    @Override
+    public abstract void setConf(Configuration conf);
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.metadata.HiveStorageHandler#getSerDeClass()
+     */
+    @Override
+    public abstract Class<? extends SerDe> getSerDeClass();
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.metadata.HiveStorageHandler#getInputFormatClass
+     * ()
+     */
+    @Override
+    public final Class<? extends InputFormat> getInputFormatClass() {
+        return DummyInputFormat.class;
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.metadata.HiveStorageHandler#getOutputFormatClass
+     * ()
+     */
+    @Override
+    public final Class<? extends OutputFormat> getOutputFormatClass() {
+        return DummyOutputFormat.class;
+    }
+    
+    /**
+     * The Class DummyInputFormat is a dummy implementation of the old hadoop
+     * mapred.InputFormat required by HiveStorageHandler.
+     */
+    class DummyInputFormat implements
+            InputFormat<WritableComparable, HCatRecord> {
+        
+        /*
+         * @see
+         * org.apache.hadoop.mapred.InputFormat#getRecordReader(org.apache.hadoop
+         * .mapred.InputSplit, org.apache.hadoop.mapred.JobConf,
+         * org.apache.hadoop.mapred.Reporter)
+         */
+        @Override
+        public RecordReader<WritableComparable, HCatRecord> getRecordReader(
+                InputSplit split, JobConf jobconf, Reporter reporter)
+                throws IOException {
+            throw new IOException("This operation is not supported.");
+        }
+        
+        /*
+         * @see
+         * org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.
+         * mapred .JobConf, int)
+         */
+        @Override
+        public InputSplit[] getSplits(JobConf jobconf, int number)
+                throws IOException {
+            throw new IOException("This operation is not supported.");
+        }
+    }
+    
+    /**
+     * The Class DummyOutputFormat is a dummy implementation of the old hadoop
+     * mapred.OutputFormat and HiveOutputFormat required by HiveStorageHandler.
+     */
+    class DummyOutputFormat implements
+            OutputFormat<WritableComparable<?>, HCatRecord>,
+            HiveOutputFormat<WritableComparable<?>, HCatRecord> {
+        
+        /*
+         * @see
+         * org.apache.hadoop.mapred.OutputFormat#checkOutputSpecs(org.apache
+         * .hadoop .fs.FileSystem, org.apache.hadoop.mapred.JobConf)
+         */
+        @Override
+        public void checkOutputSpecs(FileSystem fs, JobConf jobconf)
+                throws IOException {
+            throw new IOException("This operation is not supported.");
+            
+        }
+        
+        /*
+         * @see
+         * org.apache.hadoop.mapred.OutputFormat#getRecordWriter(org.apache.
+         * hadoop .fs.FileSystem, org.apache.hadoop.mapred.JobConf,
+         * java.lang.String, org.apache.hadoop.util.Progressable)
+         */
+        @Override
+        public RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter(
+                FileSystem fs, JobConf jobconf, String str,
+                Progressable progress) throws IOException {
+            throw new IOException("This operation is not supported.");
+        }
+        
+        /*
+         * @see
+         * org.apache.hadoop.hive.ql.io.HiveOutputFormat#getHiveRecordWriter(org
+         * .apache.hadoop.mapred.JobConf, org.apache.hadoop.fs.Path,
+         * java.lang.Class, boolean, java.util.Properties,
+         * org.apache.hadoop.util.Progressable)
+         */
+        @Override
+        public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+                JobConf jc, Path finalOutPath,
+                Class<? extends Writable> valueClass, boolean isCompressed,
+                Properties tableProperties, Progressable progress)
+                throws IOException {
+            throw new IOException("This operation is not supported.");
+        }
+        
+    }
+    
+}



Mime
View raw message