ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject [2/2] ambari git commit: AMBARI-17559. NN HA enabling failed at 'Stop all Services' step (aonishuk)
Date Tue, 05 Jul 2016 16:55:01 GMT
AMBARI-17559. NN HA enabling failed at 'Stop all Services' step (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a7d5038a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a7d5038a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a7d5038a

Branch: refs/heads/branch-2.4
Commit: a7d5038ad4ecee5d1d3dfe908fc9ef978c236bc9
Parents: 5a78997
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Tue Jul 5 19:54:18 2016 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Tue Jul 5 19:54:18 2016 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  5 +-
 .../server/actionmanager/ActionManager.java     | 11 +++--
 .../ambari/server/agent/HeartbeatProcessor.java | 31 +++++++------
 .../ambari/server/utils/CommandUtils.java       | 48 ++++++++++++++++++++
 .../actionmanager/TestActionDBAccessorImpl.java |  6 ++-
 .../server/actionmanager/TestActionManager.java |  7 +--
 .../actionmanager/TestActionScheduler.java      |  9 ++--
 .../server/agent/HeartbeatProcessorTest.java    |  3 +-
 8 files changed, 90 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 3832b81..1e7b1b6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -318,11 +318,10 @@ class ActionQueue(threading.Thread):
         if commandresult['exitcode'] == 0:
           status = self.COMPLETED_STATUS
         else:
+          status = self.FAILED_STATUS
           if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode']
== -signal.SIGKILL):
             logger.info('Command {cid} was canceled!'.format(cid=taskId))
-            return
-          else:
-            status = self.FAILED_STATUS
+            break
 
       if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
         delay = self.get_retry_delay(delay)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 2b121dc..582b4de 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.topology.TopologyManager;
+import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,7 +139,7 @@ public class ActionManager {
    * twice
    */
   public void processTaskResponse(String hostname, List<CommandReport> reports,
-                                  Collection<HostRoleCommand> commands) {
+                                  Map<Long, HostRoleCommand> commands) {
     if (reports == null) {
       return;
     }
@@ -149,10 +151,9 @@ public class ActionManager {
       }
     });
     List<CommandReport> reportsToProcess = new ArrayList<CommandReport>();
-    Iterator<HostRoleCommand> commandIterator = commands.iterator();
     //persist the action response into the db.
     for (CommandReport report : reports) {
-      HostRoleCommand command = commandIterator.next();
+      HostRoleCommand command = commands.get(report.getTaskId());
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing command report : " + report.toString());
       }
@@ -212,6 +213,10 @@ public class ActionManager {
     return db.getTasks(taskIds);
   }
 
