drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [2/2] drill git commit: DRILL-4182 TopN schema changes support.
Date Mon, 14 Dec 2015 07:27:58 GMT
DRILL-4182 TopN schema changes support.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e529df46
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e529df46
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e529df46

Branch: refs/heads/master
Commit: e529df46054b685d72ff424c58e38a5dcdbc381a
Parents: cc9175c
Author: Amit Hadke <amit.hadke@gmail.com>
Authored: Thu Dec 10 00:21:52 2015 -0800
Committer: Steven Phillips <smp@apache.org>
Committed: Sun Dec 13 23:23:22 2015 -0800

----------------------------------------------------------------------
 .../impl/TopN/PriorityQueueTemplate.java        |  16 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |  75 ++++++-
 .../impl/svremover/RemovingRecordBatch.java     |   2 +-
 .../drill/exec/record/HyperVectorWrapper.java   |   3 +-
 .../apache/drill/exec/record/SchemaUtil.java    | 104 +++++----
 .../drill/exec/record/VectorContainer.java      |   6 +-
 .../apache/drill/exec/util/BatchPrinter.java    |  20 +-
 .../impl/TopN/TestTopNSchemaChanges.java        | 211 +++++++++++++++++++
 8 files changed, 384 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 5cdfc5d..2b1830e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -70,12 +70,17 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
       newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors());
     }
     newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+    // Cleanup before recreating hyperbatch and sv4.
+    cleanup();
     hyperBatch = new ExpandableHyperContainer(newContainer);
     batchCount = hyperBatch.iterator().next().getValueVectors().length;
     final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
     heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
+    // Reset queue size (most likely to be set to limit).
+    queueSize = 0;
     for (int i = 0; i < v4.getTotalCount(); i++) {
       heapSv4.set(i, v4.get(i));
+      ++queueSize;
     }
     v4.clear();
     doSetup(context, hyperBatch, null);
@@ -146,8 +151,15 @@ public abstract class PriorityQueueTemplate implements PriorityQueue
{
 
   @Override
   public void cleanup() {
-    heapSv4.clear();
-    hyperBatch.clear();
+    if (heapSv4 != null) {
+      heapSv4.clear();
+    }
+    if (hyperBatch != null) {
+      hyperBatch.clear();
+    }
+    if (finalSv4 != null) {
+      finalSv4.clear();
+    }
   }
 
   private void siftUp() {

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 54d2839..8c4cf21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -76,6 +77,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   private final RecordBatch incoming;
   private BatchSchema schema;
+  private boolean schemaChanged = false;
   private PriorityQueue priorityQueue;
   private TopN config;
   SelectionVector4 sv4;
@@ -143,6 +145,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         }
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(0);
+
         return;
       case STOP:
         state = BatchState.STOP;
@@ -202,9 +205,16 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           // only change in the case that the schema truly changes.  Artificial schema changes
are ignored.
           if (!incoming.getSchema().equals(schema)) {
             if (schema != null) {
-              throw new UnsupportedOperationException("Sort doesn't currently support sorts
with changing schemas.");
+              if (!unionTypeEnabled) {
+                throw new UnsupportedOperationException("Sort doesn't currently support sorts
with changing schemas.");
+              } else {
+                this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
+                purgeAndResetPriorityQueue();
+                this.schemaChanged = true;
+              }
+            } else {
+              this.schema = incoming.getSchema();
             }
-            this.schema = incoming.getSchema();
           }
           // fall through.
         case OK:
@@ -216,11 +226,17 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           }
           countSincePurge += incoming.getRecordCount();
           batchCount++;
-          RecordBatchData batch = new RecordBatchData(incoming);
+          RecordBatchData batch;
+          if (schemaChanged) {
+            batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema,
oContext));
+          } else {
+            batch = new RecordBatchData(incoming);
+          }
           boolean success = false;
           try {
             batch.canonicalize();
             if (priorityQueue == null) {
+              assert !schemaChanged;
               priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new
ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
             }
             priorityQueue.add(context, batch);
@@ -255,7 +271,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         container.add(w.getValueVectors());
       }
       container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-
       recordCount = sv4.getCount();
       return IterOutcome.OK_NEW_SCHEMA;
 
@@ -323,7 +338,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     for (Ordering od : orderings) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(),
batch, collector, context.getFunctionRegistry());
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(),
batch, collector, context.getFunctionRegistry(), unionTypeEnabled);
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
@@ -356,6 +371,56 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     return q;
   }
 
