hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject svn commit: r1617377 [1/2] - in /hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/asy...
Date Mon, 11 Aug 2014 22:27:53 GMT
Author: jing9
Date: Mon Aug 11 22:27:50 2014
New Revision: 1617377

URL: http://svn.apache.org/r1617377
Log:
Merging r1616894 through r1617376 from trunk.

Added:
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
      - copied unchanged from r1617376, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
      - copied unchanged from r1617376, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
Removed:
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNewSavedEvent.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateSavedEvent.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptNewSavedEvent.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateSavedEvent.java
Modified:
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt Mon Aug 11 22:27:50 2014
@@ -100,6 +100,19 @@ Release 2.6.0 - UNRELEASED
     YARN-2212. ApplicationMaster needs to find a way to update the AMRMToken
     periodically. (xgong)
 
+    YARN-2026. Fair scheduler: Consider only active queues for computing fairshare. 
+    (Ashwin Shankar via kasha)
+
+    YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
+
+    YARN-2302. Refactor TimelineWebServices. (Zhijie Shen via junping_du)
+
+    YARN-2337. ResourceManager sets ClientRMService in RMContext multiple times.
+    (Zhihai Xu via kasha)
+
+    YARN-2138. Cleaned up notifyDone* APIs in RMStateStore. (Varun Saxena via
+    jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -154,6 +167,11 @@ Release 2.6.0 - UNRELEASED
     YARN-2008. Fixed CapacityScheduler to calculate headroom based on max available
     capacity instead of configured max capacity. (Craig Welch via jianhe)
 
+    YARN-2400. Fixed TestAMRestart fails intermittently. (Jian He via xgong)
+
+    YARN-2361. RMAppAttempt state machine entries for KILLED state has duplicate
+    event entries. (Zhihai Xu via kasha)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Mon Aug 11 22:27:50 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.client.api
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     AbstractService {
+  private static final Log LOG = LogFactory.getLog(AMRMClient.class);
 
   /**
    * Create a new instance of AMRMClient.
@@ -336,4 +340,63 @@ public abstract class AMRMClient<T exten
     return nmTokenCache;
   }
 
+  /**
+   * Wait for <code>check</code> to return true for each 1000 ms.
+   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+   * @param check
+   */
+  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+    waitFor(check, 1000);
+  }
+
+  /**
+   * Wait for <code>check</code> to return true for each
+   * <code>checkEveryMillis</code> ms.
+   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+   * @param check user defined checker
+   * @param checkEveryMillis interval to call <code>check</code>
+   */
+  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+      throws InterruptedException {
+    waitFor(check, checkEveryMillis, 1);
+  }
+
+  /**
+   * Wait for <code>check</code> to return true for each
+   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+   * the message "waiting in main loop" for each <code>logInterval</code> times
+   * iteration to confirm the thread is alive.
+   * @param check user defined checker
+   * @param checkEveryMillis interval to call <code>check</code>
+   * @param logInterval interval to log for each
+   */
+  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+      int logInterval) throws InterruptedException {
+    Preconditions.checkNotNull(check, "check should not be null");
+    Preconditions.checkArgument(checkEveryMillis >= 0,
+        "checkEveryMillis should be positive value");
+    Preconditions.checkArgument(logInterval >= 0,
+        "logInterval should be positive value");
+
+    int loggingCounter = logInterval;
+    do {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Check the condition for main loop.");
+      }
+
+      boolean result = check.get();
+      if (result) {
+        LOG.info("Exits the main loop.");
+        return;
+      }
+      if (--loggingCounter <= 0) {
+        LOG.info("Waiting in main loop.");
+        loggingCounter = logInterval;
+      }
+
+      Thread.sleep(checkEveryMillis);
+    } while (true);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Mon Aug 11 22:27:50 2014
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.yarn.client.api.async;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -90,6 +94,7 @@ import com.google.common.annotations.Vis
 @Stable
 public abstract class AMRMClientAsync<T extends ContainerRequest> 
 extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
   
   protected final AMRMClient<T> client;
   protected final CallbackHandler handler;
