phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject git commit: Phoenix-10
Date Tue, 11 Feb 2014 10:25:37 GMT
Updated Branches:
  refs/heads/master 191d097bd -> 7a6a46c0a


Phoenix-10


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/7a6a46c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/7a6a46c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/7a6a46c0

Branch: refs/heads/master
Commit: 7a6a46c0adddf48946b5d1d52e3de3a99304c61d
Parents: 191d097
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Tue Feb 11 15:55:09 2014 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Tue Feb 11 15:55:09 2014 +0530

----------------------------------------------------------------------
 .../phoenix/compile/ProjectionCompiler.java     | 150 +++++++++++++++++-
 .../phoenix/coprocessor/ScanRegionObserver.java | 125 +++++++++++++--
 .../apache/phoenix/query/QueryConstants.java    |   5 +-
 .../org/apache/phoenix/end2end/ArrayTest.java   | 157 ++++++++++++++++++-
 4 files changed, 412 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/7a6a46c0/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index d617b5e..2869a25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -19,6 +19,9 @@
  */
 package org.apache.phoenix.compile;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,32 +32,38 @@ import java.util.NavigableSet;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.BaseTerminalExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.aggregator.ClientAggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.BindParseNode;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.FamilyWildcardParseNode;
+import org.apache.phoenix.parse.FunctionParseNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SequenceValueParseNode;
 import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ArgumentTypeMismatchException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDataType;
@@ -64,10 +73,17 @@ import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 
 /**
  * 
@@ -78,7 +94,8 @@ import org.apache.phoenix.util.SizedUtil;
  * @since 0.1
  */
 public class ProjectionCompiler {
-    
+    private static ValueBitSet arrayIndexesBitSet; 
+    private static KeyValueSchema arrayIndexesSchema;
     private ProjectionCompiler() {
     }
     
@@ -179,9 +196,11 @@ public class ProjectionCompiler {
      * @throws SQLException 
      */
     public static RowProjector compile(StatementContext context, SelectStatement statement,
GroupBy groupBy, List<? extends PDatum> targetColumns) throws SQLException {
+        List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<KeyValueColumnExpression>();
+        List<Expression> arrayKVFuncs = new ArrayList<Expression>();
         List<AliasedNode> aliasedNodes = statement.getSelect();
         // Setup projected columns in Scan
-        SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy);
+        SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy, arrayKVRefs,
arrayKVFuncs, statement);
         List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>();
         TableRef tableRef = context.getResolver().getTables().get(0);
         PTable table = tableRef.getTable();
@@ -248,6 +267,20 @@ public class ProjectionCompiler {
                 String name = columnAlias == null ? expression.toString() : columnAlias;
                 projectedColumns.add(new ExpressionProjector(name, table.getName().getString(),
expression, isCaseSensitive));
             }
+            if(arrayKVFuncs.size() > 0 && arrayKVRefs.size() > 0) {
+                serailizeArrayIndexInformationAndSetInScan(context, arrayKVFuncs, arrayKVRefs);
+                KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+                for (Expression expression : arrayKVRefs) {
+                    builder.addField(expression);
+                }
+                KeyValueSchema kvSchema = builder.build();
+                arrayIndexesBitSet = ValueBitSet.newInstance(kvSchema);
+                builder = new KeyValueSchemaBuilder(0);
+                for (Expression expression : arrayKVFuncs) {
+                    builder.addField(expression);
+                }
+                arrayIndexesSchema = builder.build();
+            }
             selectVisitor.reset();
             index++;
         }
@@ -291,7 +324,67 @@ public class ProjectionCompiler {
         }
         return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue);
     }
