hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject svn commit: r1520466 [2/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/ ...
Date Fri, 06 Sep 2013 00:49:17 GMT
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.common;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public final class HCatConstants {
+
+    public static final String HIVE_RCFILE_IF_CLASS = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+    public static final String HIVE_RCFILE_OF_CLASS = "org.apache.hadoop.hive.ql.io.RCFileOutputFormat";
+
+    public static final String SEQUENCEFILE_INPUT = SequenceFileInputFormat.class.getName();
+    public static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName();
+
+    public static final String HCAT_PIG_STORAGE_CLASS = "org.apache.pig.builtin.PigStorage";
+    public static final String HCAT_PIG_LOADER = "hcat.pig.loader";
+    public static final String HCAT_PIG_LOADER_LOCATION_SET = HCAT_PIG_LOADER + ".location.set";
+    public static final String HCAT_PIG_LOADER_ARGS = "hcat.pig.loader.args";
+    public static final String HCAT_PIG_STORER = "hcat.pig.storer";
+    public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args";
+    public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter";
+    public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ",";
+    public static final String HCAT_PIG_STORER_LOCATION_SET = HCAT_PIG_STORER + ".location.set";
+    public static final String HCAT_PIG_INNER_TUPLE_NAME = "hcat.pig.inner.tuple.name";
+    public static final String HCAT_PIG_INNER_TUPLE_NAME_DEFAULT = "innertuple";
+    public static final String HCAT_PIG_INNER_FIELD_NAME = "hcat.pig.inner.field.name";
+    public static final String HCAT_PIG_INNER_FIELD_NAME_DEFAULT = "innerfield";
+
+    /**
+     * {@value} (default: null)
+     * When the property is set in the UDFContext of the org.apache.hcatalog.pig.HCatStorer, HCatStorer writes
+     * to the location it specifies instead of the default HCatalog location format. An example can be found
+     * in org.apache.hcatalog.pig.HCatStorerWrapper.
+     */
+    public static final String HCAT_PIG_STORER_EXTERNAL_LOCATION = HCAT_PIG_STORER + ".external.location";
+
+    //The keys used to store info into the job Configuration
+    public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
+
+    public static final String HCAT_KEY_OUTPUT_SCHEMA = HCAT_KEY_BASE + ".output.schema";
+
+    public static final String HCAT_KEY_JOB_INFO = HCAT_KEY_BASE + ".job.info";
+
+    // hcatalog specific configurations, that can be put in hive-site.xml
+    public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = "hcatalog.hive.client.cache.expiry.time";
+
+    private HCatConstants() { // restrict instantiation
+    }
+
+    public static final String HCAT_TABLE_SCHEMA = "hcat.table.schema";
+
+    public static final String HCAT_METASTORE_URI = HiveConf.ConfVars.METASTOREURIS.varname;
+
+    public static final String HCAT_PERMS = "hcat.perms";
+
+    public static final String HCAT_GROUP = "hcat.group";
+
+    public static final String HCAT_CREATE_TBL_NAME = "hcat.create.tbl.name";
+
+    public static final String HCAT_CREATE_DB_NAME = "hcat.create.db.name";
+
+    public static final String HCAT_METASTORE_PRINCIPAL
+        = HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname;
+
+    /**
+     * The desired number of input splits produced for each partition. When the
+     * input files are large and few, we want to split them into many splits,
+     * so as to increase the parallelizm of loading the splits. Try also two
+     * other parameters, mapred.min.split.size and mapred.max.split.size, to
+     * control the number of input splits.
+     */
+    public static final String HCAT_DESIRED_PARTITION_NUM_SPLITS =
+        "hcat.desired.partition.num.splits";
+
+    // IMPORTANT IMPORTANT IMPORTANT!!!!!
+    //The keys used to store info into the job Configuration.
+    //If any new keys are added, the HCatStorer needs to be updated. The HCatStorer
+    //updates the job configuration in the backend to insert these keys to avoid
+    //having to call setOutput from the backend (which would cause a metastore call
+    //from the map jobs)
+    public static final String HCAT_KEY_OUTPUT_BASE = "mapreduce.lib.hcatoutput";
+    public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info";
+    public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf";
+    public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig";
+
+    public static final String[] OUTPUT_CONFS_TO_SAVE = {
+        HCAT_KEY_OUTPUT_INFO,
+        HCAT_KEY_HIVE_CONF,
+        HCAT_KEY_TOKEN_SIGNATURE
+    };
+
+
+    public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
+    public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration";
+
+    public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
+    public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy";
+    public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix";
+
+    public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid";
+    public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
+
+    // Message Bus related properties.
+    public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat";
+    public static final String HCAT_EVENT = "HCAT_EVENT";
+    public static final String HCAT_ADD_PARTITION_EVENT = "ADD_PARTITION";
+    public static final String HCAT_DROP_PARTITION_EVENT = "DROP_PARTITION";
+    public static final String HCAT_PARTITION_DONE_EVENT = "PARTITION_DONE";
+    public static final String HCAT_CREATE_TABLE_EVENT = "CREATE_TABLE";
+    public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE";
+    public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE";
+    public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE";
+    public static final String HCAT_MESSAGE_VERSION = "HCAT_MESSAGE_VERSION";
+    public static final String HCAT_MESSAGE_FORMAT = "HCAT_MESSAGE_FORMAT";
+    public static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl.";
+    public static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format";
+    public static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory";
+
+    // System environment variables
+    public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION";
+
+    // Hadoop Conf Var Names
+    public static final String CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
+
+    //***************************************************************************
+    // Data-related configuration properties.
+    //***************************************************************************
+
+    /**
+     * {@value} (default: {@value #HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT}).
+     * Pig < 0.10.0 does not have boolean support, and scripts written for pre-boolean Pig versions
+     * will not expect boolean values when upgrading Pig. For integration the option is offered to
+     * convert boolean fields to integers by setting this Hadoop configuration key.
+     */
+    public static final String HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER =
+        "hcat.data.convert.boolean.to.integer";
+    public static final boolean HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT = false;
+
+    /**
+     * {@value} (default: {@value #HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT}).
+     * Hive tables support tinyint and smallint columns, while not all processing frameworks support
+     * these types (Pig only has integer for example). Enable this property to promote tinyint and
+     * smallint columns to integer at runtime. Note that writes to tinyint and smallint columns
+     * enforce bounds checking and jobs will fail if attempting to write values outside the column
+     * bounds.
+     */
+    public static final String HCAT_DATA_TINY_SMALL_INT_PROMOTION =
+        "hcat.data.tiny.small.int.promotion";
+    public static final boolean HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT = false;
+
+    /**
+     * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT}).
+     * Threshold for the ratio of bad records that will be silently skipped without causing a task
+     * failure. This is useful when processing large data sets with corrupt records, when its
+     * acceptable to skip some bad records.
+     */
+    public static final String HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY = "hcat.input.bad.record.threshold";
+    public static final float HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT = 0.0001f;
+
+    /**
+     * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_MIN_DEFAULT}).
+     * Number of bad records that will be accepted before applying
+     * {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY}. This is necessary to prevent an initial bad
+     * record from causing a task failure.
+     */
+    public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+    public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatContext.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatContext.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatContext.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.common;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * HCatContext is a singleton that provides global access to configuration data.
+ *
+ * <p>HCatalog provides a variety of functionality that users can configure at runtime through
+ * configuration properties. Available configuration properties are defined in
+ * {@link HCatConstants}. HCatContext allows users to enable optional functionality by
+ * setting properties in a provided configuration.</p>
+ *
+ * <p>HCatalog <em>users</em> (MR apps, processing framework adapters) should set properties
+ * in a configuration that has been provided to
+ * {@link #setConf(org.apache.hadoop.conf.Configuration)} to enable optional functionality.
+ * The job configuration must be used to ensure properties are passed to the backend MR tasks.</p>
+ *
+ * <p>HCatalog <em>developers</em> should enable optional functionality by checking properties
+ * from {@link #getConf()}. Since users are not obligated to set a configuration, optional
+ * functionality must provide a sensible default.</p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum HCatContext {
+    INSTANCE;
+
+    private Configuration conf = null;
+
+    /**
+     * Use the given configuration for optional behavior. Keys exclusive to an existing config
+     * are set in the new conf. The job conf must be used to ensure properties are passed to
+     * backend MR tasks.
+     */
+    public synchronized HCatContext setConf(Configuration newConf) {
+        Preconditions.checkNotNull(newConf, "Required parameter 'newConf' must not be null.");
+
+        if (conf == null) {
+            conf = newConf;
+            return this;
+        }
+
+        if (conf != newConf) {
+            for (Map.Entry<String, String> entry : conf) {
+                if ((entry.getKey().matches("hcat.*")) && (newConf.get(entry.getKey()) == null)) {
+                    newConf.set(entry.getKey(), entry.getValue());
+                }
+            }
+            conf = newConf;
+        }
+        return this;
+    }
+
+    /**
+     * Get the configuration, if there is one. Users are not required to setup HCatContext
+     * unless they wish to override default behavior, so the configuration may not be present.
+     *
+     * @return an Optional that might contain a Configuration
+     */
+    public Optional<Configuration> getConf() {
+        return Optional.fromNullable(conf);
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatException.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatException.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatException.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatException.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.common;
+
+import java.io.IOException;
+
+/**
+ * Class representing exceptions thrown by HCat.
+ */
+public class HCatException extends IOException {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The error type enum for this exception. */
+    private final ErrorType errorType;
+
+    /**
+     * Instantiates a new hcat exception.
+     * @param errorType the error type
+     */
+    public HCatException(ErrorType errorType) {
+        this(errorType, null, null);
+    }
+
+
+    /**
+     * Instantiates a new hcat exception.
+     * @param errorType the error type
+     * @param cause the cause
+     */
+    public HCatException(ErrorType errorType, Throwable cause) {
+        this(errorType, null, cause);
+    }
+
+    /**
+     * Instantiates a new hcat exception.
+     * @param errorType the error type
+     * @param extraMessage extra messages to add to the message string
+     */
+    public HCatException(ErrorType errorType, String extraMessage) {
+        this(errorType, extraMessage, null);
+    }
+
+    /**
+     * Instantiates a new hcat exception.
+     * @param errorType the error type
+     * @param extraMessage extra messages to add to the message string
+     * @param cause the cause
+     */
+    public HCatException(ErrorType errorType, String extraMessage, Throwable cause) {
+        super(buildErrorMessage(
+            errorType,
+            extraMessage,
+            cause), cause);
+        this.errorType = errorType;
+    }
+
+
+    //TODO : remove default error type constructors after all exceptions
+    //are changed to use error types
+
+    /**
+     * Instantiates a new hcat exception.
+     * @param message the error message
+     */
+    public HCatException(String message) {
+        this(ErrorType.ERROR_INTERNAL_EXCEPTION, message, null);
+    }
+
+    /**
+     * Instantiates a new hcat exception.
+     * @param message the error message
+     * @param cause the cause
+     */
+    public HCatException(String message, Throwable cause) {
+        this(ErrorType.ERROR_INTERNAL_EXCEPTION, message, cause);
+    }
+
+
+    /**
+     * Builds the error message string. The error type message is appended with the extra message. If appendCause
+     * is true for the error type, then the message of the cause also is added to the message.
+     * @param type the error type
+     * @param extraMessage the extra message string
+     * @param cause the cause for the exception
+     * @return the exception message string
+     */
+    public static String buildErrorMessage(ErrorType type, String extraMessage, Throwable cause) {
+
+        //Initial message is just the error type message
+        StringBuffer message = new StringBuffer(HCatException.class.getName());
+        message.append(" : " + type.getErrorCode());
+        message.append(" : " + type.getErrorMessage());
+
+        if (extraMessage != null) {
+            //Add the extra message value to buffer
+            message.append(" : " + extraMessage);
+        }
+
+        if (type.appendCauseMessage()) {
+            if (cause != null) {
+                //Add the cause message to buffer
+                message.append(". Cause : " + cause.toString());
+            }
+        }
+
+        return message.toString();
+    }
+
+
+    /**
+     * Is this a retriable error.
+     * @return is it retriable
+     */
+    public boolean isRetriable() {
+        return errorType.isRetriable();
+    }
+
+    /**
+     * Gets the error type.
+     * @return the error type enum
+     */
+    public ErrorType getErrorType() {
+        return errorType;
+    }
+
+    /**
+     * Gets the error code.
+     * @return the error code
+     */
+    public int getErrorCode() {
+        return errorType.getErrorCode();
+    }
+
+    /* (non-Javadoc)
+    * @see java.lang.Throwable#toString()
+    */
+    @Override
+    public String toString() {
+        return getMessage();
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,627 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.FosterStorageHandler;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+
+public class HCatUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class);
+    private static volatile HiveClientCache hiveClientCache;
+    private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
+
+    public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
+        if (j.getConfiguration().get("mapred.task.id", "").equals("") &&
+                !("true".equals(j.getConfiguration().get("pig.illustrating")))) {
+            return false;
+        }
+        return true;
+    }
+
+    public static String serialize(Serializable obj) throws IOException {
+        if (obj == null) {
+            return "";
+        }
+        try {
+            ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
+            ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
+            objStream.writeObject(obj);
+            objStream.close();
+            return encodeBytes(serialObj.toByteArray());
+        } catch (Exception e) {
+            throw new IOException("Serialization error: " + e.getMessage(), e);
+        }
+    }
+
+    public static Object deserialize(String str) throws IOException {
+        if (str == null || str.length() == 0) {
+            return null;
+        }
+        try {
+            ByteArrayInputStream serialObj = new ByteArrayInputStream(
+                decodeBytes(str));
+            ObjectInputStream objStream = new ObjectInputStream(serialObj);
+            return objStream.readObject();
+        } catch (Exception e) {
+            throw new IOException("Deserialization error: " + e.getMessage(), e);
+        }
+    }
+
+    public static String encodeBytes(byte[] bytes) {
+        StringBuffer strBuf = new StringBuffer();
+
+        for (int i = 0; i < bytes.length; i++) {
+            strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
+            strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
+        }
+
+        return strBuf.toString();
+    }
+
+    public static byte[] decodeBytes(String str) {
+        byte[] bytes = new byte[str.length() / 2];
+        for (int i = 0; i < str.length(); i += 2) {
+            char c = str.charAt(i);
+            bytes[i / 2] = (byte) ((c - 'a') << 4);
+            c = str.charAt(i + 1);
+            bytes[i / 2] += (c - 'a');
+        }
+        return bytes;
+    }
+
+    public static List<HCatFieldSchema> getHCatFieldSchemaList(
+        FieldSchema... fields) throws HCatException {
+        List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>(
+            fields.length);
+
+        for (FieldSchema f : fields) {
+            result.add(HCatSchemaUtils.getHCatFieldSchema(f));
+        }
+
+        return result;
+    }
+
+    public static List<HCatFieldSchema> getHCatFieldSchemaList(
+        List<FieldSchema> fields) throws HCatException {
+        if (fields == null) {
+            return null;
+        } else {
+            List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>();
+            for (FieldSchema f : fields) {
+                result.add(HCatSchemaUtils.getHCatFieldSchema(f));
+            }
+            return result;
+        }
+    }
+
+    public static HCatSchema extractSchema(Table table) throws HCatException {
+        return new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols()));
+    }
+
+    public static HCatSchema extractSchema(Partition partition) throws HCatException {
+        return new HCatSchema(HCatUtil.getHCatFieldSchemaList(partition.getCols()));
+    }
+
+    public static List<FieldSchema> getFieldSchemaList(
+        List<HCatFieldSchema> hcatFields) {
+        if (hcatFields == null) {
+            return null;
+        } else {
+            List<FieldSchema> result = new ArrayList<FieldSchema>();
+            for (HCatFieldSchema f : hcatFields) {
+                result.add(HCatSchemaUtils.getFieldSchema(f));
+            }
+            return result;
+        }
+    }
+
+    public static Table getTable(HiveMetaStoreClient client, String dbName, String tableName)
+        throws NoSuchObjectException, TException, MetaException {
+        return new Table(client.getTable(dbName, tableName));
+    }
+
+    public static HCatSchema getTableSchemaWithPtnCols(Table table) throws IOException {
+        HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols()));
+
+        if (table.getPartitionKeys().size() != 0) {
+
+            // add partition keys to table schema
+            // NOTE : this assumes that we do not ever have ptn keys as columns
+            // inside the table schema as well!
+            for (FieldSchema fs : table.getPartitionKeys()) {
+                tableSchema.append(HCatSchemaUtils.getHCatFieldSchema(fs));
+            }
+        }
+        return tableSchema;
+    }
+
+    /**
+     * return the partition columns from a table instance
+     *
+     * @param table the instance to extract partition columns from
+     * @return HCatSchema instance which contains the partition columns
+     * @throws IOException
+     */
+    public static HCatSchema getPartitionColumns(Table table) throws IOException {
+        HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+        if (table.getPartitionKeys().size() != 0) {
+            for (FieldSchema fs : table.getPartitionKeys()) {
+                cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
+            }
+        }
+        return cols;
+    }
+
+    /**
+     * Validate partition schema, checks if the column types match between the
+     * partition and the existing table schema. Returns the list of columns
+     * present in the partition but not in the table.
+     *
+     * @param table the table
+     * @param partitionSchema the partition schema
+     * @return the list of newly added fields
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static List<FieldSchema> validatePartitionSchema(Table table,
+                                                            HCatSchema partitionSchema) throws IOException {
+        Map<String, FieldSchema> partitionKeyMap = new HashMap<String, FieldSchema>();
+
+        for (FieldSchema field : table.getPartitionKeys()) {
+            partitionKeyMap.put(field.getName().toLowerCase(), field);
+        }
+
+        List<FieldSchema> tableCols = table.getCols();
+        List<FieldSchema> newFields = new ArrayList<FieldSchema>();
+
+        for (int i = 0; i < partitionSchema.getFields().size(); i++) {
+
+            FieldSchema field = HCatSchemaUtils.getFieldSchema(partitionSchema
+                .getFields().get(i));
+
+            FieldSchema tableField;
+            if (i < tableCols.size()) {
+                tableField = tableCols.get(i);
+
+                if (!tableField.getName().equalsIgnoreCase(field.getName())) {
+                    throw new HCatException(
+                        ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH,
+                        "Expected column <" + tableField.getName()
+                            + "> at position " + (i + 1)
+                            + ", found column <" + field.getName()
+                            + ">");
+                }
+            } else {
+                tableField = partitionKeyMap.get(field.getName().toLowerCase());
+
+                if (tableField != null) {
+                    throw new HCatException(
+                        ErrorType.ERROR_SCHEMA_PARTITION_KEY, "Key <"
+                        + field.getName() + ">");
+                }
+            }
+
+            if (tableField == null) {
+                // field present in partition but not in table
+                newFields.add(field);
+            } else {
+                // field present in both. validate type has not changed
+                TypeInfo partitionType = TypeInfoUtils
+                    .getTypeInfoFromTypeString(field.getType());
+                TypeInfo tableType = TypeInfoUtils
+                    .getTypeInfoFromTypeString(tableField.getType());
+
+                if (!partitionType.equals(tableType)) {
+                    throw new HCatException(
+                        ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, "Column <"
+                        + field.getName() + ">, expected <"
+                        + tableType.getTypeName() + ">, got <"
+                        + partitionType.getTypeName() + ">");
+                }
+            }
+        }
+
+        return newFields;
+    }
+
+    /**
+     * Test if the first FsAction is more permissive than the second. This is
+     * useful in cases where we want to ensure that a file owner has more
+     * permissions than the group they belong to, for eg. More completely(but
+     * potentially more cryptically) owner-r >= group-r >= world-r : bitwise
+     * and-masked with 0444 => 444 >= 440 >= 400 >= 000 owner-w >= group-w >=
+     * world-w : bitwise and-masked with &0222 => 222 >= 220 >= 200 >= 000
+     * owner-x >= group-x >= world-x : bitwise and-masked with &0111 => 111 >=
+     * 110 >= 100 >= 000
+     *
+     * @return true if first FsAction is more permissive than the second, false
+     *         if not.
+     */
+    public static boolean validateMorePermissive(FsAction first, FsAction second) {
+        if ((first == FsAction.ALL) || (second == FsAction.NONE)
+            || (first == second)) {
+            return true;
+        }
+        switch (first) {
+        case READ_EXECUTE:
+            return ((second == FsAction.READ) || (second == FsAction.EXECUTE));
+        case READ_WRITE:
+            return ((second == FsAction.READ) || (second == FsAction.WRITE));
+        case WRITE_EXECUTE:
+            return ((second == FsAction.WRITE) || (second == FsAction.EXECUTE));
+        }
+        return false;
+    }
+
+    /**
+     * Ensure that read or write permissions are not granted without also
+     * granting execute permissions. Essentially, r-- , rw- and -w- are invalid,
+     * r-x, -wx, rwx, ---, --x are valid
+     *
+     * @param perms The FsAction to verify
+     * @return true if the presence of read or write permission is accompanied
+     *         by execute permissions
+     */
+    public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms) {
+        if ((perms == FsAction.READ) || (perms == FsAction.WRITE)
+            || (perms == FsAction.READ_WRITE)) {
+            return false;
+        }
+        return true;
+    }
+
+    public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(
+        Configuration conf, String userName) throws Exception {
+        // LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
+        JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
+        Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl
+            .getDelegationToken(new Text(userName));
+        // LOG.info("got "+t);
+        return t;
+
+        // return null;
+    }
+
+    public static Token<? extends AbstractDelegationTokenIdentifier> extractThriftToken(
+        String tokenStrForm, String tokenSignature) throws MetaException,
+        TException, IOException {
+        // LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
+        Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
+        t.decodeFromUrlString(tokenStrForm);
+        t.setService(new Text(tokenSignature));
+        // LOG.info("returning "+t);
+        return t;
+    }
+
+    /**
+     * Create an instance of a storage handler defined in storerInfo. If one cannot be found
+     * then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
+     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
+     * @param conf job's configuration will be used to configure the Configurable StorageHandler
+     * @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe
+     * @return storageHandler instance
+     * @throws IOException
+     */
+    public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException {
+        return getStorageHandler(conf,
+            storerInfo.getStorageHandlerClass(),
+            storerInfo.getSerdeClass(),
+            storerInfo.getIfClass(),
+            storerInfo.getOfClass());
+    }
+
+    public static HCatStorageHandler getStorageHandler(Configuration conf, PartInfo partitionInfo) throws IOException {
+        return HCatUtil.getStorageHandler(
+            conf,
+            partitionInfo.getStorageHandlerClassName(),
+            partitionInfo.getSerdeClassName(),
+            partitionInfo.getInputFormatClassName(),
+            partitionInfo.getOutputFormatClassName());
+    }
+
+    /**
+     * Create an instance of a storage handler. If storageHandler == null,
+     * then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
+     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
+     * @param conf job's configuration will be used to configure the Configurable StorageHandler
+     * @param storageHandler fully qualified class name of the desired StorageHandle instance
+     * @param serDe fully qualified class name of the desired SerDe instance
+     * @param inputFormat fully qualified class name of the desired InputFormat instance
+     * @param outputFormat fully qualified class name of the desired outputFormat instance
+     * @return storageHandler instance
+     * @throws IOException
+     */
+    public static HCatStorageHandler getStorageHandler(Configuration conf,
+                                                       String storageHandler,
+                                                       String serDe,
+                                                       String inputFormat,
+                                                       String outputFormat)
+        throws IOException {
+
+        if ((storageHandler == null) || (storageHandler.equals(FosterStorageHandler.class.getName()))) {
+            try {
+                FosterStorageHandler fosterStorageHandler =
+                    new FosterStorageHandler(inputFormat, outputFormat, serDe);
+                fosterStorageHandler.setConf(conf);
+                return fosterStorageHandler;
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to load "
+                    + "foster storage handler", e);
+            }
+        }
+
+        try {
+            Class<? extends HCatStorageHandler> handlerClass =
+                (Class<? extends HCatStorageHandler>) Class
+                    .forName(storageHandler, true, JavaUtils.getClassLoader());
+            return (HCatStorageHandler) ReflectionUtils.newInstance(
+                handlerClass, conf);
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Error in loading storage handler."
+                + e.getMessage(), e);
+        }
+    }
+
+    public static Pair<String, String> getDbAndTableName(String tableName) throws IOException {
+        String[] dbTableNametokens = tableName.split("\\.");
+        if (dbTableNametokens.length == 1) {
+            return new Pair<String, String>(MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
+        } else if (dbTableNametokens.length == 2) {
+            return new Pair<String, String>(dbTableNametokens[0], dbTableNametokens[1]);
+        } else {
+            throw new IOException("tableName expected in the form "
+                + "<databasename>.<table name> or <table name>. Got " + tableName);
+        }
+    }
+
+    public static Map<String, String>
+    getInputJobProperties(HCatStorageHandler storageHandler,
+                          InputJobInfo inputJobInfo) {
+        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
+            storageHandler.getInputFormatClass(),
+            storageHandler.getOutputFormatClass(),
+            inputJobInfo.getTableInfo().getStorerInfo().getProperties());
+        if (tableDesc.getJobProperties() == null) {
+            tableDesc.setJobProperties(new HashMap<String, String>());
+        }
+
+        Map<String, String> jobProperties = new HashMap<String, String>();
+        try {
+            tableDesc.getJobProperties().put(
+                HCatConstants.HCAT_KEY_JOB_INFO,
+                HCatUtil.serialize(inputJobInfo));
+
+            storageHandler.configureInputJobProperties(tableDesc,
+                jobProperties);
+
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "Failed to configure StorageHandler", e);
+        }
+
+        return jobProperties;
+    }
+
+    @InterfaceAudience.Private
+    @InterfaceStability.Evolving
+    public static void
+    configureOutputStorageHandler(HCatStorageHandler storageHandler,
+                                  Configuration conf,
+                                  OutputJobInfo outputJobInfo) {
+        //TODO replace IgnoreKeyTextOutputFormat with a
+        //HiveOutputFormatWrapper in StorageHandler
+        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
+            storageHandler.getInputFormatClass(),
+            IgnoreKeyTextOutputFormat.class,
+            outputJobInfo.getTableInfo().getStorerInfo().getProperties());
+        if (tableDesc.getJobProperties() == null)
+            tableDesc.setJobProperties(new HashMap<String, String>());
+        for (Map.Entry<String, String> el : conf) {
+            tableDesc.getJobProperties().put(el.getKey(), el.getValue());
+        }
+
+        Map<String, String> jobProperties = new HashMap<String, String>();
+        try {
+            tableDesc.getJobProperties().put(
+                HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                HCatUtil.serialize(outputJobInfo));
+
+            storageHandler.configureOutputJobProperties(tableDesc,
+                jobProperties);
+
+            for (Map.Entry<String, String> el : jobProperties.entrySet()) {
+                conf.set(el.getKey(), el.getValue());
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "Failed to configure StorageHandler", e);
+        }
+    }
+
+    /**
+     * Replace the contents of dest with the contents of src
+     * @param src
+     * @param dest
+     */
+    public static void copyConf(Configuration src, Configuration dest) {
+        dest.clear();
+        for (Map.Entry<String, String> el : src) {
+            dest.set(el.getKey(), el.getValue());
+        }
+    }
+
+    /**
+     * Get or create a hive client depending on whether it exits in cache or not
+     * @param hiveConf The hive configuration
+     * @return the client
+     * @throws MetaException When HiveMetaStoreClient couldn't be created
+     * @throws IOException
+     */
+    public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf)
+        throws MetaException, IOException {
+
+        // Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and
+        // using the expiry time available in hiveConf.
+
+        if (hiveClientCache == null) {
+            synchronized (HiveMetaStoreClient.class) {
+                if (hiveClientCache == null) {
+                    hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
+                        DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
+                }
+            }
+        }
+        try {
+            return hiveClientCache.get(hiveConf);
+        } catch (LoginException e) {
+            throw new IOException("Couldn't create hiveMetaStoreClient, Error getting UGI for user", e);
+        }
+    }
+
+    public static void closeHiveClientQuietly(HiveMetaStoreClient client) {
+        try {
+            if (client != null)
+                client.close();
+        } catch (Exception e) {
+            LOG.debug("Error closing metastore client. Ignored the error.", e);
+        }
+    }
+
+    public static HiveConf getHiveConf(Configuration conf)
+        throws IOException {
+
+        HiveConf hiveConf = new HiveConf(conf, HCatUtil.class);
+
+        //copy the hive conf into the job conf and restore it
+        //in the backend context
+        if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) {
+            conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(hiveConf.getAllProperties()));
+        } else {
+            //Copy configuration properties into the hive conf
+            Properties properties = (Properties) HCatUtil.deserialize(
+                conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+
+            for (Map.Entry<Object, Object> prop : properties.entrySet()) {
+                if (prop.getValue() instanceof String) {
+                    hiveConf.set((String) prop.getKey(), (String) prop.getValue());
+                } else if (prop.getValue() instanceof Integer) {
+                    hiveConf.setInt((String) prop.getKey(),
+                        (Integer) prop.getValue());
+                } else if (prop.getValue() instanceof Boolean) {
+                    hiveConf.setBoolean((String) prop.getKey(),
+                        (Boolean) prop.getValue());
+                } else if (prop.getValue() instanceof Long) {
+                    hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
+                } else if (prop.getValue() instanceof Float) {
+                    hiveConf.setFloat((String) prop.getKey(),
+                        (Float) prop.getValue());
+                }
+            }
+        }
+
+        if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+            hiveConf.set("hive.metastore.token.signature",
+                conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+        }
+
+        return hiveConf;
+    }
+
+
+    public static JobConf getJobConfFromContext(JobContext jobContext) {
+        JobConf jobConf;
+        // we need to convert the jobContext into a jobConf
+        // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat)
+        // begin conversion..
+        jobConf = new JobConf(jobContext.getConfiguration());
+        // ..end of conversion
+
+
+        return jobConf;
+    }
+
+    public static void copyJobPropertiesToJobConf(
+        Map<String, String> jobProperties, JobConf jobConf) {
+        for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
+            jobConf.set(entry.getKey(), entry.getValue());
+        }
+    }
+    
+
+    public static boolean isHadoop23() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b0\\.23\\..+\\b")||version.matches("\\b2\\..*"))
+            return true;
+        return false;
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HiveClientCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HiveClientCache.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HiveClientCache.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/common/HiveClientCache.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.common;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A thread safe time expired cache for HiveMetaStoreClient
+ */
+class HiveClientCache {
+    final private Cache<HiveClientCacheKey, CacheableHiveMetaStoreClient> hiveCache;
+    private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class);
+    private final int timeout;
+    // This lock is used to make sure removalListener won't close a client that is being contemplated for returning by get()
+    private final Object CACHE_TEARDOWN_LOCK = new Object();
+
+    private static final AtomicInteger nextId = new AtomicInteger(0);
+
+    // Since HiveMetaStoreClient is not threadsafe, hive clients are not  shared across threads.
+    // Thread local variable containing each thread's unique ID, is used as one of the keys for the cache
+    // causing each thread to get a different client even if the hiveConf is same.
+    private static final ThreadLocal<Integer> threadId =
+        new ThreadLocal<Integer>() {
+            @Override
+            protected Integer initialValue() {
+                return nextId.getAndIncrement();
+            }
+        };
+
+    private int getThreadId() {
+        return threadId.get();
+    }
+
+    /**
+     * @param timeout the length of time in seconds after a client is created that it should be automatically removed
+     */
+    public HiveClientCache(final int timeout) {
+        this.timeout = timeout;
+        RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient> removalListener =
+            new RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient>() {
+                public void onRemoval(RemovalNotification<HiveClientCacheKey, CacheableHiveMetaStoreClient> notification) {
+                    CacheableHiveMetaStoreClient hiveMetaStoreClient = notification.getValue();
+                    if (hiveMetaStoreClient != null) {
+                        synchronized (CACHE_TEARDOWN_LOCK) {
+                            hiveMetaStoreClient.setExpiredFromCache();
+                            hiveMetaStoreClient.tearDownIfUnused();
+                        }
+                    }
+                }
+            };
+        hiveCache = CacheBuilder.newBuilder()
+            .expireAfterWrite(timeout, TimeUnit.SECONDS)
+            .removalListener(removalListener)
+            .build();
+
+        // Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up.
+        // This is the best effort approach. Ignore any error while doing so. Notice that most of the clients
+        // would get cleaned up via either the removalListener or the close() call, only the active clients
+        // that are in the cache or expired but being used in other threads wont get cleaned. The following code will only
+        // clean the active cache ones. The ones expired from cache but being hold by other threads are in the mercy
+        // of finalize() being called.
+        Thread cleanupHiveClientShutdownThread = new Thread() {
+            @Override
+            public void run() {
+                LOG.debug("Cleaning up hive client cache in ShutDown hook");
+                closeAllClientsQuietly();
+            }
+        };
+        Runtime.getRuntime().addShutdownHook(cleanupHiveClientShutdownThread);
+    }
+
+    /**
+     * Note: This doesn't check if they are being used or not, meant only to be called during shutdown etc.
+     */
+    void closeAllClientsQuietly() {
+        try {
+            ConcurrentMap<HiveClientCacheKey, CacheableHiveMetaStoreClient> elements = hiveCache.asMap();
+            for (CacheableHiveMetaStoreClient cacheableHiveMetaStoreClient : elements.values()) {
+                cacheableHiveMetaStoreClient.tearDown();
+            }
+        } catch (Exception e) {
+            LOG.warn("Clean up of hive clients in the cache failed. Ignored", e);
+        }
+    }
+
+    public void cleanup() {
+        hiveCache.cleanUp();
+    }
+
+    /**
+     * Returns a cached client if exists or else creates one, caches and returns it. It also checks that the client is
+     * healthy and can be reused
+     * @param hiveConf
+     * @return the hive client
+     * @throws MetaException
+     * @throws IOException
+     * @throws LoginException
+     */
+    public HiveMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException {
+        final HiveClientCacheKey cacheKey = HiveClientCacheKey.fromHiveConf(hiveConf, getThreadId());
+        CacheableHiveMetaStoreClient hiveMetaStoreClient = null;
+        // the hmsc is not shared across threads. So the only way it could get closed while we are doing healthcheck
+        // is if removalListener closes it. The synchronization takes care that removalListener won't do it
+        synchronized (CACHE_TEARDOWN_LOCK) {
+            hiveMetaStoreClient = getOrCreate(cacheKey);
+            hiveMetaStoreClient.acquire();
+        }
+        if (!hiveMetaStoreClient.isOpen()) {
+            synchronized (CACHE_TEARDOWN_LOCK) {
+                hiveCache.invalidate(cacheKey);
+                hiveMetaStoreClient.close();
+                hiveMetaStoreClient = getOrCreate(cacheKey);
+                hiveMetaStoreClient.acquire();
+            }
+        }
+        return hiveMetaStoreClient;
+    }
+
+    /**
+     * Return from cache if exists else create/cache and return
+     * @param cacheKey
+     * @return
+     * @throws IOException
+     * @throws MetaException
+     * @throws LoginException
+     */
+    private CacheableHiveMetaStoreClient getOrCreate(final HiveClientCacheKey cacheKey) throws IOException, MetaException, LoginException {
+        try {
+            return hiveCache.get(cacheKey, new Callable<CacheableHiveMetaStoreClient>() {
+                @Override
+                public CacheableHiveMetaStoreClient call() throws MetaException {
+                    return new CacheableHiveMetaStoreClient(cacheKey.getHiveConf(), timeout);
+                }
+            });
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof IOException) {
+                throw (IOException) t;
+            } else if (t instanceof MetaException) {
+                throw (MetaException) t;
+            } else if (t instanceof LoginException) {
+                throw (LoginException) t;
+            } else {
+                throw new IOException("Error creating hiveMetaStoreClient", t);
+            }
+        }
+    }
+
+    /**
+     * A class to wrap HiveConf and expose equality based only on UserGroupInformation and the metaStoreURIs.
+     * This becomes the key for the cache and this way the same HiveMetaStoreClient would be returned if
+     * UserGroupInformation and metaStoreURIs are same. This function can evolve to express
+     * the cases when HiveConf is different but the same hiveMetaStoreClient can be used
+     */
+    public static class HiveClientCacheKey {
+        final private String metaStoreURIs;
+        final private UserGroupInformation ugi;
+        final private HiveConf hiveConf;
+        final private int threadId;
+
+        private HiveClientCacheKey(HiveConf hiveConf, final int threadId) throws IOException, LoginException {
+            this.metaStoreURIs = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
+            ugi = ShimLoader.getHadoopShims().getUGIForConf(hiveConf);
+            this.hiveConf = hiveConf;
+            this.threadId = threadId;
+        }
+
+        public static HiveClientCacheKey fromHiveConf(HiveConf hiveConf, final int threadId) throws IOException, LoginException {
+            return new HiveClientCacheKey(hiveConf, threadId);
+        }
+
+        public HiveConf getHiveConf() {
+            return hiveConf;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            HiveClientCacheKey that = (HiveClientCacheKey) o;
+            return new EqualsBuilder().
+                append(this.metaStoreURIs,
+                    that.metaStoreURIs).
+                append(this.ugi, that.ugi).
+                append(this.threadId, that.threadId).isEquals();
+        }
+
+        @Override
+        public int hashCode() {
+            return new HashCodeBuilder().
+                append(metaStoreURIs).
+                append(ugi).
+                append(threadId).toHashCode();
+        }
+    }
+
+    /**
+     * Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it.
+     */
+    public static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient {
+        private AtomicInteger users = new AtomicInteger(0);
+        private volatile boolean expiredFromCache = false;
+        private boolean isClosed = false;
+        private final long expiryTime;
+        private static final int EXPIRY_TIME_EXTENSION_IN_MILLIS = 60 * 1000;
+
+        public CacheableHiveMetaStoreClient(final HiveConf conf, final int timeout) throws MetaException {
+            super(conf);
+            // Extend the expiry time with some extra time on top of guava expiry time to make sure
+            // that items closed() are for sure expired and would never be returned by guava.
+            this.expiryTime = System.currentTimeMillis() + timeout * 1000 + EXPIRY_TIME_EXTENSION_IN_MILLIS;
+        }
+
+        private void acquire() {
+            users.incrementAndGet();
+        }
+
+        private void release() {
+            users.decrementAndGet();
+        }
+
+        public void setExpiredFromCache() {
+            expiredFromCache = true;
+        }
+
+        public boolean isClosed() {
+            return isClosed;
+        }
+
+        /**
+         * Make a call to hive meta store and see if the client is still usable. Some calls where the user provides
+         * invalid data renders the client unusable for future use (example: create a table with very long table name)
+         * @return
+         */
+        protected boolean isOpen() {
+            try {
+                // Look for an unlikely database name and see if either MetaException or TException is thrown
+                this.getDatabase("NonExistentDatabaseUsedForHealthCheck");
+            } catch (NoSuchObjectException e) {
+                return true; // It is okay if the database doesn't exist
+            } catch (MetaException e) {
+                return false;
+            } catch (TException e) {
+                return false;
+            }
+            return true;
+        }
+
+        /**
+         * Decrement the user count and piggyback this to set expiry flag as well, then  teardown(), if conditions are met.
+         * This *MUST* be called by anyone who uses this client.
+         */
+        @Override
+        public void close() {
+            release();
+            if (System.currentTimeMillis() >= expiryTime)
+                setExpiredFromCache();
+            tearDownIfUnused();
+        }
+
+        /**
+         * Tear down only if
+         *  1. There are no active user
+         *  2. It has expired from the cache
+         */
+        private void tearDownIfUnused() {
+            if (users.get() == 0 && expiredFromCache) {
+                this.tearDown();
+            }
+        }
+
+        /**
+         * Close if not closed already
+         */
+        protected synchronized void tearDown() {
+            try {
+                if (!isClosed) {
+                    super.close();
+                }
+                isClosed = true;
+            } catch (Exception e) {
+                LOG.warn("Error closing hive metastore client. Ignored.", e);
+            }
+        }
+
+        /**
+         * Last effort to clean up, may not even get called.
+         * @throws Throwable
+         */
+        @Override
+        protected void finalize() throws Throwable {
+            try {
+                this.tearDown();
+            } finally {
+                super.finalize();
+            }
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataType.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataType.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataType.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DataType.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+
+public abstract class DataType {
+
+    public static final byte NULL = 1;
+    public static final byte BOOLEAN = 5;
+    public static final byte BYTE = 6;
+    public static final byte INTEGER = 10;
+    public static final byte SHORT = 11;
+    public static final byte LONG = 15;
+    public static final byte FLOAT = 20;
+    public static final byte DOUBLE = 25;
+    public static final byte STRING = 55;
+    public static final byte BINARY = 60;
+
+    public static final byte MAP = 100;
+    public static final byte STRUCT = 110;
+    public static final byte LIST = 120;
+    public static final byte ERROR = -1;
+
+    /**
+     * Determine the datatype of an object.
+     * @param o Object to test.
+     * @return byte code of the type, or ERROR if we don't know.
+     */
+    public static byte findType(Object o) {
+        if (o == null) {
+            return NULL;
+        }
+
+        Class<?> clazz = o.getClass();
+
+        // Try to put the most common first
+        if (clazz == String.class) {
+            return STRING;
+        } else if (clazz == Integer.class) {
+            return INTEGER;
+        } else if (clazz == Long.class) {
+            return LONG;
+        } else if (clazz == Float.class) {
+            return FLOAT;
+        } else if (clazz == Double.class) {
+            return DOUBLE;
+        } else if (clazz == Boolean.class) {
+            return BOOLEAN;
+        } else if (clazz == Byte.class) {
+            return BYTE;
+        } else if (clazz == Short.class) {
+            return SHORT;
+        } else if (o instanceof List<?>) {
+            return LIST;
+        } else if (o instanceof Map<?, ?>) {
+            return MAP;
+        } else if (o instanceof byte[]) {
+            return BINARY;
+        } else {
+            return ERROR;
+        }
+    }
+
+    public static int compare(Object o1, Object o2) {
+
+        return compare(o1, o2, findType(o1), findType(o2));
+    }
+
+    public static int compare(Object o1, Object o2, byte dt1, byte dt2) {
+        if (dt1 == dt2) {
+            switch (dt1) {
+            case NULL:
+                return 0;
+
+            case BOOLEAN:
+                return ((Boolean) o1).compareTo((Boolean) o2);
+
+            case BYTE:
+                return ((Byte) o1).compareTo((Byte) o2);
+
+            case INTEGER:
+                return ((Integer) o1).compareTo((Integer) o2);
+
+            case LONG:
+                return ((Long) o1).compareTo((Long) o2);
+
+            case FLOAT:
+                return ((Float) o1).compareTo((Float) o2);
+
+            case DOUBLE:
+                return ((Double) o1).compareTo((Double) o2);
+
+            case STRING:
+                return ((String) o1).compareTo((String) o2);
+
+            case SHORT:
+                return ((Short) o1).compareTo((Short) o2);
+
+            case BINARY:
+                return compareByteArray((byte[]) o1, (byte[]) o2);
+
+            case LIST:
+                List<?> l1 = (List<?>) o1;
+                List<?> l2 = (List<?>) o2;
+                int len = l1.size();
+                if (len != l2.size()) {
+                    return len - l2.size();
+                } else {
+                    for (int i = 0; i < len; i++) {
+                        int cmpVal = compare(l1.get(i), l2.get(i));
+                        if (cmpVal != 0) {
+                            return cmpVal;
+                        }
+                    }
+                    return 0;
+                }
+
+            case MAP: {
+                Map<?, ?> m1 = (Map<?, ?>) o1;
+                Map<?, ?> m2 = (Map<?, ?>) o2;
+                int sz1 = m1.size();
+                int sz2 = m2.size();
+                if (sz1 < sz2) {
+                    return -1;
+                } else if (sz1 > sz2) {
+                    return 1;
+                } else {
+                    // This is bad, but we have to sort the keys of the maps in order
+                    // to be commutative.
+                    TreeMap<Object, Object> tm1 = new TreeMap<Object, Object>(m1);
+                    TreeMap<Object, Object> tm2 = new TreeMap<Object, Object>(m2);
+                    Iterator<Entry<Object, Object>> i1 = tm1.entrySet().iterator();
+                    Iterator<Entry<Object, Object>> i2 = tm2.entrySet().iterator();
+                    while (i1.hasNext()) {
+                        Map.Entry<Object, Object> entry1 = i1.next();
+                        Map.Entry<Object, Object> entry2 = i2.next();
+                        int c = compare(entry1.getValue(), entry2.getValue());
+                        if (c != 0) {
+                            return c;
+                        } else {
+                            c = compare(entry1.getValue(), entry2.getValue());
+                            if (c != 0) {
+                                return c;
+                            }
+                        }
+                    }
+                    return 0;
+                }
+            }
+
+            default:
+                throw new RuntimeException("Unkown type " + dt1 +
+                    " in compare");
+            }
+        } else {
+            return dt1 < dt2 ? -1 : 1;
+        }
+    }
+
+    private static int compareByteArray(byte[] o1, byte[] o2) {
+
+        for (int i = 0; i < o1.length; i++) {
+            if (i == o2.length) {
+                return 1;
+            }
+            if (o1[i] == o2[i]) {
+                continue;
+            }
+            if (o1[i] > o1[i]) {
+                return 1;
+            } else {
+                return -1;
+            }
+        }
+
+        //bytes in o1 are same as o2
+        //in case o2 was longer
+        if (o2.length > o1.length) {
+            return -1;
+        }
+        return 0; //equals
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DefaultHCatRecord.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DefaultHCatRecord.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DefaultHCatRecord.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/DefaultHCatRecord.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+public class DefaultHCatRecord extends HCatRecord {
+
+    private List<Object> contents;
+
+    public DefaultHCatRecord() {
+        contents = new ArrayList<Object>();
+    }
+
+    public DefaultHCatRecord(int size) {
+        contents = new ArrayList<Object>(size);
+        for (int i = 0; i < size; i++) {
+            contents.add(null);
+        }
+    }
+
+    @Override
+    public void remove(int idx) throws HCatException {
+        contents.remove(idx);
+    }
+
+    public DefaultHCatRecord(List<Object> list) {
+        contents = list;
+    }
+
+    @Override
+    public Object get(int fieldNum) {
+        return contents.get(fieldNum);
+    }
+
+    @Override
+    public List<Object> getAll() {
+        return contents;
+    }
+
+    @Override
+    public void set(int fieldNum, Object val) {
+        contents.set(fieldNum, val);
+    }
+
+    @Override
+    public int size() {
+        return contents.size();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+
+        contents.clear();
+        int len = in.readInt();
+        for (int i = 0; i < len; i++) {
+            contents.add(ReaderWriter.readDatum(in));
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        int sz = size();
+        out.writeInt(sz);
+        for (int i = 0; i < sz; i++) {
+            ReaderWriter.writeDatum(out, contents.get(i));
+        }
+
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        for (Object o : contents) {
+            if (o != null) {
+                hash = 31 * hash + o.hashCode();
+            }
+        }
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+
+        StringBuilder sb = new StringBuilder();
+        for (Object o : contents) {
+            sb.append(o + "\t");
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public Object get(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return get(recordSchema.getPosition(fieldName));
+    }
+
+    @Override
+    public void set(String fieldName, HCatSchema recordSchema, Object value) throws HCatException {
+        set(recordSchema.getPosition(fieldName), value);
+    }
+
+    @Override
+    public void copy(HCatRecord r) throws HCatException {
+        this.contents = r.getAll();
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecord.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecord.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecord.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecord.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/**
+ * Abstract class exposing get and set semantics for basic record usage.
+ * Note :
+ *   HCatRecord is designed only to be used as in-memory representation only.
+ *   Don't use it to store data on the physical device.
+ */
+public abstract class HCatRecord implements HCatRecordable {
+
+    public abstract Object get(String fieldName, HCatSchema recordSchema) throws HCatException;
+
+    public abstract void set(String fieldName, HCatSchema recordSchema, Object value) throws HCatException;
+
+    public abstract void remove(int idx) throws HCatException;
+
+    public abstract void copy(HCatRecord r) throws HCatException;
+
+    protected Object get(String fieldName, HCatSchema recordSchema, Class clazz) throws HCatException {
+        // TODO : if needed, verify that recordschema entry for fieldname matches appropriate type.
+        return get(fieldName, recordSchema);
+    }
+
+    public Boolean getBoolean(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (Boolean) get(fieldName, recordSchema, Boolean.class);
+    }
+
+    public void setBoolean(String fieldName, HCatSchema recordSchema, Boolean value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public byte[] getByteArray(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (byte[]) get(fieldName, recordSchema, byte[].class);
+    }
+
+    public void setByteArray(String fieldName, HCatSchema recordSchema, byte[] value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Byte getByte(String fieldName, HCatSchema recordSchema) throws HCatException {
+        //TINYINT
+        return (Byte) get(fieldName, recordSchema, Byte.class);
+    }
+
+    public void setByte(String fieldName, HCatSchema recordSchema, Byte value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Short getShort(String fieldName, HCatSchema recordSchema) throws HCatException {
+        // SMALLINT
+        return (Short) get(fieldName, recordSchema, Short.class);
+    }
+
+    public void setShort(String fieldName, HCatSchema recordSchema, Short value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Integer getInteger(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (Integer) get(fieldName, recordSchema, Integer.class);
+    }
+
+    public void setInteger(String fieldName, HCatSchema recordSchema, Integer value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Long getLong(String fieldName, HCatSchema recordSchema) throws HCatException {
+        // BIGINT
+        return (Long) get(fieldName, recordSchema, Long.class);
+    }
+
+    public void setLong(String fieldName, HCatSchema recordSchema, Long value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Float getFloat(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (Float) get(fieldName, recordSchema, Float.class);
+    }
+
+    public void setFloat(String fieldName, HCatSchema recordSchema, Float value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Double getDouble(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (Double) get(fieldName, recordSchema, Double.class);
+    }
+
+    public void setDouble(String fieldName, HCatSchema recordSchema, Double value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public String getString(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (String) get(fieldName, recordSchema, String.class);
+    }
+
+    public void setString(String fieldName, HCatSchema recordSchema, String value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<? extends Object> getStruct(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (List<? extends Object>) get(fieldName, recordSchema, List.class);
+    }
+
+    public void setStruct(String fieldName, HCatSchema recordSchema, List<? extends Object> value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public List<?> getList(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (List<?>) get(fieldName, recordSchema, List.class);
+    }
+
+    public void setList(String fieldName, HCatSchema recordSchema, List<?> value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+    public Map<?, ?> getMap(String fieldName, HCatSchema recordSchema) throws HCatException {
+        return (Map<?, ?>) get(fieldName, recordSchema, Map.class);
+    }
+
+    public void setMap(String fieldName, HCatSchema recordSchema, Map<?, ?> value) throws HCatException {
+        set(fieldName, recordSchema, value);
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspector.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.data;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+public class HCatRecordObjectInspector extends StandardStructObjectInspector {
+
+    protected HCatRecordObjectInspector(List<String> structFieldNames,
+                                        List<ObjectInspector> structFieldObjectInspectors) {
+        super(structFieldNames, structFieldObjectInspectors);
+    }
+
+    @Override
+    public Object getStructFieldData(Object data, StructField fieldRef) {
+        if (data == null) {
+            return new IllegalArgumentException("Data passed in to get field from was null!");
+        }
+
+        int fieldID = ((MyField) fieldRef).getFieldID();
+        if (!(fieldID >= 0 && fieldID < fields.size())) {
+            throw new IllegalArgumentException("Invalid field index [" + fieldID + "]");
+        }
+
+        return ((HCatRecord) data).get(fieldID);
+    }
+
+    @Override
+    public List<Object> getStructFieldsDataAsList(Object o) {
+        return ((HCatRecord) o).getAll();
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.data;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ObjectInspectorFactory for HCatRecordObjectInspectors (and associated helper inspectors)
+ */
+public class HCatRecordObjectInspectorFactory {
+
+    private final static Logger LOG = LoggerFactory.getLogger(HCatRecordObjectInspectorFactory.class);
+
+    static HashMap<TypeInfo, HCatRecordObjectInspector> cachedHCatRecordObjectInspectors =
+        new HashMap<TypeInfo, HCatRecordObjectInspector>();
+    static HashMap<TypeInfo, ObjectInspector> cachedObjectInspectors =
+        new HashMap<TypeInfo, ObjectInspector>();
+
+    /**
+     * Returns HCatRecordObjectInspector given a StructTypeInfo type definition for the record to look into
+     * @param typeInfo Type definition for the record to look into
+     * @return appropriate HCatRecordObjectInspector
+     * @throws SerDeException
+     */
+    public static HCatRecordObjectInspector getHCatRecordObjectInspector(
+        StructTypeInfo typeInfo) throws SerDeException {
+        HCatRecordObjectInspector oi = cachedHCatRecordObjectInspectors.get(typeInfo);
+        if (oi == null) {
+
+            LOG.debug("Got asked for OI for {} [{} ]", typeInfo.getCategory(), typeInfo.getTypeName());
+            switch (typeInfo.getCategory()) {
+            case STRUCT:
+                StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+                List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+                List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+                List<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>(fieldTypeInfos.size());
+                for (int i = 0; i < fieldTypeInfos.size(); i++) {
+                    fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i)));
+                }
+                oi = new HCatRecordObjectInspector(fieldNames, fieldObjectInspectors);
+
+                break;
+            default:
+                // Hmm.. not good,
+                // the only type expected here is STRUCT, which maps to HCatRecord
+                // - anything else is an error. Return null as the inspector.
+                throw new SerDeException("TypeInfo [" + typeInfo.getTypeName()
+                    + "] was not of struct type - HCatRecord expected struct type, got ["
+                    + typeInfo.getCategory().toString() + "]");
+            }
+            cachedHCatRecordObjectInspectors.put(typeInfo, oi);
+        }
+        return oi;
+    }
+
+    public static ObjectInspector getStandardObjectInspectorFromTypeInfo(TypeInfo typeInfo) {
+
+
+        ObjectInspector oi = cachedObjectInspectors.get(typeInfo);
+        if (oi == null) {
+
+            LOG.debug("Got asked for OI for {}, [{}]", typeInfo.getCategory(), typeInfo.getTypeName());
+            switch (typeInfo.getCategory()) {
+            case PRIMITIVE:
+                oi = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+                    ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory());
+                break;
+            case STRUCT:
+                StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+                List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+                List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+                List<ObjectInspector> fieldObjectInspectors =
+                    new ArrayList<ObjectInspector>(fieldTypeInfos.size());
+                for (int i = 0; i < fieldTypeInfos.size(); i++) {
+                    fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i)));
+                }
+                oi = ObjectInspectorFactory.getStandardStructObjectInspector(
+                    fieldNames, fieldObjectInspectors
+                );
+                break;
+            case LIST:
+                ObjectInspector elementObjectInspector = getStandardObjectInspectorFromTypeInfo(
+                    ((ListTypeInfo) typeInfo).getListElementTypeInfo());
+                oi = ObjectInspectorFactory.getStandardListObjectInspector(elementObjectInspector);
+                break;
+            case MAP:
+                ObjectInspector keyObjectInspector = getStandardObjectInspectorFromTypeInfo(
+                    ((MapTypeInfo) typeInfo).getMapKeyTypeInfo());
+                ObjectInspector valueObjectInspector = getStandardObjectInspectorFromTypeInfo(
+                    ((MapTypeInfo) typeInfo).getMapValueTypeInfo());
+                oi = ObjectInspectorFactory.getStandardMapObjectInspector(keyObjectInspector, valueObjectInspector);
+                break;
+            default:
+                oi = null;
+            }
+            cachedObjectInspectors.put(typeInfo, oi);
+        }
+        return oi;
+    }
+
+
+}



Mime
View raw message