drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [6/9] drill git commit: DRILL-2245: Clean up query setup and execution kickoff in Foreman/WorkManager in order to ensure consistent handling, and avoid hangs and races, with the goal of improving Drillbit robustness.
Date Thu, 19 Mar 2015 21:08:16 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
deleted file mode 100644
index 4e18da6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.foreman;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.SchemaUserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.google.common.collect.Lists;
-
-public class QueryStatus {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
-
-  public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
-          newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE) //
-          .name("profiles") //
-          .blob() //
-          .max(100) //
-          .build();
-
-  public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig.
-      newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE).name("running").ephemeral().build();
-
-  // doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then accessed by multiple threads for reads only.
-  private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>();
-  private List<FragmentData> fragmentDataSet = Lists.newArrayList();
-
-  private final String queryId;
-  private final QueryId id;
-  private RunQuery query;
-  private String planText;
-  private Foreman foreman;
-  private long startTime;
-  private long endTime;
-  private int totalFragments;
-  private int finishedFragments = 0;
-
-  private final PStore<QueryProfile> profilePStore;
-  private final PStore<QueryInfo> profileEStore;
-
-  public QueryStatus(RunQuery query, QueryId id, PStoreProvider provider, Foreman foreman) {
-    this.id = id;
-    this.query = query;
-    this.queryId = QueryIdHelper.getQueryId(id);
-    try {
-      this.profilePStore = provider.getStore(QUERY_PROFILE);
-      this.profileEStore = provider.getStore(RUNNING_QUERY_INFO);
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
-    }
-    this.foreman = foreman;
-  }
-
-  public List<FragmentData> getFragmentData() {
-    return fragmentDataSet;
-  }
-
-  public void setPlanText(String planText) {
-    this.planText = planText;
-  }
-
-  public void markStart() {
-    this.startTime = System.currentTimeMillis();
-  }
-
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
-  }
-
-  public void setTotalFragments(int totalFragments) {
-    this.totalFragments = totalFragments;
-  }
-
-  public void incrementFinishedFragments() {
-    finishedFragments++;
-    assert finishedFragments <= totalFragments;
-  }
-
-  void add(FragmentData data) {
-    int majorFragmentId = data.getHandle().getMajorFragmentId();
-    int minorFragmentId = data.getHandle().getMinorFragmentId();
-    IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
-    if (minorMap == null) {
-      minorMap = new IntObjectOpenHashMap<FragmentData>();
-      fragmentDataMap.put(majorFragmentId, minorMap);
-    }
-
-    minorMap.put(minorFragmentId, data);
-    fragmentDataSet.add(data);
-  }
-
-  void updateFragmentStatus(FragmentStatus fragmentStatus) {
-    int majorFragmentId = fragmentStatus.getHandle().getMajorFragmentId();
-    int minorFragmentId = fragmentStatus.getHandle().getMinorFragmentId();
-    fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
-  }
-
-  synchronized QueryState updateQueryStateInStore(QueryState queryState) {
-    switch (queryState) {
-      case PENDING:
-      case RUNNING:
-        profileEStore.put(queryId, getAsInfo());  // store as ephemeral query profile.
-        break;
-      case COMPLETED:
-      case CANCELED:
-      case FAILED:
-        try{
-          profileEStore.delete(queryId);
-        }catch(Exception e){
-          logger.warn("Failure while trying to delete the estore profile for this query.", e);
-        }
-
-        profilePStore.put(queryId, getAsProfile());
-        break;
-      default:
-        throw new IllegalStateException();
-    }
-    return queryState;
-  }
-
-  @Override
-  public String toString() {
-    return fragmentDataMap.toString();
-  }
-
-  public static class FragmentId{
-    int major;
-    int minor;
-
-    public FragmentId(FragmentStatus status) {
-      this.major = status.getHandle().getMajorFragmentId();
-      this.minor = status.getHandle().getMinorFragmentId();
-    }
-
-    public FragmentId(FragmentData data) {
-      this.major = data.getHandle().getMajorFragmentId();
-      this.minor = data.getHandle().getMinorFragmentId();
-    }
-
-    public FragmentId(int major, int minor) {
-      super();
-      this.major = major;
-      this.minor = minor;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + major;
-      result = prime * result + minor;
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      FragmentId other = (FragmentId) obj;
-      if (major != other.major) {
-        return false;
-      }
-      if (minor != other.minor) {
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return major + ":" + minor;
-    }
-  }
-
-  public QueryInfo getAsInfo() {
-    return QueryInfo.newBuilder() //
-      .setQuery(query.getPlan())
-      .setState(foreman.getState())
-      .setForeman(foreman.getContext().getCurrentEndpoint())
-      .setStart(startTime)
-      .build();
-  }
-
-  public QueryProfile getAsProfile() {
-    QueryProfile.Builder b = QueryProfile.newBuilder();
-    b.setQuery(query.getPlan());
-    b.setType(query.getType());
-    if (planText != null) {
-      b.setPlan(planText);
-    }
-    b.setId(id);
-    for (int i = 0; i < fragmentDataMap.allocated.length; i++) {
-      if (fragmentDataMap.allocated[i]) {
-        int majorFragmentId = fragmentDataMap.keys[i];
-        IntObjectOpenHashMap<FragmentData> minorMap = (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i];
-
-        MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder();
-        fb.setMajorFragmentId(majorFragmentId);
-        for (int v = 0; v < minorMap.allocated.length; v++) {
-          if (minorMap.allocated[v]) {
-            FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
-            fb.addMinorFragmentProfile(data.getStatus().getProfile());
-          }
-        }
-        b.addFragmentProfile(fb);
-      }
-    }
-
-    b.setState(foreman.getState());
-    b.setForeman(foreman.getContext().getCurrentEndpoint());
-    b.setStart(startTime);
-    b.setEnd(endTime);
-    b.setTotalFragments(totalFragments);
-    b.setFinishedFragments(finishedFragments);
-    return b.build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index e2f7bbf..b6176db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,70 +18,65 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.DeferredException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.FragmentStats;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
-import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
-import org.apache.drill.exec.work.CancelableQuery;
-import org.apache.drill.exec.work.StatusProvider;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
- * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
- * messages.
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
+ * and cancellation messages.
  */
-public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvider, Comparable<Object>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
+public class FragmentExecutor implements Runnable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
 
   private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
   private final FragmentRoot rootOperator;
-  private final FragmentContext context;
-  private final WorkerBee bee;
+  private final FragmentContext fragmentContext;
   private final StatusReporter listener;
-  private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
-
+  private volatile boolean closed;
   private RootExec root;
-  private boolean closed;
 
-  public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
-    this.context = context;
-    this.bee = bee;
+  public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
+      final StatusReporter listener) {
+    this.fragmentContext = context;
     this.rootOperator = rootOperator;
     this.listener = listener;
   }
 
-  @Override
   public FragmentStatus getStatus() {
-    // If the query is not in a running state, the operator tree is still being constructed and
-    // there is no reason to poll for intermediate results.
-
-    // Previously the call to get the operator stats with the AbstractStatusReporter was happening
-    // before this check. This caused a concurrent modification exception as the list of operator
-    // stats is iterated over while collecting info, and added to while building the operator tree.
-    if(state.get() != FragmentState.RUNNING_VALUE){
+    /*
+     * If the query is not in a running state, the operator tree is still being constructed and
+     * there is no reason to poll for intermediate results.
+     *
+     * Previously the call to get the operator stats with the AbstractStatusReporter was happening
+     * before this check. This caused a concurrent modification exception as the list of operator
+     * stats is iterated over while collecting info, and added to while building the operator tree.
+     */
+    if(state.get() != FragmentState.RUNNING_VALUE) {
       return null;
     }
-    FragmentStatus status = AbstractStatusReporter.getBuilder(context, FragmentState.RUNNING, null, null).build();
+    final FragmentStatus status =
+        AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null, null).build();
     return status;
   }
 
-  @Override
   public void cancel() {
+    // Note this will be called outside of run(), from another thread
     updateState(FragmentState.CANCELLED);
-    logger.debug("Cancelled Fragment {}", context.getHandle());
-    context.cancel();
+    logger.debug("Cancelled Fragment {}", fragmentContext.getHandle());
+    fragmentContext.cancel();
   }
 
   public void receivingFragmentFinished(FragmentHandle handle) {
@@ -91,98 +86,116 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     }
   }
 