+  public Map<Long, HostRoleCommand> getTasksMap(Collection<Long> taskIds) {
+    return CommandUtils.convertToTaskIdCommandMap(getTasks(taskIds));
+  }
+
   /**
    * Get first or last maxResults requests that are in the specified status
    *

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
index c6036c2..2448c99 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -363,9 +363,8 @@ public class HeartbeatProcessor extends AbstractService{
     for (CommandReport report : reports) {
       taskIds.add(report.getTaskId());
     }
-    Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
+    Map<Long, HostRoleCommand> commands = actionManager.getTasksMap(taskIds);
 
-    Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
     for (CommandReport report : reports) {
 
       Long clusterId = null;
@@ -378,8 +377,6 @@ public class HeartbeatProcessor extends AbstractService{
       }
 
       LOG.debug("Received command report: " + report);
-      // Fetch HostRoleCommand that corresponds to a given task ID
-      HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
       Host host = clusterFsm.getHost(hostname);
 //      HostEntity hostEntity = hostDAO.findByName(hostname); //don't touch database
       if (host == null) {
@@ -395,17 +392,23 @@ public class HeartbeatProcessor extends AbstractService{
         ambariEventPublisher.publish(event);
       }
 
-      // Skip sending events for command reports for ABORTed commands
-      if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
-        continue;
-      }
-      if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
-          report.getStatus().equals("IN_PROGRESS")) {
-        hostRoleCommand.setStartTime(now);
+      // Fetch HostRoleCommand that corresponds to a given task ID
+      HostRoleCommand hostRoleCommand = commands.get(report.getTaskId());
+      if (hostRoleCommand == null) {
+        LOG.warn("Can't fetch HostRoleCommand with taskId = " + report.getTaskId());
+      } else {
+        // Skip sending events for command reports for ABORTed commands
+        if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
+          continue;
+        }
+        if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
+            report.getStatus().equals("IN_PROGRESS")) {
+          hostRoleCommand.setStartTime(now);
 
-        // Because the task may be retried several times, set the original start time only
once.
-        if (hostRoleCommand.getOriginalStartTime() == -1) {
-          hostRoleCommand.setOriginalStartTime(now);
+          // Because the task may be retried several times, set the original start time only
once.
+          if (hostRoleCommand.getOriginalStartTime() == -1) {
+            hostRoleCommand.setOriginalStartTime(now);
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/main/java/org/apache/ambari/server/utils/CommandUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/CommandUtils.java
b/ambari-server/src/main/java/org/apache/ambari/server/utils/CommandUtils.java
new file mode 100644
index 0000000..5a8d938
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/CommandUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommandUtils {
+  private static Logger LOG = LoggerFactory.getLogger(CommandUtils.class);
+
+  /**
+   * Converts a collection of commands to {@code}Map{@code} from command.taskId to command.
+   */
+  public static Map<Long, HostRoleCommand> convertToTaskIdCommandMap(Collection<HostRoleCommand>
commands) {
+    if (commands == null || commands.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<Long, HostRoleCommand> result = new HashMap<>();
+    for (HostRoleCommand command : commands) {
+      result.put(command.getTaskId(), command);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index 50021c2..2292293 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -50,6 +50,7 @@ import org.apache.ambari.server.serveraction.MockServerAction;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
+import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -153,7 +154,8 @@ public class TestActionDBAccessorImpl {
     cr.setStdOut("");
     cr.setExitCode(215);
     reports.add(cr);
-    am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
     assertEquals(215,
         am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
     assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
@@ -180,7 +182,7 @@ public class TestActionDBAccessorImpl {
     cr.setStdOut("");
     cr.setExitCode(0);
     reports.add(cr);
-    am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
     assertEquals(0,
             am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
     assertEquals("HostRoleStatus should remain ABORTED " +

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index f85b95d..3634794 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -44,6 +44,7 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
+import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -112,7 +113,7 @@ public class TestActionManager {
     cr.setStructuredOut("STRUCTURED_OUTPUT");
     cr.setExitCode(215);
     reports.add(cr);
-    am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
     assertEquals(215,
         am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
     assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
@@ -165,7 +166,7 @@ public class TestActionManager {
     cr2.setStructuredOut("STRUCTURED_OUTPUT");
     cr2.setExitCode(215);
     reports.add(cr2);
-    am.processTaskResponse(hostname, reports, am.getTasks(Arrays.asList(new Long[]{1L, 2L})));
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(am.getTasks(Arrays.asList(new
Long[]{1L, 2L}))));
     assertEquals(HostRoleStatus.IN_PROGRESS, am.getAction(requestId, stageId)
         .getHostRoleStatus(hostname, "HBASE_MASTER"));
     assertEquals(HostRoleStatus.PENDING, am.getAction(requestId, stageId)
@@ -196,7 +197,7 @@ public class TestActionManager {
     cr.setStructuredOut(outLog);
     cr.setExitCode(215);
     reports.add(cr);
-    am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
     assertEquals(215,
         am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
     assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index e0f67af..0b24765 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -89,6 +89,7 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEve
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -1410,7 +1411,7 @@ public class TestActionScheduler {
 
     List<CommandReport> reports = new ArrayList<CommandReport>();
     reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS,
"1-1", 1));
-    am.processTaskResponse(hostname, reports, stages.get(0).getOrderedHostRoleCommands());
+    am.processTaskResponse(hostname, reports, CommandUtils.convertToTaskIdCommandMap(stages.get(0).getOrderedHostRoleCommands()));
 
     scheduler.doWork();
     Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname,
"NAMENODE"));
@@ -1763,15 +1764,15 @@ public class TestActionScheduler {
 
     List<CommandReport> reports = new ArrayList<CommandReport>();
     reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS,
"1-1", 1));
-    am.processTaskResponse("host1", reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse("host1", reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
 
     reports.clear();
     reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS,
"1-1", 2));
-    am.processTaskResponse("host2", reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse("host2", reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
 
     reports.clear();
     reports.add(getCommandReport(HostRoleStatus.COMPLETED, Role.DATANODE, Service.Type.HDFS,
"1-1", 3));
-    am.processTaskResponse("host3", reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse("host3", reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
 
     scheduler.doWork();
     Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus("host1",
"HDFS_CLIENT"));

http://git-wip-us.apache.org/repos/asf/ambari/blob/a7d5038a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
index 913c4ea..261eb37 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
@@ -61,6 +61,7 @@ import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.EasyMock;
@@ -563,7 +564,7 @@ public class HeartbeatProcessorTest {
 
 
     reports.add(cr);
-    am.processTaskResponse(DummyHostname1, reports, stage.getOrderedHostRoleCommands());
+    am.processTaskResponse(DummyHostname1, reports, CommandUtils.convertToTaskIdCommandMap(stage.getOrderedHostRoleCommands()));
     assertEquals(215,
         am.getAction(requestId, stageId).getExitCode(DummyHostname1, HBASE_MASTER));
     assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)


Mime
View raw message