phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [2/4] phoenix git commit: PHOENIX-3586 Add StorageScheme table property to allow users to specify their custom storage schemes
Date Wed, 25 Jan 2017 21:55:02 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 67b7663..ee1af19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -47,6 +47,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
@@ -71,7 +72,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -93,9 +93,9 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
 import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
-import static org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_COLUMN_FAMILY;
-import static org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -195,11 +195,11 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRangeException;
-import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.types.PDataType;
@@ -273,7 +273,7 @@ public class MetaDataClient {
                     AUTO_PARTITION_SEQ +  "," +
                     APPEND_ONLY_SCHEMA + "," +
                     GUIDE_POSTS_WIDTH + "," +
-                    STORAGE_SCHEME + "," +
+                    IMMUTABLE_STORAGE_SCHEME + "," +
                     ENCODING_SCHEME + 
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
@@ -1695,7 +1695,8 @@ public class MetaDataClient {
                     ? SchemaUtil.isNamespaceMappingEnabled(tableType, connection.getQueryServices().getProps())
                     : parent.isNamespaceMapped();
             boolean isLocalIndex = indexType == IndexType.LOCAL;
-            QualifierEncodingScheme encodingScheme = QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+            QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
+            ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN;
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactional);
                 storeNulls = parent.getStoreNulls();
@@ -2033,7 +2034,6 @@ public class MetaDataClient {
             }
             int pkPositionOffset = pkColumns.size();
             int position = positionOffset;
-            StorageScheme storageScheme = ONE_CELL_PER_KEYVALUE_COLUMN;
             EncodedCQCounter cqCounter = NULL_COUNTER;
             PTable viewPhysicalTable = null;
             if (tableType == PTableType.VIEW) {
@@ -2048,7 +2048,7 @@ public class MetaDataClient {
                      * encoded column qualifier.
                      */
                     viewPhysicalTable = PhoenixRuntime.getTable(connection, physicalNames.get(0).getString());
-                    storageScheme = viewPhysicalTable.getStorageScheme();
+                    immutableStorageScheme = viewPhysicalTable.getImmutableStorageScheme();
                     encodingScheme = viewPhysicalTable.getEncodingScheme();
 					if (EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) {
                         cqCounter  = viewPhysicalTable.getEncodedCQCounter();
@@ -2088,21 +2088,29 @@ public class MetaDataClient {
                 }
                 if (tableExists) {
                     encodingScheme = NON_ENCODED_QUALIFIERS;
+                    immutableStorageScheme = ONE_CELL_PER_COLUMN;
                 } else if (parent != null) {
                     encodingScheme = parent.getEncodingScheme();
+                    immutableStorageScheme = parent.getImmutableStorageScheme();
                 } else {
                 	Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
                     if (encodingSchemeSerializedByte == null) {
                     	encodingSchemeSerializedByte = (byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES);
                     } 
                     encodingScheme =  QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte);
+                    if (isImmutableRows) {
+                        immutableStorageScheme = (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
+                        if (immutableStorageScheme == null) {
+                            immutableStorageScheme = ImmutableStorageScheme.valueOf(connection.getQueryServices().getProps().get(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
+                        } 
+                        if (immutableStorageScheme!=ONE_CELL_PER_COLUMN && encodingScheme == NON_ENCODED_QUALIFIERS) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                        }
+                    } 
                 }
-                if (isImmutableRows && encodingScheme != NON_ENCODED_QUALIFIERS) {
-                    storageScheme = ONE_CELL_PER_COLUMN_FAMILY;
-                } 
                 cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new EncodedCQCounter() : NULL_COUNTER;
             }
-                        
+
             Map<String, Integer> changedCqCounters = new HashMap<>(colDefs.size());
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType);
@@ -2127,7 +2135,7 @@ public class MetaDataClient {
                 boolean isPkColumn = isPkColumn(pkConstraint, colDef, columnDefName);
                 String cqCounterFamily = null;
                 if (!isPkColumn) {
-                    if (storageScheme == ONE_CELL_PER_COLUMN_FAMILY && encodingScheme != NON_ENCODED_QUALIFIERS) {
+                    if (immutableStorageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS && encodingScheme != NON_ENCODED_QUALIFIERS) {
                         // For this scheme we track column qualifier counters at the column family level.
                         cqCounterFamily = colDefFamily != null ? colDefFamily : (defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY);
                     } else {
@@ -2272,7 +2280,7 @@ public class MetaDataClient {
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_KEYVALUE_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER);
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
             
@@ -2426,7 +2434,7 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setLong(25, guidePostsWidth);
             }
-            tableUpsert.setByte(26, storageScheme.getSerializedMetadataValue()); //TODO: samarth should there be a null check here?
+            tableUpsert.setByte(26, immutableStorageScheme.getSerializedMetadataValue()); //TODO: samarth should there be a null check here?
             tableUpsert.setByte(27, encodingScheme.getSerializedMetadataValue());
             tableUpsert.execute();
 
@@ -2532,7 +2540,7 @@ public class MetaDataClient {
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns.values(),
                         parent == null ? null : parent.getSchemaName(), parent == null ? null : parent.getTableName(), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounterToBe);
+                        result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, immutableStorageScheme, encodingScheme, cqCounterToBe);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2716,7 +2724,7 @@ public class MetaDataClient {
                                 PTable viewIndexTable = new PTableImpl(null,
                                         SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                         SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                        table.getColumnFamilies(),table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme());
+                                        table.getColumnFamilies(),table.isNamespaceMapped(), table.getImmutableStorageScheme(), table.getEncodingScheme());
                                 tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
                             }
                         }
@@ -2837,12 +2845,12 @@ public class MetaDataClient {
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency) throws SQLException {
-        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, updateCacheFrequency, null, null, null, null, -1L, null);
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, updateCacheFrequency, null, null, null, null, -1L, null, null);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
             Boolean isTransactional, Long updateCacheFrequency, Boolean isImmutableRows, Boolean disableWAL,
-            Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema)
+            Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema, ImmutableStorageScheme immutableStorageScheme)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -2886,6 +2894,10 @@ public class MetaDataClient {
         if (appendOnlySchema !=null) {
             mutateBooleanProperty(tenantId, schemaName, tableName, APPEND_ONLY_SCHEMA, appendOnlySchema);
         }
+        if (immutableStorageScheme !=null) {
+            mutateStringProperty(tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
+        }
+        
         return seqNum;
     }
 
@@ -2926,6 +2938,23 @@ public class MetaDataClient {
             tableBoolUpsert.execute();
         }
     }
