drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [7/7] drill git commit: DRILL-2406: part 2 - Allow interpreted expression evaluation at planning time.
Date Tue, 17 Mar 2015 22:09:12 GMT
DRILL-2406: part 2 - Allow interpreted expression evaluation at planning time.

Changes needed after rebase to expose function determinism to calcite appropriately.

Address Jacques review comments.

Address chris' review comments.

Make things work now that BufferManager is AutoClosable.

Fixes tests that were creating plan fragments directly to create their own query start time,
as this information is now passed along from QueryContext during standard query initialization
(this enables the query start time and timezone to be available to planning time expression
evaluation).

Fix docs in BufferManger.

Update UDF interface to track determinism rather than randomness.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3f93454f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3f93454f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3f93454f

Branch: refs/heads/master
Commit: 3f93454f014196a4da198ce012b605b70081fde0
Parents: 0aa8b19
Author: Jason Altekruse <altekrusejason@gmail.com>
Authored: Mon Mar 9 11:50:17 2015 -0700
Committer: Jason Altekruse <altekrusejason@gmail.com>
Committed: Tue Mar 17 13:49:31 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/expr/fn/DrillFuncHolder.java     | 16 ++--
 .../exec/expr/fn/DrillFunctionRegistry.java     |  2 +-
 .../drill/exec/memory/BufferAllocator.java      | 10 +++
 .../drill/exec/memory/TopLevelAllocator.java    |  5 +-
 .../apache/drill/exec/ops/BufferManager.java    | 82 ++++++++++++++++++++
 .../apache/drill/exec/ops/FragmentContext.java  | 29 +++----
 .../org/apache/drill/exec/ops/QueryContext.java | 67 +++++++++++++---
 .../planner/fragment/SimpleParallelizer.java    | 11 +--
 .../logical/DrillReduceAggregatesRule.java      |  2 +-
 .../visitor/InsertLocalExchangeVisitor.java     |  4 +-
 .../exec/planner/sql/DrillSqlOperator.java      | 12 ++-
 .../apache/drill/exec/work/foreman/Foreman.java | 21 ++++-
 .../exec/work/fragment/FragmentExecutor.java    |  4 +-
 .../exec/physical/impl/TestLocalExchange.java   |  8 +-
 .../drill/exec/pop/TestFragmentChecker.java     |  9 ++-
 15 files changed, 225 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 1b1348e..4e0f167 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FunctionHolderExpression;
@@ -41,7 +40,6 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionCostCategory;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
-import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
@@ -54,6 +52,7 @@ import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JType;
 import com.sun.codemodel.JVar;
 
+
 public abstract class DrillFuncHolder extends AbstractFuncHolder {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class);
@@ -62,7 +61,7 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
   protected final FunctionTemplate.NullHandling nullHandling;
   protected final FunctionTemplate.FunctionCostCategory costCategory;
   protected final boolean isBinaryCommutative;
-  protected final boolean isRandom;
+  protected final boolean isDeterministic;
   protected final String[] registeredNames;
   protected final ImmutableList<String> imports;
   protected final WorkspaceReference[] workspaceVars;
@@ -78,7 +77,7 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
     this.nullHandling = nullHandling;
     this.workspaceVars = workspaceVars;
     this.isBinaryCommutative = isBinaryCommutative;
-    this.isRandom = isRandom;
+    this.isDeterministic = ! isRandom;
     this.registeredNames = registeredNames;
     this.methodMap = ImmutableMap.copyOf(methods);
     this.parameters = parameters;
@@ -116,10 +115,17 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
     return false;
   }
 
+  /**
+   * @deprecated - use isDeterministic method instead
+   */
+  @Deprecated
   public boolean isRandom() {
-    return isRandom;
+    return ! isDeterministic;
   }
 