@@ -189,6 +194,65 @@ extends AbstractService {
    */
   public abstract int getClusterNodeCount();
 
+  /**
+   * Wait for <code>check</code> to return true for each 1000 ms.
+   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+   * @param check
+   */
+  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+    waitFor(check, 1000);
+  }
+
+  /**
+   * Wait for <code>check</code> to return true for each
+   * <code>checkEveryMillis</code> ms.
+   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+   * @param check user defined checker
+   * @param checkEveryMillis interval to call <code>check</code>
+   */
+  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+      throws InterruptedException {
+    waitFor(check, checkEveryMillis, 1);
+  };
+
+  /**
+   * Wait for <code>check</code> to return true for each
+   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+   * the message "waiting in main loop" for each <code>logInterval</code> times
+   * iteration to confirm the thread is alive.
+   * @param check user defined checker
+   * @param checkEveryMillis interval to call <code>check</code>
+   * @param logInterval interval to log for each
+   */
+  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+      int logInterval) throws InterruptedException {
+    Preconditions.checkNotNull(check, "check should not be null");
+    Preconditions.checkArgument(checkEveryMillis >= 0,
+        "checkEveryMillis should be positive value");
+    Preconditions.checkArgument(logInterval >= 0,
+        "logInterval should be positive value");
+
+    int loggingCounter = logInterval;
+    do {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Check the condition for main loop.");
+      }
+
+      boolean result = check.get();
+      if (result) {
+        LOG.info("Exits the main loop.");
+        return;
+      }
+      if (--loggingCounter <= 0) {
+        LOG.info("Waiting in main loop.");
+        loggingCounter = logInterval;
+      }
+
+      Thread.sleep(checkEveryMillis);
+    } while (true);
+  }
+
   public interface CallbackHandler {
     
     /**

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Mon Aug 11 22:27:50 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.client.api.async.impl;
 
+import com.google.common.base.Supplier;
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
@@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
     AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
     when(client.allocate(anyFloat())).thenThrow(ex);
 
-    AMRMClientAsync<ContainerRequest> asyncClient = 
+    AMRMClientAsync<ContainerRequest> asyncClient =
         AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
     asyncClient.init(conf);
     asyncClient.start();
@@ -228,6 +229,41 @@ public class TestAMRMClientAsync {
     asyncClient.stop();
   }
 
+  @Test (timeout = 10000)
+  public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception {
+    Configuration conf = new Configuration();
+    final TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+    final AllocateResponse shutDownResponse = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+    shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
+    when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
+
+    AMRMClientAsync<ContainerRequest> asyncClient =
+        AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+
+    Supplier<Boolean> checker = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return callbackHandler.reboot;
+      }
+    };
+
+    asyncClient.registerApplicationMaster("localhost", 1234, null);
+    asyncClient.waitFor(checker);
+
+    asyncClient.stop();
+    // stopping should have joined all threads and completed all callbacks
+    Assert.assertTrue(callbackHandler.callbackCount == 0);
+
+    verify(client, times(1)).allocate(anyFloat());
+    asyncClient.stop();
+  }
+
   @Test (timeout = 5000)
   public void testCallAMRMClientAsyncStopFromCallbackHandler()
       throws YarnException, IOException, InterruptedException {
@@ -262,6 +298,40 @@ public class TestAMRMClientAsync {
     }
   }
 
+  @Test (timeout = 5000)
+  public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor()
+      throws YarnException, IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+    List<ContainerStatus> completed = Arrays.asList(
+        ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+            ContainerState.COMPLETE, "", 0));
+    final AllocateResponse response = createAllocateResponse(completed,
+        new ArrayList<Container>(), null);
+
+    when(client.allocate(anyFloat())).thenReturn(response);
+
+    AMRMClientAsync<ContainerRequest> asyncClient =
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+    callbackHandler.asynClient = asyncClient;
+    asyncClient.init(conf);
+    asyncClient.start();
+
+    Supplier<Boolean> checker = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return callbackHandler.notify;
+      }
+    };
+
+    asyncClient.registerApplicationMaster("localhost", 1234, null);
+    asyncClient.waitFor(checker);
+    Assert.assertTrue(checker.get());
+  }
+
   void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
         InterruptedException, YarnException, IOException {
     Configuration conf = new Configuration();
@@ -342,7 +412,7 @@ public class TestAMRMClientAsync {
     private volatile List<ContainerStatus> completedContainers;
     private volatile List<Container> allocatedContainers;
     Exception savedException = null;
-    boolean reboot = false;
+    volatile boolean reboot = false;
     Object notifier = new Object();
     
     int callbackCount = 0;
@@ -432,7 +502,7 @@ public class TestAMRMClientAsync {
     @SuppressWarnings("rawtypes")
     AMRMClientAsync asynClient;
     boolean stop = true;
-    boolean notify = false;
+    volatile boolean notify = false;
     boolean throwOutException = false;
 
     @Override

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Mon Aug 11 22:27:50 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.client.api.impl;
 
+import com.google.common.base.Supplier;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -814,6 +815,40 @@ public class TestAMRMClient {
     assertEquals(0, amClient.ask.size());
     assertEquals(0, amClient.release.size());
   }
+
+  class CountDownSupplier implements Supplier<Boolean> {
+    int counter = 0;
+    @Override
+    public Boolean get() {
+      counter++;
+      if (counter >= 3) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  };
+
+  @Test
+  public void testWaitFor() throws InterruptedException {
+    AMRMClientImpl<ContainerRequest> amClient = null;
+    CountDownSupplier countDownChecker = new CountDownSupplier();
+
+    try {
+      // start am rm client
+      amClient =
+          (AMRMClientImpl<ContainerRequest>) AMRMClient
+              .<ContainerRequest> createAMRMClient();
+      amClient.init(new YarnConfiguration());
+      amClient.start();
+      amClient.waitFor(countDownChecker, 1000);
+      assertEquals(3, countDownChecker.counter);
+    } finally {
+      if (amClient != null) {
+        amClient.stop();
+      }
+    }
+  }
   
   private void sleep(int sleepTime) {
     try {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Mon Aug 11 22:27:50 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
@@ -59,12 +60,12 @@ public class ApplicationHistoryServer ex
   private static final Log LOG = LogFactory
     .getLog(ApplicationHistoryServer.class);
 
-  protected ApplicationHistoryClientService ahsClientService;
-  protected ApplicationHistoryManager historyManager;
-  protected TimelineStore timelineStore;
-  protected TimelineDelegationTokenSecretManagerService secretManagerService;
-  protected TimelineACLsManager timelineACLsManager;
-  protected WebApp webApp;
+  private ApplicationHistoryClientService ahsClientService;
+  private ApplicationHistoryManager historyManager;
+  private TimelineStore timelineStore;
+  private TimelineDelegationTokenSecretManagerService secretManagerService;
+  private TimelineDataManager timelineDataManager;
+  private WebApp webApp;
 
   public ApplicationHistoryServer() {
     super(ApplicationHistoryServer.class.getName());
@@ -72,15 +73,18 @@ public class ApplicationHistoryServer ex
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    historyManager = createApplicationHistory();
-    ahsClientService = createApplicationHistoryClientService(historyManager);
-    addService(ahsClientService);
-    addService((Service) historyManager);
+    // init timeline services first
     timelineStore = createTimelineStore(conf);
     addIfService(timelineStore);
     secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
     addService(secretManagerService);
-    timelineACLsManager = createTimelineACLsManager(conf);
+    timelineDataManager = createTimelineDataManager(conf);
+
+    // init generic history service afterwards
+    historyManager = createApplicationHistoryManager(conf);
+    ahsClientService = createApplicationHistoryClientService(historyManager);
+    addService(ahsClientService);
+    addService((Service) historyManager);
 
     DefaultMetricsSystem.initialize("ApplicationHistoryServer");
     JvmMetrics.initSingleton("ApplicationHistoryServer", null);
@@ -111,21 +115,22 @@ public class ApplicationHistoryServer ex
 
   @Private
   @VisibleForTesting
-  public ApplicationHistoryClientService getClientService() {
+  ApplicationHistoryClientService getClientService() {
     return this.ahsClientService;
   }
 
-  protected ApplicationHistoryClientService
-      createApplicationHistoryClientService(
-          ApplicationHistoryManager historyManager) {
-    return new ApplicationHistoryClientService(historyManager);
-  }
-
-  protected ApplicationHistoryManager createApplicationHistory() {
-    return new ApplicationHistoryManagerImpl();
+  /**
+   * @return ApplicationTimelineStore
+   */
+  @Private
+  @VisibleForTesting
+  public TimelineStore getTimelineStore() {
+    return timelineStore;
   }
 
-  protected ApplicationHistoryManager getApplicationHistory() {
+  @Private
+  @VisibleForTesting
+  ApplicationHistoryManager getApplicationHistoryManager() {
     return this.historyManager;
   }
 
@@ -154,28 +159,35 @@ public class ApplicationHistoryServer ex
     launchAppHistoryServer(args);
   }
 
-  protected ApplicationHistoryManager createApplicationHistoryManager(
+  private ApplicationHistoryClientService
+      createApplicationHistoryClientService(
+          ApplicationHistoryManager historyManager) {
+    return new ApplicationHistoryClientService(historyManager);
+  }
+
+  private ApplicationHistoryManager createApplicationHistoryManager(
       Configuration conf) {
     return new ApplicationHistoryManagerImpl();
   }
 
-  protected TimelineStore createTimelineStore(
+  private TimelineStore createTimelineStore(
       Configuration conf) {
     return ReflectionUtils.newInstance(conf.getClass(
         YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
         TimelineStore.class), conf);
   }
 
-  protected TimelineDelegationTokenSecretManagerService
+  private TimelineDelegationTokenSecretManagerService
       createTimelineDelegationTokenSecretManagerService(Configuration conf) {
     return new TimelineDelegationTokenSecretManagerService();
   }
 
-  protected TimelineACLsManager createTimelineACLsManager(Configuration conf) {
-    return new TimelineACLsManager(conf);
+  private TimelineDataManager createTimelineDataManager(Configuration conf) {
+    return new TimelineDataManager(
+        timelineStore, new TimelineACLsManager(conf));
   }
 
-  protected void startWebApp() {
+  private void startWebApp() {
     Configuration conf = getConfig();
     // Always load pseudo authentication filter to parse "user.name" in an URL
     // to identify a HTTP request's user in insecure mode.
@@ -199,9 +211,8 @@ public class ApplicationHistoryServer ex
     try {
       AHSWebApp ahsWebApp = AHSWebApp.getInstance();
       ahsWebApp.setApplicationHistoryManager(historyManager);
-      ahsWebApp.setTimelineStore(timelineStore);
       ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
-      ahsWebApp.setTimelineACLsManager(timelineACLsManager);
+      ahsWebApp.setTimelineDataManager(timelineDataManager);
       webApp =
           WebApps
             .$for("applicationhistory", ApplicationHistoryClientService.class,
@@ -213,14 +224,6 @@ public class ApplicationHistoryServer ex
       throw new YarnRuntimeException(msg, e);
     }
   }
-  /**
-   * @return ApplicationTimelineStore
-   */
-  @Private
-  @VisibleForTesting
-  public TimelineStore getTimelineStore() {
-    return timelineStore;
-  }
 
   private void doSecureLogin(Configuration conf) throws IOException {
     InetSocketAddress socAddr = getBindAddress(conf);

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Mon Aug 11 22:27:50 2014
@@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.uti
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.server.api.ApplicationContext;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
 import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -36,9 +35,8 @@ import com.google.common.annotations.Vis
 public class AHSWebApp extends WebApp implements YarnWebParams {
 
   private ApplicationHistoryManager applicationHistoryManager;
-  private TimelineStore timelineStore;
   private TimelineDelegationTokenSecretManagerService secretManagerService;
-  private TimelineACLsManager timelineACLsManager;
+  private TimelineDataManager timelineDataManager;
 
   private static AHSWebApp instance = null;
 
@@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp im
     this.applicationHistoryManager = applicationHistoryManager;
   }
 
-  public TimelineStore getTimelineStore() {
-    return timelineStore;
-  }
-
-  public void setTimelineStore(TimelineStore timelineStore) {
-    this.timelineStore = timelineStore;
-  }
-
   public TimelineDelegationTokenSecretManagerService
       getTimelineDelegationTokenSecretManagerService() {
     return secretManagerService;
@@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp im
     this.secretManagerService = secretManagerService;
   }
 
-  public TimelineACLsManager getTimelineACLsManager() {
-    return timelineACLsManager;
+  public TimelineDataManager getTimelineDataManager() {
+    return timelineDataManager;
   }
 
-  public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) {
-    this.timelineACLsManager = timelineACLsManager;
+  public void setTimelineDataManager(TimelineDataManager timelineDataManager) {
+    this.timelineDataManager = timelineDataManager;
   }
 
   @Override
@@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp im
     bind(TimelineWebServices.class);
     bind(GenericExceptionHandler.class);
     bind(ApplicationContext.class).toInstance(applicationHistoryManager);
-    bind(TimelineStore.class).toInstance(timelineStore);
     bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
         secretManagerService);
-    bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+    bind(TimelineDataManager.class).toInstance(timelineDataManager);
     route("/", AHSController.class);
     route(pajoin("/apps", APP_STATE), AHSController.class);
     route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Mon Aug 11 22:27:50 2014
@@ -18,14 +18,10 @@
 
 package org.apache.hadoop.yarn.server.timeline.webapp;
 
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -80,14 +73,11 @@ public class TimelineWebServices {
 
   private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
 
-  private TimelineStore store;
-  private TimelineACLsManager timelineACLsManager;
+  private TimelineDataManager timelineDataManager;
 
   @Inject
-  public TimelineWebServices(TimelineStore store,
-      TimelineACLsManager timelineACLsManager) {
-    this.store = store;
-    this.timelineACLsManager = timelineACLsManager;
+  public TimelineWebServices(TimelineDataManager timelineDataManager) {
+    this.timelineDataManager = timelineDataManager;
   }
 
   @XmlRootElement(name = "about")
@@ -148,61 +138,28 @@ public class TimelineWebServices {
       @QueryParam("limit") String limit,
       @QueryParam("fields") String fields) {
     init(res);
-    TimelineEntities entities = null;
     try {
-      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
-      boolean modified = extendFields(fieldEnums);
-      UserGroupInformation callerUGI = getUser(req);
-      entities = store.getEntities(
+      return timelineDataManager.getEntities(
           parseStr(entityType),
-          parseLongStr(limit),
+          parsePairStr(primaryFilter, ":"),
+          parsePairsStr(secondaryFilter, ",", ":"),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
           parseStr(fromId),
           parseLongStr(fromTs),
-          parsePairStr(primaryFilter, ":"),
-          parsePairsStr(secondaryFilter, ",", ":"),
-          fieldEnums);
-      if (entities != null) {
-        Iterator<TimelineEntity> entitiesItr =
-            entities.getEntities().iterator();
-        while (entitiesItr.hasNext()) {
-          TimelineEntity entity = entitiesItr.next();
-          try {
-            // check ACLs
-            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-              entitiesItr.remove();
-            } else {
-              // clean up system data
-              if (modified) {
-                entity.setPrimaryFilters(null);
-              } else {
-                cleanupOwnerInfo(entity);
-              }
-            }
-          } catch (YarnException e) {
-            LOG.error("Error when verifying access for user " + callerUGI
-                + " on the events of the timeline entity "
-                + new EntityIdentifier(entity.getEntityId(),
-                    entity.getEntityType()), e);
-            entitiesItr.remove();
-          }
-        }
-      }
+          parseLongStr(limit),
+          parseFieldsStr(fields, ","),
+          getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
           "windowStart, windowEnd or limit is not a numeric value.");
     } catch (IllegalArgumentException e) {
       throw new BadRequestException("requested invalid field.");
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entities", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
     }
-    if (entities == null) {
-      return new TimelineEntities();
-    }
-    return entities;
   }
 
   /**
@@ -220,33 +177,15 @@ public class TimelineWebServices {
     init(res);
     TimelineEntity entity = null;
     try {
-      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
-      boolean modified = extendFields(fieldEnums);
-      entity =
-          store.getEntity(parseStr(entityId), parseStr(entityType),
-              fieldEnums);
-      if (entity != null) {
-        // check ACLs
-        UserGroupInformation callerUGI = getUser(req);
-        if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-          entity = null;
-        } else {
-          // clean up the system data
-          if (modified) {
-            entity.setPrimaryFilters(null);
-          } else {
-            cleanupOwnerInfo(entity);
-          }
-        }
-      }
+      entity = timelineDataManager.getEntity(
+          parseStr(entityType),
+          parseStr(entityId),
+          parseFieldsStr(fields, ","),
+          getUser(req));
     } catch (IllegalArgumentException e) {
       throw new BadRequestException(
           "requested invalid field.");
-    } catch (IOException e) {
-      LOG.error("Error getting entity", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    } catch (YarnException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entity", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -275,51 +214,23 @@ public class TimelineWebServices {
       @QueryParam("windowEnd") String windowEnd,
       @QueryParam("limit") String limit) {
     init(res);
-    TimelineEvents events = null;
     try {
-      UserGroupInformation callerUGI = getUser(req);
-      events = store.getEntityTimelines(
+      return timelineDataManager.getEvents(
           parseStr(entityType),
           parseArrayStr(entityId, ","),
-          parseLongStr(limit),
+          parseArrayStr(eventType, ","),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
-          parseArrayStr(eventType, ","));
-      if (events != null) {
-        Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
-            events.getAllEvents().iterator();
-        while (eventsItr.hasNext()) {
-          TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
-          try {
-            TimelineEntity entity = store.getEntity(
-                eventsOfOneEntity.getEntityId(),
-                eventsOfOneEntity.getEntityType(),
-                EnumSet.of(Field.PRIMARY_FILTERS));
-            // check ACLs
-            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-              eventsItr.remove();
-            }
-          } catch (Exception e) {
-            LOG.error("Error when verifying access for user " + callerUGI
-                + " on the events of the timeline entity "
-                + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
-                    eventsOfOneEntity.getEntityType()), e);
-            eventsItr.remove();
-          }
-        }
-      }
+          parseLongStr(limit),
+          getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
           "windowStart, windowEnd or limit is not a numeric value.");
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entity timelines", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
     }
-    if (events == null) {
-      return new TimelineEvents();
-    }
-    return events;
   }
 
   /**
@@ -333,9 +244,6 @@ public class TimelineWebServices {
       @Context HttpServletResponse res,
       TimelineEntities entities) {
     init(res);
-    if (entities == null) {
-      return new TimelinePutResponse();
-    }
     UserGroupInformation callerUGI = getUser(req);
     if (callerUGI == null) {
       String msg = "The owner of the posted timeline entities is not set";
@@ -343,76 +251,8 @@ public class TimelineWebServices {
       throw new ForbiddenException(msg);
     }
     try {
-      List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
-      TimelineEntities entitiesToPut = new TimelineEntities();
-      List<TimelinePutResponse.TimelinePutError> errors =
-          new ArrayList<TimelinePutResponse.TimelinePutError>();
-      for (TimelineEntity entity : entities.getEntities()) {
-        EntityIdentifier entityID =
-            new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
-
-        // check if there is existing entity
-        TimelineEntity existingEntity = null;
-        try {
-          existingEntity =
-              store.getEntity(entityID.getId(), entityID.getType(),
-                  EnumSet.of(Field.PRIMARY_FILTERS));
-          if (existingEntity != null
-              && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
-            throw new YarnException("The timeline entity " + entityID
-                + " was not put by " + callerUGI + " before");
-          }
-        } catch (Exception e) {
-          // Skip the entity which already exists and was put by others
-          LOG.warn("Skip the timeline entity: " + entityID + ", because "
-              + e.getMessage());
-          TimelinePutResponse.TimelinePutError error =
-              new TimelinePutResponse.TimelinePutError();
-          error.setEntityId(entityID.getId());
-          error.setEntityType(entityID.getType());
-          error.setErrorCode(
-              TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
-          errors.add(error);
-          continue;
-        }
-
-        // inject owner information for the access check if this is the first
-        // time to post the entity, in case it's the admin who is updating
-        // the timeline data.
-        try {
-          if (existingEntity == null) {
-            injectOwnerInfo(entity, callerUGI.getShortUserName());
-          }
-        } catch (YarnException e) {
-          // Skip the entity which messes up the primary filter and record the
-          // error
-          LOG.warn("Skip the timeline entity: " + entityID + ", because "
-              + e.getMessage());
-          TimelinePutResponse.TimelinePutError error =
-              new TimelinePutResponse.TimelinePutError();
-          error.setEntityId(entityID.getId());
-          error.setEntityType(entityID.getType());
-          error.setErrorCode(
-              TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
-          errors.add(error);
-          continue;
-        }
-
-        entityIDs.add(entityID);
-        entitiesToPut.addEntity(entity);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
-              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
-      }
-      TimelinePutResponse response =  store.put(entitiesToPut);
-      // add the errors of timeline system filter key conflict
-      response.addErrors(errors);
-      return response;
-    } catch (IOException e) {
+      return timelineDataManager.postEntities(entities, callerUGI);
+    } catch (Exception e) {
       LOG.error("Error putting entities", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -423,6 +263,15 @@ public class TimelineWebServices {
     response.setContentType(null);
   }
 
+  private static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
   private static SortedSet<String> parseArrayStr(String str, String delimiter) {
     if (str == null) {
       return null;
@@ -495,14 +344,6 @@ public class TimelineWebServices {
     }
   }
 
-  private static boolean extendFields(EnumSet<Field> fieldEnums) {
-    boolean modified = false;
-    if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
-      fieldEnums.add(Field.PRIMARY_FILTERS);
-      modified = true;
-    }
-    return modified;
-  }
   private static Long parseLongStr(String str) {
     return str == null ? null : Long.parseLong(str.trim());
   }
@@ -511,34 +352,4 @@ public class TimelineWebServices {
     return str == null ? null : str.trim();
   }
 
-  private static UserGroupInformation getUser(HttpServletRequest req) {
-    String remoteUser = req.getRemoteUser();
-    UserGroupInformation callerUGI = null;
-    if (remoteUser != null) {
-      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-    }
-    return callerUGI;
-  }
-
-  private static void injectOwnerInfo(TimelineEntity timelineEntity,
-      String owner) throws YarnException {
-    if (timelineEntity.getPrimaryFilters() != null &&
-        timelineEntity.getPrimaryFilters().containsKey(
-            TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
-      throw new YarnException(
-          "User should not use the timeline system filter key: "
-              + TimelineStore.SystemFilter.ENTITY_OWNER);
-    }
-    timelineEntity.addPrimaryFilter(
-        TimelineStore.SystemFilter.ENTITY_OWNER
-            .toString(), owner);
-  }
-
-  private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
-    if (timelineEntity.getPrimaryFilters() != null) {
-      timelineEntity.getPrimaryFilters().remove(
-          TimelineStore.SystemFilter.ENTITY_OWNER.toString());
-    }
-  }
-
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Mon Aug 11 22:27:50 2014
@@ -69,7 +69,7 @@ public class TestApplicationHistoryClien
     historyServer.init(config);
     historyServer.start();
     store =
-        ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory())
+        ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager())
           .getHistoryStore();
   }
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Mon Aug 11 22:27:50 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AdminACLsManager;
 import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
@@ -89,14 +90,15 @@ public class TestTimelineWebServices ext
       } catch (Exception e) {
         Assert.fail();
       }
-      bind(TimelineStore.class).toInstance(store);
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
       timelineACLsManager = new TimelineACLsManager(conf);
       conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
       conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
       adminACLsManager = new AdminACLsManager(conf);
-      bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+      TimelineDataManager timelineDataManager =
+          new TimelineDataManager(store, timelineACLsManager);
+      bind(TimelineDataManager.class).toInstance(timelineDataManager);
       serve("/*").with(GuiceContainer.class);
       TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
       FilterConfig filterConfig = mock(FilterConfig.class);

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Aug 11 22:27:50 2014
@@ -461,7 +461,6 @@ public class ResourceManager extends Com
       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
 
       clientRM = createClientRMService();
-      rmContext.setClientRMService(clientRM);
       addService(clientRM);
       rmContext.setClientRMService(clientRM);
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Aug 11 22:27:50 2014
@@ -52,13 +52,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -132,7 +132,8 @@ public abstract class RMStateStore exten
       LOG.info("Storing info for app: " + appId);
       try {
         store.storeApplicationStateInternal(appId, appStateData);
-        store.notifyDoneStoringApplication(appId, null);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing app: " + appId, e);
         store.notifyStoreOperationFailed(e);
@@ -156,7 +157,8 @@ public abstract class RMStateStore exten
       LOG.info("Updating info for app: " + appId);
       try {
         store.updateApplicationStateInternal(appId, appStateData);
-        store.notifyDoneUpdatingApplication(appId, null);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating app: " + appId, e);
         store.notifyStoreOperationFailed(e);
@@ -205,8 +207,9 @@ public abstract class RMStateStore exten
         }
         store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
             attemptStateData);
-        store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-            null);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
         store.notifyStoreOperationFailed(e);
@@ -233,8 +236,9 @@ public abstract class RMStateStore exten
         }
         store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
             attemptStateData);
-        store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-            null);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
         store.notifyStoreOperationFailed(e);
@@ -801,47 +805,28 @@ public abstract class RMStateStore exten
     }
     rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
   }
-
+ 
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application that new application is stored in state store
-   * @param appId id of the application that has been saved
-   * @param storedException the exception that is thrown when storing the
-   * application
-   */
-  private void notifyDoneStoringApplication(ApplicationId appId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppNewSavedEvent(appId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplication(ApplicationId appId,
-      Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppUpdateSavedEvent(appId, storedException));
+   * This method is called to notify the application that
+   * new application is stored or updated in state store
+   * @param event App event containing the app id and event type
+   */
+  private void notifyApplication(RMAppEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application attempt that new attempt is stored in state store
-   * @param appAttempt attempt that has been saved
-   */
-  private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppAttemptNewSavedEvent(attemptId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
-      Exception updatedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+   * This method is called to notify the application attempt
+   * that new attempt is stored or updated in state store
+   * @param event App attempt event containing the app attempt
+   * id and event type
+   */
+  private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Aug 11 22:27:50 2014
@@ -820,17 +820,6 @@ public class RMAppImpl implements RMApp,
       RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event instanceof RMAppNewSavedEvent) {
-        RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
-        // For HA this exception needs to be handled by giving up
-        // master status if we got fenced
-        if (((RMAppNewSavedEvent) event).getStoredException() != null) {
-          LOG.error(
-            "Failed to store application: " + storeEvent.getApplicationId(),
-            storeEvent.getStoredException());
-          ExitUtil.terminate(1, storeEvent.getStoredException());
-        }
-      }
       app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
         app.submissionContext.getQueue(), app.user));
     }
@@ -848,13 +837,6 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-      RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application"
-              + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       if (app.transitionTodo instanceof SingleArcTransition) {
         ((SingleArcTransition) app.transitionTodo).transition(app,
           app.eventCausingFinalSaving);

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Aug 11 22:27:50 2014
@@ -80,11 +80,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -398,7 +396,6 @@ public class RMAppAttemptImpl implements
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
           EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
-              RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.LAUNCHED,
               RMAppAttemptEventType.LAUNCH_FAILED,
               RMAppAttemptEventType.EXPIRE,
@@ -906,8 +903,6 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                                                     RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
-
       appAttempt.launchAttempt();
     }
   }
@@ -1059,14 +1054,6 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application attempt: "
-            + storeEvent.getApplicationAttemptId(),
-          storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
 
       if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1196,8 +1183,6 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
-
       // create AMRMToken
       appAttempt.amrmToken =
           appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
@@ -1690,18 +1675,6 @@ public class RMAppAttemptImpl implements
     rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
   }
   
-  private void checkAttemptStoreError(RMAppAttemptEvent event) {
-    RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
-    if(storeEvent.getStoredException() != null)
-    {
-      // This needs to be handled for HA and give up master status if we got
-      // fenced
-      LOG.error("Failed to store attempt: " + getAppAttemptId(),
-                storeEvent.getStoredException());
-      ExitUtil.terminate(1, storeEvent.getStoredException());
-    }
-  }
-
   private void storeAttempt() {
     // store attempt data in a non-blocking manner to prevent dispatcher
     // thread starvation and wait for state to be saved

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Aug 11 22:27:50 2014
@@ -116,6 +116,18 @@ public abstract class Schedulable {
     return fairShare;
   }
 
+  /**
+   * Returns true if queue has atleast one app running. Always returns true for
+   * AppSchedulables.
+   */
+  public boolean isActive() {
+    if (this instanceof FSQueue) {
+      FSQueue queue = (FSQueue) this;
+      return queue.getNumRunnableApps() > 0;
+    }
+    return true;
+  }
+
   /** Convenient toString implementation for debugging. */
   @Override
   public String toString() {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java Mon Aug 11 22:27:50 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.res
 public class ComputeFairShares {
   
   private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
-  
+
+  /**
+   * Compute fair share of the given schedulables.Fair share is an allocation of
+   * shares considering only active schedulables ie schedulables which have
+   * running apps.
+   * 
+   * @param schedulables
+   * @param totalResources
+   * @param type
+   */
+  public static void computeShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources,
+      ResourceType type) {
+    Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
+    for (Schedulable sched : schedulables) {
+      if (sched.isActive()) {
+        activeSchedulables.add(sched);
+      } else {
+        setResourceValue(0, sched.getFairShare(), type);
+      }
+    }
+
+    computeSharesInternal(activeSchedulables, totalResources, type);
+  }
+
   /**
    * Given a set of Schedulables and a number of slots, compute their weighted
    * fair shares. The min and max shares and of the Schedulables are assumed to
@@ -75,7 +100,7 @@ public class ComputeFairShares {
    * because resourceUsedWithWeightToResourceRatio is linear-time and the number of
    * iterations of binary search is a constant (dependent on desired precision).
    */
-  public static void computeShares(
+  private static void computeSharesInternal(
       Collection<? extends Schedulable> schedulables, Resource totalResources,
       ResourceType type) {
     if (schedulables.isEmpty()) {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Mon Aug 11 22:27:50 2014
@@ -386,7 +386,8 @@ public class TestAMRestart {
     ApplicationState appState =
         memStore.getState().getApplicationState().get(app1.getApplicationId());
     // AM should be restarted even though max-am-attempt is 1.
-    MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    MockAM am2 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
     RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
     Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
 
@@ -398,7 +399,8 @@ public class TestAMRestart {
     am2.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
     rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
-    MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    MockAM am3 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
     RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
     Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
 
@@ -421,7 +423,8 @@ public class TestAMRestart {
         .getAMContainerExitStatus());
 
     rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
-    MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    MockAM am4 =
+        rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
     RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
     Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Mon Aug 11 22:27:50 2014
@@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -77,10 +77,9 @@ public class RMStateStoreTestBase extend
   public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 
   static class TestDispatcher implements
-      Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
+      Dispatcher, EventHandler<RMAppAttemptEvent> {
 
     ApplicationAttemptId attemptId;
-    Exception storedException;
 
     boolean notified = false;
 
@@ -91,9 +90,8 @@ public class RMStateStoreTestBase extend
     }
 
     @Override
-    public void handle(RMAppAttemptNewSavedEvent event) {
+    public void handle(RMAppAttemptEvent event) {
       assertEquals(attemptId, event.getApplicationAttemptId());
-      assertEquals(storedException, event.getStoredException());
       notified = true;
       synchronized (this) {
         notifyAll();
@@ -163,7 +161,6 @@ public class RMStateStoreTestBase extend
     when(mockAttempt.getClientTokenMasterKey())
         .thenReturn(clientTokenMasterKey);
     dispatcher.attemptId = attemptId;
-    dispatcher.storedException = null;
     store.storeNewApplicationAttempt(mockAttempt);
     waitNotify(dispatcher);
     return container.getId();

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Mon Aug 11 22:27:50 2014
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -328,15 +327,15 @@ public class TestRMAppTransitions {
 
   private void sendAppUpdateSavedEvent(RMApp application) {
     RMAppEvent event =
-        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
     application.handle(event);
     rmDispatcher.await();
   }
 
   private void sendAttemptUpdateSavedEvent(RMApp application) {
     application.getCurrentAppAttempt().handle(
-      new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
-        .getAppAttemptId(), null));
+        new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(),
+            RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
   }
 
   protected RMApp testCreateAppNewSaving(
@@ -357,7 +356,7 @@ public class TestRMAppTransitions {
   RMApp application = testCreateAppNewSaving(submissionContext);
     // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
     RMAppEvent event =
-        new RMAppNewSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
@@ -422,7 +421,7 @@ public class TestRMAppTransitions {
     RMApp application = testCreateAppFinalSaving(submissionContext);
     // FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
     RMAppEvent appUpdated =
-        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
     application.handle(appUpdated);
     assertAppState(RMAppState.FINISHING, application);
     assertTimesAtFinish(application);
@@ -763,7 +762,7 @@ public class TestRMAppTransitions {
     application.handle(event);
     assertAppState(RMAppState.FINAL_SAVING, application);
     RMAppEvent appUpdated =
-        new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
     application.handle(appUpdated);
     assertAppState(RMAppState.FINISHED, application);
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Mon Aug 11 22:27:50 2014
@@ -81,10 +81,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -570,15 +568,15 @@ public class TestRMAppAttemptTransitions
     submitApplicationAttempt();
     applicationAttempt.handle(
         new RMAppAttemptEvent(
-            applicationAttempt.getAppAttemptId(), 
+            applicationAttempt.getAppAttemptId(),
             RMAppAttemptEventType.ATTEMPT_ADDED));
     
     if(unmanagedAM){
       assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
           applicationAttempt.getAppAttemptState());
       applicationAttempt.handle(
-          new RMAppAttemptNewSavedEvent(
-              applicationAttempt.getAppAttemptId(), null));
+        new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+            RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
     }
     
     testAppAttemptScheduledState();
@@ -616,8 +614,8 @@ public class TestRMAppAttemptTransitions
     assertEquals(RMAppAttemptState.ALLOCATED_SAVING, 
         applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(
-        new RMAppAttemptNewSavedEvent(
-            applicationAttempt.getAppAttemptId(), null));
+        new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+            RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
     
     testAppAttemptAllocatedState(container);
     
@@ -696,8 +694,8 @@ public class TestRMAppAttemptTransitions
     assertEquals(RMAppAttemptState.FINAL_SAVING,
       applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(
-      new RMAppAttemptUpdateSavedEvent(
-          applicationAttempt.getAppAttemptId(), null));
+      new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), 
+          RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
   }
 
   @Test



Mime
View raw message