-        
+
+    // A replaced ArrayIndex function that retrieves the exact array value retrieved from
the server
+    static class ArrayIndexExpression extends BaseTerminalExpression {
+        private final int position;
+        private final PDataType type;
+
+        public ArrayIndexExpression(int position, PDataType type) {
+            this.position = position;
+            this.type =  type;
+        }
+
+        @Override
+        public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+            if (!tuple.getValue(QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER,
+                    ptr)) { 
+              return false; 
+            }
+            int maxOffset = ptr.getOffset() + ptr.getLength();
+            arrayIndexesBitSet.or(ptr);
+            arrayIndexesSchema.iterator(ptr, position, arrayIndexesBitSet);
+            Boolean hasValue = arrayIndexesSchema.next(ptr, position, maxOffset, arrayIndexesBitSet);
+            arrayIndexesBitSet.clear();
+            if (hasValue == null) {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            }
+            return true;
+        }
+
+        @Override
+        public PDataType getDataType() {
+            return this.type;
+        }
+    }
+    private static void serailizeArrayIndexInformationAndSetInScan(StatementContext context,
List<Expression> arrayKVFuncs,
+            List<KeyValueColumnExpression> arrayKVRefs) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            // Write the arrayKVRef size followed by the keyvalues that needs to be of type
arrayindex function
+            WritableUtils.writeVInt(output, arrayKVRefs.size());
+            for (Expression expression : arrayKVRefs) {
+                    expression.write(output);
+            }
+            // then write the number of arrayindex functions followeed by the expression
itself
+            WritableUtils.writeVInt(output, arrayKVFuncs.size());
+            for (Expression expression : arrayKVFuncs) {
+                    expression.write(output);
+            }
+            
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        context.getScan().setAttribute(QueryConstants.SPECIFIC_ARRAY_INDEX, stream.toByteArray());
+    }
+
     private static class SelectClauseVisitor extends ExpressionCompiler {
         private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs,
boolean isUngroupedAggregation) {
             int minNullableIndex = aggFuncs.size();
@@ -311,9 +404,16 @@ public class ProjectionCompiler {
          */
         private boolean isCaseSensitive;
         private int elementCount;
+        private List<KeyValueColumnExpression> arrayKVRefs;
+        private List<Expression> arrayKVFuncs;
+        private SelectStatement statement; 
         
-        private SelectClauseVisitor(StatementContext context, GroupBy groupBy) {
+        private SelectClauseVisitor(StatementContext context, GroupBy groupBy, 
+                List<KeyValueColumnExpression> arrayKVRefs, List<Expression>
arrayKVFuncs, SelectStatement statement) {
             super(context, groupBy);
+            this.arrayKVRefs = arrayKVRefs;
+            this.arrayKVFuncs = arrayKVFuncs;
+            this.statement = statement;
             reset();
         }
 
@@ -381,5 +481,39 @@ public class ProjectionCompiler {
             }
             return context.getSequenceManager().newSequenceReference(node);
         }
+        
+        @Override
+        public Expression visitLeave(FunctionParseNode node, List<Expression> children)
throws SQLException {
+            Expression func = super.visitLeave(node,children);
+            // this need not be done for group by clause with array. Hence the below check
+            if (!statement.isAggregate() && ArrayIndexFunction.NAME.equals(node.getName()))
{
+                 final List<KeyValueColumnExpression> indexKVs = Lists.newArrayList();
+                 // Create anon visitor to find reference to array in a generic way
+                 children.get(0).accept(new KeyValueExpressionVisitor() {
+                     @Override
+                     public Void visit(KeyValueColumnExpression expression) {
+                         if (expression.getDataType().isArrayType()) {
+                             indexKVs.add(expression);
+                         }
+                         return null;
+                     }
+                 });
+                 // Add the keyvalues which is of type array
+                 if (!indexKVs.isEmpty()) {
+                    arrayKVRefs.addAll(indexKVs);
+                    // Track the array index function also 
+                    arrayKVFuncs.add(func);
+                    // Store the index of the array index function in the select query list
+                    func = replaceArrayIndexFunction(func, arrayKVFuncs.size() - 1);
+                    return func;
+                }
+            }
+            return func;
+        }
+        
+        public Expression replaceArrayIndexFunction(Expression func, int size) {
+            return new ArrayIndexExpression(size, func.getDataType());
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/7a6a46c0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 543cde0..87588fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -24,10 +24,13 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -36,22 +39,30 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.RegionScannerResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.ScanProjector;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import com.google.common.collect.Lists;
+
 
 /**
  * 
@@ -66,7 +77,9 @@ import org.apache.phoenix.util.ServerUtil;
 public class ScanRegionObserver extends BaseScannerRegionObserver {
     public static final String NON_AGGREGATE_QUERY = "NonAggregateQuery";
     private static final String TOPN = "TopN";
-
+    private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    private KeyValueSchema kvSchema = null;
+    private ValueBitSet kvSchemaBitSet;
     public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression>
orderByExpressions, int estimatedRowSize) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
         try {
@@ -120,6 +133,44 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             }
         }
     }
+    
+    private Expression[] deserializeArrayPostionalExpressionInfoFromScan(Scan scan, RegionScanner
s,
+            List<KeyValueColumnExpression> arrayKVRefs) {
+        byte[] specificArrayIdx = scan.getAttribute(QueryConstants.SPECIFIC_ARRAY_INDEX);
+        if (specificArrayIdx == null) {
+            return null;
+        }
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int arrayKVRefSize = WritableUtils.readVInt(input);
+            for (int i = 0; i < arrayKVRefSize; i++) {
+                KeyValueColumnExpression kvExp = new KeyValueColumnExpression();
+                kvExp.readFields(input);
+                arrayKVRefs.add(kvExp);
+            }
+            int arrayKVFuncSize = WritableUtils.readVInt(input);
+            Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
+            for (int i = 0; i < arrayKVFuncSize; i++) {
+                ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
+                arrayIdxFunc.readFields(input);
+                arrayFuncRefs[i] = arrayIdxFunc;
+                builder.addField(arrayIdxFunc);
+            }
+            kvSchema = builder.build();
+            kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
+            return arrayFuncRefs;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment>
c, final Scan scan, final RegionScanner s) throws Throwable {
@@ -139,10 +190,14 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         }
         
         final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
+        List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<KeyValueColumnExpression>();
+        Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(
+                scan, innerScanner, arrayKVRefs);
+        innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs);
         if (iterator == null) {
-            return getWrappedScanner(c, innerScanner);
+            return innerScanner;
         }
-        
+        // TODO:the above wrapped scanner should be used here also
         return getTopNScanner(c, innerScanner, iterator, tenantId);
     }
     
@@ -219,8 +274,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
      * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
      * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
      * the same from a custom filter.
+     * @param arrayFuncRefs 
+     * @param arrayKVRefs 
      */
-    private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment>
c, final RegionScanner s) {
+    private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment>
c, final RegionScanner s, 
+           final List<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs)
{
         return new RegionScanner() {
 
             @Override
@@ -291,23 +349,72 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             @Override
             public boolean nextRaw(List<KeyValue> result, String metric) throws IOException
{
                 try {
-                    return s.nextRaw(result, metric);
+                    boolean next = s.nextRaw(result, metric);
+                    if(result.size() == 0) {
+                        return next;
+                    } else if((arrayFuncRefs != null && arrayFuncRefs.length == 0)
|| arrayKVRefs.size() == 0) {
+                        return next;
+                    }
+                    replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+                    return next;
                 } catch (Throwable t) {
                     ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
t);
                     return false; // impossible
                 }
             }
 
+            
+
             @Override
             public boolean nextRaw(List<KeyValue> result, int limit, String metric)
throws IOException {
                 try {
-                    return s.nextRaw(result, limit, metric);
+                    boolean next = s.nextRaw(result, limit, metric);
+                    if (result.size() == 0) {
+                        return next;
+                    } else if ((arrayFuncRefs != null && arrayFuncRefs.length ==
0) || arrayKVRefs.size() == 0) { 
+                        return next; 
+                    }
+                    // There is a scanattribute set to retrieve the specific array element
+                    replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+                    return next;
                 } catch (Throwable t) {
                     ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
t);
                     return false; // impossible
                 }
             }
+
+            private void replaceArrayIndexElement(final List<KeyValueColumnExpression>
arrayKVRefs,
+                    final Expression[] arrayFuncRefs, List<KeyValue> result) {
+                MultiKeyValueTuple tuple = new MultiKeyValueTuple(result);
+                // The size of both the arrays would be same?
+                // Using KeyValueSchema to set and retrieve the value
+                // collect the first kv to get the row
+                KeyValue rowKv = result.get(0);
+                for (int i = 0; i < arrayKVRefs.size(); i++) {
+                    KeyValueColumnExpression kvExp = arrayKVRefs.get(i);
+                    if (kvExp.evaluate(tuple, ptr)) {
+                        for (int idx = tuple.size() - 1; idx >= 0; idx--) {
+                            KeyValue kv = tuple.getValue(idx);
+                            if (Bytes.equals(kvExp.getColumnFamily(), kv.getFamily())
+                                    && Bytes.equals(kvExp.getColumnName(), kv.getQualifier()))
{
+                                // remove the kv that has the full array values.
+                                result.remove(idx);
+                                break;
+                            }
+                        }
+                    }
+                }
+                byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
+                        kvSchemaBitSet, ptr);
+                // Add a dummy kv with the exact value of the array index
+                result.add(new KeyValue(rowKv.getBuffer(), rowKv.getRowOffset(), rowKv.getRowLength(),
+                        QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
+                        QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
+                        QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
+                        Type.codeToType(rowKv.getType()), value, 0, value.length));
+            }
         };
     }
