ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject [1/2] git commit: AMBARI-4758. Failure tolerance parameter doesn't seem to work for Rolling Restarts. (swagle)
Date Thu, 20 Feb 2014 05:42:09 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk c3751de81 -> 9199f1c60


AMBARI-4758. Failure tolerance parameter doesn't seem to work for Rolling Restarts. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b7856cbc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b7856cbc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b7856cbc

Branch: refs/heads/trunk
Commit: b7856cbcf7544dad98881870369fa3de393d76b3
Parents: c3751de
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Wed Feb 19 21:00:05 2014 -0800
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Wed Feb 19 21:00:38 2014 -0800

----------------------------------------------------------------------
 .../scheduler/AbstractLinearExecutionJob.java   | 10 +++-
 .../scheduler/ExecutionScheduleManager.java     |  5 +-
 .../server/state/scheduler/BatchRequestJob.java | 14 +++--
 .../state/scheduler/BatchRequestJobTest.java    | 63 ++++++++++++++++++--
 4 files changed, 77 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b7856cbc/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index d4602a2..2847891 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.scheduler;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.state.scheduler.BatchRequestJob;
 import org.quartz.DateBuilder;
 import org.quartz.DisallowConcurrentExecution;
 import org.quartz.JobDataMap;
@@ -42,8 +43,6 @@ import static org.quartz.TriggerBuilder.newTrigger;
  * template method "doWork()" (where the extending Job class's real work goes)
  * and then it schedules the follow-up job.
  */
-@PersistJobDataAfterExecution
-@DisallowConcurrentExecution
 public abstract class AbstractLinearExecutionJob implements ExecutionJob {
   private static Logger LOG = LoggerFactory.getLogger(AbstractLinearExecutionJob.class);
   protected ExecutionScheduleManager executionScheduleManager;
@@ -129,13 +128,20 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob
{
     }
 
     int separationSeconds = jobDataMap.getIntValue(NEXT_EXECUTION_SEPARATION_SECONDS);
+    Object failedCount = properties.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY);
+    Object totalCount = properties.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY);
 
     // Create trigger for next job execution
+    // Persist counts with trigger, so that they apply to current batch only
     Trigger trigger = newTrigger()
       .forJob(nextJobName, nextJobGroup)
       .withIdentity("TriggerForJob-" + nextJobName, LINEAR_EXECUTION_TRIGGER_GROUP)
       .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
       .startAt(futureDate(separationSeconds, DateBuilder.IntervalUnit.SECOND))
+      .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY,
+        failedCount != null ? (Integer) failedCount : 0)
+      .usingJobData(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY,
+        totalCount != null ? (Integer) totalCount : 0)
       .build();
 
     executionScheduleManager.scheduleJob(trigger);

http://git-wip-us.apache.org/repos/asf/ambari/blob/b7856cbc/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index ef8dc9d..83deb06 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -43,6 +43,7 @@ import org.apache.ambari.server.state.scheduler.Schedule;
 import org.apache.ambari.server.utils.DateUtils;
 import org.apache.commons.lang.text.StrBuilder;
 import org.quartz.CronExpression;
+import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobKey;
@@ -300,8 +301,7 @@ public class ExecutionScheduleManager {
       List<BatchRequest> batchRequests = batch.getBatchRequests();
       if (batchRequests != null) {
         Collections.sort(batchRequests);
-        ListIterator<BatchRequest> iterator = batchRequests.listIterator
-          (batchRequests.size());
+        ListIterator<BatchRequest> iterator = batchRequests.listIterator(batchRequests.size());
         String nextJobName = null;
         while (iterator.hasPrevious()) {
           BatchRequest batchRequest = iterator.previous();
@@ -726,3 +726,4 @@ public class ExecutionScheduleManager {
     }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b7856cbc/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index 0718dcc..bd52d70 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -23,12 +23,15 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
 import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.PersistJobDataAfterExecution;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.HashMap;
 import java.util.Map;
 
+@PersistJobDataAfterExecution
+@DisallowConcurrentExecution
 public class BatchRequestJob extends AbstractLinearExecutionJob {
   private static final Logger LOG = LoggerFactory.getLogger(BatchRequestJob.class);
 
@@ -106,7 +109,7 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
             + ", execution_id = " + executionId
             + ", processed batch_id = " + batchId
             + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
-            + ", total tasks = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
+            + ", total tasks completed = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
       }
     }
   }
@@ -134,7 +137,7 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
 
   private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties,
                                         Map<String, Integer> oldCounts,
-                                        BatchRequestResponse batchRequestResponse) {
+                                        BatchRequestResponse batchRequestResponse) throws
AmbariException {
 
     Map<String, Integer> taskCounts = new HashMap<String, Integer>();
 
@@ -147,10 +150,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
       Integer totalCount = oldCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY) +
         batchRequestResponse.getTotalTaskCount();
 
