hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rohithsharm...@apache.org
Subject hadoop git commit: YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
Date Wed, 07 Oct 2015 04:06:23 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 1bca1bb51 -> 9156fc60c


YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called by stateMachine.doTransition.
(Zhihai Xu via rohithsharmaks)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9156fc60
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9156fc60
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9156fc60

Branch: refs/heads/trunk
Commit: 9156fc60c654e9305411686878acb443f3be1e67
Parents: 1bca1bb
Author: Rohith Sharma K S <rohithsharmaks@apache.org>
Authored: Wed Oct 7 09:34:59 2015 +0530
Committer: Rohith Sharma K S <rohithsharmaks@apache.org>
Committed: Wed Oct 7 09:34:59 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/recovery/RMStateStore.java  | 261 +++++++++++++------
 .../recovery/TestMemoryRMStateStore.java        |  65 +++++
 3 files changed, 243 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9156fc60/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9d12b82..de81879 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1020,6 +1020,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
     ConcurrentModificationException (Zhihai Xu via jlowe)
 
+    YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called 
+    by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9156fc60/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index aa5caf9..a2cf517 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
-import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
@@ -100,7 +100,10 @@ public abstract class RMStateStore extends AbstractService {
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
-  private enum RMStateStoreState {
+  /**
+   * The enum defines state of RMStateStore.
+   */
+  public enum RMStateStoreState {
     ACTIVE,
     FENCED
   };
@@ -114,41 +117,57 @@ public abstract class RMStateStore extends AbstractService {
                                                     RMStateStoreEventType,
                                                     RMStateStoreEvent>(
       RMStateStoreState.ACTIVE)
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.STORE_APP, new StoreAppTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
-              RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
+          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
-          RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
-          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
+          RMStateStoreEventType.STORE_APP_ATTEMPT,
+          new StoreAppAttemptTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
+          RMStateStoreEventType.UPDATE_APP_ATTEMPT,
+          new UpdateAppAttemptTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.STORE_MASTERKEY,
-              new StoreRMDTMasterKeyTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          new StoreRMDTMasterKeyTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.REMOVE_MASTERKEY,
-              new RemoveRMDTMasterKeyTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          new RemoveRMDTMasterKeyTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.STORE_DELEGATION_TOKEN,
-              new StoreRMDTTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          new StoreRMDTTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
-              new RemoveRMDTTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          new RemoveRMDTTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
-              new UpdateRMDTTransition())
-       .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
-           RMStateStoreEventType.UPDATE_AMRM_TOKEN,
-           new StoreOrUpdateAMRMTokenTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          new UpdateRMDTTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
+          RMStateStoreEventType.UPDATE_AMRM_TOKEN,
+          new StoreOrUpdateAMRMTokenTransition())
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.STORE_RESERVATION,
           new StoreReservationAllocationTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.UPDATE_RESERVATION,
           new UpdateReservationAllocationTransition())
-      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+      .addTransition(RMStateStoreState.ACTIVE,
+          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
           RMStateStoreEventType.REMOVE_RESERVATION,
           new RemoveReservationAllocationTransition())
       .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
@@ -176,14 +195,17 @@ public abstract class RMStateStore extends AbstractService {
                              RMStateStoreEvent> stateMachine;
 
   private static class StoreAppTransition
-      implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreAppEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       ApplicationStateData appState =
           ((RMStateStoreAppEvent) event).getAppState();
       ApplicationId appId =
@@ -195,20 +217,24 @@ public abstract class RMStateStore extends AbstractService {
                RMAppEventType.APP_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing app: " + appId, e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     };
   }
 
   private static class UpdateAppTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateUpdateAppEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       ApplicationStateData appState =
           ((RMStateUpdateAppEvent) event).getAppState();
       ApplicationId appId =
@@ -222,20 +248,24 @@ public abstract class RMStateStore extends AbstractService {
         }
       } catch (Exception e) {
         LOG.error("Error updating app: " + appId, e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     };
   }
 
   private static class RemoveAppTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreRemoveAppEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       ApplicationStateData appState =
           ((RMStateStoreRemoveAppEvent) event).getAppState();
       ApplicationId appId =
