phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject [06/11] phoenix git commit: Encode column names and take advantage of encoding in group by and order by
Date Mon, 08 Aug 2016 01:42:05 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 847979a..bb71367 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -52,8 +53,10 @@ import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -70,6 +73,7 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+
 /**
  *
  * Base class for PTable implementors.  Provides abstraction for
@@ -98,7 +102,8 @@ public class PTableImpl implements PTable {
     private List<PColumnFamily> families;
     private Map<byte[], PColumnFamily> familyByBytes;
     private Map<String, PColumnFamily> familyByString;
-    private ListMultimap<String,PColumn> columnsByName;
+    private ListMultimap<String, PColumn> columnsByName;
+    private ListMultimap<Integer, PColumn> kvColumnsByEncodedColumnNames;
     private PName pkName;
     private Integer bucketNum;
     private RowKeySchema rowKeySchema;
@@ -130,6 +135,8 @@ public class PTableImpl implements PTable {
     private boolean isNamespaceMapped;
     private String autoPartitionSeqName;
     private boolean isAppendOnlySchema;
+    private StorageScheme storageScheme;
+    private EncodedCQCounter encodedCQCounter;
 
     public PTableImpl() {
         this.indexes = Collections.emptyList();
@@ -161,8 +168,9 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
     }
     
+    // For indexes stored in shared physical tables
     public PTableImpl(PName tenantId, PName schemaName, PName tableName, long timestamp, List<PColumnFamily> families, 
-            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped) throws SQLException { // For indexes stored in shared physical tables
+            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         this.pkColumns = this.allColumns = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
         this.indexes = Collections.emptyList();
@@ -176,7 +184,7 @@ public class PTableImpl implements PTable {
         init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
             this.schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
             null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false);
+            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, encodedCQCounter);
     }
 
     public PTableImpl(long timeStamp) { // For delete marker
@@ -220,7 +228,7 @@ public class PTableImpl implements PTable {
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -230,7 +238,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -240,7 +248,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -250,7 +258,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
                 table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(),
-                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
@@ -261,7 +269,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), 
-                isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -272,7 +280,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -283,7 +291,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
-                table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table) throws SQLException {
@@ -294,7 +302,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -303,12 +311,12 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
                 indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
-                updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
+                updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, encodedCQCounter);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -318,13 +326,13 @@ public class PTableImpl implements PTable {
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
             int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped,
-            String autoPartitionSeqName, boolean isAppendOnlySchema)
+            String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
                 defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
                 indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, 
-                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
+                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, encodedCQCounter);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -333,11 +341,11 @@ public class PTableImpl implements PTable {
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
             int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
+                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, encodedCQCounter);
     }
     
     @Override
@@ -371,7 +379,7 @@ public class PTableImpl implements PTable {
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, 
-            boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
+            boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -407,10 +415,12 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
         this.autoPartitionSeqName = autoPartitionSeqName;
         this.isAppendOnlySchema = isAppendOnlySchema;
+        this.storageScheme = storageScheme;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
         
         this.columnsByName = ArrayListMultimap.create(columns.size(), 1);
+        this.kvColumnsByEncodedColumnNames = (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES ? ArrayListMultimap.<Integer, PColumn>create(columns.size(), 1) : null);
         int numPKColumns = 0;
         if (bucketNum != null) {
             // Add salt column to allColumns and pkColumns, but don't add to
@@ -437,7 +447,22 @@ public class PTableImpl implements PTable {
                     if (Objects.equal(familyName, dupColumn.getFamilyName())) {
                         count++;
                         if (count > 1) {
-                            throw new ColumnAlreadyExistsException(null, name.getString(), columnName);
+                            throw new ColumnAlreadyExistsException(schemaName.getString(), name.getString(), columnName);
+                        }
+                    }
+                }
+            }
+            Integer cq = column.getEncodedColumnQualifier();
+            //TODO: samarth understand the implication of this.
+            if (kvColumnsByEncodedColumnNames != null && cq != null) {
+                if (kvColumnsByEncodedColumnNames.put(cq, column)) {
+                    int count = 0;
+                    for (PColumn dupColumn : kvColumnsByEncodedColumnNames.get(cq)) {
+                        if (Objects.equal(familyName, dupColumn.getFamilyName())) {
+                            count++;
+                            if (count > 1) {
+                                throw new ColumnAlreadyExistsException(schemaName.getString(), name.getString(), columnName);
+                            }
                         }
                     }
                 }
@@ -501,7 +526,7 @@ public class PTableImpl implements PTable {
                 .orderedBy(Bytes.BYTES_COMPARATOR);
         for (int i = 0; i < families.length; i++) {
             Map.Entry<PName,List<PColumn>> entry = iterator.next();
-            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());
+            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), storageScheme == StorageScheme.ENCODED_COLUMN_NAMES);
             families[i] = family;
             familyByString.put(family.getName().getString(), family);
             familyByBytes.put(family.getName().getBytes(), family);
@@ -527,9 +552,9 @@ public class PTableImpl implements PTable {
         for (PName name : this.physicalNames) {
             estimatedSize += name.getEstimatedSize();
         }
-
         this.estimatedSize = estimatedSize;
         this.baseColumnCount = baseColumnCount;
+        this.encodedCQCounter = encodedCQCounter;
     }
 
     @Override
@@ -687,7 +712,7 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+    public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
         List<PColumn> columns = columnsByName.get(name);
         int size = columns.size();
         if (size == 0) {
@@ -706,6 +731,36 @@ public class PTableImpl implements PTable {
         }
         return columns.get(0);
     }
+    
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+        Preconditions.checkNotNull(cq);
+        if (!EncodedColumnsUtil.usesEncodedColumnNames(this)) {
+            String columnName = (String)PVarchar.INSTANCE.toObject(cq);
+            return getPColumnForColumnName(columnName);
+        } else {
+            Integer qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+            List<PColumn> columns = kvColumnsByEncodedColumnNames.get(qualifier);
+            int size = columns.size();
+            if (size == 0) {
+                //TODO: samarth should we have a column qualifier not found exception?
+                throw new ColumnNotFoundException(Bytes.toString(cq));
+            }
+            if (size > 1) {
+                for (PColumn column : columns) {
+                    if (column.getFamilyName() == null || QueryConstants.DEFAULT_COLUMN_FAMILY.equals(column.getFamilyName().getString())) {
+                        // Allow ambiguity with PK column or column in the default column family,
+                        // since a PK column cannot be prefixed and a user would not know how to
+                        // prefix a column in the default column family.
+                        return column;
+                    }
+                }
+                //TODO: samarth should we have a column qualifier not found exception?
+                throw new AmbiguousColumnException(columns.get(0).getName().getString());
+            }
+            return columns.get(0);
+        }
+    }
 
     /**
      *
@@ -761,10 +816,11 @@ public class PTableImpl implements PTable {
                 // Because we cannot enforce a not null constraint on a KV column (since we don't know if the row exists when
                 // we upsert it), se instead add a KV that is always emtpy. This allows us to imitate SQL semantics given the
                 // way HBase works.
+                Pair<byte[], byte[]> emptyKvInfo = EncodedColumnsUtil.getEmptyKeyValueInfo(PTableImpl.this);
                 addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
                     SchemaUtil.getEmptyColumnFamilyPtr(PTableImpl.this),
-                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
-                    QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
+                    new ImmutableBytesPtr(emptyKvInfo.getFirst()), ts,
+                    new ImmutableBytesPtr(emptyKvInfo.getSecond())));
                 mutations.add(setValues);
                 if (!unsetValues.isEmpty()) {
                     mutations.add(unsetValues);
@@ -798,7 +854,8 @@ public class PTableImpl implements PTable {
         public void setValue(PColumn column, byte[] byteValue) {
             deleteRow = null;
             byte[] family = column.getFamilyName().getBytes();
-            byte[] qualifier = column.getName().getBytes();
+            byte[] qualifier = getColumnQualifier(column);
+            ImmutableBytesPtr qualifierPtr = new ImmutableBytesPtr(qualifier);
             PDataType type = column.getDataType();
             // Check null, since some types have no byte representation for null
             boolean isNull = type.isNull(byteValue);
@@ -808,7 +865,7 @@ public class PTableImpl implements PTable {
                 }
                 removeIfPresent(setValues, family, qualifier);
                 deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column
-                            .getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts));
+                            .getFamilyName().getBytesPtr(), qualifierPtr, ts));
             } else {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable(byteValue == null ?
                         HConstants.EMPTY_BYTE_ARRAY : byteValue);
@@ -822,7 +879,7 @@ public class PTableImpl implements PTable {
             	}
                 removeIfPresent(unsetValues, family, qualifier);
                 addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
-                        column.getFamilyName().getBytesPtr(), column.getName().getBytesPtr(),
+                        column.getFamilyName().getBytesPtr(), qualifierPtr,
                         ts, ptr));
             }
         }
@@ -856,6 +913,11 @@ public class PTableImpl implements PTable {
                 deleteRow.setDurability(Durability.SKIP_WAL);
             }
         }
+        
+        private byte[] getColumnQualifier(PColumn column) {
+            return EncodedColumnsUtil.getColumnQualifier(column, PTableImpl.this);
+        }
+        
     }
 
     @Override
@@ -1013,114 +1075,125 @@ public class PTableImpl implements PTable {
     public IndexType getIndexType() {
         return indexType;
     }
-
+    
+    //FIXME: samarth change the proto here
     /**
      * Construct a PTable instance from ProtoBuffered PTable instance
      * @param table
      */
     public static PTable createFromProto(PTableProtos.PTable table) {
-      PName tenantId = null;
-      if(table.hasTenantId()){
-        tenantId = PNameFactory.newName(table.getTenantId().toByteArray());
-      }
-      PName schemaName = PNameFactory.newName(table.getSchemaNameBytes().toByteArray());
-      PName tableName = PNameFactory.newName(table.getTableNameBytes().toByteArray());
-      PTableType tableType = PTableType.values()[table.getTableType().ordinal()];
-      PIndexState indexState = null;
-      if (table.hasIndexState()) {
-        indexState = PIndexState.fromSerializedValue(table.getIndexState());
-      }
-      Short viewIndexId = null;
-      if(table.hasViewIndexId()){
-    	  viewIndexId = (short)table.getViewIndexId();
-      }
-      IndexType indexType = IndexType.getDefault();
-      if(table.hasIndexType()){
-          indexType = IndexType.fromSerializedValue(table.getIndexType().toByteArray()[0]);
-      }
-      long sequenceNumber = table.getSequenceNumber();
-      long timeStamp = table.getTimeStamp();
-      long indexDisableTimestamp = table.getIndexDisableTimestamp();
-      PName pkName = null;
-      if (table.hasPkNameBytes()) {
-        pkName = PNameFactory.newName(table.getPkNameBytes().toByteArray());
-      }
-      int bucketNum = table.getBucketNum();
-      List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumnsCount());
-      for (PTableProtos.PColumn curPColumnProto : table.getColumnsList()) {
-        columns.add(PColumnImpl.createFromProto(curPColumnProto));
-      }
-      List<PTable> indexes = Lists.newArrayListWithExpectedSize(table.getIndexesCount());
-      for (PTableProtos.PTable curPTableProto : table.getIndexesList()) {
-        indexes.add(createFromProto(curPTableProto));
-      }
+        PName tenantId = null;
+        if(table.hasTenantId()){
+            tenantId = PNameFactory.newName(table.getTenantId().toByteArray());
+        }
+        PName schemaName = PNameFactory.newName(table.getSchemaNameBytes().toByteArray());
+        PName tableName = PNameFactory.newName(table.getTableNameBytes().toByteArray());
+        PTableType tableType = PTableType.values()[table.getTableType().ordinal()];
+        PIndexState indexState = null;
+        if (table.hasIndexState()) {
+            indexState = PIndexState.fromSerializedValue(table.getIndexState());
+        }
+        Short viewIndexId = null;
+        if(table.hasViewIndexId()){
+            viewIndexId = (short)table.getViewIndexId();
+        }
+        IndexType indexType = IndexType.getDefault();
+        if(table.hasIndexType()){
+            indexType = IndexType.fromSerializedValue(table.getIndexType().toByteArray()[0]);
+        }
+        long sequenceNumber = table.getSequenceNumber();
+        long timeStamp = table.getTimeStamp();
+        long indexDisableTimestamp = table.getIndexDisableTimestamp();
+        PName pkName = null;
+        if (table.hasPkNameBytes()) {
+            pkName = PNameFactory.newName(table.getPkNameBytes().toByteArray());
+        }
+        int bucketNum = table.getBucketNum();
+        List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumnsCount());
+        for (PTableProtos.PColumn curPColumnProto : table.getColumnsList()) {
+            columns.add(PColumnImpl.createFromProto(curPColumnProto));
+        }
+        List<PTable> indexes = Lists.newArrayListWithExpectedSize(table.getIndexesCount());
+        for (PTableProtos.PTable curPTableProto : table.getIndexesList()) {
+            indexes.add(createFromProto(curPTableProto));
+        }
 