-
+    
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/7a6a46c0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 28b77a8..d560ec8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -115,6 +115,9 @@ public interface QueryConstants {
     public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a");
     public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME;
     public final static PName AGG_COLUMN_FAMILY_NAME = SINGLE_COLUMN_FAMILY_NAME;
+    
+    public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("_arr_v");
+    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = Bytes.toBytes("_arr_v");
 
     public static final byte[] TRUE = new byte[] {1};
 
@@ -144,7 +147,7 @@ public interface QueryConstants {
     public static final double MILLIS_TO_NANOS_CONVERTOR = Math.pow(10, 6);
     public static final BigDecimal BD_MILLIS_NANOS_CONVERSION = BigDecimal.valueOf(MILLIS_TO_NANOS_CONVERTOR);
     public static final BigDecimal BD_MILLIS_IN_DAY = BigDecimal.valueOf(QueryConstants.MILLIS_IN_DAY);
-    
+    public static final String SPECIFIC_ARRAY_INDEX = "SpecificArrayIndex";
 
     public static final String CREATE_TABLE_METADATA =
             // Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists exception

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/7a6a46c0/phoenix-core/src/test/java/org/apache/phoenix/end2end/ArrayTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/end2end/ArrayTest.java b/phoenix-core/src/test/java/org/apache/phoenix/end2end/ArrayTest.java
index 8b7fd1e..a7e46e3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/end2end/ArrayTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/end2end/ArrayTest.java
@@ -221,7 +221,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
 			doubleArr[0] = 36.763;
 			conn.createArrayOf("DOUBLE", doubleArr);
 			Double result =  rs.getDouble(1);
-			assertEquals(result, doubleArr[0]);
+			assertEquals(doubleArr[0], result);
 			assertFalse(rs.next());
 		} finally {
 			conn.close();
@@ -248,7 +248,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
 			Double[] doubleArr = new Double[1];
 			doubleArr[0] = 37.56;
 			Double result =  rs.getDouble(1);
-			assertEquals(result, doubleArr[0]);
+			assertEquals(doubleArr[0], result);
 			assertFalse(rs.next());
 		} finally {
 			conn.close();
@@ -275,7 +275,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
             Double[] doubleArr = new Double[1];
             doubleArr[0] = 37.56;
             Double result =  rs.getDouble(1);
-            assertEquals(result, doubleArr[0]);
+            assertEquals(doubleArr[0], result);
             assertFalse(rs.next());
         } finally {
             conn.close();
@@ -313,7 +313,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
             doubleArr[0] = 345.8d;
             conn.createArrayOf("DOUBLE", doubleArr);
             Double result = rs.getDouble(1);
-            assertEquals(result, doubleArr[0]);
+            assertEquals(doubleArr[0], result);
             assertFalse(rs.next());
         } finally {
             conn.close();
@@ -492,7 +492,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
 			doubleArr = new Double[1];
 			doubleArr[0] = 36.763;
 			Double result =  rs.getDouble(1);
-			assertEquals(result, doubleArr[0]);
+			assertEquals(doubleArr[0], result);
 			assertFalse(rs.next());
 		} finally {
 			conn.close();
@@ -521,7 +521,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
 			doubleArr = new Double[1];
 			doubleArr[0] = 36.763;
 			Double result =  rs.getDouble(1);
-			assertEquals(result, doubleArr[0]);
+			assertEquals(doubleArr[0], result);
 			assertFalse(rs.next());
 		} finally {
 			conn.close();
@@ -552,6 +552,149 @@ public class ArrayTest extends BaseClientManagedTimeTest {
 			conn.close();
 		}
 	}
+	
+	@Test
+	public void testSelectSpecificIndexOfAVariableArrayAlongWithAnotherColumn1() throws Exception
{
+	    long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        createTableWithArray(BaseConnectedQueryTest.getUrl(),
+                getDefaultSplits(tenantId), null, ts - 2);
+        initTablesWithArrays(tenantId, null, ts, false);
+        String query = "SELECT a_string_array[2],A_INTEGER FROM table_with_array";
+        Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            String[] strArr = new String[1];
+            strArr[0] = "XYZWER";
+            String result = rs.getString(1);
+            assertEquals(strArr[0], result);
+            int a_integer = rs.getInt(2);
+            assertEquals(1, a_integer);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+	}
+	   
+    @Test
+    public void testSelectSpecificIndexOfAVariableArrayAlongWithAnotherColumn2() throws Exception
{
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        createTableWithArray(BaseConnectedQueryTest.getUrl(),
+                getDefaultSplits(tenantId), null, ts - 2);
+        initTablesWithArrays(tenantId, null, ts, false);
+        String query = "SELECT A_INTEGER, a_string_array[2] FROM table_with_array";
+        Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            String[] strArr = new String[1];
+            strArr[0] = "XYZWER";
+            int a_integer = rs.getInt(1);
+            assertEquals(1, a_integer);
+            String result = rs.getString(2);
+            assertEquals(strArr[0], result);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectMultipleArrayColumns() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        createTableWithArray(BaseConnectedQueryTest.getUrl(),
+                getDefaultSplits(tenantId), null, ts - 2);
+        initTablesWithArrays(tenantId, null, ts, false);
+        String query = "SELECT  a_string_array[2], a_double_array[1] FROM table_with_array";
+        Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            String[] strArr = new String[1];
+            strArr[0] = "XYZWER";
+            Double[] doubleArr = new Double[1];
+            doubleArr[0] = 36.763d;
+            Double a_double = rs.getDouble(2);
+            assertEquals(doubleArr[0], a_double);
+            String result = rs.getString(1);
+            assertEquals(strArr[0], result);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        } 
+    }
+    
+    @Test
+    public void testSelectSameArrayColumnMultipleTimesWithDifferentIndices() throws Exception
{
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        createTableWithArray(BaseConnectedQueryTest.getUrl(),
+                getDefaultSplits(tenantId), null, ts - 2);
+        initTablesWithArrays(tenantId, null, ts, false);
+        String query = "SELECT a_string_array[0], a_string_array[2] FROM table_with_array";
+        Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            String[] strArr = new String[2];
+            strArr[0] = "ABC";
+            strArr[1] = "XYZWER";
+            String result = rs.getString(1);
+            assertEquals(strArr[0], result);
+            result = rs.getString(2);
+            assertEquals(strArr[1], result);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        } 
+    }
+    
+    @Test
+    public void testSelectSameArrayColumnMultipleTimesWithSameIndices() throws Exception
{
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        createTableWithArray(BaseConnectedQueryTest.getUrl(),
+                getDefaultSplits(tenantId), null, ts - 2);
+        initTablesWithArrays(tenantId, null, ts, false);
+        String query = "SELECT a_string_array[2], a_string_array[2] FROM table_with_array";
+        Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            String[] strArr = new String[1];
+            strArr[0] = "XYZWER";
+            String result = rs.getString(1);
+            assertEquals(strArr[0], result);
+            result = rs.getString(2);
+            assertEquals(strArr[0], result);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        } 
+    }
 
 	@Test
 	public void testSelectSpecificIndexOfAVariableArray() throws Exception {
@@ -572,7 +715,7 @@ public class ArrayTest extends BaseClientManagedTimeTest {
 			String[] strArr = new String[1];
 			strArr[0] = "XYZWER";
 			String result = rs.getString(1);
-			assertEquals(result, strArr[0]);
+			assertEquals(strArr[0], result);
 			assertFalse(rs.next());
 		} finally {
 			conn.close();


Mime
View raw message