-  public UserClientConnection getClient() {
-    return context.getConnection();
-  }
-
   @Override
   public void run() {
-    final String originalThread = Thread.currentThread().getName();
-    try {
-      String newThreadName = String.format("%s:frag:%s:%s", //
-          QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
-          context.getHandle().getMajorFragmentId(),
-          context.getHandle().getMinorFragmentId()
-          );
-      Thread.currentThread().setName(newThreadName);
+    final Thread myThread = Thread.currentThread();
+    final String originalThreadName = myThread.getName();
+    final FragmentHandle fragmentHandle = fragmentContext.getHandle();
+    final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
+    final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
 
-      root = ImplCreator.getExec(context, rootOperator);
+    try {
+      final String newThreadName = String.format("%s:frag:%s:%s",
+          QueryIdHelper.getQueryId(fragmentHandle.getQueryId()),
+          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+      myThread.setName(newThreadName);
 
-      context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener);
+      root = ImplCreator.getExec(fragmentContext, rootOperator);
+      clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
 
-      logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
+      logger.debug("Starting fragment runner. {}:{}",
+          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
       if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
         logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
         return;
       }
 
-      // run the query until root.next returns false.
+      /*
+       * Run the query until root.next returns false.
+       * Note that we closeOutResources() here if we're done. That's because this can also throw
+       * exceptions that we want to treat as failures of the request, even if the request did fine
+       * up until this point. Any failures there will be caught in the catch clause below, which
+       * will be reported to the user. If they were to come from the finally clause, the uncaught
+       * exception there will simply terminate this thread without alerting the user -- the
+       * behavior then is to hang.
+       */
       while (state.get() == FragmentState.RUNNING_VALUE) {
         if (!root.next()) {
-          if (context.isFailed()) {
-            internalFail(context.getFailureCause());
-            closeOutResources(false);
+          if (fragmentContext.isFailed()) {
+            internalFail(fragmentContext.getFailureCause());
+            closeOutResources();
           } else {
-            closeOutResources(true); // make sure to close out resources before we report success.
+            /*
+             * Close out resources before we report success. We do this so that we'll get an
+             * error if there's a problem cleaning up, even though the query execution portion
+             * succeeded.
+             */
+            closeOutResources();
             updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
           }
-
           break;
         }
       }
     } catch (AssertionError | Exception e) {
       logger.warn("Error while initializing or executing fragment", e);
-      context.fail(e);
+      fragmentContext.fail(e);
       internalFail(e);
     } finally {
-      bee.removeFragment(context.getHandle());
-      context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
+      clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
       // Final check to make sure RecordBatches are cleaned up.
-      closeOutResources(false);
+      closeOutResources();
 
-      Thread.currentThread().setName(originalThread);
+      myThread.setName(originalThreadName);
     }
   }
 
-  private void closeOutResources(boolean throwFailure) {
+  private static final String CLOSE_FAILURE = "Failure while closing out resources";
+
+  private void closeOutResources() {
+    /*
+     * Because of the way this method can be called, it needs to be idempotent; it must
+     * be safe to call it more than once. We use this flag to bypass the body if it has
+     * been called before.
+     */
     if (closed) {
       return;
     }
 
+    final DeferredException deferredException = fragmentContext.getDeferredException();
     try {
-      root.stop();
+      root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
     } catch (RuntimeException e) {
-      if (throwFailure) {
-        throw e;
-      }
-      logger.warn("Failure while closing out resources.", e);
+      logger.warn(CLOSE_FAILURE, e);
+      deferredException.addException(e);
     }
 
+    closed = true;
+
+    /*
+     * This must be last, because this may throw deferred exceptions.
+     * We are forced to wrap the checked exception (if any) so that it will be unchecked.
+     */
     try {
-      context.close();
-    } catch (Exception e) {
-      if (throwFailure) {
-        throw new RuntimeException("Error closing fragment context.", e);
-      }
-      logger.warn("Failure while closing out resources.", e);
+      fragmentContext.close();
+    } catch(Exception e) {
+      throw new RuntimeException("Error closing fragment context.", e);
     }
-
-    closed = true;
   }
 
-  private void internalFail(Throwable excep) {
+  private void internalFail(final Throwable excep) {
     state.set(FragmentState.FAILED_VALUE);
-    listener.fail(context.getHandle(), "Failure while running fragment.", excep);
+    listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", excep);
   }
 
   /**
    * Updates the fragment state with the given state
+   *
    * @param to target state
    */
-  protected void updateState(FragmentState to) {;
+  private void updateState(final FragmentState to) {
     state.set(to.getNumber());
-    listener.stateChanged(context.getHandle(), to);
+    listener.stateChanged(fragmentContext.getHandle(), to);
   }
 
   /**
@@ -192,10 +205,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
    * @param to target state
    * @return true only if update succeeds
    */
-  protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) {
-    boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
+  private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
+    final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
     if (success) {
-      listener.stateChanged(context.getHandle(), to);
+      listener.stateChanged(fragmentContext.getHandle(), to);
     } else {
       logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
           expected.name(), to.name(), FragmentState.valueOf(state.get()));
@@ -206,7 +219,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   /**
    * Returns true if the fragment is in a terminal state
    */
-  protected boolean isCompleted() {
+  private boolean isCompleted() {
     return state.get() == FragmentState.CANCELLED_VALUE
         || state.get() == FragmentState.FAILED_VALUE
         || state.get() == FragmentState.FINISHED_VALUE;
@@ -220,38 +233,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
    * @param to target state
    * @return true only if update succeeds
    */
-  protected boolean updateStateOrFail(FragmentState expected, FragmentState to) {
+  private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
     final boolean updated = checkAndUpdateState(expected, to);
     if (!updated && !isCompleted()) {
       final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
-      internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
+      internalFail(new StateTransitionException(
+          String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
     }
     return updated;
   }
 
-
-  @Override
-  public int compareTo(Object o) {
-    return o.hashCode() - this.hashCode();
-  }
-
   public FragmentContext getContext() {
-    return context;
+    return fragmentContext;
   }
 
   private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
-
     @Override
-    public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
-      // Do nothing.
+    public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
     }
 
     @Override
-    public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
-      if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanEndpoint())) {
-        logger.warn("Forman : {} no longer active. Cancelling fragment {}.",
-            FragmentExecutor.this.context.getForemanEndpoint().getAddress(),
-            QueryIdHelper.getQueryIdentifier(context.getHandle()));
+    public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
+      // if the defunct Drillbit was running our Foreman, then cancel the query
+      final DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint();
+      if (unregisteredDrillbits.contains(foremanEndpoint)) {
+        logger.warn("Foreman : {} no longer active. Cancelling fragment {}.",
+            foremanEndpoint.getAddress(), QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
         FragmentExecutor.this.cancel();
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 3671804..41e87cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -30,13 +30,13 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.foreman.ForemanException;
 
 /**
  * This managers determines when to run a non-root fragment node.
  */
+// TODO a lot of this is the same as RootFragmentManager
 public class NonRootFragmentManager implements FragmentManager {
   private final PlanFragment fragment;
   private FragmentRoot root;
@@ -44,15 +44,13 @@ public class NonRootFragmentManager implements FragmentManager {
   private final StatusReporter runnerListener;
   private volatile FragmentExecutor runner;
   private volatile boolean cancel = false;
-  private final WorkerBee bee;
   private final FragmentContext context;
   private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
 
-  public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws ExecutionSetupException {
+  public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
+      throws ExecutionSetupException {
     try {
       this.fragment = fragment;
-      DrillbitContext context = bee.getContext();
-      this.bee = bee;
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
       this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
@@ -84,8 +82,8 @@ public class NonRootFragmentManager implements FragmentManager {
       if (cancel) {
         return null;
       }
-      runner = new FragmentExecutor(context, bee, root, runnerListener);
-      return this.runner;
+      runner = new FragmentExecutor(context, root, runnerListener);
+      return runner;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 54fc8c4..84071c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+// TODO a lot of this is the same as NonRootFragmentManager
 public class RootFragmentManager implements FragmentManager{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 775bccb..8281b7f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -23,18 +23,13 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import com.google.common.base.Preconditions;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.client.PrintingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter;
-import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -58,7 +53,7 @@ import org.junit.runner.Description;
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 
-public class BaseTestQuery extends ExecTest{
+public class BaseTestQuery extends ExecTest {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
   /**
@@ -129,7 +124,7 @@ public class BaseTestQuery extends ExecTest{
   }
 
   @BeforeClass
-  public static void openClient() throws Exception{
+  public static void openClient() throws Exception {
     config = DrillConfig.create(TEST_CONFIGURATIONS);
     allocator = new TopLevelAllocator(config);
     if (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)) {
@@ -139,20 +134,15 @@ public class BaseTestQuery extends ExecTest{
     }
 
     bits = new Drillbit[drillbitCount];
-    for(int i=0; i<drillbitCount; i++) {
+    for(int i = 0; i < drillbitCount; i++) {
       bits[i] = new Drillbit(config, serviceSet);
       bits[i].run();
     }
 
-    client = new DrillClient(config, serviceSet.getCoordinator());
-    client.connect();
-    List<QueryResultBatch> results = client.runQuery(QueryType.SQL, String.format("alter session set `%s` = 2", ExecConstants.MAX_WIDTH_PER_NODE_KEY));
-    for (QueryResultBatch b : results) {
-      b.release();
-    }
+    client = QueryTestUtil.createClient(config,  serviceSet, 2);
   }
 
-  protected BufferAllocator getAllocator() {
+  protected static BufferAllocator getAllocator() {
     return allocator;
   }
 
@@ -205,27 +195,23 @@ public class BaseTestQuery extends ExecTest{
   }
 
   public static List<QueryResultBatch>  testRunAndReturn(QueryType type, String query) throws Exception{
-    query = normalizeQuery(query);
+    query = QueryTestUtil.normalizeQuery(query);
     return client.runQuery(type, query);
   }
 
-  public static int testRunAndPrint(QueryType type, String query) throws Exception{
-    query = normalizeQuery(query);
-    PrintingResultsListener resultListener = new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
-    client.runQuery(type, query, resultListener);
-    return resultListener.await();
+  public static int testRunAndPrint(final QueryType type, final String query) throws Exception {
+    return QueryTestUtil.testRunAndPrint(client, type, query);
   }
 
   protected static void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
-    query = normalizeQuery(query);
-    client.runQuery(type, query, resultListener);
+    QueryTestUtil.testWithListener(client, type, query, resultListener);
   }
 
-  protected void testNoResult(String query, Object... args) throws Exception {
+  protected static void testNoResult(String query, Object... args) throws Exception {
     testNoResult(1, query, args);
   }
 
-  protected void testNoResult(int interation, String query, Object... args) throws Exception {
+  protected static void testNoResult(int interation, String query, Object... args) throws Exception {
     query = String.format(query, args);
     logger.debug("Running query:\n--------------\n"+query);
     for (int i = 0; i < interation; i++) {
@@ -237,27 +223,11 @@ public class BaseTestQuery extends ExecTest{
   }
 
   public static void test(String query, Object... args) throws Exception {
-    test(String.format(query, args));
+    QueryTestUtil.test(client, String.format(query, args));
   }
 
-  public static void test(String query) throws Exception{
-    query = normalizeQuery(query);
-    String[] queries = query.split(";");
-    for (String q : queries) {
-      if (q.trim().isEmpty()) {
-        continue;
-      }
-      testRunAndPrint(QueryType.SQL, q);
-    }
-  }
-
-  public static String normalizeQuery(String query) {
-    if (query.contains("${WORKING_PATH}")) {
-      return query.replaceAll(Pattern.quote("${WORKING_PATH}"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
-    } else if (query.contains("[WORKING_PATH]")) {
-      return query.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
-    }
-    return query;
+  public static void test(final String query) throws Exception {
+    QueryTestUtil.test(client, query);
   }
 
   protected static int testLogical(String query) throws Exception{
@@ -272,19 +242,19 @@ public class BaseTestQuery extends ExecTest{
     return testRunAndPrint(QueryType.SQL, query);
   }
 
-  protected void testPhysicalFromFile(String file) throws Exception{
+  protected static void testPhysicalFromFile(String file) throws Exception{
     testPhysical(getFile(file));
   }
 
-  protected List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception {
+  protected static List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception {
     return testRunAndReturn(QueryType.PHYSICAL, getFile(file));
   }
 
-  protected void testLogicalFromFile(String file) throws Exception{
+  protected static void testLogicalFromFile(String file) throws Exception{
     testLogical(getFile(file));
   }
 
-  protected void testSqlFromFile(String file) throws Exception{
+  protected static void testSqlFromFile(String file) throws Exception{
     test(getFile(file));
   }
 
@@ -358,7 +328,8 @@ public class BaseTestQuery extends ExecTest{
     return rowCount;
   }
 
-  protected String getResultString(List<QueryResultBatch> results, String delimiter) throws SchemaChangeException {
+  protected static String getResultString(List<QueryResultBatch> results, String delimiter)
+      throws SchemaChangeException {
     StringBuilder formattedResults = new StringBuilder();
     boolean includeHeader = true;
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 36091af..80b4d13 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.drill;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
@@ -28,7 +26,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
@@ -38,7 +35,6 @@ import org.eigenbase.sql.SqlExplain.Depth;
 import org.eigenbase.sql.SqlExplainLevel;
 
 import com.google.common.base.Strings;
-import org.junit.Test;
 
 public class PlanTestBase extends BaseTestQuery {
 
@@ -54,9 +50,9 @@ public class PlanTestBase extends BaseTestQuery {
    * format. Then check the physical plan against the list expected substrs.
    * Verify all the expected strings are contained in the physical plan string.
    */
-  public void testPhysicalPlan(String sql, String... expectedSubstrs)
+  public static void testPhysicalPlan(String sql, String... expectedSubstrs)
       throws Exception {
-    sql = "EXPLAIN PLAN for " + normalizeQuery(sql);
+    sql = "EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(sql);
 
     String planStr = getPlanInString(sql, JSON_FORMAT);
 
@@ -79,8 +75,9 @@ public class PlanTestBase extends BaseTestQuery {
    * @throws Exception - if an inclusion or exclusion check fails, or the
    *                     planning process throws an exception
    */
-  public void testPlanMatchingPatterns(String query, String[] expectedPatterns, String[] excludedPatterns) throws Exception {
-    String plan = getPlanInString("EXPLAIN PLAN for " + normalizeQuery(query), OPTIQ_FORMAT);
+  public static void testPlanMatchingPatterns(String query, String[] expectedPatterns, String[] excludedPatterns)
+      throws Exception {
+    String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);
 
     Pattern p;
     Matcher m;
@@ -121,8 +118,9 @@ public class PlanTestBase extends BaseTestQuery {
    * @throws Exception - if an inclusion or exclusion check fails, or the
    *                     planning process throws an exception
    */
-  public void testPlanSubstrPatterns(String query, String[] expectedPatterns, String[] excludedPatterns) throws Exception {
-    String plan = getPlanInString("EXPLAIN PLAN for " + normalizeQuery(query), OPTIQ_FORMAT);
+  public static void testPlanSubstrPatterns(String query, String[] expectedPatterns, String[] excludedPatterns)
+      throws Exception {
+    final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);
 
     // Check and make sure all expected patterns are in the plan
     if (expectedPatterns != null) {
@@ -139,15 +137,15 @@ public class PlanTestBase extends BaseTestQuery {
     }
   }
 
-  public void testPlanOneExpectedPatternOneExcluded(String query, String expectedPattern, String excludedPattern) throws Exception {
+  public static void testPlanOneExpectedPatternOneExcluded(String query, String expectedPattern, String excludedPattern) throws Exception {
     testPlanMatchingPatterns(query, new String[]{expectedPattern}, new String[]{excludedPattern});
   }
 
-  public void testPlanOneExpectedPattern(String query, String expectedPattern) throws Exception {
+  public static void testPlanOneExpectedPattern(String query, String expectedPattern) throws Exception {
     testPlanMatchingPatterns(query, new String[]{expectedPattern}, new String[]{});
   }
 
-  public void testPlanOneExcludedPattern(String query, String excludedPattern) throws Exception {
+  public static void testPlanOneExcludedPattern(String query, String excludedPattern) throws Exception {
     testPlanMatchingPatterns(query, new String[]{}, new String[]{excludedPattern});
   }
 
@@ -157,7 +155,7 @@ public class PlanTestBase extends BaseTestQuery {
    * substrs. Verify all the expected strings are contained in the physical plan
    * string.
    */
-  public void testRelLogicalJoinOrder(String sql, String... expectedSubstrs) throws Exception {
+  public static void testRelLogicalJoinOrder(String sql, String... expectedSubstrs) throws Exception {
     String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL);
 
     String prefixJoinOrder = getLogicalPrefixJoinOrderFromPlan(planStr);
@@ -173,7 +171,7 @@ public class PlanTestBase extends BaseTestQuery {
    * substrs. Verify all the expected strings are contained in the physical plan
    * string.
    */
-  public void testRelPhysicalJoinOrder(String sql, String... expectedSubstrs) throws Exception {
+  public static void testRelPhysicalJoinOrder(String sql, String... expectedSubstrs) throws Exception {
     String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL);
 
     String prefixJoinOrder = getPhysicalPrefixJoinOrderFromPlan(planStr);
@@ -189,9 +187,9 @@ public class PlanTestBase extends BaseTestQuery {
    * expected substrs. Verify all the expected strings are contained in the
    * physical plan string.
    */
-  public void testRelPhysicalPlanLevDigest(String sql, String... expectedSubstrs)
+  public static void testRelPhysicalPlanLevDigest(String sql, String... expectedSubstrs)
       throws Exception {
-    String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.PHYSICAL);
+    final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.PHYSICAL);
 
     for (String substr : expectedSubstrs) {
       assertTrue(planStr.contains(substr));
@@ -204,9 +202,9 @@ public class PlanTestBase extends BaseTestQuery {
    * substrs. Verify all the expected strings are contained in the physical plan
    * string.
    */
-  public void testRelLogicalPlanLevDigest(String sql, String... expectedSubstrs)
+  public static void testRelLogicalPlanLevDigest(String sql, String... expectedSubstrs)
       throws Exception {
-    String planStr = getDrillRelPlanInString(sql,
+    final String planStr = getDrillRelPlanInString(sql,
         SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.LOGICAL);
 
     for (String substr : expectedSubstrs) {
@@ -220,8 +218,8 @@ public class PlanTestBase extends BaseTestQuery {
    * expected substrs. Verify all the expected strings are contained in the
    * physical plan string.
    */
-  public void testRelPhysicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception {
-    String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL);
+  public static void testRelPhysicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception {
+    final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL);
 
     for (String substr : expectedSubstrs) {
       assertTrue(planStr.contains(substr));
@@ -234,8 +232,8 @@ public class PlanTestBase extends BaseTestQuery {
    * substrs. Verify all the expected strings are contained in the physical plan
    * string.
    */
-  public void testRelLogicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception {
-    String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL);
+  public static void testRelLogicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception {
+    final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL);
 
     for (String substr : expectedSubstrs) {
       assertTrue(planStr.contains(substr));
@@ -246,7 +244,7 @@ public class PlanTestBase extends BaseTestQuery {
    * This will get the plan (either logical or physical) in Optiq RelNode
    * format, based on SqlExplainLevel and Depth.
    */
-  private String getDrillRelPlanInString(String sql, SqlExplainLevel level,
+  private static String getDrillRelPlanInString(String sql, SqlExplainLevel level,
       Depth depth) throws Exception {
     String levelStr = " ", depthStr = " ";
 
@@ -279,7 +277,7 @@ public class PlanTestBase extends BaseTestQuery {
     }
 
     sql = "EXPLAIN PLAN " + levelStr + " " + depthStr + "  for "
-        + normalizeQuery(sql);
+        + QueryTestUtil.normalizeQuery(sql);
 
     return getPlanInString(sql, OPTIQ_FORMAT);
   }
@@ -326,22 +324,21 @@ public class PlanTestBase extends BaseTestQuery {
     return builder.toString();
   }
 
-  private String getLogicalPrefixJoinOrderFromPlan(String plan) {
+  private static String getLogicalPrefixJoinOrderFromPlan(String plan) {
     return getPrefixJoinOrderFromPlan(plan, "DrillJoinRel", "DrillScanRel");
-
   }
 
-  private String getPhysicalPrefixJoinOrderFromPlan(String plan) {
+  private static String getPhysicalPrefixJoinOrderFromPlan(String plan) {
     return getPrefixJoinOrderFromPlan(plan, "JoinPrel", "ScanPrel");
   }
 
-  private String getPrefixJoinOrderFromPlan(String plan, String joinKeyWord, String scanKeyWord) {
+  private static String getPrefixJoinOrderFromPlan(String plan, String joinKeyWord, String scanKeyWord) {
     StringBuilder builder = new StringBuilder();
 
-    String[] planLines = plan.split("\n");
+    final String[] planLines = plan.split("\n");
     int cnt = 0;
 
-    Stack<Integer> s = new Stack<Integer>();
+    final Stack<Integer> s = new Stack<>();
 
     for (String line : planLines) {
       if (line.trim().isEmpty()) {
@@ -369,5 +366,4 @@ public class PlanTestBase extends BaseTestQuery {
 
     return builder.toString();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
new file mode 100644
index 0000000..3d19229
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.client.PrintingResultsListener;
+import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.VectorUtil;
+
+/**
+ * Utilities useful for tests that issue SQL queries.
+ */
+public class QueryTestUtil {
+  /**
+   * Constructor. All methods are static.
+   */
+  private QueryTestUtil() {
+  }
+
+  /**
+   * Create a DrillClient that can be used to query a drill cluster.
+   *
+   * @param drillConfig
+   * @param remoteServiceSet remote service set
+   * @param maxWidth maximum width per node
+   * @return the newly created client
+   * @throws RpcException if there is a problem setting up the client
+   */
+  public static DrillClient createClient(
+      final DrillConfig drillConfig, final RemoteServiceSet remoteServiceSet, final int maxWidth)
+      throws RpcException {
+    final DrillClient drillClient = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
+    drillClient.connect();
+
+    final List<QueryResultBatch> results = drillClient.runQuery(
+        QueryType.SQL, String.format("alter session set `%s` = %d",
+            ExecConstants.MAX_WIDTH_PER_NODE_KEY, maxWidth));
+    for (QueryResultBatch queryResultBatch : results) {
+      queryResultBatch.release();
+    }
+
+    return drillClient;
+  }
+
+  /**
+   * Normalize the query relative to the test environment.
+   *
+   * <p>Looks for "${WORKING_PATH}" in the query string, and replaces it the current
+   * working patch obtained from {@link org.apache.drill.common.util.TestTools#getWorkingPath()}.
+   *
+   * @param query the query string
+   * @return the normalized query string
+   */
+  public static String normalizeQuery(final String query) {
+    if (query.contains("${WORKING_PATH}")) {
+      return query.replaceAll(Pattern.quote("${WORKING_PATH}"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
+    } else if (query.contains("[WORKING_PATH]")) {
+      return query.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
+    }
+    return query;
+  }
+
+  /**
+   * Execute a SQL query, and print the results.
+   *
+   * @param drillClient drill client to use
+   * @param type type of the query
+   * @param queryString query string
+   * @return number of rows returned
+   * @throws Exception
+   */
+  public static int testRunAndPrint(
+      final DrillClient drillClient, final QueryType type, final String queryString) throws Exception {
+    final String query = normalizeQuery(queryString);
+    PrintingResultsListener resultListener =
+        new PrintingResultsListener(drillClient.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
+    drillClient.runQuery(type, query, resultListener);
+    return resultListener.await();
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and print the results.
+   *
+   * @param drillClient drill client to use
+   * @param queryString the query string
+   * @throws Exception
+   */
+  public static void test(final DrillClient drillClient, final String queryString) throws Exception{
+    final String query = normalizeQuery(queryString);
+    String[] queries = query.split(";");
+    for (String q : queries) {
+      final String trimmedQuery = q.trim();
+      if (trimmedQuery.isEmpty()) {
+        continue;
+      }
+      testRunAndPrint(drillClient, QueryType.SQL, trimmedQuery);
+    }
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and print the results, with the option to
+   * add formatted arguments to the query string.
+   *
+   * @param drillClient drill client to use
+   * @param query the query string; may contain formatting specifications to be used by
+   *   {@link String#format(String, Object...)}.
+   * @param args optional args to use in the formatting call for the query string
+   * @throws Exception
+   */
+  public static void test(final DrillClient drillClient, final String query, Object... args) throws Exception {
+    test(drillClient, String.format(query, args));
+  }
+
+  /**
+   * Execute a single query with a user supplied result listener.
+   *
+   * @param drillClient drill client to use
+   * @param type type of query
+   * @param queryString the query string
+   * @param resultListener the result listener
+   */
+  public static void testWithListener(final DrillClient drillClient, final QueryType type,
+      final String queryString, final UserResultsListener resultListener) {
+    final String query = QueryTestUtil.normalizeQuery(queryString);
+    drillClient.runQuery(type, query, resultListener);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
new file mode 100644
index 0000000..07cb833
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+
+/**
+ * Result listener that is set up to receive a single row. Useful for queries
+ * such with a count(*) or limit 1. The abstract method {@link #rowArrived(QueryResultBatch)} provides
+ * the means for a derived class to get the expected record's data.
+ */
+public abstract class SingleRowListener implements UserResultsListener {
+  private final CountDownLatch latch = new CountDownLatch(1); // used to wait for completion
+  private final AtomicInteger nRows = new AtomicInteger(0); // counts rows received
+  private QueryState queryState = null; // last received QueryState, if any
+  private final List<DrillPBError> errorList = new LinkedList<>(); // all errors ever received
+  private Exception exception = null; // the exception captured from a submission failure
+
+  @Override
+  public void queryIdArrived(final QueryId queryId) {
+  }
+
+  @Override
+  public void submissionFailed(final RpcException ex) {
+    exception = ex;
+    latch.countDown();
+  }
+
+  @Override
+  public void resultArrived(final QueryResultBatch result, final ConnectionThrottle throttle) {
+    final QueryResult queryResult = result.getHeader();
+    if (result.hasData()) {
+      final int nRows = this.nRows.addAndGet(queryResult.getRowCount());
+      if (nRows > 1) {
+        throw new IllegalStateException("Expected exactly one row, but got " + nRows);
+      }
+
+      rowArrived(result);
+    }
+
+    // TODO this appears to never be set
+    if (queryResult.hasQueryState()) {
+      queryState = queryResult.getQueryState();
+    }
+
+    synchronized(errorList) {
+      errorList.addAll(queryResult.getErrorList());
+    }
+
+    final boolean isLastChunk = queryResult.getIsLastChunk();
+    result.release();
+
+    if (isLastChunk) {
+      cleanup();
+      latch.countDown();
+    }
+  }
+
+  /**
+   * Get the last known QueryState.
+   *
+   * @return the query state; may be null if no query state has been received
+   */
+  public QueryState getQueryState() {
+    return queryState;
+  }
+
+  /**
+   * Get an immutable copy of the list of all errors received so far.
+   *
+   * @return list of errors received
+   */
+  public List<DrillPBError> getErrorList() {
+    synchronized(errorList) {
+      return Collections.unmodifiableList(errorList);
+    }
+  }
+
+  /**
+   * A record has arrived and is ready for access.
+   *
+   * <p>Derived classes provide whatever implementation they require here to access
+   * the record's data.
+   *
+   * @param queryResultBatch result batch holding the row
+   */
+  protected abstract void rowArrived(QueryResultBatch queryResultBatch);
+
+  /**
+   * Wait for the completion of this query; receiving a record or an error will both cause the
+   * query to be considered complete
+   *
+   * @throws Exception if there was any kind of problem
+   */
+  public void waitForCompletion() throws Exception {
+    latch.await();
+    if (exception != null) {
+      throw new RuntimeException("Query submission failed", exception);
+    }
+  }
+
+  /**
+   * Clean up any resources used.
+   *
+   * <p>Derived classes may use this to free things like allocators or files that were used to
+   * record data received in resultArrived().
+   */
+  public void cleanup() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 978e565..13f958c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.expression.parser.ExprLexer;
 import org.apache.drill.common.expression.parser.ExprParser;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
 
@@ -36,7 +35,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 
@@ -124,7 +122,7 @@ public class TestBuilder {
   }
 
   public TestBuilder sqlQuery(String query) {
-    this.query = BaseTestQuery.normalizeQuery(query);
+    this.query = QueryTestUtil.normalizeQuery(query);
     this.queryType = UserBitShared.QueryType.SQL;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
index e0f830d..b062b39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
@@ -23,8 +23,8 @@ import org.junit.Test;
 public class TestTpchDistributed extends BaseTestQuery {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchDistributed.class);
 
-  private static void testDistributed(String fileName) throws Exception {
-    String query = getFile(fileName);
+  private static void testDistributed(final String fileName) throws Exception {
+    final String query = getFile(fileName);
     test("alter session set `planner.slice_target` = 10; " + query);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
index 75ba3a9..e63f085 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Throwables.propagate;
 
 import java.util.List;
 
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.server.Drillbit;
 import org.slf4j.Logger;
@@ -33,8 +32,7 @@ import com.google.common.collect.ImmutableList;
  * Starts one or more Drillbits, an embedded ZooKeeper cluster and provides a configured client for testing.
  */
 public class DrillSystemTestBase extends TestWithZookeeper {
-
-  static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
+  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSystemTestBase.class);
 
   private List<Drillbit> servers;
 
@@ -62,9 +60,7 @@ public class DrillSystemTestBase extends TestWithZookeeper {
     }
   }
 
-
   public Drillbit getABit(){
     return this.servers.iterator().next();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
index bb69c9a..f65c638 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
@@ -17,65 +17,27 @@
  */
 package org.apache.drill.exec;
 
-import static com.google.common.base.Throwables.propagate;
-
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.util.MiniZooKeeperCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 public class TestWithZookeeper extends ExecTest {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class);
 
-  private static File testDir = new File("target/test-data");
-  private static DrillConfig config;
-  private static String zkUrl;
-  private static MiniZooKeeperCluster zkCluster;
+  private static ZookeeperHelper zkHelper;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    config = DrillConfig.create();
-    zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
-    setupTestDir();
-    startZookeeper(1);
+    zkHelper = new ZookeeperHelper();
+    zkHelper.startZookeeper(1);
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    stopZookeeper();
+    zkHelper.stopZookeeper();
   }
 
-  private static void setupTestDir() {
-    if (!testDir.exists()) {
-      testDir.mkdirs();
-    }
+  public static DrillConfig getConfig() {
+    return zkHelper.getConfig();
   }
-
-  private static void startZookeeper(int numServers) {
-    try {
-      zkCluster = new MiniZooKeeperCluster();
-      zkCluster.setDefaultClientPort(Integer.parseInt(zkUrl.split(":")[1]));
-      zkCluster.startup(testDir, numServers);
-    } catch (IOException e) {
-      propagate(e);
-    } catch (InterruptedException e) {
-      propagate(e);
-    }
-  }
-
-  private static void stopZookeeper() {
-    try {
-      zkCluster.shutdown();
-    } catch (IOException e) {
-      propagate(e);
-    }
-  }
-
-  public static DrillConfig getConfig(){
-    return config;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
new file mode 100644
index 0000000..7fcf4cb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import static com.google.common.base.Throwables.propagate;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.util.MiniZooKeeperCluster;
+
+/**
+ * Test utility for managing a Zookeeper instance.
+ *
+ * <p>Tests that need a Zookeeper instance can initialize a static instance of this class in
+ * their {@link org.junit.BeforeClass} section to set up Zookeeper.
+ */
+public class ZookeeperHelper {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperHelper.class);
+
+  private final File testDir = new File("target/test-data");
+  private final DrillConfig config;
+  private final String zkUrl;
+  private MiniZooKeeperCluster zkCluster;
+
+  /**
+   * Constructor.
+   *
+   * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
+   */
+  public ZookeeperHelper() {
+    config = DrillConfig.create();
+    zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
+
+    if (!testDir.exists()) {
+      testDir.mkdirs();
+    }
+  }
+
+  /**
+   * Start the Zookeeper instance.
+   *
+   * <p>This must be used before any operations that depend on the Zookeeper instance being up.
+   *
+   * @param numServers how many servers the Zookeeper instance should have
+   */
+  public void startZookeeper(final int numServers) {
+    if (zkCluster != null) {
+      throw new IllegalStateException("Zookeeper cluster already running");
+    }
+
+    try {
+      zkCluster = new MiniZooKeeperCluster();
+      zkCluster.setDefaultClientPort(Integer.parseInt(zkUrl.split(":")[1]));
+      zkCluster.startup(testDir, numServers);
+    } catch (IOException | InterruptedException e) {
+      propagate(e);
+    }
+  }
+
+  /**
+   * Shut down the Zookeeper instance.
+   *
+   * <p>This must be used before the program exits.
+   */
+  public void stopZookeeper() {
+    try {
+      zkCluster.shutdown();
+      zkCluster = null;
+    } catch (IOException e) {
+      // since this is meant to be used in a test's cleanup, we don't propagate the exception
+      final String message = "Unable to shutdown Zookeeper";
+      System.err.println(message + '.');
+      logger.warn(message, e);
+    }
+  }
+
+  /**
+   * Get the DrillConfig used for the Zookeeper instance.
+   *
+   * @return the DrillConfig used.
+   */
+  public DrillConfig getConfig() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 0277876..933417e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -35,11 +34,12 @@ import org.apache.drill.exec.vector.ValueVector;
 import com.google.common.collect.Lists;
 
 @Deprecated
-public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRootExec.class);
+public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRootExec.class);
+
+  private final RecordBatch incoming;
+  private final ScreenRoot screenRoot;
 
-  private RecordBatch incoming;
-  private ScreenRoot screenRoot;
   public SimpleRootExec(RootExec e) {
     if (e instanceof ScreenRoot) {
       incoming = ((ScreenRoot)e).getIncoming();

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 609bc14..0f6fd43 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -42,13 +42,12 @@ import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 
 public class TestComparisonFunctions extends ExecTest {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestComparisonFunctions.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestComparisonFunctions.class);
 
-  DrillConfig c = DrillConfig.create();
-    String COMPARISON_TEST_PHYSICAL_PLAN = "functions/comparisonTest.json";
-  PhysicalPlanReader reader;
-  FunctionImplementationRegistry registry;
-  FragmentContext context;
+  private final DrillConfig c = DrillConfig.create();
+  private final String COMPARISON_TEST_PHYSICAL_PLAN = "functions/comparisonTest.json";
+  private PhysicalPlanReader reader;
+  private FunctionImplementationRegistry registry;
 
   public void runTest(@Injectable final DrillbitContext bitContext,
                       @Injectable UserServer.UserClientConnection connection, String expression, int expectedResults) throws Throwable {
@@ -68,21 +67,20 @@ public class TestComparisonFunctions extends ExecTest {
     if (registry == null) {
       registry = new FunctionImplementationRegistry(c);
     }
-    if(context == null) {
-      context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
-    }
+    final FragmentContext context =
+        new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
     PhysicalPlan plan = reader.readPhysicalPlan(planString);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()) {
-      assertEquals(String.format("Expression: %s;", expression), expectedResults, exec.getSelectionVector2().getCount());
+      assertEquals(String.format("Expression: %s;", expression), expectedResults,
+          exec.getSelectionVector2().getCount());
 //      for (ValueVector vv: exec) {
 //        vv.close();
 //      }
     }
 
     exec.stop();
-
     context.close();
 
     if (context.getFailureCause() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 2a0aedc..6bf23ec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.control.Controller;
@@ -104,11 +103,13 @@ public class TestOptiqPlans extends ExecTest {
       }
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
-    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, workBus, new LocalPStoreProvider(DrillConfig.create()), null);
-    QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), QueryId.getDefaultInstance(), bitContext);
+    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller,
+        com, workBus, new LocalPStoreProvider(DrillConfig.create()), null);
+    QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(),
+        bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
-    PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+    PhysicalPlan pp = new BasicOptimizer(qc).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
 
 
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);


Mime
View raw message