-      boolean isImmutableRows = table.getIsImmutableRows();
-      PName dataTableName = null;
-      if (table.hasDataTableNameBytes()) {
-        dataTableName = PNameFactory.newName(table.getDataTableNameBytes().toByteArray());
-      }
-      PName defaultFamilyName = null;
-      if (table.hasDefaultFamilyName()) {
-        defaultFamilyName = PNameFactory.newName(table.getDefaultFamilyName().toByteArray());
-      }
-      boolean disableWAL = table.getDisableWAL();
-      boolean multiTenant = table.getMultiTenant();
-      boolean storeNulls = table.getStoreNulls();
-      boolean isTransactional = table.getTransactional();
-      ViewType viewType = null;
-      String viewStatement = null;
-      List<PName> physicalNames = Collections.emptyList();
-      if (tableType == PTableType.VIEW) {
-        viewType = ViewType.fromSerializedValue(table.getViewType().toByteArray()[0]);
-        if(table.hasViewStatement()){
-          viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray());
+        boolean isImmutableRows = table.getIsImmutableRows();
+        PName dataTableName = null;
+        if (table.hasDataTableNameBytes()) {
+            dataTableName = PNameFactory.newName(table.getDataTableNameBytes().toByteArray());
         }
-      }
-      if (tableType == PTableType.VIEW || viewIndexId != null) {
-        physicalNames = Lists.newArrayListWithExpectedSize(table.getPhysicalNamesCount());
-        for(int i = 0; i < table.getPhysicalNamesCount(); i++){
-          physicalNames.add(PNameFactory.newName(table.getPhysicalNames(i).toByteArray()));
+        PName defaultFamilyName = null;
+        if (table.hasDefaultFamilyName()) {
+            defaultFamilyName = PNameFactory.newName(table.getDefaultFamilyName().toByteArray());
+        }
+        boolean disableWAL = table.getDisableWAL();
+        boolean multiTenant = table.getMultiTenant();
+        boolean storeNulls = table.getStoreNulls();
+        boolean isTransactional = table.getTransactional();
+        ViewType viewType = null;
+        String viewStatement = null;
+        List<PName> physicalNames = Collections.emptyList();
+        if (tableType == PTableType.VIEW) {
+            viewType = ViewType.fromSerializedValue(table.getViewType().toByteArray()[0]);
+            if(table.hasViewStatement()){
+                viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray());
+            }
+        }
+        if (tableType == PTableType.VIEW || viewIndexId != null) {
+            physicalNames = Lists.newArrayListWithExpectedSize(table.getPhysicalNamesCount());
+            for(int i = 0; i < table.getPhysicalNamesCount(); i++){
+                physicalNames.add(PNameFactory.newName(table.getPhysicalNames(i).toByteArray()));
+            }
         }
-      }
-      
-      int baseColumnCount = -1;
-      if (table.hasBaseColumnCount()) {
-          baseColumnCount = table.getBaseColumnCount();
-      }
 