+    
+    private void mutateStringProperty(String tenantId, String schemaName, String tableName,
+            String propertyName, String propertyValue) throws SQLException {
+        String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+                TENANT_ID + "," +
+                TABLE_SCHEM + "," +
+                TABLE_NAME + "," +
+                propertyName +
+                ") VALUES (?, ?, ?, ?)";
+        try (PreparedStatement tableBoolUpsert = connection.prepareStatement(updatePropertySql)) {
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            tableBoolUpsert.setString(4, propertyValue);
+            tableBoolUpsert.execute();
+        }
+    }
 
     public MutationState addColumn(AddColumnStatement statement) throws SQLException {
         PTable table = FromCompiler.getResolver(statement, connection).getTables().get(0).getTable();
@@ -2951,6 +2980,7 @@ public class MetaDataClient {
             Long updateCacheFrequencyProp = null;
             Boolean appendOnlySchemaProp = null;
             Long guidePostWidth = -1L;
+            ImmutableStorageScheme immutableStorageSchemeProp = null;
 
             Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size());
             List<ColumnDef> columnDefs = null;
@@ -3013,6 +3043,8 @@ public class MetaDataClient {
                             guidePostWidth = (Long)value;
                         } else if (propName.equals(APPEND_ONLY_SCHEMA)) {
                             appendOnlySchemaProp = (Boolean) value;
+                        } else if (propName.equalsIgnoreCase(IMMUTABLE_STORAGE_SCHEME)) {
+                            immutableStorageSchemeProp = (ImmutableStorageScheme)value;
                         }
                     }
                     // if removeTableProps is true only add the property if it is not a HTable or Phoenix Table property
