Revert "AMBARI-15671. On Ambari Agent restart currently running commands on that agent should
be immediately aborted. (mpapirkovskyy)"
This reverts commit 67cd9d9ee17f59adcec358c90bad515b2590d7d1.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1fed70ca
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1fed70ca
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1fed70ca
Branch: refs/heads/trunk
Commit: 1fed70cab776015408ffae7897d7ddf307d1276f
Parents: 3f2d5be
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Mon Apr 11 23:35:58 2016 -0400
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Mon Apr 11 23:35:58 2016 -0400
----------------------------------------------------------------------
.../server/actionmanager/ActionScheduler.java | 15 --
.../actionmanager/TestActionScheduler.java | 173 ++-----------------
2 files changed, 10 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1fed70ca/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 79e3a07..95d1763 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -689,15 +689,6 @@ class ActionScheduler implements Runnable {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
status = HostRoleStatus.ABORTED;
- } else if (wasAgentRestartedDuringOperation(hostObj, s, roleStr)) {
- String message = String.format("Detected ambari-agent restart during command execution."
+
- "The command has been aborted." +
- "Execution command details: host: %s, role: %s, actionId: %s", host, roleStr,
s.getActionId());
- LOG.warn(message);
- if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
- processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
- }
- status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout))
{
// Process command timeouts
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId()
+ " timed out");
@@ -882,12 +873,6 @@ class ActionScheduler implements Runnable {
return false;
}
- protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role)
{
- String hostName = (null == host) ? null : host.getHostName();
- long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role);
- return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime();
- }
-
private boolean hasCommandInProgress(Stage stage, String host) {
List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(host);
for (ExecutionCommandWrapper wrapper : commandWrappers) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/1fed70ca/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 0ee0c27..af6fb9b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -27,9 +27,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -904,7 +902,6 @@ public class TestActionScheduler {
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
- //Stage stage = mock(Stage.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
@@ -959,10 +956,8 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class),
anyString());
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();
@@ -1049,12 +1044,9 @@ public class TestActionScheduler {
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
- unitOfWork, null, conf));
-
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class),
anyString());
+ unitOfWork, null, conf);
scheduler.doWork();
@@ -1123,11 +1115,9 @@ public class TestActionScheduler {
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
- unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class),
anyString());
+ unitOfWork, null, conf);
scheduler.doWork();
@@ -2135,145 +2125,6 @@ public class TestActionScheduler {
Assert.assertEquals(cancelCommand.getReason(), reason);
}
- @Test
- public void testCancelRequestsDueAgentRestart() throws Exception {
- final long HOST_REGISTRATION_TIME = 100L;
- final long STAGE_LAST_ATTEMPT_TIME = HOST_REGISTRATION_TIME - 1;
- ActionQueue aq = new ActionQueue();
- Clusters fsm = mock(Clusters.class);
- Cluster oneClusterMock = mock(Cluster.class);
- Service serviceObj = mock(Service.class);
- ServiceComponent scomp = mock(ServiceComponent.class);
- ServiceComponentHost sch = mock(ServiceComponentHost.class);
- UnitOfWork unitOfWork = mock(UnitOfWork.class);
- when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
- when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
- when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
- when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
- when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-
- HostEntity hostEntity = new HostEntity();
- hostEntity.setHostName(hostname);
- hostDAO.create(hostEntity);
-
- HashMap<String, ServiceComponentHost> hosts =
- new HashMap<String, ServiceComponentHost>();
- hosts.put(hostname, sch);
- when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-
- long requestId = 1;
-
- final List<Stage> stages = new ArrayList<Stage>();
- int namenodeCmdTaskId = 1;
- stages.add(
- getStageWithSingleTask(
- hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
- Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
- stages.add(
- getStageWithSingleTask(
- hostname, "cluster1", Role.DATANODE, RoleCommand.START,
- Service.Type.HDFS, 2, 2, (int)requestId));
-
- Host host = mock(Host.class);
- when(fsm.getHost(anyString())).thenReturn(host);
- when(host.getState()).thenReturn(HostState.HEALTHY);
- when(host.getHostName()).thenReturn(hostname);
- when(host.getLastRegistrationTime()).thenReturn(HOST_REGISTRATION_TIME);
-
- stages.get(0).setLastAttemptTime(host.getHostName(), Role.NAMENODE.toString(), STAGE_LAST_ATTEMPT_TIME);
- stages.get(1).setLastAttemptTime(host.getHostName(), Role.DATANODE.toString(), STAGE_LAST_ATTEMPT_TIME);
-
- ActionDBAccessor db = mock(ActionDBAccessor.class);
-
- RequestEntity request = mock(RequestEntity.class);
- when(request.isExclusive()).thenReturn(false);
- when(db.getRequestEntity(anyLong())).thenReturn(request);
-
- when(db.getCommandsInProgressCount()).thenReturn(stages.size());
- when(db.getStagesInProgress()).thenReturn(stages);
-
- List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
- for (Stage stage : stages) {
- requestTasks.addAll(stage.getOrderedHostRoleCommands());
- }
- when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
- when(db.getAllStages(anyLong())).thenReturn(stages);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
- for (CommandReport report : reports) {
- String actionId = report.getActionId();
- long[] requestStageIds = StageUtils.getRequestStage(actionId);
- Long requestId = requestStageIds[0];
- Long stageId = requestStageIds[1];
- Long id = report.getTaskId();
- for (Stage stage : stages) {
- if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId()))
{
- for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands())
{
- if (hostRoleCommand.getTaskId() == id) {
- hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
- }
- }
- }
- }
-
- }
-
- return null;
- }
- }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
-
- when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Long taskId = (Long) invocation.getArguments()[0];
- for (Stage stage : stages) {
- for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
- if (taskId.equals(command.getTaskId())) {
- return command;
- }
- }
- }
- return null;
- }
- });
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Long requestId = (Long) invocation.getArguments()[0];
- for (Stage stage : stages) {
- if (requestId.equals(stage.getRequestId())) {
- for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
- if (command.getStatus() == HostRoleStatus.QUEUED ||
- command.getStatus() == HostRoleStatus.IN_PROGRESS ||
- command.getStatus() == HostRoleStatus.PENDING) {
- command.setStatus(HostRoleStatus.ABORTED);
- }
- }
- }
- }
-
- return null;
- }
- }).when(db).abortOperation(anyLong());
-
- Properties properties = new Properties();
- Configuration conf = new Configuration(properties);
-
- ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf);
-
- scheduler.doWork();
-
- String reason = "Some reason";
-
- scheduler.scheduleCancellingRequest(requestId, reason);
-
- scheduler.doWork();
- Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname,
"NAMENODE"));
- Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname,
"DATANODE"));
- }
@Test
public void testExclusiveRequests() throws Exception {
@@ -2415,10 +2266,8 @@ public class TestActionScheduler {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class),
anyString());
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
// Execution of request 1
@@ -2615,10 +2464,8 @@ public class TestActionScheduler {
}
}).when(db).abortOperation(anyLong());
- ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
- new HostsMap((String) null), unitOfWork, null, conf));
-
- doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class),
anyString());
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+ new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();
|