+  /**
+   * Handle schema changes during execution.
+   * 1. Purge existing batches
+   * 2. Promote newly created container for new schema.
+   * 3. Recreate priority queue and reset with coerced container.
+   * @throws SchemaChangeException
+   */
+  public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTransformationException,
IOException {
+    final Stopwatch watch = new Stopwatch();
+    watch.start();
+    final VectorContainer c = priorityQueue.getHyperBatch();
+    final VectorContainer newContainer = new VectorContainer(oContext);
+    final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
+    final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
+    final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+    copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),
 newContainer, newBatch, null);
+    SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
+    try {
+      do {
+        final int count = selectionVector4.getCount();
+        final int copiedRecords = copier.copyRecords(0, count);
+        assert copiedRecords == count;
+        for (VectorWrapper<?> v : newContainer) {
+          ValueVector.Mutator m = v.getValueVector().getMutator();
+          m.setValueCount(count);
+        }
+        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+        newContainer.setRecordCount(count);
+        builder.add(newBatch);
+      } while (selectionVector4.next());
+      selectionVector4.clear();
+      c.clear();
+      final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
+      builder.canonicalize();
+      builder.build(context, oldSchemaContainer);
+      oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
+      final VectorContainer newSchemaContainer =  SchemaUtil.coerceContainer(oldSchemaContainer,
this.schema, oContext);
+      // Canonicalize new container since we canonicalize incoming batches before adding
to queue.
+      final VectorContainer canonicalizedContainer = VectorContainer.canonicalize(newSchemaContainer);
+      canonicalizedContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+      priorityQueue.cleanup();
+      priorityQueue = createNewPriorityQueue(context, config.getOrderings(), canonicalizedContainer,
MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+      priorityQueue.resetQueue(canonicalizedContainer, builder.getSv4().createNewWrapperCurrent());
+    } finally {
+      builder.clear();
+      builder.close();
+    }
+    logger.debug("Took {} us to purge and recreate queue for new schema", watch.elapsed(TimeUnit.MICROSECONDS));
+  }
+
   @Override
   public WritableBatch getWritableBatch() {
     throw new UnsupportedOperationException("A sort batch is not writable.");

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index fa6001b..5faaf58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -62,7 +62,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     switch(incoming.getSchema().getSelectionVectorMode()){
     case NONE:
       this.copier = getStraightCopier();

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index a1557e6..7fc7960 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -132,7 +132,8 @@ public class HyperVectorWrapper<T extends ValueVector> implements
VectorWrapper<
   }
 
   public void addVector(ValueVector v) {
-    Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot
add vector type %s to hypervector type %s", v.getClass(), this.getVectorClass()));
+    Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot
add vector type %s to hypervector type %s for field %s",
+      v.getClass(), this.getVectorClass(), v.getField()));
     vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every
time
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 8a0954e..8cf90ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -96,6 +96,49 @@ public class SchemaUtil {
     return s;
   }
 
+  private static  ValueVector coerceVector(ValueVector v, VectorContainer c, MaterializedField
field,
+                                           int recordCount, OperatorContext context) {
+    if (v != null) {
+      int valueCount = v.getAccessor().getValueCount();
+      TransferPair tp = v.getTransferPair();
+      tp.transfer();
+      if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
+        if (field.getType().getMinorType() == MinorType.UNION) {
+          UnionVector u = (UnionVector) tp.getTo();
+          for (MinorType t : field.getType().getSubTypeList()) {
+            if (u.getField().getType().getSubTypeList().contains(t)) {
+              continue;
+            }
+            u.addSubType(t);
+          }
+        }
+        return tp.getTo();
+      } else {
+        ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
+        Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can
only convert vector to Union vector");
+        UnionVector u = (UnionVector) newVector;
+        u.addVector(tp.getTo());
+        MinorType type = v.getField().getType().getMinorType();
+        for (int i = 0; i < valueCount; i++) {
+          u.getMutator().setType(i, type);
+        }
+        for (MinorType t : field.getType().getSubTypeList()) {
+          if (u.getField().getType().getSubTypeList().contains(t)) {
+            continue;
+          }
+          u.addSubType(t);
+        }
+        u.getMutator().setValueCount(valueCount);
+        return u;
+      }
+    } else {
+      v = TypeHelper.getNewVector(field, context.getAllocator());
+      v.allocateNew();
+      v.getMutator().setValueCount(recordCount);
+      return v;
+    }
+  }
+
   /**
    * Creates a copy a record batch, converting any fields as necessary to coerce it into
the provided schema
    * @param in
@@ -105,54 +148,39 @@ public class SchemaUtil {
    */
   public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema,