@@ -245,20 +275,24 @@ public abstract class RMStateStore extends AbstractService {
         store.removeApplicationStateInternal(appState);
       } catch (Exception e) {
         LOG.error("Error removing app: " + appId, e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     };
   }
 
   private static class StoreAppAttemptTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreAppAttemptEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       ApplicationAttemptStateData attemptState =
           ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
       try {
@@ -272,20 +306,24 @@ public abstract class RMStateStore extends AbstractService {
                RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     };
   }
 
   private static class UpdateAppAttemptTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       ApplicationAttemptStateData attemptState =
           ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
       try {
@@ -299,20 +337,24 @@ public abstract class RMStateStore extends AbstractService {
                RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     };
   }
 
   private static class StoreRMDTTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreRMDTEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       try {
         LOG.info("Storing RMDelegationToken and SequenceNumber");
@@ -321,20 +363,24 @@ public abstract class RMStateStore extends AbstractService {
       } catch (Exception e) {
         LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
             e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class RemoveRMDTTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreRMDTEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       try {
         LOG.info("Removing RMDelegationToken and SequenceNumber");
@@ -342,21 +388,24 @@ public abstract class RMStateStore extends AbstractService {
       } catch (Exception e) {
         LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
             e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class UpdateRMDTTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreRMDTEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
-
+      boolean isFenced = false;
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       try {
         LOG.info("Updating RMDelegationToken and SequenceNumber");
@@ -365,20 +414,24 @@ public abstract class RMStateStore extends AbstractService {
       } catch (Exception e) {
         LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
             e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class StoreRMDTMasterKeyTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreRMDTMasterKeyEvent dtEvent =
           (RMStateStoreRMDTMasterKeyEvent) event;
       try {
@@ -386,20 +439,24 @@ public abstract class RMStateStore extends AbstractService {
         store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
       } catch (Exception e) {
         LOG.error("Error While Storing RMDTMasterKey.", e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class RemoveRMDTMasterKeyTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreRMDTMasterKeyEvent dtEvent =
           (RMStateStoreRMDTMasterKeyEvent) event;
       try {
@@ -407,42 +464,49 @@ public abstract class RMStateStore extends AbstractService {
         store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
       } catch (Exception e) {
         LOG.error("Error While Removing RMDTMasterKey.", e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class StoreOrUpdateAMRMTokenTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
       RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
-
+      boolean isFenced = false;
       try {
         LOG.info("Updating AMRMToken");
         store.storeOrUpdateAMRMTokenSecretManagerState(
             amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
       } catch (Exception e) {
         LOG.error("Error storing info for AMRMTokenSecretManager", e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class StoreReservationAllocationTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreStoreReservationEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreStoreReservationEvent reservationEvent =
           (RMStateStoreStoreReservationEvent) event;
       try {
@@ -454,20 +518,24 @@ public abstract class RMStateStore extends AbstractService {
             reservationEvent.getReservationIdName());
       } catch (Exception e) {
         LOG.error("Error while storing reservation allocation.", e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class UpdateReservationAllocationTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreStoreReservationEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreStoreReservationEvent reservationEvent =
           (RMStateStoreStoreReservationEvent) event;
       try {
@@ -479,20 +547,24 @@ public abstract class RMStateStore extends AbstractService {
             reservationEvent.getReservationIdName());
       } catch (Exception e) {
         LOG.error("Error while updating reservation allocation.", e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
   private static class RemoveReservationAllocationTransition implements
-      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+          RMStateStoreState> {
     @Override
-    public void transition(RMStateStore store, RMStateStoreEvent event) {
+    public RMStateStoreState transition(RMStateStore store,
+        RMStateStoreEvent event) {
       if (!(event instanceof RMStateStoreStoreReservationEvent)) {
         // should never happen
         LOG.error("Illegal event type: " + event.getClass());
-        return;
+        return RMStateStoreState.ACTIVE;
       }
+      boolean isFenced = false;
       RMStateStoreStoreReservationEvent reservationEvent =
           (RMStateStoreStoreReservationEvent) event;
       try {
@@ -503,11 +575,16 @@ public abstract class RMStateStore extends AbstractService {
             reservationEvent.getReservationIdName());
       } catch (Exception e) {
         LOG.error("Error while removing reservation allocation.", e);
-        store.notifyStoreOperationFailed(e);
+        isFenced = store.notifyStoreOperationFailedInternal(e);
       }
+      return finalState(isFenced);
     }
   }
 
+  private static RMStateStoreState finalState(boolean isFenced) {
+    return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
+  }
+
   public RMStateStore() {
     super(RMStateStore.class.getName());
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -1006,17 +1083,28 @@ public abstract class RMStateStore extends AbstractService {
     }
   }
 
-  @SuppressWarnings("unchecked")
   /**
    * This method is called to notify the ResourceManager that the store
    * operation has failed.
    * @param failureCause the exception due to which the operation failed
    */
   protected void notifyStoreOperationFailed(Exception failureCause) {
+    if (isFencedState()) {
+      return;
+    }
+    if (notifyStoreOperationFailedInternal(failureCause)) {
+      updateFencedState();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean notifyStoreOperationFailedInternal(
+      Exception failureCause) {
+    boolean isFenced = false;
     LOG.error("State store operation failed ", failureCause);
     if (HAUtil.isHAEnabled(getConfig())) {
       LOG.warn("State-store fenced ! Transitioning RM to standby");
-      updateFencedState();
+      isFenced = true;
       Thread standByTransitionThread =
           new Thread(new StandByTransitionThread());
       standByTransitionThread.setName("StandByTransitionThread Handler");
@@ -1029,6 +1117,7 @@ public abstract class RMStateStore extends AbstractService {
     } else {
       LOG.warn("Skip the state-store error.");
     }
+    return isFenced;
   }
  
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9156fc60/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java
new file mode 100644
index 0000000..89b9e2b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestMemoryRMStateStore {
+
+  @Test
+  public void testNotifyStoreOperationFailed() throws Exception {
+    RMStateStore store = new MemoryRMStateStore() {
+      @Override
+      public synchronized void removeRMDelegationTokenState(
+          RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
+        throw new Exception("testNotifyStoreOperationFailed");
+      }
+    };
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    store.init(conf);
+    ResourceManager mockRM = mock(ResourceManager.class);
+    store.setResourceManager(mockRM);
+    RMDelegationTokenIdentifier mockTokenId =
+        mock(RMDelegationTokenIdentifier.class);
+    store.removeRMDelegationToken(mockTokenId);
+    assertTrue("RMStateStore should have been in fenced state",
+        store.isFencedState());
+    store = new MemoryRMStateStore() {
+      @Override
+      public synchronized void removeRMDelegationToken(
+          RMDelegationTokenIdentifier rmDTIdentifier) {
+        notifyStoreOperationFailed(new Exception(
+            "testNotifyStoreOperationFailed"));
+      }
+    };
+    store.init(conf);
+    store.setResourceManager(mockRM);
+    store.removeRMDelegationToken(mockTokenId);
+    assertTrue("RMStateStore should have been in fenced state",
+        store.isFencedState());
+  }
+}


Mime
View raw message