phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [03/10] phoenix git commit: PHOENIX-2743 HivePhoenixHandler for big-big join with predicate push down
Date Tue, 19 Apr 2016 22:42:07 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordWriter.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordWriter.java
new file mode 100644
index 0000000..c6884df
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordWriter.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.hive.PhoenixSerializer;
+import org.apache.phoenix.hive.PhoenixSerializer.DmlType;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.hive.util.PhoenixUtil;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.ConcurrentTableMutationException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.util.QueryUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ *
+ * RecordWriter implementation. Writes records to the output
+ * WARNING : There is possibility that WAL disable setting not working properly due concurrent
+ * enabling/disabling WAL.
+ *
+ */
+public class PhoenixRecordWriter<T extends DBWritable> implements RecordWriter<NullWritable, T>,
+        org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter, RecordUpdater {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+
+    private Connection conn;
+    private PreparedStatement pstmt;
+    private long batchSize;
+    private long numRecords = 0;
+
+    private Configuration config;
+    private String tableName;
+    private MetaDataClient metaDataClient;
+    private boolean restoreWalMode;
+
+    // For RecordUpdater
+    private long rowCountDelta = 0;
+    private PhoenixSerializer phoenixSerializer;
+    private ObjectInspector objInspector;
+    private PreparedStatement pstmtForDelete;
+
+    // For RecordUpdater
+    public PhoenixRecordWriter(Path path, AcidOutputFormat.Options options) throws IOException {
+        Configuration config = options.getConfiguration();
+        Properties props = new Properties();
+
+        try {
+            initialize(config, props);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+
+        this.objInspector = options.getInspector();
+        try {
+            phoenixSerializer = new PhoenixSerializer(config, options.getTableProperties());
+        } catch (SerDeException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public PhoenixRecordWriter(final Configuration configuration, final Properties props) throws
+            SQLException {
+        initialize(configuration, props);
+    }
+
+    private void initialize(Configuration config, Properties properties) throws SQLException {
+        this.config = config;
+        tableName = config.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
+
+        // Disable WAL
+        String walConfigName = tableName.toLowerCase() + PhoenixStorageHandlerConstants.DISABLE_WAL;
+        boolean disableWal = config.getBoolean(walConfigName, false);
+        if (disableWal) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Property " + walConfigName + " is true. batch.mode will be set true. ");
+            }
+
+            properties.setProperty(PhoenixStorageHandlerConstants.BATCH_MODE, "true");
+        }
+
+        this.conn = PhoenixConnectionUtil.getInputConnection(config, properties);
+
+        if (disableWal) {
+            metaDataClient = new MetaDataClient((PhoenixConnection) conn);
+
+            if (!PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
+                // execute alter tablel statement if disable_wal is not true.
+                try {
+                    PhoenixUtil.alterTableForWalDisable(conn, tableName, true);
+                } catch (ConcurrentTableMutationException e) {
+                    if (LOG.isWarnEnabled()) {
+                        LOG.warn("Another mapper or task processing wal disable");
+                    }
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(tableName + "s wal disabled.");
+                }
+
+                // restore original value of disable_wal at the end.
+                restoreWalMode = true;
+            }
+        }
+
+        this.batchSize = PhoenixConfigurationUtil.getBatchSize(config);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Batch-size : " + batchSize);
+        }
+
+        String upsertQuery = QueryUtil.constructUpsertStatement(tableName, PhoenixUtil
+                .getColumnInfoList(conn, tableName));
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Upsert-query : " + upsertQuery);
+        }
+        this.pstmt = this.conn.prepareStatement(upsertQuery);
+    }
+
+    @Override
+    public void write(NullWritable key, T record) throws IOException {
+        try {
+            record.write(pstmt);
+            numRecords++;
+            pstmt.executeUpdate();
+
+            if (numRecords % batchSize == 0) {
+                LOG.debug("Commit called on a batch of size : " + batchSize);
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException("Exception while writing to table.", e);
+        }
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+        try {
+            conn.commit();
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Wrote row : " + numRecords);
+            }
+        } catch (SQLException e) {
+            LOG.error("SQLException while performing the commit for the task.");
+            throw new IOException(e);
+        } finally {
+            try {
+                if (restoreWalMode && PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
+                    try {
+                        PhoenixUtil.alterTableForWalDisable(conn, tableName, false);
+                    } catch (ConcurrentTableMutationException e) {
+                        if (LOG.isWarnEnabled()) {
+                            LOG.warn("Another mapper or task processing wal enable");
+                        }
+                    }
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(tableName + "s wal enabled.");
+                    }
+                }
+
+                // flush if [table-name].auto.flush is true.
+                String autoFlushConfigName = tableName.toLowerCase() +
+                        PhoenixStorageHandlerConstants.AUTO_FLUSH;
+                boolean autoFlush = config.getBoolean(autoFlushConfigName, false);
+                if (autoFlush) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("autoFlush is true.");
+                    }
+
+                    PhoenixUtil.flush(conn, tableName);
+                }
+
+                PhoenixUtil.closeResource(pstmt);
+                PhoenixUtil.closeResource(pstmtForDelete);
+                PhoenixUtil.closeResource(conn);
+            } catch (SQLException ex) {
+                LOG.error("SQLException while closing the connection for the task.");
+                throw new IOException(ex);
+            }
+        }
+    }
+
+    // For Testing
+    public boolean isRestoreWalMode() {
+        return restoreWalMode;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void write(Writable w) throws IOException {
+        PhoenixResultWritable row = (PhoenixResultWritable) w;
+
+        write(NullWritable.get(), (T) row);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+        close(Reporter.NULL);
+    }
+
+    @Override
+    public void insert(long currentTransaction, Object row) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("insert transaction : " + currentTransaction + ", row : " +
+                    PhoenixStorageHandlerUtil.toString(row));
+        }
+
+        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
+                .serialize(row, objInspector, DmlType.INSERT);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Data : " + pResultWritable.getValueList());
+        }
+
+        write(pResultWritable);
+        rowCountDelta++;
+    }
+
+    @Override
+    public void update(long currentTransaction, Object row) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("update transaction : " + currentTransaction + ", row : " +
+                    PhoenixStorageHandlerUtil
+                            .toString(row));
+        }
+
+        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
+                .serialize(row, objInspector, DmlType.UPDATE);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Data : " + pResultWritable.getValueList());
+        }
+
+        write(pResultWritable);
+    }
+
+    @Override
+    public void delete(long currentTransaction, Object row) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("delete transaction : " + currentTransaction + ", row : " +
+                    PhoenixStorageHandlerUtil.toString(row));
+        }
+
+        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer
+                .serialize(row, objInspector, DmlType.DELETE);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Data : " + pResultWritable.getValueList());
+        }
+
+        if (pstmtForDelete == null) {
+            try {
+                String deleteQuery = PhoenixUtil.constructDeleteStatement(conn, tableName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Delete query : " + deleteQuery);
+                }
+
+                pstmtForDelete = conn.prepareStatement(deleteQuery);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+
+        delete(pResultWritable);
+
+        rowCountDelta--;
+    }
+
+    private void delete(PhoenixResultWritable pResultWritable) throws IOException {
+        try {
+            pResultWritable.delete(pstmtForDelete);
+            numRecords++;
+            pstmtForDelete.executeUpdate();
+
+            if (numRecords % batchSize == 0) {
+                LOG.debug("Commit called on a batch of size : " + batchSize);
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException("Exception while deleting to table.", e);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Flush called");
+        }
+
+        try {
+            conn.commit();
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Written row : " + numRecords);
+            }
+        } catch (SQLException e) {
+            LOG.error("SQLException while performing the commit for the task.");
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public SerDeStats getStats() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getStats called");
+        }
+
+        SerDeStats stats = new SerDeStats();
+        stats.setRowCount(rowCountDelta);
+        // Don't worry about setting raw data size diff.  There is no reasonable way  to calculate
+        // that without finding the row we are updating or deleting, which would be a mess.
+        return stats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
new file mode 100644
index 0000000..18ded89
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.mapreduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.hive.PhoenixRowKey;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.hive.util.PhoenixUtil;
+import org.apache.phoenix.util.ColumnInfo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialized class for SerDe
+ *
+ */
+public class PhoenixResultWritable implements Writable, DBWritable, Configurable {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixResultWritable.class);
+
+    private List<ColumnInfo> columnMetadataList;
+    private List<Object> valueList;    // for output
+    private Map<String, Object> rowMap = Maps.newHashMap();  // for input
+
+    private int columnCount = -1;
+
+    private Configuration config;
+    private boolean isTransactional;
+    private Map<String, Object> rowKeyMap = Maps.newLinkedHashMap();
+    private List<String> primaryKeyColumnList;
+
+    public PhoenixResultWritable() {
+    }
+
+    public PhoenixResultWritable(Configuration config) throws IOException {
+        setConf(config);
+    }
+
+    public PhoenixResultWritable(Configuration config, List<ColumnInfo> columnMetadataList)
+            throws IOException {
+        this(config);
+        this.columnMetadataList = columnMetadataList;
+
+        valueList = Lists.newArrayListWithExpectedSize(columnMetadataList.size());
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    // for write
+    public void clear() {
+        valueList.clear();
+    }
+
+    // for write
+    public void add(Object value) {
+        valueList.add(value);
+    }
+
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+        ColumnInfo columnInfo = null;
+        Object value = null;
+
+        try {
+            for (int i = 0, limit = columnMetadataList.size(); i < limit; i++) {
+                columnInfo = columnMetadataList.get(i);
+
+                if (valueList.size() > i) {
+                    value = valueList.get(i);
+                } else {
+                    value = null;
+                }
+
+                if (value == null) {
+                    statement.setNull(i + 1, columnInfo.getSqlType());
+                } else {
+                    statement.setObject(i + 1, value, columnInfo.getSqlType());
+                }
+            }
+        } catch (SQLException | RuntimeException e) {
+            LOG.error("[column-info, value] : " + columnInfo + ", " + value);
+            throw e;
+        }
+    }
+
+    public void delete(PreparedStatement statement) throws SQLException {
+        ColumnInfo columnInfo = null;
+        Object value = null;
+
+        try {
+            for (int i = 0, limit = primaryKeyColumnList.size(); i < limit; i++) {
+                columnInfo = columnMetadataList.get(i);
+
+                if (valueList.size() > i) {
+                    value = valueList.get(i);
+                } else {
+                    value = null;
+                }
+
+                if (value == null) {
+                    statement.setNull(i + 1, columnInfo.getSqlType());
+                } else {
+                    statement.setObject(i + 1, value, columnInfo.getSqlType());
+                }
+            }
+        } catch (SQLException | RuntimeException e) {
+            LOG.error("[column-info, value] : " + columnInfo + ", " + value);
+            throw e;
+        }
+    }
+
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+        ResultSetMetaData rsmd = resultSet.getMetaData();
+        if (columnCount == -1) {
+            this.columnCount = rsmd.getColumnCount();
+        }
+        rowMap.clear();
+
+        for (int i = 0; i < columnCount; i++) {
+            Object value = resultSet.getObject(i + 1);
+
+            rowMap.put(rsmd.getColumnName(i + 1), value);
+        }
+
+        // Adding row__id column.
+        if (isTransactional) {
+            rowKeyMap.clear();
+
+            for (String pkColumn : primaryKeyColumnList) {
+                rowKeyMap.put(pkColumn, rowMap.get(pkColumn));
+            }
+        }
+    }
+
+    public void readPrimaryKey(PhoenixRowKey rowKey) {
+        rowKey.setRowKeyMap(rowKeyMap);
+    }
+
+    public List<ColumnInfo> getColumnMetadataList() {
+        return columnMetadataList;
+    }
+
+    public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
+        this.columnMetadataList = columnMetadataList;
+    }
+
+    public Map<String, Object> getResultMap() {
+        return rowMap;
+    }
+
+    public List<Object> getValueList() {
+        return valueList;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        config = conf;
+
+        isTransactional = PhoenixStorageHandlerUtil.isTransactionalTable(config);
+
+        if (isTransactional) {
+            primaryKeyColumnList = PhoenixUtil.getPrimaryKeyColumnList(config, config.get
+                    (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME));
+        }
+    }
+
+    @Override
+    public Configuration getConf() {
+        return config;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/AbstractPhoenixObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/AbstractPhoenixObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/AbstractPhoenixObjectInspector.java
new file mode 100644
index 0000000..1de1cc7
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/AbstractPhoenixObjectInspector.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive
+        .AbstractPrimitiveLazyObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * AbstractPhoenixObjectInspector for a LazyPrimitive object
+ */
+public abstract class AbstractPhoenixObjectInspector<T extends Writable>
+        extends AbstractPrimitiveLazyObjectInspector<T> {
+
+    private final Log log;
+
+    public AbstractPhoenixObjectInspector() {
+        super();
+
+        log = LogFactory.getLog(getClass());
+    }
+
+    protected AbstractPhoenixObjectInspector(PrimitiveTypeInfo typeInfo) {
+        super(typeInfo);
+
+        log = LogFactory.getLog(getClass());
+    }
+
+    @Override
+    public Object getPrimitiveJavaObject(Object o) {
+        return o == null ? null : o;
+    }
+
+    public void logExceptionMessage(Object value, String dataType) {
+        if (log.isDebugEnabled()) {
+            log.debug("Data not in the " + dataType + " data type range so converted to null. " +
+                    "Given data is :"
+                    + value.toString(), new Exception("For debugging purposes"));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBinaryObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBinaryObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBinaryObjectInspector.java
new file mode 100644
index 0000000..2c642d2
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBinaryObjectInspector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * ObjectInspector for Binary type
+ */
+
+public class PhoenixBinaryObjectInspector extends AbstractPhoenixObjectInspector<BytesWritable>
+        implements BinaryObjectInspector {
+
+    public PhoenixBinaryObjectInspector() {
+        super(TypeInfoFactory.binaryTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        byte[] clone = null;
+
+        if (o != null) {
+            byte[] source = (byte[]) o;
+            clone = new byte[source.length];
+            System.arraycopy(source, 0, clone, 0, source.length);
+        }
+
+        return clone;
+    }
+
+    @Override
+    public byte[] getPrimitiveJavaObject(Object o) {
+        return (byte[]) o;
+    }
+
+    @Override
+    public BytesWritable getPrimitiveWritableObject(Object o) {
+        return new BytesWritable((byte[]) o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBooleanObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBooleanObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBooleanObjectInspector.java
new file mode 100644
index 0000000..0795e14
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixBooleanObjectInspector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BooleanWritable;
+
+public class PhoenixBooleanObjectInspector extends AbstractPhoenixObjectInspector<BooleanWritable>
+        implements BooleanObjectInspector {
+
+    public PhoenixBooleanObjectInspector() {
+        super(TypeInfoFactory.booleanTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Boolean((Boolean) o);
+    }
+
+    @Override
+    public boolean get(Object o) {
+        Boolean value = null;
+
+        if (o != null) {
+            try {
+                value = (Boolean) o;
+            } catch (Exception e) {
+                logExceptionMessage(o, "BOOLEAN");
+            }
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixByteObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixByteObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixByteObjectInspector.java
new file mode 100644
index 0000000..c6c5e95
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixByteObjectInspector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.ByteWritable;
+
+/**
+ * ObjectInspector for byte type
+ */
+public class PhoenixByteObjectInspector extends AbstractPhoenixObjectInspector<ByteWritable>
+        implements ByteObjectInspector {
+
+    public PhoenixByteObjectInspector() {
+        super(TypeInfoFactory.byteTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Byte((Byte) o);
+    }
+
+    @Override
+    public byte get(Object o) {
+        Byte value = null;
+
+        if (o != null) {
+            try {
+                value = (Byte) o;
+            } catch (Exception e) {
+                logExceptionMessage(o, "BYTE");
+            }
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixCharObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixCharObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixCharObjectInspector.java
new file mode 100644
index 0000000..8d6aa8c
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixCharObjectInspector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * ObjectInspector for char type
+ */
+public class PhoenixCharObjectInspector extends AbstractPhoenixObjectInspector<HiveCharWritable>
+        implements HiveCharObjectInspector {
+
+    public PhoenixCharObjectInspector() {
+        super(TypeInfoFactory.charTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new String((String) o);
+    }
+
+    @Override
+    public HiveCharWritable getPrimitiveWritableObject(Object o) {
+        return new HiveCharWritable(getPrimitiveJavaObject(o));
+    }
+
+    @Override
+    public HiveChar getPrimitiveJavaObject(Object o) {
+        String value = (String) o;
+        return new HiveChar(value, value.length());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDateObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDateObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDateObjectInspector.java
new file mode 100644
index 0000000..d97993e
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDateObjectInspector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.sql.Date;
+
+/**
+ * ObjectInspector for date type
+ */
+
+public class PhoenixDateObjectInspector extends AbstractPhoenixObjectInspector<DateWritable>
+        implements DateObjectInspector {
+
+    public PhoenixDateObjectInspector() {
+        super(TypeInfoFactory.dateTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Date(((Date) o).getTime());
+    }
+
+    @Override
+    public DateWritable getPrimitiveWritableObject(Object o) {
+        DateWritable value = null;
+
+        if (o != null) {
+            try {
+                value = new DateWritable((Date) o);
+            } catch (Exception e) {
+                logExceptionMessage(o, "DATE");
+                value = new DateWritable();
+            }
+        }
+
+        return value;
+    }
+
+    @Override
+    public Date getPrimitiveJavaObject(Object o) {
+        return (Date) o;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDecimalObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDecimalObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDecimalObjectInspector.java
new file mode 100644
index 0000000..388863a
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDecimalObjectInspector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.math.BigDecimal;
+
+public class PhoenixDecimalObjectInspector extends
+        AbstractPhoenixObjectInspector<HiveDecimalWritable>
+        implements HiveDecimalObjectInspector {
+
+    public PhoenixDecimalObjectInspector() {
+        super(TypeInfoFactory.decimalTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Decimal((Decimal) o);
+    }
+
+    @Override
+    public HiveDecimal getPrimitiveJavaObject(Object o) {
+        return HiveDecimal.create((BigDecimal) o);
+    }
+
+    @Override
+    public HiveDecimalWritable getPrimitiveWritableObject(Object o) {
+        HiveDecimalWritable value = null;
+
+        if (o != null) {
+            try {
+                value = new HiveDecimalWritable((HiveDecimalWritable) o);
+            } catch (Exception e) {
+                logExceptionMessage(o, "DECIMAL");
+            }
+        }
+
+        return value;
+
+//		return super.getPrimitiveWritableObject(o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDoubleObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDoubleObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDoubleObjectInspector.java
new file mode 100644
index 0000000..25ae793
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixDoubleObjectInspector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * ObjectInspector for double type
+ */
+public class PhoenixDoubleObjectInspector extends AbstractPhoenixObjectInspector<DoubleWritable>
+        implements DoubleObjectInspector {
+
+    public PhoenixDoubleObjectInspector() {
+        super(TypeInfoFactory.doubleTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Double((Double) o);
+    }
+
+    @Override
+    public double get(Object o) {
+        Double value = null;
+
+        if (o != null) {
+            try {
+                value = ((Double) o).doubleValue();
+            } catch (Exception e) {
+                logExceptionMessage(o, "LONG");
+            }
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixFloatObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixFloatObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixFloatObjectInspector.java
new file mode 100644
index 0000000..83ad2b0
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixFloatObjectInspector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.FloatWritable;
+
+/**
+ * ObjectInspector for float type
+ */
+
+public class PhoenixFloatObjectInspector extends AbstractPhoenixObjectInspector<FloatWritable>
+        implements FloatObjectInspector {
+
+    public PhoenixFloatObjectInspector() {
+        super(TypeInfoFactory.floatTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Float((Float) o);
+    }
+
+    @Override
+    public float get(Object o) {
+        Float value = null;
+
+        if (o != null) {
+            try {
+                value = ((Float) o).floatValue();
+            } catch (Exception e) {
+                logExceptionMessage(o, "LONG");
+            }
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixIntObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixIntObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixIntObjectInspector.java
new file mode 100644
index 0000000..fc9e7d0
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixIntObjectInspector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.IntWritable;
+
+public class PhoenixIntObjectInspector extends AbstractPhoenixObjectInspector<IntWritable>
+        implements IntObjectInspector {
+
+    public PhoenixIntObjectInspector() {
+        super(TypeInfoFactory.intTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Integer((Integer) o);
+    }
+
+    @Override
+    public int get(Object o) {
+        Integer value = null;
+
+        if (o != null) {
+            try {
+                value = ((Integer) o).intValue();
+            } catch (Exception e) {
+                logExceptionMessage(o, "INT");
+            }
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixListObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixListObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixListObjectInspector.java
new file mode 100644
index 0000000..c4f2d51
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixListObjectInspector.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyObjectInspectorParameters;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.phoenix.schema.types.PhoenixArray;
+
+import java.util.List;
+
+/**
+ * ObjectInspector for list objects.
+ */
+public class PhoenixListObjectInspector implements ListObjectInspector {
+
+    private ObjectInspector listElementObjectInspector;
+    private byte separator;
+    private LazyObjectInspectorParameters lazyParams;
+
+    public PhoenixListObjectInspector(ObjectInspector listElementObjectInspector,
+                                      byte separator, LazyObjectInspectorParameters lazyParams) {
+        this.listElementObjectInspector = listElementObjectInspector;
+        this.separator = separator;
+        this.lazyParams = lazyParams;
+    }
+
+    @Override
+    public String getTypeName() {
+        return org.apache.hadoop.hive.serde.serdeConstants.LIST_TYPE_NAME + "<" +
+                listElementObjectInspector.getTypeName() + ">";
+    }
+
+    @Override
+    public Category getCategory() {
+        return Category.LIST;
+    }
+
+    @Override
+    public ObjectInspector getListElementObjectInspector() {
+        return listElementObjectInspector;
+    }
+
+    @Override
+    public Object getListElement(Object data, int index) {
+        if (data == null) {
+            return null;
+        }
+
+        PhoenixArray array = (PhoenixArray) data;
+
+        return array.getElement(index);
+    }
+
+    @Override
+    public int getListLength(Object data) {
+        if (data == null) {
+            return -1;
+        }
+
+        PhoenixArray array = (PhoenixArray) data;
+        return array.getDimensions();
+    }
+
+    @Override
+    public List<?> getList(Object data) {
+        if (data == null) {
+            return null;
+        }
+
+        PhoenixArray array = (PhoenixArray) data;
+        int valueLength = array.getDimensions();
+        List<Object> valueList = Lists.newArrayListWithExpectedSize(valueLength);
+
+        for (int i = 0; i < valueLength; i++) {
+            valueList.add(array.getElement(i));
+        }
+
+        return valueList;
+    }
+
+    public byte getSeparator() {
+        return separator;
+    }
+
+    public LazyObjectInspectorParameters getLazyParams() {
+        return lazyParams;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixLongObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixLongObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixLongObjectInspector.java
new file mode 100644
index 0000000..ad5cd05
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixLongObjectInspector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.LongWritable;
+
+public class PhoenixLongObjectInspector extends AbstractPhoenixObjectInspector<LongWritable>
+        implements LongObjectInspector {
+
+    public PhoenixLongObjectInspector() {
+        super(TypeInfoFactory.longTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Long((Long) o);
+    }
+
+    @Override
+    public long get(Object o) {
+        Long value = null;
+
+        if (o != null) {
+            try {
+                value = ((Long) o).longValue();
+            } catch (Exception e) {
+                logExceptionMessage(o, "LONG");
+            }
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixObjectInspectorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixObjectInspectorFactory.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixObjectInspectorFactory.java
new file mode 100644
index 0000000..928dede
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixObjectInspectorFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Factory for object inspectors. Matches hive type to the corresponding Phoenix object inspector.
+ */
+
+public class PhoenixObjectInspectorFactory {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixObjectInspectorFactory.class);
+
+    private PhoenixObjectInspectorFactory() {
+
+    }
+
+    public static LazySimpleStructObjectInspector createStructObjectInspector(TypeInfo type,
+                                                                              LazySerDeParameters
+                                                                                      serdeParams) {
+        StructTypeInfo structTypeInfo = (StructTypeInfo) type;
+        List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+        List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+        List<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>
+                (fieldTypeInfos.size());
+
+        for (int i = 0; i < fieldTypeInfos.size(); i++) {
+            fieldObjectInspectors.add(createObjectInspector(fieldTypeInfos.get(i), serdeParams));
+        }
+
+        return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(
+                fieldNames, fieldObjectInspectors, null,
+                serdeParams.getSeparators()[1],
+                serdeParams, ObjectInspectorOptions.JAVA);
+    }
+
+    public static ObjectInspector createObjectInspector(TypeInfo type, LazySerDeParameters
+            serdeParams) {
+        ObjectInspector oi = null;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Type : " + type);
+        }
+
+        switch (type.getCategory()) {
+            case PRIMITIVE:
+                switch (((PrimitiveTypeInfo) type).getPrimitiveCategory()) {
+                    case BOOLEAN:
+                        oi = new PhoenixBooleanObjectInspector();
+                        break;
+                    case BYTE:
+                        oi = new PhoenixByteObjectInspector();
+                        break;
+                    case SHORT:
+                        oi = new PhoenixShortObjectInspector();
+                        break;
+                    case INT:
+                        oi = new PhoenixIntObjectInspector();
+                        break;
+                    case LONG:
+                        oi = new PhoenixLongObjectInspector();
+                        break;
+                    case FLOAT:
+                        oi = new PhoenixFloatObjectInspector();
+                        break;
+                    case DOUBLE:
+                        oi = new PhoenixDoubleObjectInspector();
+                        break;
+                    case VARCHAR:
+                        // same string
+                    case STRING:
+                        oi = new PhoenixStringObjectInspector(serdeParams.isEscaped(),
+                                serdeParams.getEscapeChar());
+                        break;
+                    case CHAR:
+                        oi = new PhoenixCharObjectInspector();
+                        break;
+                    case DATE:
+                        oi = new PhoenixDateObjectInspector();
+                        break;
+                    case TIMESTAMP:
+                        oi = new PhoenixTimestampObjectInspector();
+                        break;
+                    case DECIMAL:
+                        oi = new PhoenixDecimalObjectInspector();
+                        break;
+                    case BINARY:
+                        oi = new PhoenixBinaryObjectInspector();
+                        break;
+                    default:
+                        throw new RuntimeException("Hive internal error. not supported data type " +
+                                ": " + type);
+                }
+
+                break;
+            case LIST:
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("List type started");
+                }
+
+                ObjectInspector listElementObjectInspector = createObjectInspector((
+                        (ListTypeInfo) type).getListElementTypeInfo(), serdeParams);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("List type ended");
+                }
+
+                oi = new PhoenixListObjectInspector(listElementObjectInspector, serdeParams
+                        .getSeparators()[0], serdeParams);
+
+                break;
+            default:
+                throw new RuntimeException("Hive internal error. not supported data type : " +
+                        type);
+        }
+
+        return oi;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixShortObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixShortObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixShortObjectInspector.java
new file mode 100644
index 0000000..1b7ec13
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixShortObjectInspector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.ShortWritable;
+
+public class PhoenixShortObjectInspector extends AbstractPhoenixObjectInspector<ShortWritable>
+        implements ShortObjectInspector {
+
+    public PhoenixShortObjectInspector() {
+        super(TypeInfoFactory.shortTypeInfo);
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Short((Short) o);
+    }
+
+    @Override
+    public short get(Object o) {
+        Short value = null;
+
+        if (o != null) {
+            try {
+                value = ((Short) o).shortValue();
+            } catch (Exception e) {
+                logExceptionMessage(o, "SHORT");
+            }
+        }
+
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixStringObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixStringObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixStringObjectInspector.java
new file mode 100644
index 0000000..e409e1d
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixStringObjectInspector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.Text;
+
+/**
+ * ObjectInspector for string type
+ */
+public class PhoenixStringObjectInspector extends AbstractPhoenixObjectInspector<Text>
+        implements StringObjectInspector {
+
+    private boolean escaped;
+    private byte escapeChar;
+
+    public PhoenixStringObjectInspector(boolean escaped, byte escapeChar) {
+        super(TypeInfoFactory.stringTypeInfo);
+        this.escaped = escaped;
+        this.escapeChar = escapeChar;
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new String((String) o);
+    }
+
+    @Override
+    public String getPrimitiveJavaObject(Object o) {
+        return (String) o;
+    }
+
+    @Override
+    public Text getPrimitiveWritableObject(Object o) {
+        Text value = null;
+
+        if (o != null) {
+            try {
+                value = new Text((String) o);
+            } catch (Exception e) {
+                logExceptionMessage(o, "STRING");
+            }
+        }
+
+        return value;
+    }
+
+    public boolean isEscaped() {
+        return escaped;
+    }
+
+    public byte getEscapeChar() {
+        return escapeChar;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixTimestampObjectInspector.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixTimestampObjectInspector.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixTimestampObjectInspector.java
new file mode 100644
index 0000000..7b13f2b
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/objectinspector/PhoenixTimestampObjectInspector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.objectinspector;
+
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.sql.Timestamp;
+
+/**
+ * ObjectInspector for timestamp type
+ */
+public class PhoenixTimestampObjectInspector extends
+        AbstractPhoenixObjectInspector<TimestampWritable>
+        implements TimestampObjectInspector {
+
+    public PhoenixTimestampObjectInspector() {
+        super(TypeInfoFactory.timestampTypeInfo);
+    }
+
+    @Override
+    public Timestamp getPrimitiveJavaObject(Object o) {
+        return (Timestamp) o;
+    }
+
+    @Override
+    public Object copyObject(Object o) {
+        return o == null ? null : new Timestamp(((Timestamp) o).getTime());
+    }
+
+    @Override
+    public TimestampWritable getPrimitiveWritableObject(Object o) {
+        TimestampWritable value = null;
+
+        if (o != null) {
+            try {
+                value = new TimestampWritable((Timestamp) o);
+            } catch (Exception e) {
+                logExceptionMessage(o, "TIMESTAMP");
+            }
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
new file mode 100644
index 0000000..b94e4df
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.ppd;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.phoenix.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
+import org.apache.phoenix.hive.ql.index.PredicateAnalyzerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Supporting class that generate DecomposedPredicate companion to PhoenixHiveStorageHandler
+ * basing on search conditions.
+ */
+public class PhoenixPredicateDecomposer {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixPredicateDecomposer.class);
+
+    private List<String> columnNameList;
+    private boolean calledPPD;
+
+    private List<IndexSearchCondition> searchConditionList;
+
+    public PhoenixPredicateDecomposer(List<String> columnNameList) {
+        this.columnNameList = columnNameList;
+    }
+
+    public DecomposedPredicate decomposePredicate(ExprNodeDesc predicate) {
+        IndexPredicateAnalyzer analyzer = PredicateAnalyzerFactory.createPredicateAnalyzer
+                (columnNameList, getFieldValidator());
+        DecomposedPredicate decomposed = new DecomposedPredicate();
+
+        List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>();
+        decomposed.residualPredicate = (ExprNodeGenericFuncDesc) analyzer.analyzePredicate
+                (predicate, conditions);
+        if (!conditions.isEmpty()) {
+            decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions);
+            try {
+                searchConditionList = conditions;
+                calledPPD = true;
+            } catch (Exception e) {
+                LOG.warn("Failed to decompose predicates", e);
+                return null;
+            }
+        }
+
+        return decomposed;
+    }
+
+    public List<IndexSearchCondition> getSearchConditionList() {
+        return searchConditionList;
+    }
+
+    public boolean isCalledPPD() {
+        return calledPPD;
+    }
+
+    protected IndexPredicateAnalyzer.FieldValidator getFieldValidator() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/537b90be/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
new file mode 100644
index 0000000..2faef73
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.ppd;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Support class that produces PredicateDecomposer for PhoenixStorageHandler
+ */
+
+public class PhoenixPredicateDecomposerManager {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixPredicateDecomposerManager.class);
+
+    // In case of absence of WHERE clause, PhoenixPredicateDecomposer is not created because
+    // it's not called method of StorageHandler.decomposePredicate.
+
+    private static final Map<String, List<PhoenixPredicateDecomposer>> PREDICATE_DECOMPOSER_MAP =
+            Maps.newConcurrentMap();
+
+    public static PhoenixPredicateDecomposer createPredicateDecomposer(String predicateKey,
+                                                                       List<String>
+                                                                               columnNameList) {
+        List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get
+                (predicateKey);
+        if (predicateDecomposerList == null) {
+            predicateDecomposerList = Lists.newArrayList();
+            PREDICATE_DECOMPOSER_MAP.put(predicateKey, predicateDecomposerList);
+        }
+
+        PhoenixPredicateDecomposer predicateDecomposer = new PhoenixPredicateDecomposer
+                (columnNameList);
+        predicateDecomposerList.add(predicateDecomposer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" +
+                    predicateKey + "] : " + predicateDecomposer);
+        }
+
+        return predicateDecomposer;
+    }
+
+    public static PhoenixPredicateDecomposer getPredicateDecomposer(String predicateKey) {
+        List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get
+                (predicateKey);
+
+        PhoenixPredicateDecomposer predicateDecomposer = null;
+        if (predicateDecomposerList != null && predicateDecomposerList.size() > 0) {
+            predicateDecomposer = predicateDecomposerList.remove(0);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" + predicateKey
+                    + "] : " + predicateDecomposer);
+        }
+
+        return predicateDecomposer;
+    }
+
+    private PhoenixPredicateDecomposerManager() {
+    }
+}


Mime
View raw message