OperatorContext context) {
     int recordCount = in.getRecordCount();
-    Map<SchemaPath,ValueVector> vectorMap = Maps.newHashMap();
+    boolean isHyper = false;
+    Map<SchemaPath, Object> vectorMap = Maps.newHashMap();
     for (VectorWrapper w : in) {
-      ValueVector v = w.getValueVector();
-      vectorMap.put(v.getField().getPath(), v);
+      if (w.isHyper()) {
+        isHyper = true;
+        final ValueVector[] vvs = w.getValueVectors();
+        vectorMap.put(vvs[0].getField().getPath(), vvs);
+      } else {
+        assert !isHyper;
+        final ValueVector v = w.getValueVector();
+        vectorMap.put(v.getField().getPath(), v);
+      }
     }
 
     VectorContainer c = new VectorContainer(context);
 
     for (MaterializedField field : toSchema) {
-      ValueVector v = vectorMap.remove(field.getPath());
-      if (v != null) {
-        int valueCount = v.getAccessor().getValueCount();
-        TransferPair tp = v.getTransferPair();
-        tp.transfer();
-        if (v.getField().getType().getMinorType().equals(field.getType().getMinorType()))
{
-          if (field.getType().getMinorType() == MinorType.UNION) {
-            UnionVector u = (UnionVector) tp.getTo();
-            for (MinorType t : field.getType().getSubTypeList()) {
-              if (u.getField().getType().getSubTypeList().contains(t)) {
-                continue;
-              }
-              u.addSubType(t);
-            }
-          }
-          c.add(tp.getTo());
+      if (isHyper) {
+        final ValueVector[] vvs = (ValueVector[]) vectorMap.remove(field.getPath());
+        final ValueVector[] vvsOut;
+        if (vvs == null) {
+          vvsOut = new ValueVector[1];
+          vvsOut[0] = coerceVector(null, c, field, recordCount, context);
         } else {
-          ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
-          Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can
only convert vector to Union vector");
-          UnionVector u = (UnionVector) newVector;
-          u.addVector(tp.getTo());
-          MinorType type = v.getField().getType().getMinorType();
-          for (int i = 0; i < valueCount; i++) {
-            u.getMutator().setType(i, type);
-          }
-          for (MinorType t : field.getType().getSubTypeList()) {
-            if (u.getField().getType().getSubTypeList().contains(t)) {
-              continue;
-            }
-            u.addSubType(t);
+          vvsOut = new ValueVector[vvs.length];
+          for (int i = 0; i < vvs.length; ++i) {
+            vvsOut[i] = coerceVector(vvs[i], c, field, recordCount, context);
           }
-          u.getMutator().setValueCount(valueCount);
-          c.add(u);
         }
+        c.add(vvsOut);
       } else {
-        v = TypeHelper.getNewVector(field, context.getAllocator());
-        v.allocateNew();
-        v.getMutator().setValueCount(recordCount);
-        c.add(v);
+        final ValueVector v = (ValueVector) vectorMap.remove(field.getPath());
+        c.add(coerceVector(v, c, field, recordCount, context));
       }
     }
     c.buildSchema(in.getSchema().getSelectionVectorMode());

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index ccc05ff..c483650 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -187,7 +187,11 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>,
VectorAccess
     });
 
     for (VectorWrapper<?> w : canonicalWrappers) {
-      vc.add(w.getValueVector());
+      if (w.isHyper()) {
+        vc.add(w.getValueVectors());
+      } else {
+        vc.add(w.getValueVector());
+      }
     }
     vc.oContext = original.oContext;
     return vc;

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index 198c0b5..2a1db01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -42,15 +42,25 @@ public class BatchPrinter {
     }
     int width = columns.size();
     for (int j = 0; j < sv4.getCount(); j++) {
+      if (j%50 == 0) {
+        System.out.println(StringUtils.repeat("-", width * 17 + 1));
+        for (String column : columns) {
+          System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14));
+        }
+        System.out.printf("|\n");
+        System.out.println(StringUtils.repeat("-", width*17 + 1));
+      }
       for (VectorWrapper vw : batch) {
         Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j)
& 65535);
-        if (o instanceof byte[]) {
-          String value = new String((byte[]) o);
-          System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,
14));
+        String value;
+        if (o == null) {
+          value = "null";
+        } else if (o instanceof byte[]) {
+          value = new String((byte[]) o);
         } else {
-          String value = o.toString();
-          System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
+          value = o.toString();
         }