-      properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
       taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
-      properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
       taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
+
+      properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+      properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
     }
 
     return taskCounts;

http://git-wip-us.apache.org/repos/asf/ambari/blob/b7856cbc/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
index 7ed183f..76ad2f8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
@@ -18,20 +18,29 @@
 
 package org.apache.ambari.server.state.scheduler;
 
-import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
-import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
 import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 import org.easymock.Capture;
 import org.junit.Assert;
 import org.junit.Test;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
-
-import java.lang.reflect.Method;
+import org.quartz.JobKey;
+import org.quartz.Trigger;
 import java.util.HashMap;
 import java.util.Map;
-
-import static org.easymock.EasyMock.*;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.captureLong;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 
 public class BatchRequestJobTest {
 
@@ -93,4 +102,46 @@ public class BatchRequestJobTest {
     Assert.assertEquals(batchId, batchIdCapture.getValue().longValue());
     Assert.assertEquals(clusterName, clusterNameCapture.getValue());
   }
+
+  @Test
+  public void testTaskCountsPersistedWithTrigger() throws Exception {
+    ExecutionScheduleManager scheduleManagerMock = createNiceMock
+      (ExecutionScheduleManager.class);
+    BatchRequestJob batchRequestJobMock = createMockBuilder
+      (BatchRequestJob.class).withConstructor(scheduleManagerMock, 100L)
+      .addMockedMethods("doWork")
+      .createMock();
+    JobExecutionContext executionContext = createNiceMock(JobExecutionContext.class);
+    JobDataMap jobDataMap = createNiceMock(JobDataMap.class);
+    JobDetail jobDetail = createNiceMock(JobDetail.class);
+    Map<String, Object> properties = new HashMap<String, Object>();
+    properties.put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 10);
+    properties.put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 20);
+
+    expect(scheduleManagerMock.continueOnMisfire(executionContext)).andReturn(true);
+    expect(executionContext.getMergedJobDataMap()).andReturn(jobDataMap);
+    expect(executionContext.getJobDetail()).andReturn(jobDetail);
+    expect(jobDetail.getKey()).andReturn(JobKey.jobKey("testJob", "testGroup"));
+    expect(jobDataMap.getWrappedMap()).andReturn(properties);
+    expect(jobDataMap.getString((String) anyObject())).andReturn("testJob").anyTimes();
+
+    Capture<Trigger> triggerCapture = new Capture<Trigger>();
+    scheduleManagerMock.scheduleJob(capture(triggerCapture));
+    expectLastCall().once();
+
+    replay(scheduleManagerMock, executionContext, jobDataMap, jobDetail);
+
+    batchRequestJobMock.execute(executionContext);
+
+    verify(scheduleManagerMock, executionContext, jobDataMap, jobDetail);
+
+    Trigger trigger = triggerCapture.getValue();
+    Assert.assertNotNull(trigger);
+    JobDataMap savedMap = trigger.getJobDataMap();
+    Assert.assertNotNull(savedMap);
+    Assert.assertEquals(10, savedMap.getIntValue
+      (BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY));
+    Assert.assertEquals(20, savedMap.getIntValue
+      (BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY));
+  }
 }


Mime
View raw message