drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/2] drill git commit: DRILL-1747: Enable fragment profile updates throughout queries rather than only at end.
Date Thu, 05 Feb 2015 01:30:26 GMT
Repository: drill
Updated Branches:
  refs/heads/master cafb7d4ff -> a9ee79110


DRILL-1747: Enable fragment profile updates throughout queries rather than only at end.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6331d35b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6331d35b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6331d35b

Branch: refs/heads/master
Commit: 6331d35b3d09ecea06d68fc9b95567cef963d1ef
Parents: cafb7d4
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue Jan 13 23:08:03 2015 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Feb 4 16:28:02 2015 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/work/StatusProvider.java  |  5 ++
 .../org/apache/drill/exec/work/WorkManager.java | 53 +++++++++++++++++++-
 .../work/fragment/AbstractStatusReporter.java   |  5 +-
 .../exec/work/fragment/FragmentExecutor.java    |  8 ++-
 4 files changed, 66 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6331d35b/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
index 9215f43..6086f74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
@@ -20,5 +20,10 @@ package org.apache.drill.exec.work;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 
 public interface StatusProvider {
+
+  /**
+   * Provides the current status of the FragmentExecutor's work.
+   * @return Status if currently.  Null if in another state.
+   */
   public FragmentStatus getStatus();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6331d35b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 5bc3da1..99c6ab8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
@@ -29,11 +30,15 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
@@ -52,6 +57,7 @@ import org.apache.drill.exec.work.user.UserWorker;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
@@ -79,6 +85,8 @@ public class WorkManager implements Closeable {
   private final WorkEventBus workBus;
   private ExecutorService executor;
   private final EventThread eventThread;
+  private final StatusThread statusThread;
+  private Controller controller;
 
   public WorkManager(BootStrapContext context) {
     this.bee = new WorkerBee();
@@ -87,6 +95,7 @@ public class WorkManager implements Closeable {
     this.controlMessageWorker = new ControlHandlerImpl(bee);
     this.userWorker = new UserWorker(bee);
     this.eventThread = new EventThread();
+    this.statusThread = new StatusThread();
     this.dataHandler = new DataResponseHandlerImpl(bee);
   }
 
@@ -94,8 +103,10 @@ public class WorkManager implements Closeable {
       DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider) {
     this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus,
provider);
     // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
-    executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
-    eventThread.start();
+    this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
+    this.controller = controller;
+    this.eventThread.start();
+    this.statusThread.start();
     // TODO remove try block once metrics moved from singleton, For now catch to avoid unit
test failures
     try {
       dContext.getMetrics().register(
@@ -209,6 +220,44 @@ public class WorkManager implements Closeable {
 
   }
 
+  private class StatusThread extends Thread {
+    public StatusThread() {
+      this.setDaemon(true);
+      this.setName("WorkManager Status Reporter");
+    }
+
+    @Override
+    public void run() {
+      while(true){
+        List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
+        for(FragmentExecutor e : runningFragments.values()){
+          FragmentStatus status = e.getStatus();
+          if(status == null){
+            continue;
+          }
+          DrillbitEndpoint ep = e.getContext().getForemanEndpoint();
+          futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
+        }
+
+        for(DrillRpcFuture<Ack> future : futures){
+          try{
+            future.checkedGet();
+          }catch(RpcException ex){
+            logger.info("Failure while sending intermediate fragment status to Foreman",
ex);
+          }
+        }
+
+        try{
+          Thread.sleep(5000);
+        }catch(InterruptedException e){
+          // exit status thread on interrupt.
+          break;
+        }
+      }
+    }
+
+  }
+
   private class EventThread extends Thread {
     public EventThread() {
       this.setDaemon(true);

http://git-wip-us.apache.org/repos/asf/drill/blob/6331d35b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index c7ac311..8fd4b97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -37,9 +37,10 @@ public abstract class AbstractStatusReporter implements StatusReporter{
   }
 
   private  FragmentStatus.Builder getBuilder(FragmentState state){
-    return getBuilder(state, null, null);
+    return getBuilder(context, state, null, null);
   }
-  private  FragmentStatus.Builder getBuilder(FragmentState state, String message, Throwable
t){
+
+  public static FragmentStatus.Builder getBuilder(FragmentContext context, FragmentState
state, String message, Throwable t){
     FragmentStatus.Builder status = FragmentStatus.newBuilder();
     MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
     context.getStats().addMetricsToStatus(b);

http://git-wip-us.apache.org/repos/asf/drill/blob/6331d35b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 9ffe643..a8f07b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentStats;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.work.CancelableQuery;
@@ -62,7 +64,11 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
   @Override
   public FragmentStatus getStatus() {
-    throw new UnsupportedOperationException();
+    FragmentStatus status = AbstractStatusReporter.getBuilder(context, FragmentState.RUNNING,
null, null).build();
+    if(state.get() != FragmentState.RUNNING_VALUE){
+      return null;
+    }
+    return status;
   }
 
   @Override


Mime
View raw message