-      boolean rowKeyOrderOptimizable = false;
-      if (table.hasRowKeyOrderOptimizable()) {
-          rowKeyOrderOptimizable = table.getRowKeyOrderOptimizable();
-      }
-      long updateCacheFrequency = 0;
-      if (table.hasUpdateCacheFrequency()) {
-          updateCacheFrequency = table.getUpdateCacheFrequency();
-      }
-      boolean isNamespaceMapped=false;
-      if (table.hasIsNamespaceMapped()) {
-          isNamespaceMapped = table.getIsNamespaceMapped();
-      }
-      String autoParititonSeqName = null;
-      if (table.hasAutoParititonSeqName()) {
-          autoParititonSeqName = table.getAutoParititonSeqName();
-      }
-      boolean isAppendOnlySchema = false;
-      if (table.hasIsAppendOnlySchema()) {
-          isAppendOnlySchema = table.getIsAppendOnlySchema();
-      }
-      
-      try {
-        PTableImpl result = new PTableImpl();
-        result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
-            (bucketNum == NO_SALTING) ? null : bucketNum, columns, schemaName,dataTableName, indexes,
-            isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
-            multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, isAppendOnlySchema);
-        return result;
-      } catch (SQLException e) {
-        throw new RuntimeException(e); // Impossible
-      }
+        int baseColumnCount = -1;
+        if (table.hasBaseColumnCount()) {
+            baseColumnCount = table.getBaseColumnCount();
+        }
+
+        boolean rowKeyOrderOptimizable = false;
+        if (table.hasRowKeyOrderOptimizable()) {
+            rowKeyOrderOptimizable = table.getRowKeyOrderOptimizable();
+        }
+        long updateCacheFrequency = 0;
+        if (table.hasUpdateCacheFrequency()) {
+            updateCacheFrequency = table.getUpdateCacheFrequency();
+        }
+        boolean isNamespaceMapped=false;
+        if (table.hasIsNamespaceMapped()) {
+            isNamespaceMapped = table.getIsNamespaceMapped();
+        }
+        String autoParititonSeqName = null;
+        if (table.hasAutoParititonSeqName()) {
+            autoParititonSeqName = table.getAutoParititonSeqName();
+        }
+        boolean isAppendOnlySchema = false;
+        if (table.hasIsAppendOnlySchema()) {
+            isAppendOnlySchema = table.getIsAppendOnlySchema();
+        }
+        StorageScheme storageScheme = null;
+        if (table.hasStorageScheme()) {
+            storageScheme = StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
+        }
+        EncodedCQCounter encodedColumnQualifierCounter = null;
+        if (table.hasEncodedColumnQualifierCounter()) {
+            encodedColumnQualifierCounter = new EncodedCQCounter(table.getEncodedColumnQualifierCounter());
+        } else {
+            encodedColumnQualifierCounter = PTable.EncodedCQCounter.NULL_COUNTER;
+        }
+
+        try {
+            PTableImpl result = new PTableImpl();
+            result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
+                    (bucketNum == NO_SALTING) ? null : bucketNum, columns, schemaName,dataTableName, indexes,
+                            isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
+                            multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
+                            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, isAppendOnlySchema, storageScheme, encodedColumnQualifierCounter);
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e); // Impossible
+        }
     }
 
     public static PTableProtos.PTable toProto(PTable table) {
@@ -1193,10 +1266,16 @@ public class PTableImpl implements PTable {
       builder.setUpdateCacheFrequency(table.getUpdateCacheFrequency());
       builder.setIndexDisableTimestamp(table.getIndexDisableTimestamp());
       builder.setIsNamespaceMapped(table.isNamespaceMapped());
-      if (table.getAutoPartitionSeqName()!= null) {
+      if (table.getAutoPartitionSeqName() != null) {
           builder.setAutoParititonSeqName(table.getAutoPartitionSeqName());
       }
       builder.setIsAppendOnlySchema(table.isAppendOnlySchema());
+      if (table.getStorageScheme() != null) {
+          builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getStorageScheme().getSerializedValue()}));
+      }
+      if (table.getEncodedCQCounter() != PTable.EncodedCQCounter.NULL_COUNTER) {
+          builder.setEncodedColumnQualifierCounter(table.getEncodedCQCounter().getValue());
+      }
       return builder.build();
     }
 
