hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [04/50] [abbrv] hadoop git commit: YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg.
Date Tue, 09 Jan 2018 19:55:22 GMT
YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed
by Wilfred Spiegelenburg.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f515f57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f515f57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f515f57

Branch: refs/heads/YARN-6592
Commit: 7f515f57ede74dae787994f37bfafd5d20c9aa4c
Parents: 8044023
Author: Miklos Szegedi <szegedim@apache.org>
Authored: Tue Jan 2 18:03:04 2018 -0800
Committer: Miklos Szegedi <szegedim@apache.org>
Committed: Tue Jan 2 18:03:04 2018 -0800

----------------------------------------------------------------------
 .../yarn/server/nodemanager/NodeManager.java    |  1 +
 .../recovery/NMLeveldbStateStoreService.java    | 72 ++++++++++++++++++++
 .../recovery/NMStateStoreService.java           | 11 +++
 .../TestNMLeveldbStateStoreService.java         | 35 ++++++++++
 4 files changed, 119 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 179b01e..6cb8560 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -451,6 +451,7 @@ public class NodeManager extends CompositeService
     // so that we make sure everything is up before registering with RM. 
     addService(nodeStatusUpdater);
     ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
+    nmStore.setNodeStatusUpdater(nodeStatusUpdater);
 
     super.serviceInit(conf);
     // TODO add local dirs to del

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 3455874..0f659d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import org.apache.hadoop.yarn.server.records.Version;
@@ -155,6 +156,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   private DB db;
   private boolean isNewlyCreated;
+  private boolean isHealthy;
   private Timer compactionTimer;
 
   /**
@@ -169,6 +171,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   @Override
   protected void startStorage() throws IOException {
+    // Assume that we're healthy when we start
+    isHealthy = true;
   }
 
   @Override
@@ -187,6 +191,36 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     return isNewlyCreated;
   }
 
+  /**
+   * If the state store throws an error after recovery has been performed
+   * then we can not trust it any more to reflect the NM state. We need to
+   * mark the store and node unhealthy.
+   * Errors during the recovery will cause a service failure and thus a NM
+   * start failure. Do not need to mark the store unhealthy for those.
+   * @param dbErr Exception
+   */
+  private void markStoreUnHealthy(DBException dbErr) {
+    // Always log the error here, we might not see the error in the caller
+    LOG.error("Statestore exception: ", dbErr);
+    // We have already been marked unhealthy so no need to do it again.
+    if (!isHealthy) {
+      return;
+    }
+    // Mark unhealthy, an out of band heartbeat will be sent and the state
+    // will remain unhealthy (not recoverable).
+    // No need to close the store: does not make any difference at this point.
+    isHealthy = false;
+    // We could get here before the nodeStatusUpdater is set
+    NodeStatusUpdater nsu = getNodeStatusUpdater();
+    if (nsu != null) {
+      nsu.reportException(dbErr);
+    }
+  }
+
+  @VisibleForTesting
+  boolean isHealthy() {
+    return isHealthy;
+  }
 
   @Override
   public List<RecoveredContainerState> loadContainersState()
@@ -354,6 +388,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -378,6 +413,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -393,6 +429,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -408,6 +445,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -424,6 +462,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -441,6 +480,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(diagnostics.toString()));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -459,6 +499,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -488,6 +529,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -504,6 +546,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), EMPTY_VALUE);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -520,6 +563,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(Integer.toString(exitCode)));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -532,6 +576,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -544,6 +589,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(workDir));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -556,6 +602,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), bytes(logDir));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -589,6 +636,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -638,6 +686,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), p.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -659,6 +708,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -815,6 +865,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), proto.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -838,6 +889,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -861,6 +913,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -926,6 +979,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.put(bytes(key), taskProto.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -936,6 +990,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1009,6 +1064,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1023,6 +1079,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.put(bytes(dbKey), pb.getProto().toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1096,6 +1153,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.put(bytes(key), bytes(expTime.toString()));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1107,6 +1165,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1157,6 +1216,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.put(bytes(key), proto.toByteArray());
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1167,6 +1227,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1198,6 +1259,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
         batch.close();
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
 
@@ -1361,6 +1423,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
       try {
         db.delete(bytes(dbkey));
       } catch (DBException e) {
+        markStoreUnHealthy(e);
         throw new IOException(e);
       }
       return;
@@ -1375,6 +1438,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.put(bytes(fullkey), data);
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1386,6 +1450,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     try {
       db.delete(bytes(fullkey));
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1409,6 +1474,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
         candidates.add(key);
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     } finally {
       if (iter != null) {
@@ -1422,6 +1488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
         db.delete(bytes(key));
       }
     } catch (DBException e) {
+      markStoreUnHealthy(e);
       throw new IOException(e);
     }
   }
@@ -1555,6 +1622,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService
{
     return db;
   }
 
+  @VisibleForTesting
+  void setDB(DB testDb) {
+    this.db = testDb;
+  }
+
   /**
    * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
    * 2) Any incompatible change of state-store is a major upgrade, and any

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 598ea9e..f9b86bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 
@@ -51,10 +52,20 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Reso
 @Unstable
 public abstract class NMStateStoreService extends AbstractService {
 
+  private NodeStatusUpdater nodeStatusUpdater = null;
+
   public NMStateStoreService(String name) {
     super(name);
   }
 
+  protected NodeStatusUpdater getNodeStatusUpdater() {
+    return nodeStatusUpdater;
+  }
+
+  public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
+    this.nodeStatusUpdater = nodeStatusUpdater;
+  }
+
   public static class RecoveredApplicationsState {
     List<ContainerManagerApplicationProto> applications;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f515f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 3cac5b4..de667d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestNMLeveldbStateStoreService {
   private static final File TMP_DIR = new File(
@@ -1165,6 +1168,38 @@ public class TestNMLeveldbStateStoreService {
         resourceMappings.getAssignedResources("numa").equals(numaRes));
   }
 
+  @Test
+  public void testStateStoreNodeHealth() throws IOException {
+    // keep the working DB clean, break a temp DB
+    DB keepDB = stateStore.getDB();
+    DB myMocked = mock(DB.class);
+    stateStore.setDB(myMocked);
+
+    ApplicationId appId = ApplicationId.newInstance(1234, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    DBException toThrow = new DBException();
+    Mockito.doThrow(toThrow).when(myMocked).
+        put(any(byte[].class), any(byte[].class));
+    // write some data
+    try {
+      // chosen a simple method could be any of the "void" methods
+      ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+      stateStore.storeContainerKilled(containerId);
+    } catch (IOException ioErr) {
+      // Cause should be wrapped DBException
+      assertTrue(ioErr.getCause() instanceof DBException);
+      // check the store is marked unhealthy
+      assertFalse("Statestore should have been unhealthy",
+          stateStore.isHealthy());
+      return;
+    } finally {
+      // restore the working DB
+      stateStore.setDB(keepDB);
+    }
+    Assert.fail("Expected exception not thrown");
+  }
+
   private StartContainerRequest storeMockContainer(ContainerId containerId)
       throws IOException {
     // create a container request


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message