drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [1/5] drill git commit: DRILL-3190: Check for transitions from CANCELLATION_REQUESTED state + Moved state transition checks to QueryManager
Date Mon, 08 Jun 2015 19:07:26 GMT
Repository: drill
Updated Branches:
  refs/heads/master 1de6aed93 -> 6796006f2


DRILL-3190: Check for transitions from CANCELLATION_REQUESTED state
+ Moved state transition checks to QueryManager


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d9452d97
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d9452d97
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d9452d97

Branch: refs/heads/master
Commit: d9452d973e1bfd7f0aa2e6c28b3e8c72817628a3
Parents: 1de6aed
Author: Sudheesh Katkam <skatkam@maprtech.com>
Authored: Mon Jun 1 09:42:33 2015 -0700
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Fri Jun 5 07:47:09 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/work/foreman/FragmentData.java   | 28 +++++---------------
 .../drill/exec/work/foreman/QueryManager.java   | 25 ++++++++++++++---
 2 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d9452d97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index ceb77f0..9e07210 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -23,10 +23,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 public class FragmentData {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);
 
   private final boolean isLocal;
   private volatile FragmentStatus status;
@@ -49,31 +48,16 @@ public class FragmentData {
   }
 
   /**
-   * Update the status for this fragment.  Also records last update and last progress time.
-   * @param status Updated status
-   * @return Whether or not the status update resulted in a FragmentState change.
+   * Update the status for this fragment. Also records last update and last progress time.
+   * @param newStatus Updated status
    */
-  public boolean setStatus(final FragmentStatus newStatus) {
+  public void setStatus(final FragmentStatus newStatus) {
     final long time = System.currentTimeMillis();
-    final FragmentState oldState = status.getProfile().getState();
-    final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED
|| oldState == FragmentState.CANCELLED;
-    final FragmentState currentState = newStatus.getProfile().getState();
-    final boolean stateChanged = currentState != oldState;
-
-    if (inTerminalState) {
-      // already in a terminal state. This shouldn't happen.
-      logger.warn(String.format("Received status message for fragment %s after fragment was
in state %s. New state was %s",
-          QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState));
-      return false;
-    }
-
-    this.lastStatusUpdate = time;
+    lastStatusUpdate = time;
     if (madeProgress(status, newStatus)) {
-      this.lastProgress = time;
+      lastProgress = time;
     }
     status = newStatus;
-
-    return stateChanged;
   }
 
   public FragmentState getState() {

http://git-wip-us.apache.org/repos/asf/drill/blob/d9452d97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 71b77c6..a7dfe85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
@@ -50,10 +51,7 @@ import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
-import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
-import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 import org.apache.drill.exec.work.fragment.StatusReporter;
 
@@ -126,12 +124,31 @@ public class QueryManager {
     }
   }
 
+  private static boolean isTerminal(final FragmentState state) {
+    return state == FragmentState.FAILED
+        || state == FragmentState.FINISHED
+        || state == FragmentState.CANCELLED;
+  }
+
   private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
     final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
     final int majorFragmentId = fragmentHandle.getMajorFragmentId();
     final int minorFragmentId = fragmentHandle.getMinorFragmentId();
     final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);
-    return data.setStatus(fragmentStatus);
+
+    final FragmentState oldState = data.getState();
+    final boolean inTerminalState = isTerminal(oldState);
+    final FragmentState currentState = fragmentStatus.getProfile().getState();
+
+    if (inTerminalState || (oldState == FragmentState.CANCELLATION_REQUESTED && !isTerminal(currentState)))
{
+      // Already in a terminal state, or invalid state transition from CANCELLATION_REQUESTED.
This shouldn't happen.
+      logger.warn(String.format("Received status message for fragment %s after fragment was
in state %s. New state was %s",
+        QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState));
+      return false;
+    }
+
+    data.setStatus(fragmentStatus);
+    return oldState != currentState;
   }
 
   private void fragmentDone(final FragmentStatus status) {


Mime
View raw message