@@ -1244,4 +1323,14 @@ public class PTableImpl implements PTable {
     public boolean isAppendOnlySchema() {
         return isAppendOnlySchema;
     }
+    
+    @Override
+    public StorageScheme getStorageScheme() {
+        return storageScheme;
+    }
+    
+    @Override
+    public EncodedCQCounter getEncodedCQCounter() {
+        return encodedCQCounter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
index 42699d9..017c75d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
@@ -28,7 +28,11 @@ public class PTableKey {
     public PTableKey(PName tenantId, String name) {
         Preconditions.checkNotNull(name);
         this.tenantId = tenantId;
-        this.name = name;
+        if (name.indexOf(QueryConstants.NAMESPACE_SEPARATOR) != -1) {
+            this.name = name.replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
+        } else {
+            this.name = name;
+        }
     }
 
     public PName getTenantId() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
index 19dd1c1..9336938 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
@@ -39,6 +39,7 @@ public class ProjectedColumn extends DelegateColumn {
         return name;
     }
     
+    @Override
     public PName getFamilyName() {
         return familyName;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 734a9ed..23cfd1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -38,7 +38,7 @@ public class SaltingUtil {
     public static final String SALTING_COLUMN_NAME = "_SALT";
     public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY";
     public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
-            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false);
+            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null);
     public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
         .addField(SALTING_COLUMN, false, SortOrder.getDefault())
         .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
index a8dc487..92371e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
@@ -17,6 +17,10 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+
 
 public abstract class BaseTuple implements Tuple {
 
@@ -24,4 +28,9 @@ public abstract class BaseTuple implements Tuple {
     public long getSequenceValue(int index) {
         throw new UnsupportedOperationException();
     }
+    
+    @Override
+    public void setKeyValues(List<Cell> values) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
index 58b1eda..3430f5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.List;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
@@ -61,4 +63,9 @@ public class DelegateTuple implements Tuple {
     public long getSequenceValue(int index) {
         return delegate.getSequenceValue(index);
     }
+
+    @Override
+    public void setKeyValues(List<Cell> values) {
+        delegate.setKeyValues(values);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
new file mode 100644
index 0000000..2110125
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable.StorageScheme;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
+
+/**
+ * List implementation that provides indexed based look up when the cell column qualifiers are generated using the
+ * {@link StorageScheme#ENCODED_COLUMN_NAMES} scheme. The api methods in this list assume that the caller wants to see
+ * and add only non null elements in the list. Such an assumption makes the implementation mimic the behavior that one
+ * would get when passing an {@link ArrayList} to hbase for filling in the key values returned by scanners. This
+ * implementation doesn't implement all the optional methods of the {@link List} interface which should be OK. A lot of
+ * things would be screwed up if HBase starts expecting that the the list implementation passed in to scanners
+ * implements all the optional methods of the interface too.
+ * 
+ * For getting elements out o
+ */
+@NotThreadSafe
+public class EncodedColumnQualiferCellsList implements List<Cell> {
+
+    private int minQualifier;
+    private int maxQualifier;
+    private final Cell[] array;
+    private int numNonNullElements;
+    private int firstNonNullElementIdx = -1;
+    private static final int RESERVED_RANGE_MIN = ENCODED_EMPTY_COLUMN_NAME;
+    private static final int RESERVED_RANGE_MAX = ENCODED_CQ_COUNTER_INITIAL_VALUE - 1;
+    private static final String RESERVED_RANGE = "(" + RESERVED_RANGE_MIN + ", " + RESERVED_RANGE_MAX + ")";
+    
+    
+    public EncodedColumnQualiferCellsList(int minQualifier, int maxQualifier) {
+        checkArgument(minQualifier <= maxQualifier, "Invalid arguments. Min: " + minQualifier + ". Max: " + maxQualifier);
+        if (!(minQualifier == maxQualifier && minQualifier == ENCODED_EMPTY_COLUMN_NAME)) {
+            checkArgument(minQualifier >= ENCODED_CQ_COUNTER_INITIAL_VALUE, "Argument minQualifier " + minQualifier + " needs to lie outside of the reserved range: " + RESERVED_RANGE);
+        }
+        this.minQualifier = minQualifier;
+        this.maxQualifier = maxQualifier;
+        int reservedRangeSize = RESERVED_RANGE_MAX - RESERVED_RANGE_MIN + 1;
+        int qualifierRangeSize = minQualifier > RESERVED_RANGE_MAX ? (maxQualifier - minQualifier + 1) : 0;
+        this.array = new Cell[reservedRangeSize + qualifierRangeSize];
+    }
+
+    @Override
+    public int size() {
+        return numNonNullElements;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return numNonNullElements == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return indexOf(o) >= 0;
+    }
+
+
+    /**
+     * This implementation only returns an array of non-null elements in the list.
+     */
+    @Override
+    public Object[] toArray() {
+        Object[] toReturn = new Object[numNonNullElements];
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = array[i];
+            }
+        }
+        return toReturn;
+    }
+
+
+    /**
+     * This implementation only returns an array of non-null elements in the list.
+     * This is not the most efficient way of copying elemts into an array 
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T[] toArray(T[] a) {
+        T[] toReturn = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), numNonNullElements);
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = (T)array[i];
+            }
+        }
+        return toReturn;
+    }
+
+    @Override
+    public boolean add(Cell e) {
+        if (e == null) {
+            throw new NullPointerException();
+        }
+        int columnQualifier = PInteger.INSTANCE.getCodec().decodeInt(e.getQualifierArray(), e.getQualifierOffset(), SortOrder.ASC);
+        checkQualifierRange(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        array[idx] = e;
+        numNonNullElements++;
+        if (firstNonNullElementIdx == -1) {
+            firstNonNullElementIdx = idx;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        if (o == null) {
+            return false;
+        }
+        Cell e = (Cell)o;
+        int i = 0;
+        while (i < array.length) {
+            if (array[i] != null && array[i].equals(e)) {
+                array[i] = null;
+                numNonNullElements--;
+                if (numNonNullElements == 0) {
+                    firstNonNullElementIdx = -1;
+                } else if (firstNonNullElementIdx == i) {
+                    // the element being removed was the first non-null element we knew
+                    while (i < array.length && (array[i]) == null) {
+                        i++;
+                    }
+                    if (i < array.length) {
+                        firstNonNullElementIdx = i;
+                    } else {
+                        firstNonNullElementIdx = -1;
+                    }
+                }
+                return true;
+            }
+            i++;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        boolean containsAll = true;
+        Iterator<?> itr = c.iterator();
+        while (itr.hasNext()) {
+            containsAll &= (indexOf(itr.next()) >= 0);
+        }
+        return containsAll;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends Cell> c) {
+        boolean changed = false;
+        for (Cell cell : c) {
+            if (c == null) {
+                throw new NullPointerException();
+            }
+            changed |= add(cell);
+        }
+        return changed;
+    }
+
+    @Override
+    public boolean addAll(int index, Collection<? extends Cell> c) {
+        throwUnsupportedOperationException();
+        return false;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        Iterator<?> itr = c.iterator();
+        boolean changed = false;
+        while (itr.hasNext()) {
+            changed |= remove(itr.next());
+        }
+        return changed;
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throwUnsupportedOperationException();
+        return false;
+    }
+
+    @Override
+    public void clear() {
+        for (int i = 0; i < array.length; i++) {
+            array[i] = null;
+        }
+        firstNonNullElementIdx = -1;
+        numNonNullElements = 0;
+    }
+    
+    @Override
+    public Cell get(int index) {
+        rangeCheck(index);
+        int numNonNullElementsFound = 0;
+        int i = 0;
+        for (; i < array.length; i++) {
+            if (array[i] != null) {
+                numNonNullElementsFound++;
+                if (numNonNullElementsFound - 1 == index) {
+                    break;
+                }
+            }
+            
+        }
+        return (numNonNullElementsFound - 1) != index ? null : array[i];
+    }
+
+    @Override
+    public Cell set(int index, Cell e) {
+        int columnQualifier = PInteger.INSTANCE.getCodec().decodeInt(e.getQualifierArray(), e.getQualifierOffset(), SortOrder.ASC);
+        checkQualifierRange(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        if (idx != index) {
+            throw new IllegalArgumentException("Attempt made to add cell with encoded column qualifier " + columnQualifier +  "  to the encodedcolumnqualifier list at index " + index);
+        }
+        Cell prev = array[idx];
+        array[idx] = e;
+        numNonNullElements++;
+        if (firstNonNullElementIdx == -1) {
+            firstNonNullElementIdx = idx;
+        }
+        return prev;
+    }
+
+    @Override
+    public void add(int index, Cell element) {
+        throwUnsupportedOperationException();
+    }
+
+    @Override
+    public Cell remove(int index) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public int indexOf(Object o) {
+        if (o == null) {
+            return -1;
+        } else {
+            for (int i = 0; i < array.length; i++)
+                if (o.equals(array[i])) {
+                    return i;
+                }
+        }
+        return -1;
+    }
+
+    @Override
+    public int lastIndexOf(Object o) {
+        if (o == null) {
+            return -1;
+        }
+        for (int i = array.length - 1; i >=0 ; i--) {
+            if (o.equals(array[i])) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public ListIterator<Cell> listIterator() {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public ListIterator<Cell> listIterator(int index) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public List<Cell> subList(int fromIndex, int toIndex) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+    
+    @Override
+    public Iterator<Cell> iterator() {
+        return new Itr();
+    }
+    
+    //TODO: samarth confirm that this method can return null.
+    public Cell getCellForColumnQualifier(int columnQualifier) {
+        checkQualifierRange(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        Cell c =  array[idx];
+        return c;
+    }
+    
+    public Cell getFirstCell()  {
+        if (firstNonNullElementIdx == -1) {
+            throw new NoSuchElementException("No elements present in the list");
+        }
+        return array[firstNonNullElementIdx];
+    }
+
+    private void checkQualifierRange(int qualifier) {
+        if (!(isReservedQualifier(qualifier) || isQualifierInMinMaxRange(qualifier))) { 
+            throw new IndexOutOfBoundsException(
+                "Qualifier " + qualifier + " is out of the valid range. Reserved: " + RESERVED_RANGE + ". Table column qualifier range: ("
+                        + minQualifier + ", " + maxQualifier + ")"); 
+        }
+    }
+    
+    private boolean isReservedQualifier(int qualifier) {
+        return qualifier >= RESERVED_RANGE_MIN && qualifier <= RESERVED_RANGE_MAX; 
+    }
+    
+    private boolean isQualifierInMinMaxRange(int qualifier) {
+        return qualifier >= minQualifier && qualifier <= maxQualifier;
+    }
+
+    private void rangeCheck(int index) {
+        if (index < 0 || index > size() - 1) {
+            throw new IndexOutOfBoundsException();
+        }
+    }
+    
+    private int getArrayIndex(int columnQualifier) {
+        return columnQualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE ? columnQualifier : ENCODED_CQ_COUNTER_INITIAL_VALUE
+                + (columnQualifier - minQualifier);
+    }
+    
+    private void throwUnsupportedOperationException() {
+        throw new UnsupportedOperationException("Operation cannot be supported because it potentially violates the invariance contract of this list implementation");
+    }
+
+    private class Itr implements Iterator<Cell> {
+        private Cell current;
+        private int currentIdx = 0;
+        private boolean exhausted = false;
+        private Itr() {
+            moveToNextNonNullCell(true);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !exhausted;
+        }
+
+        @Override
+        public Cell next() {
+            if (exhausted) {
+                return null;
+            }
+            Cell next = current;
+            moveToNextNonNullCell(false);
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            throwUnsupportedOperationException();            
+        }
+
+        private void moveToNextNonNullCell(boolean init) {
+            int i = init ? 0 : currentIdx + 1;
+            while (i < array.length && (current = array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                currentIdx = i;
+            } else {
+                currentIdx = -1;
+                exhausted = true;
+            }
+        }
+
+    }
+
+    private class ListItr implements ListIterator<Cell> {
+        private int previousIndex;
+        private int nextIndex;
+        private Cell previous;
+        private Cell next;
+        
+        private ListItr() {
+            movePointersForward(true);
+            previous = null;
+            if (nextIndex != -1) {
+                next = array[nextIndex];
+            }
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public Cell next() {
+            Cell toReturn = next;
+            if (toReturn == null) {
+                throw new NoSuchElementException();
+            }
+            movePointersForward(false);
+            return toReturn;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return previous != null;
+        }
+
+        @Override
+        public Cell previous() {
+            Cell toReturn = previous;
+            if (toReturn == null) {
+                throw new NoSuchElementException();
+            }
+            movePointersBackward(false);
+            return toReturn;
+        }
+
+        @Override
+        public int nextIndex() {
+            return nextIndex;
+        }
+
+        @Override
+        public int previousIndex() {
+            return previousIndex;
+        }
+
+        @Override
+        public void remove() {
+            // TODO Auto-generated method stub
+            
+        }
+        
+        // TODO: samarth this is one of these ouch methods that can make our implementation frgaile.
+        // It is a non-optional method and can't really be supported 
+        @Override
+        public void set(Cell e) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void add(Cell e) {
+            // TODO Auto-generated method stub
+            
+        }
+        
+        private void movePointersForward(boolean init) {
+            int i = init ? 0 : nextIndex;
+            if (!init) {
+                previousIndex = nextIndex;
+                previous = next;
+            } else {
+                previousIndex = -1;
+                previous = null;
+            }
+            while (i < array.length && (array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                nextIndex = i;
+                next = array[i];
+            } else {
+                nextIndex = -1;
+                next = null;
+            }
+        }
+        
+        private void movePointersBackward(boolean init) {
+            int i = init ? 0 : previousIndex;
+        }
+        
+    }
+
+    public static void main (String args[]) throws Exception {
+        EncodedColumnQualiferCellsList list = new EncodedColumnQualiferCellsList(11, 16); // list of 6 elements
+        System.out.println(list.size());
+        
+        byte[] row = Bytes.toBytes("row");
+        byte[] cf = Bytes.toBytes("cf");
+        
+        // add elements in reserved range
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(0)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(10)));
+        System.out.println(list.size());
+        for (Cell c : list) {
+            //System.out.println(c);
+        }
+        
+        // add elements in qualifier range
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(12)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(14)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(16)));
+        System.out.println(list.size());
+        for (Cell c : list) {
+            //System.out.println(c);
+        }
+        
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(11)));
+        System.out.println(list.size());
+        for (Cell c : list) {
+            //System.out.println(c);
+        }
+        
+        System.out.println(list.get(0));
+        System.out.println(list.get(1));
+        System.out.println(list.get(2));
+        System.out.println(list.get(3));
+        System.out.println(list.get(4));
+        System.out.println(list.get(5));
+        System.out.println(list.get(6));
+        System.out.println(list.remove(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5))));
+        System.out.println(list.get(5));
+        System.out.println(list.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
index 53f155b..d946870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
@@ -36,6 +36,7 @@ public class MultiKeyValueTuple extends BaseTuple {
     }
 
     /** Caller must not modify the list that is passed here */
+    @Override
     public void setKeyValues(List<Cell> values) {
         this.values = values;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
new file mode 100644
index 0000000..0c6ae22
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
+
+/**
+ * Tuple that uses the 
+ */
+public class PositionBasedMultiKeyValueTuple extends BaseTuple {
+    private EncodedColumnQualiferCellsList values;
+
+    public PositionBasedMultiKeyValueTuple() {}
+    
+    public PositionBasedMultiKeyValueTuple(List<Cell> values) {
+        checkArgument(values instanceof EncodedColumnQualiferCellsList, "PositionBasedMultiKeyValueTuple only works with lists of type BoundedSkipNullCellsList");
+        this.values = (EncodedColumnQualiferCellsList)values;
+    }
+    
+    /** Caller must not modify the list that is passed here */
+    @Override
+    public void setKeyValues(List<Cell> values) {
+        checkArgument(values instanceof EncodedColumnQualiferCellsList, "PositionBasedMultiKeyValueTuple only works with lists of type BoundedSkipNullCellsList");
+        this.values = (EncodedColumnQualiferCellsList)values;
+    }
+
+    @Override
+    public void getKey(ImmutableBytesWritable ptr) {
+        Cell value = values.getFirstCell();
+        ptr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
+    }
+
+    @Override
+    public boolean isImmutable() {
+        return true;
+    }
+
+    @Override
+    public Cell getValue(byte[] family, byte[] qualifier) {
+        return values.getCellForColumnQualifier(PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.ASC));
+    }
+
+    @Override
+    public String toString() {
+        return values.toString();
+    }
+
+    @Override
+    public int size() {
+        return values.size();
+    }
+
+    @Override
+    public Cell getValue(int index) {
+        return values.get(index);
+    }
+
+    @Override
+    public boolean getValue(byte[] family, byte[] qualifier,
+            ImmutableBytesWritable ptr) {
+        Cell kv = getValue(family, qualifier);
+        if (kv == null)
+            return false;
+        ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+        return true;
+    }}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
new file mode 100644
index 0000000..8f4a846
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
+
+public class PositionBasedResultTuple extends BaseTuple {
+    private final EncodedColumnQualiferCellsList cells;
+    
+    //TODO: samarth see if we can get rid of this constructor altogether.
+    public PositionBasedResultTuple(List<Cell> list) {
+        checkArgument(list instanceof EncodedColumnQualiferCellsList, "Invalid list type");
+        this.cells = (EncodedColumnQualiferCellsList)list;
+    }
+    
+    @Override
+    public void getKey(ImmutableBytesWritable ptr) {
+        Cell value = cells.getFirstCell();
+        ptr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
+    }
+
+    @Override
+    public boolean isImmutable() {
+        return true;
+    }
+
+    @Override
+    public KeyValue getValue(byte[] family, byte[] qualifier) {
+        int columnQualifier = PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.ASC);
+        return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.getCellForColumnQualifier(columnQualifier));
+    }
+
+    //TODO: samarth implement this.
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("keyvalues=");
+      return sb.toString();
+    }
+
+    @Override
+    public int size() {
+        return cells.size();
+    }
+
+    @Override
+    public KeyValue getValue(int index) {
+        return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.get(index));
+    }
+
+    @Override
+    public boolean getValue(byte[] family, byte[] qualifier,
+            ImmutableBytesWritable ptr) {
+        KeyValue kv = getValue(family, qualifier);
+        if (kv == null)
+            return false;
+        ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
index c28a2bf..845b113 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
@@ -17,33 +17,44 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+
+import java.util.Collections;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.KeyValueUtil;
 
-
+/**
+ * 
+ * Wrapper around {@link Result} that implements Phoenix's {@link Tuple} interface.
+ *
+ */
 public class ResultTuple extends BaseTuple {
-    private Result result;
+    private final Result result;
+    public static final ResultTuple EMPTY_TUPLE = new ResultTuple(Result.create(Collections.<Cell>emptyList()));
     
+    //TODO: samarth see if we can get rid of this constructor altogether.
     public ResultTuple(Result result) {
         this.result = result;
     }
     
-    public ResultTuple() {
-    }
+//    public ResultTuple(Result result, boolean useQualifierAsIndex) {
+//        this.result = result;
+//        this.useQualifierAsIndex = useQualifierAsIndex;
+//    }
     
     public Result getResult() {
         return this.result;
     }
 
-    public void setResult(Result result) {
-        this.result = result;
-    }
-    
     @Override
     public void getKey(ImmutableBytesWritable ptr) {
         ptr.set(result.getRow());
@@ -56,6 +67,12 @@ public class ResultTuple extends BaseTuple {
 
     @Override
     public KeyValue getValue(byte[] family, byte[] qualifier) {
+//        if (useQualifierAsIndex) {
+//            int index = PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.ASC);
+//            //TODO: samarth this seems like a hack here at this place. Think more. Maybe we should use a new tuple here?
+//            index = index >= ENCODED_CQ_COUNTER_INITIAL_VALUE ? (index - ENCODED_CQ_COUNTER_INITIAL_VALUE) : index;
+//            return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.rawCells()[index]);
+//        }
         Cell cell = KeyValueUtil.getColumnLatest(GenericKeyValueBuilder.INSTANCE, 
           result.rawCells(), family, qualifier);
         return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell);
@@ -104,4 +121,4 @@ public class ResultTuple extends BaseTuple {
         ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
         return true;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
index 61b2a4f..e4a887b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.List;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
@@ -87,4 +89,6 @@ public interface Tuple {
      * @return the current or next sequence value
      */
     public long getSequenceValue(int index);
+    
+    public void setKeyValues(List<Cell> values);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/UnboundedSkipNullCellsList.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/UnboundedSkipNullCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/UnboundedSkipNullCellsList.java
new file mode 100644
index 0000000..02a85a5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/UnboundedSkipNullCellsList.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
+
+import com.google.common.base.Preconditions;
+
+public class UnboundedSkipNullCellsList implements List<Cell> {
+    private int minQualifier;
+    private int maxQualifier;
+    private Cell[] array;
+    private int numNonNullElements;
+    private int firstNonNullElementIdx = -1;
+    private int leftBoundary;
+    private int rightBoundary;
+    
+    // extra capacity we have either at the start or at the end or at at both extremes
+    // to accommodate column qualifiers outside of the range (minQualifier, maxQualifier)
+    private static final int INIITAL_EXTRA_BUFFER = 10;  
+
+    public UnboundedSkipNullCellsList(int minQualifier, int maxQualifier) {
+        checkArgument(maxQualifier - minQualifier > 0, "Illegal arguments. MinQualifier: " + minQualifier + ". MaxQualifier: " + maxQualifier);
+        this.minQualifier = minQualifier;
+        this.maxQualifier = maxQualifier;
+        int minIndex = Math.max(0, minQualifier - INIITAL_EXTRA_BUFFER);
+        int maxIndex = maxQualifier + INIITAL_EXTRA_BUFFER;
+        int size = maxIndex - minIndex + 1;
+        this.array = new Cell[size];
+    }
+    
+    
+    @Override
+    public int size() {
+        return numNonNullElements;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return numNonNullElements == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return indexOf(o) >= 0;
+    }
+
+
+    /**
+     * This implementation only returns an array of non-null elements in the list.
+     */
+    @Override
+    public Object[] toArray() {
+        Object[] toReturn = new Object[numNonNullElements];
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = array[i];
+            }
+        }
+        return toReturn;
+    }
+
+
+    /**
+     * This implementation only returns an array of non-null elements in the list.
+     * This is not the most efficient way of copying elemts into an array 
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T[] toArray(T[] a) {
+        T[] toReturn = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), numNonNullElements);
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = (T)array[i];
+            }
+        }
+        return toReturn;
+    }
+
+    @Override
+    public boolean add(Cell e) {
+        if (e == null) {
+            throw new NullPointerException();
+        }
+        int columnQualifier = PInteger.INSTANCE.getCodec().decodeInt(e.getQualifierArray(), e.getQualifierOffset(), SortOrder.ASC);
+        if (columnQualifier < 0) {
+            throw new IllegalArgumentException("Invalid column qualifier " + columnQualifier + " for cell " + e);
+        }
+        ensureCapacity(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        array[idx] = e;
+        numNonNullElements++;
+        if (firstNonNullElementIdx == -1) {
+            firstNonNullElementIdx = idx;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        if (o == null) {
+            return false;
+        }
+        Cell e = (Cell)o;
+        int i = 0;
+        while (i < array.length) {
+            if (array[i] != null && array[i].equals(e)) {
+                array[i] = null;
+                numNonNullElements--;
+                if (numNonNullElements == 0) {
+                    firstNonNullElementIdx = -1;
+                } else if (firstNonNullElementIdx == i) {
+                    // the element being removed was the first non-null element we knew
+                    while (i < array.length && (array[i]) == null) {
+                        i++;
+                    }
+                    if (i < array.length) {
+                        firstNonNullElementIdx = i;
+                    } else {
+                        firstNonNullElementIdx = -1;
+                    }
+                }
+                return true;
+            }
+            i++;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        boolean containsAll = true;
+        Iterator<?> itr = c.iterator();
+        while (itr.hasNext()) {
+            containsAll &= (indexOf(itr.next()) >= 0);
+        }
+        return containsAll;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends Cell> c) {
+        boolean changed = false;
+        for (Cell cell : c) {
+            if (c == null) {
+                throw new NullPointerException();
+            }
+            changed |= add(cell);
+        }
+        return changed;
+    }
+
+    @Override
+    public boolean addAll(int index, Collection<? extends Cell> c) {
+        throwUnsupportedOperationException();
+        return false;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        Iterator<?> itr = c.iterator();
+        boolean changed = false;
+        while (itr.hasNext()) {
+            changed |= remove(itr.next());
+        }
+        return changed;
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throwUnsupportedOperationException();
+        return false;
+    }
+
+    @Override
+    public void clear() {
+        Arrays.fill(array, null);
+    }
+    
+    @Override
+    public Cell get(int index) {
+        rangeCheck(index);
+        int counter = 0;
+        for (; counter < array.length; counter++) {
+            if (array[counter] != null && counter == index) {
+                break;
+            }
+        }
+        return array[counter];
+    }
+
+    @Override
+    public Cell set(int index, Cell element) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public void add(int index, Cell element) {
+        throwUnsupportedOperationException();
+    }
+
+    @Override
+    public Cell remove(int index) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public int indexOf(Object o) {
+        if (o == null) {
+            return -1;
+        } else {
+            for (int i = 0; i < array.length; i++)
+                if (o.equals(array[i])) {
+                    return i;
+                }
+        }
+        return -1;
+    }
+
+    @Override
+    public int lastIndexOf(Object o) {
+        if (o == null) {
+            return -1;
+        }
+        for (int i = array.length - 1; i >=0 ; i--) {
+            if (o.equals(array[i])) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public ListIterator<Cell> listIterator() {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public ListIterator<Cell> listIterator(int index) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+
+    @Override
+    public List<Cell> subList(int fromIndex, int toIndex) {
+        throwUnsupportedOperationException();
+        return null;
+    }
+    
+    @Override
+    public Iterator<Cell> iterator() {
+        return new Itr();
+    }
+    
+    public Cell getCellForColumnQualifier(int columnQualifier) {
+        int idx = getArrayIndex(columnQualifier);
+        return array[idx];
+    }
+
+    //TODO: samarth need to handle overflow conditions and integer growing beyond sizeofint.
+    private void ensureCapacity(int qualifier) {
+        if (qualifier >= 0 && qualifier < leftBoundary) {
+            // This should happen very rarely.  
+            //TODO: samarth implement this case.
+        } else if (qualifier >= 0 && qualifier > rightBoundary) {
+            // TODO: samarth implement this case.
+        } 
+    }
+
+    private void rangeCheck(int index) {
+        if (index < 0 || index > size() - 1) {
+            throw new IndexOutOfBoundsException();
+        }
+    }
+
+    private void throwUnsupportedOperationException() {
+        throw new UnsupportedOperationException("Operation cannot be supported because it violates invariance");
+    }
+
+    private class Itr implements Iterator<Cell> {
+        private Cell current;
+        private int currentIdx = 0;
+        private boolean exhausted = false;
+        private Itr() {
+            moveToNextNonNullCell(true);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !exhausted;
+        }
+
+        @Override
+        public Cell next() {
+            if (exhausted) {
+                return null;
+            }
+            Cell next = current;
+            moveToNextNonNullCell(false);
+            return next;
+        }
+
+        @Override
+        public void remove() {
+            throwUnsupportedOperationException();            
+        }
+
+        private void moveToNextNonNullCell(boolean init) {
+            int i = init ? minQualifier : currentIdx + 1;
+            while (i < array.length && (current = array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                currentIdx = i;
+            } else {
+                currentIdx = -1;
+                exhausted = true;
+            }
+        }
+
+    }
+
+    public Cell getFirstCell()  {
+        if (firstNonNullElementIdx == -1) {
+            throw new IllegalStateException("List doesn't have any non-null cell present");
+        }
+        return array[firstNonNullElementIdx];
+    }
+
+    private int getArrayIndex(int columnQualifier) {
+        return columnQualifier - minQualifier;
+    }
+    
+    private class ListItr implements ListIterator<Cell> {
+        private int previousIndex;
+        private int nextIndex;
+        private Cell previous;
+        private Cell next;
+        
+        private ListItr() {
+            movePointersForward(true);
+            previous = null;
+            if (nextIndex != -1) {
+                next = array[nextIndex];
+            }
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public Cell next() {
+            Cell toReturn = next;
+            if (toReturn == null) {
+                throw new NoSuchElementException();
+            }
+            movePointersForward(false);
+            return toReturn;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return previous != null;
+        }
+
+        @Override
+        public Cell previous() {
+            Cell toReturn = previous;
+            if (toReturn == null) {
+                throw new NoSuchElementException();
+            }
+            movePointersBackward(false);
+            return toReturn;
+        }
+
+        @Override
+        public int nextIndex() {
+            return nextIndex;
+        }
+
+        @Override
+        public int previousIndex() {
+            return previousIndex;
+        }
+
+        @Override
+        public void remove() {
+            // TODO Auto-generated method stub
+            
+        }
+        
+        // TODO: samarth this is one of these ouch methods that can make our implementation frgaile.
+        // It is a non-optional method and can't really be supported 
+        @Override
+        public void set(Cell e) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void add(Cell e) {
+            // TODO Auto-generated method stub
+            
+        }
+        
+        private void movePointersForward(boolean init) {
+            int i = init ? 0 : nextIndex;
+            if (!init) {
+                previousIndex = nextIndex;
+                previous = next;
+            } else {
+                previousIndex = -1;
+                previous = null;
+            }
+            while (i < array.length && (array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                nextIndex = i;
+                next = array[i];
+            } else {
+                nextIndex = -1;
+                next = null;
+            }
+        }
+        
+        private void movePointersBackward(boolean init) {
+            int i = init ? 0 : previousIndex;
+        }
+        
+    }
+
+    public static void main (String args[]) throws Exception {
+        UnboundedSkipNullCellsList list = new UnboundedSkipNullCellsList(0, 3); // list of eleven elements
+        System.out.println(list.size());
+        byte[] row = Bytes.toBytes("row");
+        byte[] cf = Bytes.toBytes("cf");
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(0)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(10)));
+
+        for (Cell c : list) {
+            System.out.println(c);
+        }
+        System.out.println(list.size());
+        System.out.println(list.get(0));
+        System.out.println(list.get(5));
+        System.out.println(list.get(10));
+        System.out.println(list.get(1));
+        System.out.println(list.remove(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5))));
+        System.out.println(list.get(5));
+        System.out.println(list.size());
+    }
+}


Mime
View raw message