+  public boolean isDeterministic() {
+    return isDeterministic;
+  }
 
   protected JVar[] declareWorkspaceVariables(ClassGenerator<?> g) {
     JVar[] workspaceJVars = new JVar[workspaceVars.length];

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
index fafc286..32cf362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
@@ -106,7 +106,7 @@ public class DrillFunctionRegistry {
           if (func.isAggregating()) {
             op = new DrillSqlAggOperator(name, func.getParamCount());
           } else {
-            op = new DrillSqlOperator(name, func.getParamCount(), func.getReturnType());
+            op = new DrillSqlOperator(name, func.getParamCount(), func.getReturnType(), func.isDeterministic());
           }
           operatorTable.add(function.getKey(), op);
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 30b905f..c233ac5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -54,6 +54,16 @@ public interface BufferAllocator extends Closeable {
 
   public abstract ByteBufAllocator getUnderlyingAllocator();
 
+  /**
+   * Create a child allocator nested below this one.
+   *
+   * @param context - owning fragment for this allocator
+   * @param initialReservation - specified in bytes
+   * @param maximumReservation - specified in bytes
+   * @param applyFragmentLimit - flag to conditionally enable fragment memory limits
+   * @return - a new buffer allocator owned by the parent it was spawned from
+   * @throws OutOfMemoryException - when off-heap memory has been exhausted
+   */
   public abstract BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
       long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 2a28bcb..af8c1dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -191,9 +191,10 @@ public class TopLevelAllocator implements BufferAllocator {
                           boolean applyFragmentLimit) throws OutOfMemoryException{
       assert max >= pre;
       this.applyFragmentLimit=applyFragmentLimit;
-      childAcct = new Accountor(context.getConfig(), errorOnLeak, context, parentAccountor,
max, pre, applyFragmentLimit);
+      DrillConfig drillConf = context != null ? context.getConfig() : null;
+      childAcct = new Accountor(drillConf, errorOnLeak, context, parentAccountor, max, pre,
applyFragmentLimit);
       this.fragmentContext=context;
-      this.handle = context.getHandle();
+      this.handle = context != null ? context.getHandle() : null;
       thisMap = map;
       this.empty = DrillBuf.getEmpty(this, childAcct);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
new file mode 100644
index 0000000..536f6fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+import java.io.Closeable;
+
+/**
+ * Manages a list of {@link DrillBuf}s that can be reallocated as needed. Upon
+ * re-allocation the old buffer will be freed. Managing a list of these buffers
+ * prevents some parts of the system from needing to define a correct location
+ * to place the final call to free them.
+ *
+ * The current uses of these types of buffers are within the pluggable components of Drill.
+ * In UDFs, memory management should not be a concern. We provide access to re-allocatable
+ * DrillBufs to give UDF writers general purpose buffers we can account for. To prevent the
need
+ * for UDFs to contain boilerplate to close all of the buffers they request, this list
+ * is tracked at a higher level and all of the buffers are freed once we are sure that
+ * the code depending on them is done executing (currently {@link FragmentContext}
+ * and {@link QueryContext}.
+ */
+public class BufferManager implements AutoCloseable {
+
+  private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
+  private final BufferAllocator allocator;
+
+  // fragment context associated with this buffer manager, if the buffer
+  // manager is owned by another type, this can be null
+  private final FragmentContext fragmentContext;
+
+  public BufferManager(BufferAllocator allocator, FragmentContext fragmentContext) {
+    this.allocator = allocator;
+    this.fragmentContext = fragmentContext;
+  }
+
+  public void close() throws Exception {
+    Object[] mbuffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values;
+    for (int i =0; i < mbuffers.length; i++) {
+      if (managedBuffers.allocated[i]) {
+        ((DrillBuf)mbuffers[i]).release();
+      }
+    }
+  }
+
+  public DrillBuf replace(DrillBuf old, int newSize) {
+    if (managedBuffers.remove(old.memoryAddress()) == null) {
+      throw new IllegalStateException("Tried to remove unmanaged buffer.");
+    }
+    old.release();
+    return getManagedBuffer(newSize);
+  }
+
+  public DrillBuf getManagedBuffer() {
+    return getManagedBuffer(256);
+  }
+
+  public DrillBuf getManagedBuffer(int size) {
+    DrillBuf newBuf = allocator.buffer(size);
+    managedBuffers.put(newBuf.memoryAddress(), newBuf);
+    newBuf.setFragmentContext(fragmentContext);
+    return newBuf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index aa1dffd..9fc9ad1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -56,7 +56,7 @@ import com.google.common.collect.Maps;
 /**
  * Contextual objects required for execution of a particular fragment.
  */
-public class FragmentContext implements Closeable, UdfUtilities {
+public class FragmentContext implements AutoCloseable, UdfUtilities {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
 
@@ -74,7 +74,7 @@ public class FragmentContext implements Closeable, UdfUtilities {
   private IncomingBuffers buffers;
   private final OptionManager fragmentOptions;
   private final UserCredentials credentials;
-  private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
+  private final BufferManager bufferManager;
 
   private volatile Throwable failureCause;
   private volatile FragmentContextState state = FragmentContextState.OK;
@@ -117,6 +117,7 @@ public class FragmentContext implements Closeable, UdfUtilities {
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.",
e);
     }
     this.stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
+    this.bufferManager = new BufferManager(this.allocator, this);
 
     this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions);
   }
@@ -291,16 +292,11 @@ public class FragmentContext implements Closeable, UdfUtilities {
   }
 
   @Override
-  public void close() {
+  public void close() throws Exception {
     for (Thread thread: daemonThreads) {
-     thread.interrupt();
-    }
-    Object[] mbuffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values;
-    for (int i =0; i < mbuffers.length; i++) {
-      if (managedBuffers.allocated[i]) {
-        ((DrillBuf)mbuffers[i]).release();
-      }
+      thread.interrupt();
     }
+    bufferManager.close();
 
     if (buffers != null) {
       buffers.close();
@@ -313,22 +309,15 @@ public class FragmentContext implements Closeable, UdfUtilities {
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {
-    if (managedBuffers.remove(old.memoryAddress()) == null) {
-      throw new IllegalStateException("Tried to remove unmanaged buffer.");
-    }
-    old.release();
-    return getManagedBuffer(newSize);
+    return bufferManager.replace(old, newSize);
   }
 
   public DrillBuf getManagedBuffer() {
-    return getManagedBuffer(256);
+    return bufferManager.getManagedBuffer();
   }
 
   public DrillBuf getManagedBuffer(int size) {
-    DrillBuf newBuf = allocator.buffer(size);
-    managedBuffers.put(newBuf.memoryAddress(), newBuf);
-    newBuf.setFragmentContext(this);
-    return newBuf;
+    return bufferManager.getManagedBuffer(size);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index c881432..fb6b4aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,13 +19,18 @@ package org.apache.drill.exec.ops;
 
 import java.util.Collection;
 
+import io.netty.buffer.DrillBuf;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -37,9 +42,10 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.PStoreProvider;
 
-public class QueryContext{
+// TODO - consider re-name to PlanningContext, as the query execution context actually appears
+// in fragment contexts
+public class QueryContext implements AutoCloseable, UdfUtilities{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
   private final QueryId queryId;
@@ -47,28 +53,45 @@ public class QueryContext{
   private final WorkEventBus workBus;
   private UserSession session;
   private OptionManager queryOptions;
-  public final Multitimer<QuerySetup> timer;
   private final PlannerSettings plannerSettings;
   private final DrillOperatorTable table;
 
-  public QueryContext(UserSession session, QueryId queryId, DrillbitContext drllbitContext)
{
+  private final BufferAllocator allocator;
+  private final BufferManager bufferManager;
+  private final QueryDateTimeInfo queryDateTimeInfo;
+  private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
+  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+
+  // flag to indicate if close has been called, after calling close the first
+  // time this is set to true and the close method becomes a no-op
+  private boolean closed = false;
+
+  public QueryContext(final UserSession session, QueryId queryId, final DrillbitContext drllbitContext)
{
     super();
     this.queryId = queryId;
     this.drillbitContext = drllbitContext;
     this.workBus = drllbitContext.getWorkBus();
     this.session = session;
-    this.timer = new Multitimer<>(QuerySetup.class);
     this.queryOptions = new QueryOptionManager(session.getOptions());
     this.plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
-    this.plannerSettings.setNumEndPoints(this.getActiveEndpoints().size());
+    plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
     this.table = new DrillOperatorTable(getFunctionRegistry());
-  }
 
-  public PStoreProvider getPersistentStoreProvider(){
-    return drillbitContext.getPersistentStoreProvider();
+    long queryStartTime = System.currentTimeMillis();
+    int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+    this.queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
+
+    try {
+      this.allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
+    } catch (OutOfMemoryException e) {
+      throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
+    }
+    // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed
once that is available
+    this.bufferManager = new BufferManager(this.allocator, null);
   }
 
-  public PlannerSettings getPlannerSettings(){
+
+  public PlannerSettings getPlannerSettings() {
     return plannerSettings;
   }
 
@@ -76,6 +99,10 @@ public class QueryContext{
     return session;
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
   public SchemaPlus getNewDefaultSchema(){
     SchemaPlus rootSchema = getRootSchema();
     SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
@@ -143,4 +170,24 @@ public class QueryContext{
   public ClusterCoordinator getClusterCoordinator() {
     return drillbitContext.getClusterCoordinator();
   }
+
+  @Override
+  public QueryDateTimeInfo getQueryDateTimeInfo() {
+    return queryDateTimeInfo;
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer() {
+    return bufferManager.getManagedBuffer();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (!closed) {
+      // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be
removed once that is available
+      bufferManager.close();
+      allocator.close();
+      closed = true;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index f8d1803..eebd40e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency;
@@ -112,7 +113,7 @@ public class SimpleParallelizer {
    */
   public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId
queryId,
       Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment
rootFragment,
-      UserSession session) throws ExecutionSetupException {
+      UserSession session, QueryDateTimeInfo queryDateTimeInfo) throws ExecutionSetupException
{
 
     final PlanningSet planningSet = new PlanningSet();
 
@@ -125,7 +126,7 @@ public class SimpleParallelizer {
       parallelizeFragment(wrapper, planningSet, activeEndpoints);
     }
 
-    return generateWorkUnit(options, foremanNode, queryId, reader, rootFragment, planningSet,
session);
+    return generateWorkUnit(options, foremanNode, queryId, reader, rootFragment, planningSet,
session, queryDateTimeInfo);
   }
 
   // For every fragment, create a Wrapper in PlanningSet.
@@ -320,14 +321,14 @@ public class SimpleParallelizer {
 
   private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode,
QueryId queryId,
       PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
-      UserSession session) throws ExecutionSetupException {
+      UserSession session, QueryDateTimeInfo queryDateTimeInfo) throws ExecutionSetupException
{
     List<PlanFragment> fragments = Lists.newArrayList();
 
     PlanFragment rootFragment = null;
     FragmentRoot rootOperator = null;
 
-    long queryStartTime = System.currentTimeMillis();
-    int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+    long queryStartTime = queryDateTimeInfo.getQueryStartTime();
+    int timeZone = queryDateTimeInfo.getRootFragmentTimeZone();
 
     // now we generate all the individual plan fragments and associated assignments. Note,
we need all endpoints
     // assigned before we can materialize, so we start a new loop here rather than utilizing
the previous one.

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
index 93fff35..94fed87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
@@ -66,7 +66,7 @@ public class DrillReduceAggregatesRule extends RelOptRule {
   public static final DrillReduceAggregatesRule INSTANCE =
       new DrillReduceAggregatesRule(operand(AggregateRel.class, any()));
 
-  private static final DrillSqlOperator CastHighOp = new DrillSqlOperator("CastHigh", 1);
+  private static final DrillSqlOperator CastHighOp = new DrillSqlOperator("CastHigh", 1,
false);
 
   //~ Constructors -----------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
index 907fcb1..b28198b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
@@ -81,8 +81,8 @@ public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel,
Void, Runt
     if ( isMuxEnabled ) {
       // Insert Project Operator with new column that will be a hash for HashToRandomExchange
fields
       List<DistributionField> fields = hashPrel.getFields();
-      final DrillSqlOperator sqlOpH = new DrillSqlOperator("hash", 1, MajorType.getDefaultInstance());
-      final DrillSqlOperator sqlOpX = new DrillSqlOperator("xor", 2, MajorType.getDefaultInstance());
+      final DrillSqlOperator sqlOpH = new DrillSqlOperator("hash", 1, MajorType.getDefaultInstance(),
true);
+      final DrillSqlOperator sqlOpX = new DrillSqlOperator("xor", 2, MajorType.getDefaultInstance(),
true);
       RexNode prevRex = null;
       List<String> outputFieldNames = Lists.newArrayList(childFields);
       final RexBuilder rexBuilder = prel.getCluster().getRexBuilder();

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlOperator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlOperator.java
index 6b54c43..7bd48c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlOperator.java
@@ -38,14 +38,20 @@ public class DrillSqlOperator extends SqlFunction {
 
   private static final MajorType NONE = MajorType.getDefaultInstance();
   private final MajorType returnType;
+  private final boolean isDeterministic;
 
-  public DrillSqlOperator(String name, int argCount) {
-    this(name, argCount, MajorType.getDefaultInstance());
+  public DrillSqlOperator(String name, int argCount, boolean isDeterministic) {
+    this(name, argCount, MajorType.getDefaultInstance(), isDeterministic);
   }
 
-  public DrillSqlOperator(String name, int argCount, MajorType returnType) {
+  public DrillSqlOperator(String name, int argCount, MajorType returnType, boolean isDeterminisitic)
{
     super(new SqlIdentifier(name, SqlParserPos.ZERO), DynamicReturnType.INSTANCE, null, new
Checker(argCount), null, SqlFunctionCategory.USER_DEFINED_FUNCTION);
     this.returnType = Preconditions.checkNotNull(returnType);
+    this.isDeterministic = isDeterminisitic;
+  }
+
+  public boolean isDeterministic() {
+    return isDeterministic;
   }
 
   protected RelDataType getReturnDataType(final RelDataTypeFactory factory) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 409450f..8e0780b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
@@ -159,10 +160,21 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>
{
     bee.retireForeman(this);
     context.getWorkBus().removeFragmentStatusListener(queryId);
     context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
-    if(result != null){
-      initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
+
+    try {
+      try {
+        context.close();
+      } catch (Exception e) {
+        moveToState(QueryState.FAILED, e);
+        return;
+      }
+
+      if (result != null) {
+        initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
+      }
+    } finally {
+      releaseLease();
     }
-    releaseLease();
   }
 
   /**
@@ -372,7 +384,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>
{
 
     SimpleParallelizer parallelizer = new SimpleParallelizer(context);
     return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
-        queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession());
+        queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession(),
+        context.getQueryDateTimeInfo());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 4ab3cc0..e2f7bbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -161,9 +161,9 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
     try {
       context.close();
-    } catch (RuntimeException e) {
+    } catch (Exception e) {
       if (throwFailure) {
-        throw e;
+        throw new RuntimeException("Error closing fragment context.", e);
       }
       logger.warn("Failure while closing out resources.", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
index fc1c6b9..7ccb882 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
@@ -24,6 +24,8 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.physical.base.Exchange;
 import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
 import org.apache.drill.exec.physical.config.HashToRandomExchange;
@@ -405,9 +407,13 @@ public class TestLocalExchange extends PlanTestBase {
 
     findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);
 
+    long queryStartTime = System.currentTimeMillis();
+    int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+    QueryDateTimeInfo queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
+
     QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(),
         QueryId.getDefaultInstance(),
-        drillbitContext.getBits(), planReader, rootFragment, USER_SESSION);
+        drillbitContext.getBits(), planReader, rootFragment, USER_SESSION, queryDateTimeInfo);
 
     // Make sure the number of minor fragments with HashPartitioner within a major fragment
is not more than the
     // number of Drillbits in cluster

http://git-wip-us.apache.org/repos/asf/drill/blob/3f93454f/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 07d310a..32e3bf9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.pop;
 
 import java.util.List;
 
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
@@ -60,8 +62,13 @@ public class TestFragmentChecker extends PopUnitTestBase{
       endpoints.add(b1);
     }
 
+    long queryStartTime = System.currentTimeMillis();
+    int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+    QueryDateTimeInfo queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
+
     QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(),
endpoints, ppr, fragmentRoot,
-        UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build());
+        UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build(),
+        queryDateTimeInfo);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(),
qwu.getRootFragment().getHandle().getMinorFragmentId()));
 
     System.out.print(qwu.getRootFragment().getFragmentJson());


Mime
View raw message