drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] drill git commit: DRILL-2602: Throw an error on schema change during streaming aggregation
Date Sat, 09 May 2015 01:10:57 GMT
Repository: drill
Updated Branches:
  refs/heads/master c2bd69879 -> d4f9bf2e9


DRILL-2602: Throw an error on schema change during streaming aggregation


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/099a35b8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/099a35b8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/099a35b8

Branch: refs/heads/master
Commit: 099a35b8775bbbe727d0be7123b5a80755c4e030
Parents: c2bd698
Author: adeneche <adeneche@gmail.com>
Authored: Tue May 5 13:40:41 2015 -0700
Committer: Jason Altekruse <altekrusejason@gmail.com>
Committed: Fri May 8 17:14:48 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/aggregate/HashAggBatch.java      |  4 +++-
 .../exec/physical/impl/aggregate/StreamingAggBatch.java |  5 ++++-
 .../exec/physical/impl/xsort/ExternalSortBatch.java     | 12 +++++++++---
 .../physical/impl/xsort/SingleBatchSorterTemplate.java  |  6 +++++-
 4 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/099a35b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b753574..2f68faf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -133,7 +134,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate>
{
         IterOutcome outcome = aggregator.getOutcome();
         return aggregator.getOutcome();
       case UPDATE_AGGREGATOR:
-        context.fail(new SchemaChangeException("Hash aggregate does not support schema changes"));
+        context.fail(UserException.unsupportedError()
+          .message("Hash aggregate does not support schema changes").build());
         close();
         killIncoming(false);
         return IterOutcome.STOP;

http://git-wip-us.apache.org/repos/asf/drill/blob/099a35b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c1c5cb9..46b3721 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -175,7 +176,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate>
{
         first = false;
         return outcome;
       case UPDATE_AGGREGATOR:
-        context.fail(new SchemaChangeException("Streaming aggregate does not support schema
changes"));
+        context.fail(UserException.unsupportedError()
+          .message("Streaming aggregate does not support schema changes")
+          .build());
         close();
         killIncoming(false);
         return IterOutcome.STOP;

http://git-wip-us.apache.org/repos/asf/drill/blob/099a35b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index e88bc67..d08c86c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -60,6 +61,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.store.ischema.Records;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -244,7 +246,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
           // only change in the case that the schema truly changes.  Artificial schema changes
are ignored.
           if (!incoming.getSchema().equals(schema)) {
             if (schema != null) {
-              throw new UnsupportedOperationException("Sort doesn't currently support sorts
with changing schemas.");
+              throw new SchemaChangeException();
             }
             this.schema = incoming.getSchema();
             this.sorter = createNewSorter(context, incoming);
@@ -378,9 +380,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
 
       return IterOutcome.OK_NEW_SCHEMA;
 
-    } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
+    } catch (SchemaChangeException ex) {
+      kill(false);
+      context.fail(UserException.unsupportedError(ex)
+        .message("Sort doesn't currently support sorts with changing schemas").build());
+      return IterOutcome.STOP;
+    } catch(ClassTransformationException | IOException ex) {
       kill(false);
-      logger.error("Failure during query", ex);
       context.fail(ex);
       return IterOutcome.STOP;
     } catch (UnsupportedOperationException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/099a35b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 75892f9..6c32f48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -39,7 +39,11 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter,
In
   public void setup(FragmentContext context, SelectionVector2 vector2, RecordBatch incoming)
throws SchemaChangeException{
     Preconditions.checkNotNull(vector2);
     this.vector2 = vector2;
-    doSetup(context, incoming, null);
+    try {
+      doSetup(context, incoming, null);
+    } catch (IllegalStateException e) {
+      throw new SchemaChangeException(e);
+    }
   }
 
   @Override


Mime
View raw message