+        System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
       }
       System.out.printf("|\n");
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
new file mode 100644
index 0000000..0f65bab
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.TopN;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+public class TestTopNSchemaChanges extends BaseTestQuery {
+
+  @Test
+  public void testNumericTypes() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+
+    // left side int and strings
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i < 10000; i+=2) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+    for (int i = 1; i < 10000; i+=2) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+    String query = String.format("select * from dfs_test.`%s` order by kl limit 12", data_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl");
+
+    for (long i = 0; i< 12 ; ++i) {
+      if (i %2 == 0) {
+        builder.baselineValues(i, i);
+      } else {
+        builder.baselineValues((double)i, (double)i);
+      }
+    }
+    builder.go();
+  }
+
+  @Test
+  public void testNumericAndStringTypes() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+
+    // left side int and strings
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i < 1000; i+=2) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+    for (int i = 1; i < 1000; i+=2) {
+      writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+    }
+    writer.close();
+    String query = String.format("select * from dfs_test.`%s` order by kl limit 12", data_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl");
+
+    for (long i = 0; i< 24 ; i+=2) {
+        builder.baselineValues(i, i);
+    }
+
+    query = String.format("select * from dfs_test.`%s` order by kl desc limit 12", data_dir.toPath().toString());
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl")
+      .baselineValues("999", "999")
+      .baselineValues("997", "997")
+      .baselineValues("995", "995")
+      .baselineValues("993", "993")
+      .baselineValues("991", "991")
+      .baselineValues("99", "99")
+      .baselineValues("989", "989")
+      .baselineValues("987", "987")
+      .baselineValues("985", "985")
+      .baselineValues("983", "983")
+      .baselineValues("981", "981")
+      .baselineValues("979", "979");
+    builder.go();
+  }
+
+  @Test
+  public void testUnionTypes() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+
+    // union of int and float and string.
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i <= 9; ++i) {
+      switch (i%3) {
+        case 0: // 0, 3, 6, 9
+          writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+          break;
+        case 1: // 1, 4, 7
+          writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+          break;
+        case 2: // 2, 5, 8
+          writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+          break;
+      }
+    }
+    writer.close();
+    String query = String.format("select * from dfs_test.`%s` order by kl limit 8", data_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl");
+
+    builder.baselineValues(0l, 0l);
+    builder.baselineValues(1.0d, 1.0d);
+    builder.baselineValues(3l, 3l);
+    builder.baselineValues(4.0d, 4.0d);
+    builder.baselineValues(6l, 6l);
+    builder.baselineValues(7.0d, 7.0d);
+    builder.baselineValues(9l, 9l);
+    builder.baselineValues("2", "2");
+    builder.go();
+  }
+
+  @Test
+  public void testMissingColumn() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+    System.out.println(data_dir);
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i < 100; i++) {
+      writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+    for (int i = 100; i < 200; i++) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d3.json")));
+    for (int i = 200; i < 300; i++) {
+      writer.write(String.format("{ \"kl2\" : \"%s\" , \"vl2\": \"%s\" }\n", i, i));
+    }
+    writer.close();
+
+    String query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order
by kl limit 3", data_dir.toPath().toString());
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+      .baselineValues(100.0d, 100.0d, null, null, null, null)
+      .baselineValues(101.0d, 101.0d, null, null, null, null)
+      .baselineValues(102.0d, 102.0d, null, null, null, null);
+    builder.go();
+
+    query = String.format("select kl, vl, kl1, vl1, kl2, vl2  from dfs_test.`%s` order by
kl1 limit 3", data_dir.toPath().toString());
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+      .baselineValues(null, null, 0l, 0l, null, null)
+      .baselineValues(null, null, 1l, 1l, null, null)
+      .baselineValues(null, null, 2l, 2l, null, null);
+    builder.go();
+
+    query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by
kl2 desc limit 3", data_dir.toPath().toString());
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+      .baselineValues(null, null, null, null, "299", "299")
+      .baselineValues(null, null, null, null, "298", "298")
+      .baselineValues(null, null, null, null, "297", "297");
+    builder.go();
+    // Since client can't handle new columns which are not in first batch, we won't test
output of query.
+    // Query should run w/o any errors.
+    test(String.format("select * from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString()));
+  }
+}


Mime
View raw message