@@ -3055,7 +3087,7 @@ public class MetaDataClient {
                 Boolean isImmutableRows = null;
                 if (isImmutableRowsProp != null) {
                     if (isImmutableRowsProp.booleanValue() != table.isImmutableRows()) {
-                    	if (table.getStorageScheme() != StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN) {
+                    	if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
                     		throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY)
                     		.setSchemaName(schemaName).setTableName(tableName).build().buildException();
                     	}
@@ -3091,6 +3123,18 @@ public class MetaDataClient {
                         changingPhoenixTableProperty = true;
                     }
                 }
+                ImmutableStorageScheme immutableStorageScheme = null;
+                if (immutableStorageSchemeProp!=null) {
+                    if (table.getImmutableStorageScheme() == ONE_CELL_PER_COLUMN || 
+                            immutableStorageSchemeProp == ONE_CELL_PER_COLUMN) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
+                        .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                    }
+                    else if (immutableStorageSchemeProp != table.getImmutableStorageScheme()) {
+                        immutableStorageScheme = immutableStorageSchemeProp;
+                        changingPhoenixTableProperty = true;
+                    }
+                }
             
                 if (guidePostWidth == null || guidePostWidth >= 0) {
                     changingPhoenixTableProperty = true;
@@ -3170,13 +3214,13 @@ public class MetaDataClient {
                             if (!colDef.isPK()) {
                                 String colDefFamily = colDef.getColumnDefName().getFamilyName();
                                 String familyName = null;
-                                StorageScheme storageScheme = table.getStorageScheme();
+                                ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
                                 String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ? 
                                         tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
                                     if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) {
                                         defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
                                     }
-                                if (storageScheme == ONE_CELL_PER_COLUMN_FAMILY) {
+                                if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                                     familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily;
                                 } else {
                                     familyName = defaultColumnFamily;
@@ -3271,10 +3315,9 @@ public class MetaDataClient {
                     connection.rollback();
                 }
                 
-                long seqNum = table.getSequenceNumber();
                 if (changingPhoenixTableProperty || columnDefs.size() > 0) {
-                    seqNum = incrementTableSeqNum(table, tableType, columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows,
-                            disableWAL, multiTenant, storeNulls, guidePostWidth, appendOnlySchema);
+                    incrementTableSeqNum(table, tableType, columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows,
+                            disableWAL, multiTenant, storeNulls, guidePostWidth, appendOnlySchema, immutableStorageScheme);
                     tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
@@ -3371,7 +3414,7 @@ public class MetaDataClient {
                             PTable viewIndexTable = new PTableImpl(null,
                                     SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName),
                                     SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts,
-                                    table.getColumnFamilies(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme());
+                                    table.getColumnFamilies(), table.isNamespaceMapped(), table.getImmutableStorageScheme(), table.getEncodingScheme());
                             List<TableRef> tableRefs = Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
                             MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null,
                                     Collections.<PColumn> emptyList(), ts);
@@ -3637,7 +3680,7 @@ public class MetaDataClient {
                                         sharedTableState.getSchemaName(), sharedTableState.getTableName(), ts,
                                         table.getColumnFamilies(), sharedTableState.getColumns(),
                                         sharedTableState.getPhysicalNames(), sharedTableState.getViewIndexId(),
-                                        table.isMultiTenant(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                                        table.isMultiTenant(), table.isNamespaceMapped(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
                                 TableRef indexTableRef = new TableRef(viewIndexTable);
                                 PName indexTableTenantId = sharedTableState.getTenantId();
                                 if (indexTableTenantId==null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 5e9608b..4a02e54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.schema;
 
 import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
 
+import java.io.DataOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -32,6 +33,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;
+import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -164,32 +171,57 @@ public interface PTable extends PMetaDataEntity {
         }
     }
     
-    public enum StorageScheme {
-        ONE_CELL_PER_KEYVALUE_COLUMN((byte)1),
-        ONE_CELL_PER_COLUMN_FAMILY((byte)2);
+    public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier {
+        ONE_CELL_PER_COLUMN((byte)1) {
+            @Override
+            public ColumnValueEncoder getEncoder(int numElements) {
+                throw new UnsupportedOperationException();
+            }
+            
+            @Override
+            public ColumnValueDecoder getDecoder() {
+                throw new UnsupportedOperationException();
+            }
+        },
+        // stores a single cell per column family that contains all serialized column values
+        SINGLE_CELL_ARRAY_WITH_OFFSETS((byte)2) {
+            @Override
+            public ColumnValueEncoder getEncoder(int numElements) {
+                PDataType type = PVarbinary.INSTANCE;
+                int estimatedSize = PArrayDataType.estimateSize(numElements, type);
+                TrustedByteArrayOutputStream byteStream = new TrustedByteArrayOutputStream(estimatedSize);
+                DataOutputStream oStream = new DataOutputStream(byteStream);
+                return new PArrayDataTypeEncoder(byteStream, oStream, numElements, type, SortOrder.ASC, false, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+            }
+            
+            @Override
+            public ColumnValueDecoder getDecoder() {
+                return new PArrayDataTypeDecoder();
+            }
+        };
 
-        private final byte[] byteValue;
         private final byte serializedValue;
         
-        StorageScheme(byte serializedValue) {
+        private ImmutableStorageScheme(byte serializedValue) {
             this.serializedValue = serializedValue;
-            this.byteValue = Bytes.toBytes(this.name());
-        }
-
-        public byte[] getBytes() {
-            return byteValue;
         }
 
         public byte getSerializedMetadataValue() {
             return this.serializedValue;
         }
 
-        public static StorageScheme fromSerializedValue(byte serializedValue) {
-            if (serializedValue < 1 || serializedValue > StorageScheme.values().length) {
+        public static ImmutableStorageScheme fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 1 || serializedValue > ImmutableStorageScheme.values().length) {
                 return null;
             }
-            return StorageScheme.values()[serializedValue-1];
+            return ImmutableStorageScheme.values()[serializedValue-1];
         }
+
+    }
+    
+    interface ColumnValueEncoderDecoderSupplier {
+        ColumnValueEncoder getEncoder(int numElements);
+        ColumnValueDecoder getDecoder();
     }
     
     public enum QualifierEncodingScheme implements QualifierEncoderDecoder {
@@ -393,7 +425,7 @@ public interface PTable extends PMetaDataEntity {
         int decode(byte[] bytes, int offset, int length);
         Integer getMaxQualifier();
     }
-
+    
     long getTimeStamp();
     long getSequenceNumber();
     long getIndexDisableTimestamp();
@@ -607,7 +639,7 @@ public interface PTable extends PMetaDataEntity {
      * you are also not allowed to delete the table  
      */
     boolean isAppendOnlySchema();
-    StorageScheme getStorageScheme();
+    ImmutableStorageScheme getImmutableStorageScheme();
     QualifierEncodingScheme getEncodingScheme();
     EncodedCQCounter getEncodedCQCounter();
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index e84e529..0816fea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -50,10 +50,9 @@ import org.apache.phoenix.compile.ExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
-import org.apache.phoenix.expression.ArrayConstructorExpression;
-import org.apache.phoenix.expression.DelegateExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.SingleCellConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -63,16 +62,12 @@ import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -153,7 +148,7 @@ public class PTableImpl implements PTable {
     private boolean isNamespaceMapped;
     private String autoPartitionSeqName;
     private boolean isAppendOnlySchema;
-    private StorageScheme storageScheme;
+    private ImmutableStorageScheme immutableStorageScheme;
     private QualifierEncodingScheme qualifierEncodingScheme;
     private EncodedCQCounter encodedCQCounter;
 
@@ -188,7 +183,7 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
     }
     
-    public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, StorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For base table of mapped VIEW
+    public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, ImmutableStorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For base table of mapped VIEW
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         this.tenantId = tenantId;
         this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, tableName));
@@ -210,13 +205,13 @@ public class PTableImpl implements PTable {
         this.families = families;
         this.physicalNames = Collections.emptyList();
         this.isNamespaceMapped = isNamespaceMapped;
-        this.storageScheme = storageScheme;
+        this.immutableStorageScheme = storageScheme;
         this.qualifierEncodingScheme = encodingScheme;
     }
     
     // For indexes stored in shared physical tables
     public PTableImpl(PName tenantId, PName schemaName, PName tableName, long timestamp, List<PColumnFamily> families, 
-            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
+            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
             EncodedCQCounter encodedCQCounter) throws SQLException {
         this.pkColumns = this.allColumns = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
@@ -275,7 +270,7 @@ public class PTableImpl implements PTable {
                     indexes, table.isImmutableRows(), physicalNames, table.getDefaultFamilyName(), viewStatement,
                     table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                     table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency,
-                    table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                    table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
         }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes, PName parentSchemaName, String viewStatement) throws SQLException {
@@ -285,7 +280,7 @@ public class PTableImpl implements PTable {
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, Collection<PColumn> columns) throws SQLException {
@@ -295,7 +290,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException {
@@ -305,7 +300,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -315,7 +310,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
                 table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(),
-                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
@@ -326,7 +321,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), 
-                isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -337,7 +332,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -348,7 +343,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
-                table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table) throws SQLException {
@@ -359,7 +354,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -368,7 +363,7 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
@@ -383,7 +378,7 @@ public class PTableImpl implements PTable {
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
             int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped,
-            String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter)
+            String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
@@ -398,7 +393,7 @@ public class PTableImpl implements PTable {
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
             int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, 
+            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, 
             QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
@@ -438,7 +433,7 @@ public class PTableImpl implements PTable {
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, 
-            boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
+            boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
             EncodedCQCounter encodedCQCounter) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
@@ -475,7 +470,7 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
         this.autoPartitionSeqName = autoPartitionSeqName;
         this.isAppendOnlySchema = isAppendOnlySchema;
-        this.storageScheme = storageScheme;
+        this.immutableStorageScheme = storageScheme;
         this.qualifierEncodingScheme = qualifierEncodingScheme;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
@@ -913,7 +908,7 @@ public class PTableImpl implements PTable {
                 mutations.add(deleteRow);
             } else {
                 // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance
-                if (storageScheme == StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) {
+                if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
                     Put put = new Put(this.key);
                     if (isWALDisabled()) {
                         put.setDurability(Durability.SKIP_WAL);
@@ -927,28 +922,19 @@ public class PTableImpl implements PTable {
                             int qualifier = qualifierEncodingScheme.decode(column.getColumnQualifierBytes());
                             maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, qualifier);
                         }
-                        Expression[] colValues = new Expression[maxEncodedColumnQualifier+1];
-                        Arrays.fill(colValues, new DelegateExpression(LiteralExpression.newConstant(null)) {
-                        			@Override
-                        		    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
-                        		        return false;
-                        		    }
-                        		});
-                        // 0 is a reserved position, set it to a non-null value so that we can represent absence of a value using a negative offset
-                        colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                        Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
                         for (PColumn column : columns) {
                         	if (columnToValueMap.containsKey(column)) {
-                        	    int qualifier = qualifierEncodingScheme.decode(column.getColumnQualifierBytes());
-                        		colValues[qualifier] = new LiteralExpression(columnToValueMap.get(column));
+                        	    int colIndex = qualifierEncodingScheme.decode(column.getColumnQualifierBytes())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+                        	    colValues[colIndex] = new LiteralExpression(columnToValueMap.get(column));
                         	}
                         }
                         
                         List<Expression> children = Arrays.asList(colValues);
-                        // we use ArrayConstructorExpression to serialize multiple columns into a single byte[]
-                        // construct the ArrayConstructorExpression with a variable length data type since columns can be of fixed or variable length 
-                        ArrayConstructorExpression arrayExpression = new ArrayConstructorExpression(children, PVarbinary.INSTANCE, rowKeyOrderOptimizable, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+                        // we use SingleCellConstructorExpression to serialize all the columns into a single byte[]
+                        SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children);
                         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                        arrayExpression.evaluate(null, ptr);
+                        singleCellConstructorExpression.evaluate(null, ptr);
                         ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily);
                         addQuietly(put, kvBuilder, kvBuilder.buildPut(keyPtr,
                             colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
@@ -1027,7 +1013,7 @@ public class PTableImpl implements PTable {
                 removeIfPresent(unsetValues, family, qualifier);
                 // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance
                 // we don't need to do anything with unsetValues as it is only used when storeNulls is false, storeNulls is always true when storeColsInSingleCell is true
-                if (storageScheme == StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) {
+                if (immutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                     columnToValueMap.put(column, ptr.get());
                 }
                 else {
@@ -1326,9 +1312,9 @@ public class PTableImpl implements PTable {
         if (table.hasIsAppendOnlySchema()) {
             isAppendOnlySchema = table.getIsAppendOnlySchema();
         }
-        StorageScheme storageScheme = null;
+        ImmutableStorageScheme storageScheme = null;
         if (table.hasStorageScheme()) {
-            storageScheme = StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
+            storageScheme = ImmutableStorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
         }
         QualifierEncodingScheme qualifierEncodingScheme = null;
         if (table.hasEncodingScheme()) {
@@ -1440,8 +1426,8 @@ public class PTableImpl implements PTable {
           builder.setAutoParititonSeqName(table.getAutoPartitionSeqName());
       }
       builder.setIsAppendOnlySchema(table.isAppendOnlySchema());
-      if (table.getStorageScheme() != null) {
-          builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getStorageScheme().getSerializedMetadataValue()}));
+      if (table.getImmutableStorageScheme() != null) {
+          builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()}));
       }
       if (table.getEncodedCQCounter() != null) {
           Map<String, Integer> values = table.getEncodedCQCounter().values();
@@ -1526,8 +1512,8 @@ public class PTableImpl implements PTable {
     }
     
     @Override
-    public StorageScheme getStorageScheme() {
-        return storageScheme;
+    public ImmutableStorageScheme getImmutableStorageScheme() {
+        return immutableStorageScheme;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 36df961..67ff1a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.util.SchemaUtil;
 
 public enum TableProperty {
@@ -165,7 +166,27 @@ public enum TableProperty {
 			return table.getEncodingScheme();
 		}	
 	    
-	}
+	},
+    
+    IMMUTABLE_STORAGE_SCHEME(PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+        @Override
+        public ImmutableStorageScheme getValue(Object value) {
+            if (value == null) {
+                return null;
+            } else if (value instanceof String) {
+                String strValue = (String) value;
+                return ImmutableStorageScheme.valueOf(strValue.toUpperCase());
+            } else {
+                throw new IllegalArgumentException("Immutable storage scheme table property must be a string");
+            }
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getImmutableStorageScheme();
+        }   
+        
+    }
     ;
 	
 	private final String propertyName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index d3065a7..e99003f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -31,11 +31,12 @@ import java.util.NoSuchElementException;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 
 /**
  * List implementation that provides indexed based look up when the cell column qualifiers are positive numbers. 
- * These qualifiers are generated by using one of the column qualifier encoding schemes specified in {@link QualifierEncodingScheme}. 
+ * These qualifiers are generated by using one of the column qualifier encoding schemes specified in {@link ImmutableStorageScheme}. 
  * The api methods in this list assume that the caller wants to see
  * and add only non null elements in the list. 
  * <p>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
index fede7d8..f31f272 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
@@ -22,20 +22,15 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.Types;
 import java.text.Format;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema;
-import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
@@ -354,130 +349,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         return createPhoenixArray(bytes, offset, length, sortOrder, baseType, maxLength, desiredDataType);
     }
 
-    public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index,
-            Expression arrayExpr, PDataType pDataType, Integer maxLen) {
-        if (!arrayExpr.evaluate(tuple, ptr)) {
-            return false;
-        } else if (ptr.getLength() == 0) { return true; }
-
-        // Given a ptr to the entire array, set ptr to point to a particular element within that array
-        // given the type of an array element (see comments in PDataTypeForArray)
-        return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
-    }
-
-    public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
-            Integer byteSize) {
-        byte[] bytes = ptr.get();
-        int initPos = ptr.getOffset();
-        if (!baseDataType.isFixedWidth()) {
-        	byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE];
-            int noOfElements = Bytes.toInt(bytes,
-                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
-            boolean useShort = true;
-            if (noOfElements < 0) {
-                noOfElements = -noOfElements;
-                useShort = false;
-            }
-            if (arrayIndex >= noOfElements) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                return false;
-            }
-
-            int indexOffset = Bytes.toInt(bytes,
-                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
-            if (arrayIndex >= noOfElements) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                return false;
-            } else {
-                // Skip those many offsets as given in the arrayIndex
-                // If suppose there are 5 elements in the array and the arrayIndex = 3
-                // This means we need to read the 4th element of the array
-                // So inorder to know the length of the 4th element we will read the offset of 4th element and the
-                // offset of 5th element.
-                // Subtracting the offset of 5th element and 4th element will give the length of 4th element
-                // So we could just skip reading the other elements.
-                int currOffset = getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion);
-                if (currOffset<0) {
-                	ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                    return false;
-                }
-                int elementLength = 0;
-                if (arrayIndex == (noOfElements - 1)) {
-                    int separatorBytes =  serializationVersion == SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
-                    elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + initPos) - separatorBytes;
-                } else {
-                    int separatorByte =  serializationVersion == SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
-                    elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - separatorByte;
-                }
-                ptr.set(bytes, currOffset + initPos, elementLength);
-            }
-        } else {
-            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize);
-            int offset = arrayIndex * elemByteSize;
-            if (offset >= ptr.getLength()) {
-                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-            } else {
-                ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
-            }
-        }
-        return true;
-    }
-
-    public static void positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
-            Integer byteSize, int offset, int length, int noOfElements, boolean first) {
-        byte[] bytes = ptr.get();
-        if (!baseDataType.isFixedWidth()) {
-        	byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE];
-            int indexOffset = Bytes.toInt(bytes, (offset + length - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT)))
-                    + offset;
-            boolean useShort = true;
-            if (first) {
-                int count = Bytes.toInt(bytes,
-                        (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
-                if (count < 0) {
-                    count = -count;
-                    useShort = false;
-                }
-            }
-            if (arrayIndex >= noOfElements) {
-                return;
-            } else {
-                // Skip those many offsets as given in the arrayIndex
-                // If suppose there are 5 elements in the array and the arrayIndex = 3
-                // This means we need to read the 4th element of the array
-                // So inorder to know the length of the 4th element we will read the offset of 4th element and the
-                // offset of 5th element.
-                // Subtracting the offset of 5th element and 4th element will give the length of 4th element
-                // So we could just skip reading the other elements.
-                int currOffset = getOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion);
-                int elementLength = 0;
-                if (arrayIndex == (noOfElements - 1)) {
-                    elementLength = (bytes[currOffset + offset] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + offset) - 3;
-                } else {
-                    elementLength = (bytes[currOffset + offset] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - 1;
-                }
-                ptr.set(bytes, currOffset + offset, elementLength);
-            }
-        } else {
-            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize);
-            offset += arrayIndex * elemByteSize;
-            if (offset >= offset + length) {
-                return;
-            } else {
-                ptr.set(bytes, offset, elemByteSize);
-            }
-        }
-    }
-
-    private static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) {
+    static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) {
         return Math.abs(getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion));
     }
 
-	private static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) {
+	static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) {
 		int offset;
         if (useShort) {
             offset = indexOffset + (Bytes.SIZEOF_SHORT * arrayIndex);
@@ -514,13 +390,13 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
      */
     private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
             PhoenixArray array, int noOfElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
-        PArrayDataTypeBytesArrayBuilder builder =
-                new PArrayDataTypeBytesArrayBuilder(byteStream, oStream, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
+        PArrayDataTypeEncoder builder =
+                new PArrayDataTypeEncoder(byteStream, oStream, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
         for (int i = 0; i < noOfElements; i++) {
             byte[] bytes = array.toBytes(i);
-            builder.appendElem(bytes);
+            builder.appendValue(bytes);
         }
-        return builder.getBytesAndClose();
+        return builder.encode();
     }
 
     public static boolean appendItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes,
@@ -1211,140 +1087,4 @@ public abstract class PArrayDataType<T> extends PDataType<T> {
         buf.append(']');
         return buf.toString();
     }
-
-    static public class PArrayDataTypeBytesArrayBuilder {
-        static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
-
-        private PDataType baseType;
-        private SortOrder sortOrder;
-        private List<Integer> offsetPos;
-        private TrustedByteArrayOutputStream byteStream;
-        private DataOutputStream oStream;
-        private int nulls;
-        private byte serializationVersion;
-        private boolean rowKeyOrderOptimizable;
-
-        public PArrayDataTypeBytesArrayBuilder(PDataType baseType, SortOrder sortOrder) {
-            this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new LinkedList<Integer>(), baseType, sortOrder, true);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
-                int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
-            this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
-                int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
-            this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, 
-                List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
-            this(byteStream, new DataOutputStream(byteStream), offsetPos, baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
-        }
-        
-        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
-                List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
-            this.baseType = baseType;
-            this.sortOrder = sortOrder;
-            this.offsetPos = offsetPos;
-            this.byteStream = byteStream;
-            this.oStream = oStream;
-            this.nulls = 0;
-            this.serializationVersion = serializationVersion;
-            this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
-        }
-
-        private void close() {
-            try {
-                if (byteStream != null) byteStream.close();
-                if (oStream != null) oStream.close();
-                byteStream = null;
-                oStream = null;
-            } catch (IOException ioe) {}
-        }
-        
-        // used to represent the absence of a value 
-        public void appendMissingElement() {
-            if (serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
-                offsetPos.add(-byteStream.size());
-                nulls++;
-            }
-        }
-
-        public boolean appendElem(byte[] bytes) {
-            return appendElem(bytes, 0, bytes.length);
-        }
-
-        public boolean appendElem(byte[] bytes, int offset, int len) {
-            if (oStream == null || byteStream == null) return false;
-            try {
-                // track the offset position here from the size of the byteStream
-                if (!baseType.isFixedWidth()) {
-                    // Any variable length array would follow the below order
-                    // Every element would be seperated by a seperator byte '0'
-                    // Null elements are counted and once a first non null element appears we
-                    // write the count of the nulls prefixed with a seperator byte
-                    // Trailing nulls are not taken into account
-                    // The last non null element is followed by two seperator bytes
-                    // For eg
-                    // a, b, null, null, c, null would be 
-                    // 65 0 66 0 0 2 67 0 0 0
-                    // a null null null b c null d would be
-                    // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
-                    if (len == 0) {
-                        offsetPos.add(byteStream.size());
-                        nulls++;
-                    } else {
-                        nulls = serializeNulls(oStream, nulls);
-                        offsetPos.add(byteStream.size());
-                        if (sortOrder == SortOrder.DESC) {
-                            SortOrder.invert(bytes, offset, bytes, offset, len);
-                            offset = 0;
-                        }
-                        oStream.write(bytes, offset, len);
-                        if (serializationVersion == SORTABLE_SERIALIZATION_VERSION) {
-                            oStream.write(getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
-                        }
-                    }
-                } else {
-                    // No nulls for fixed length
-                    if (sortOrder == SortOrder.DESC) {
-                        SortOrder.invert(bytes, offset, bytes, offset, len);
-                        offset = 0;
-                    }
-                    oStream.write(bytes, offset, len);
-                }
-                return true;
-            } catch (IOException e) {}
-            return false;
-        }
-
-        public byte[] getBytesAndClose() {
-            try {
-                if (!baseType.isFixedWidth()) {
-                    int noOfElements = offsetPos.size();
-                    int[] offsetPosArray = new int[noOfElements];
-                    int index = 0;
-                    for (Integer i : offsetPos) {
-                        offsetPosArray[index] = i;
-                        ++index;
-                    }
-                    if (serializationVersion == SORTABLE_SERIALIZATION_VERSION) {
-                        // Double seperator byte to show end of the non null array
-                        writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable);
-                    }
-                    noOfElements = PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                            offsetPosArray[offsetPosArray.length - 1], offsetPosArray, serializationVersion);
-                    serializeHeaderInfoIntoStream(oStream, noOfElements, serializationVersion);
-                }
-                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                ptr.set(byteStream.getBuffer(), 0, byteStream.size());
-                return ByteUtil.copyKeyBytesIfNecessary(ptr);
-            } catch (IOException e) {} finally {
-                close();
-            }
-            return null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
new file mode 100644
index 0000000..7a6ea91
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnValueDecoder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+
+
+public class PArrayDataTypeDecoder implements ColumnValueDecoder {
+    
+    @Override
+    public boolean decode(ImmutableBytesWritable ptr, int index) {
+        return PArrayDataTypeDecoder.positionAtArrayElement(ptr, index, PVarbinary.INSTANCE, null);
+    }
+
+    public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index,
+            Expression arrayExpr, PDataType pDataType, Integer maxLen) {
+        if (!arrayExpr.evaluate(tuple, ptr)) {
+            return false;
+        } else if (ptr.getLength() == 0) { return true; }
+    
+        // Given a ptr to the entire array, set ptr to point to a particular element within that array
+        // given the type of an array element (see comments in PDataTypeForArray)
+        return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
+    }
+
+    public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType,
+            Integer byteSize) {
+        byte[] bytes = ptr.get();
+        int initPos = ptr.getOffset();
+        if (!baseDataType.isFixedWidth()) {
+        	byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE];
+            int noOfElements = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
+            boolean useShort = true;
+            if (noOfElements < 0) {
+                noOfElements = -noOfElements;
+                useShort = false;
+            }
+            if (arrayIndex >= noOfElements) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
+            }
+    
+            int indexOffset = Bytes.toInt(bytes,
+                    (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
+            // Skip those many offsets as given in the arrayIndex
+            // If suppose there are 5 elements in the array and the arrayIndex = 3
+            // This means we need to read the 4th element of the array
+            // So inorder to know the length of the 4th element we will read the offset of 4th element and the
+            // offset of 5th element.
+            // Subtracting the offset of 5th element and 4th element will give the length of 4th element
+            // So we could just skip reading the other elements.
+            int currOffset = PArrayDataType.getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion);
+            if (currOffset<0) {
+            	ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
+            }
+            int elementLength = 0;
+            if (arrayIndex == (noOfElements - 1)) {
+                int separatorBytes =  serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
+                elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
+                        - (currOffset + initPos) - separatorBytes;
+            } else {
+                int separatorByte =  serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
+                elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : PArrayDataType.getOffset(bytes,
+                        arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - separatorByte;
+            }
+            ptr.set(bytes, currOffset + initPos, elementLength);
+        } else {
+            int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize);
+            int offset = arrayIndex * elemByteSize;
+            if (offset >= ptr.getLength()) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            } else {
+                ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
+            }
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
new file mode 100644
index 0000000..bb293bb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.types;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.ColumnValueEncoder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+public class PArrayDataTypeEncoder implements ColumnValueEncoder {
+    static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
+
+    private PDataType baseType;
+    private SortOrder sortOrder;
+    private List<Integer> offsetPos;
+    private TrustedByteArrayOutputStream byteStream;
+    private DataOutputStream oStream;
+    private int nulls;
+    private byte serializationVersion;
+    private boolean rowKeyOrderOptimizable;
+
+    public PArrayDataTypeEncoder(PDataType baseType, SortOrder sortOrder) {
+        this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new LinkedList<Integer>(), baseType, sortOrder, true);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
+            int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
+        this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
+            int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
+        this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, 
+            List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
+        this(byteStream, new DataOutputStream(byteStream), offsetPos, baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream,
+            List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
+        this.baseType = baseType;
+        this.sortOrder = sortOrder;
+        this.offsetPos = offsetPos;
+        this.byteStream = byteStream;
+        this.oStream = oStream;
+        this.nulls = 0;
+        this.serializationVersion = serializationVersion;
+        this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
+    }
+
+    private void close() {
+        try {
+            if (byteStream != null) byteStream.close();
+            if (oStream != null) oStream.close();
+            byteStream = null;
+            oStream = null;
+        } catch (IOException ioe) {}
+    }
+    
+    // used to represent the absence of a value 
+    @Override
+    public void appendAbsentValue() {
+        if (serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
+            offsetPos.add(-byteStream.size());
+            nulls++;
+        }
+        else {
+            throw new UnsupportedOperationException("Cannot represent an absent element");
+        }
+    }
+
+    public void appendValue(byte[] bytes) {
+        appendValue(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public void appendValue(byte[] bytes, int offset, int len) {
+        try {
+            // track the offset position here from the size of the byteStream
+            if (!baseType.isFixedWidth()) {
+                // Any variable length array would follow the below order
+                // Every element would be seperated by a seperator byte '0'
+                // Null elements are counted and once a first non null element appears we
+                // write the count of the nulls prefixed with a seperator byte
+                // Trailing nulls are not taken into account
+                // The last non null element is followed by two seperator bytes
+                // For eg
+                // a, b, null, null, c, null would be 
+                // 65 0 66 0 0 2 67 0 0 0
+                // a null null null b c null d would be
+                // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
+                if (len == 0) {
+                    offsetPos.add(byteStream.size());
+                    nulls++;
+                } else {
+                    nulls = PArrayDataType.serializeNulls(oStream, nulls);
+                    offsetPos.add(byteStream.size());
+                    if (sortOrder == SortOrder.DESC) {
+                        SortOrder.invert(bytes, offset, bytes, offset, len);
+                        offset = 0;
+                    }
+                    oStream.write(bytes, offset, len);
+                    if (serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+                        oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
+                    }
+                }
+            } else {
+                // No nulls for fixed length
+                if (sortOrder == SortOrder.DESC) {
+                    SortOrder.invert(bytes, offset, bytes, offset, len);
+                    offset = 0;
+                }
+                oStream.write(bytes, offset, len);
+            }
+        } catch (IOException e) {}
+    }
+
+    @Override
+    public byte[] encode() {
+        try {
+            if (!baseType.isFixedWidth()) {
+                int noOfElements = offsetPos.size();
+                int[] offsetPosArray = new int[noOfElements];
+                int index = 0;
+                for (Integer i : offsetPos) {
+                    offsetPosArray[index] = i;
+                    ++index;
+                }
+                if (serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION) {
+                    // Double seperator byte to show end of the non null array
+                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable);
+                }
+                noOfElements = PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
+                        offsetPosArray[offsetPosArray.length - 1], offsetPosArray, serializationVersion);
+                PArrayDataType.serializeHeaderInfoIntoStream(oStream, noOfElements, serializationVersion);
+            }
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            ptr.set(byteStream.getBuffer(), 0, byteStream.size());
+            return ByteUtil.copyKeyBytesIfNecessary(ptr);
+        } catch (IOException e) {} finally {
+            close();
+        }
+        return null;
+    }
+    
+}
\ No newline at end of file


Mime
View raw message