hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cur...@apache.org
Subject svn commit: r1619019 [3/10] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/had...
Date Wed, 20 Aug 2014 01:34:59 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java Wed Aug 20 01:34:29 2014
@@ -150,6 +150,15 @@ public abstract class ResourceCalculator
       Resource clusterResource, Resource numerator, Resource denominator);
   
   /**
+   * Determine if a resource is not suitable for use as a divisor
+   * (will result in divide by 0, etc)
+   *
+   * @param r resource
+   * @return true if divisor is invalid (should not be used), false else
+   */
+  public abstract boolean isInvalidDivisor(Resource r);
+
+  /**
    * Ratio of resource <code>a</code> to resource <code>b</code>.
    * 
    * @param a resource 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java Wed Aug 20 01:34:29 2014
@@ -184,6 +184,11 @@ public class Resources {
     return calculator.roundDown(lhs, factor);
   }
   
+  public static boolean isInvalidDivisor(
+      ResourceCalculator resourceCalculator, Resource divisor) {
+    return resourceCalculator.isInvalidDivisor(divisor);
+  }
+
   public static float ratio(
       ResourceCalculator resourceCalculator, Resource lhs, Resource rhs) {
     return resourceCalculator.ratio(lhs, rhs);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java Wed Aug 20 01:34:29 2014
@@ -19,12 +19,12 @@ package org.apache.hadoop.yarn.webapp.ut
 
 import static org.apache.hadoop.yarn.util.StringHelper.PATH_JOINER;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -34,11 +34,18 @@ import org.apache.hadoop.http.HttpServer
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.RMHAUtils;
 
 @Private
 @Evolving
 public class WebAppUtils {
+  public static final String WEB_APP_TRUSTSTORE_PASSWORD_KEY =
+      "ssl.server.truststore.password";
+  public static final String WEB_APP_KEYSTORE_PASSWORD_KEY =
+      "ssl.server.keystore.password";
+  public static final String WEB_APP_KEY_PASSWORD_KEY =
+      "ssl.server.keystore.keypassword";
   public static final String HTTPS_PREFIX = "https://";
   public static final String HTTP_PREFIX = "http://";
 
@@ -170,6 +177,37 @@ public class WebAppUtils {
     return sb.toString();
   }
   
+  /**
+   * Get the URL to use for binding where bind hostname can be specified
+   * to override the hostname in the webAppURLWithoutScheme. Port specified in the
+   * webAppURLWithoutScheme will be used.
+   *
+   * @param conf the configuration
+   * @param hostProperty bind host property name
+   * @param webAppURLWithoutScheme web app URL without scheme String
+   * @return String representing bind URL
+   */
+  public static String getWebAppBindURL(
+      Configuration conf,
+      String hostProperty,
+      String webAppURLWithoutScheme) {
+
+    // If the bind-host setting exists then it overrides the hostname
+    // portion of the corresponding webAppURLWithoutScheme
+    String host = conf.getTrimmed(hostProperty);
+    if (host != null && !host.isEmpty()) {
+      if (webAppURLWithoutScheme.contains(":")) {
+        webAppURLWithoutScheme = host + ":" + webAppURLWithoutScheme.split(":")[1];
+      }
+      else {
+        throw new YarnRuntimeException("webAppURLWithoutScheme must include port specification but doesn't: " +
+                                       webAppURLWithoutScheme);
+      }
+    }
+
+    return webAppURLWithoutScheme;
+  }
+
   public static String getNMWebAppURLWithoutScheme(Configuration conf) {
     if (YarnConfiguration.useHttps(conf)) {
       return conf.get(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS,
@@ -242,21 +280,56 @@ public class WebAppUtils {
 
   /**
    * Load the SSL keystore / truststore into the HttpServer builder.
+   * @param builder the HttpServer2.Builder to populate with ssl config
    */
   public static HttpServer2.Builder loadSslConfiguration(
       HttpServer2.Builder builder) {
-    Configuration sslConf = new Configuration(false);
+    return loadSslConfiguration(builder, null);
+  }
+
+  /**
+   * Load the SSL keystore / truststore into the HttpServer builder.
+   * @param builder the HttpServer2.Builder to populate with ssl config
+   * @param sslConf the Configuration instance to use during loading of SSL conf
+   */
+  public static HttpServer2.Builder loadSslConfiguration(
+      HttpServer2.Builder builder, Configuration sslConf) {
+    if (sslConf == null) {
+      sslConf = new Configuration(false);
+    }
     boolean needsClientAuth = YarnConfiguration.YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
     sslConf.addResource(YarnConfiguration.YARN_SSL_SERVER_RESOURCE_DEFAULT);
 
     return builder
         .needsClientAuth(needsClientAuth)
-        .keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
+        .keyPassword(getPassword(sslConf, WEB_APP_KEY_PASSWORD_KEY))
         .keyStore(sslConf.get("ssl.server.keystore.location"),
-            sslConf.get("ssl.server.keystore.password"),
+            getPassword(sslConf, WEB_APP_KEYSTORE_PASSWORD_KEY),
             sslConf.get("ssl.server.keystore.type", "jks"))
         .trustStore(sslConf.get("ssl.server.truststore.location"),
-            sslConf.get("ssl.server.truststore.password"),
+            getPassword(sslConf, WEB_APP_TRUSTSTORE_PASSWORD_KEY),
             sslConf.get("ssl.server.truststore.type", "jks"));
   }
+
+  /**
+   * Leverages the Configuration.getPassword method to attempt to get
+   * passwords from the CredentialProvider API before falling back to
+   * clear text in config - if falling back is allowed.
+   * @param conf Configuration instance
+   * @param alias name of the credential to retreive
+   * @return String credential value or null
+   */
+  static String getPassword(Configuration conf, String alias) {
+    String password = null;
+    try {
+      char[] passchars = conf.getPassword(alias);
+      if (passchars != null) {
+        password = new String(passchars);
+      }
+    }
+    catch (IOException ioe) {
+      password = null;
+    }
+    return password;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Aug 20 01:34:29 2014
@@ -71,6 +71,17 @@
   </property>
 
   <property>
+    <description>
+      The actual address the server will bind to. If this optional address is
+      set, the RPC and webapp servers will bind to this address and the port specified in
+      yarn.resourcemanager.address and yarn.resourcemanager.webapp.address, respectively. This
+      is most useful for making RM listen to all interfaces by setting to 0.0.0.0.
+    </description>
+    <name>yarn.resourcemanager.bind-host</name>
+    <value></value>
+  </property>
+
+  <property>
     <description>The number of threads used to handle applications manager requests.</description>
     <name>yarn.resourcemanager.client.thread-count</name>
     <value>50</value>
@@ -195,6 +206,15 @@
   </property>
 
   <property>
+    <description>Flag to enable override of the default kerberos authentication
+    filter with the RM authentication filter to allow authentication using
+    delegation tokens(fallback to kerberos if the tokens are missing). Only
+    applicable when the http authentication type is kerberos.</description>
+    <name>yarn.resourcemanager.webapp.delegation-token-auth-filter.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
     <description>How long to wait until a node manager is considered dead.</description>
     <name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
     <value>600000</value>
@@ -454,7 +474,7 @@
   <property>
     <description>Name of the cluster. In a HA setting,
       this is used to ensure the RM participates in leader
-      election fo this cluster and ensures it does not affect
+      election for this cluster and ensures it does not affect
       other clusters</description>
     <name>yarn.resourcemanager.cluster-id</name>
     <!--value>yarn-cluster</value-->
@@ -627,6 +647,17 @@
   </property>
 
   <property>
+    <description>
+      The actual address the server will bind to. If this optional address is
+      set, the RPC and webapp servers will bind to this address and the port specified in
+      yarn.nodemanager.address and yarn.nodemanager.webapp.address, respectively. This is
+      most useful for making NM listen to all interfaces by setting to 0.0.0.0.
+    </description>
+    <name>yarn.nodemanager.bind-host</name>
+    <value></value>
+  </property>
+
+  <property>
     <description>Environment variables that should be forwarded from the NodeManager's environment to the container's.</description>
     <name>yarn.nodemanager.admin-env</name>
     <value>MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX</value>
@@ -1164,6 +1195,18 @@
   </property>
 
   <property>
+    <description>
+      The actual address the server will bind to. If this optional address is
+      set, the RPC and webapp servers will bind to this address and the port specified in
+      yarn.timeline-service.address and yarn.timeline-service.webapp.address, respectively.
+      This is most useful for making the service listen to all interfaces by setting to
+      0.0.0.0.
+    </description>
+    <name>yarn.timeline-service.bind-host</name>
+    <value></value>
+  </property>
+
+  <property>
     <description>Store class name for timeline store.</description>
     <name>yarn.timeline-service.store-class</name>
     <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java Wed Aug 20 01:34:29 2014
@@ -28,6 +28,7 @@ import java.net.SocketAddress;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class TestYarnConfiguration {
 
@@ -75,4 +76,131 @@ public class TestYarnConfiguration {
         YarnConfiguration.DEFAULT_NM_PORT);
     assertEquals(1234, addr.getPort());
   }
+
+  @Test
+  public void testGetSocketAddr() throws Exception {
+
+    YarnConfiguration conf;
+    InetSocketAddress resourceTrackerAddress;
+
+    //all default
+    conf = new YarnConfiguration();
+    resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    assertEquals(
+      new InetSocketAddress(
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+        resourceTrackerAddress);
+
+    //with address
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.1");
+    resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    assertEquals(
+      new InetSocketAddress(
+        "10.0.0.1",
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+        resourceTrackerAddress);
+
+    //address and socket
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5001");
+    resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    assertEquals(
+      new InetSocketAddress(
+        "10.0.0.2",
+        5001),
+        resourceTrackerAddress);
+
+    //bind host only
+    conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_BIND_HOST, "10.0.0.3");
+    resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    assertEquals(
+      new InetSocketAddress(
+        "10.0.0.3",
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+        resourceTrackerAddress);
+
+    //bind host and address no port
+    conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2");
+    resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    assertEquals(
+      new InetSocketAddress(
+        "0.0.0.0",
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+        resourceTrackerAddress);
+
+    //bind host and address with port
+    conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5003");
+    resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+    assertEquals(
+      new InetSocketAddress(
+        "0.0.0.0",
+        5003),
+        resourceTrackerAddress);
+
+  }
+
+  @Test
+  public void testUpdateConnectAddr() throws Exception {
+    YarnConfiguration conf;
+    InetSocketAddress resourceTrackerConnectAddress;
+    InetSocketAddress serverAddress;
+
+    //no override, old behavior.  Won't work on a host named "yo.yo.yo"
+    conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo");
+    serverAddress = new InetSocketAddress(
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
+        Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1]));
+
+    resourceTrackerConnectAddress = conf.updateConnectAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        serverAddress);
+
+    assertFalse(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
+
+    //cause override with address
+    conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo");
+    conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
+    serverAddress = new InetSocketAddress(
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
+        Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1]));
+
+    resourceTrackerConnectAddress = conf.updateConnectAddr(
+        YarnConfiguration.RM_BIND_HOST,
+        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+        serverAddress);
+
+    assertTrue(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java Wed Aug 20 01:34:29 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 
 public class ApplicationHistoryClientService extends AbstractService {
@@ -75,10 +76,11 @@ public class ApplicationHistoryClientSer
   protected void serviceStart() throws Exception {
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
-    InetSocketAddress address =
-        conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
+    InetSocketAddress address = conf.getSocketAddr(
+        YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+        YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
 
     server =
         rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
@@ -88,8 +90,10 @@ public class ApplicationHistoryClientSer
 
     server.start();
     this.bindAddress =
-        conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
-          server.getListenerAddress());
+        conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+                               YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+                               YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+                               server.getListenerAddress());
     LOG.info("Instantiated ApplicationHistoryClientService at "
         + this.bindAddress);
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Wed Aug 20 01:34:29 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ap
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
@@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
@@ -59,12 +62,12 @@ public class ApplicationHistoryServer ex
   private static final Log LOG = LogFactory
     .getLog(ApplicationHistoryServer.class);
 
-  protected ApplicationHistoryClientService ahsClientService;
-  protected ApplicationHistoryManager historyManager;
-  protected TimelineStore timelineStore;
-  protected TimelineDelegationTokenSecretManagerService secretManagerService;
-  protected TimelineACLsManager timelineACLsManager;
-  protected WebApp webApp;
+  private ApplicationHistoryClientService ahsClientService;
+  private ApplicationHistoryManager historyManager;
+  private TimelineStore timelineStore;
+  private TimelineDelegationTokenSecretManagerService secretManagerService;
+  private TimelineDataManager timelineDataManager;
+  private WebApp webApp;
 
   public ApplicationHistoryServer() {
     super(ApplicationHistoryServer.class.getName());
@@ -72,15 +75,18 @@ public class ApplicationHistoryServer ex
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    historyManager = createApplicationHistory();
-    ahsClientService = createApplicationHistoryClientService(historyManager);
-    addService(ahsClientService);
-    addService((Service) historyManager);
+    // init timeline services first
     timelineStore = createTimelineStore(conf);
     addIfService(timelineStore);
     secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
     addService(secretManagerService);
-    timelineACLsManager = createTimelineACLsManager(conf);
+    timelineDataManager = createTimelineDataManager(conf);
+
+    // init generic history service afterwards
+    historyManager = createApplicationHistoryManager(conf);
+    ahsClientService = createApplicationHistoryClientService(historyManager);
+    addService(ahsClientService);
+    addService((Service) historyManager);
 
     DefaultMetricsSystem.initialize("ApplicationHistoryServer");
     JvmMetrics.initSingleton("ApplicationHistoryServer", null);
@@ -111,21 +117,22 @@ public class ApplicationHistoryServer ex
 
   @Private
   @VisibleForTesting
-  public ApplicationHistoryClientService getClientService() {
+  ApplicationHistoryClientService getClientService() {
     return this.ahsClientService;
   }
 
-  protected ApplicationHistoryClientService
-      createApplicationHistoryClientService(
-          ApplicationHistoryManager historyManager) {
-    return new ApplicationHistoryClientService(historyManager);
-  }
-
-  protected ApplicationHistoryManager createApplicationHistory() {
-    return new ApplicationHistoryManagerImpl();
+  /**
+   * @return ApplicationTimelineStore
+   */
+  @Private
+  @VisibleForTesting
+  public TimelineStore getTimelineStore() {
+    return timelineStore;
   }
 
-  protected ApplicationHistoryManager getApplicationHistory() {
+  @Private
+  @VisibleForTesting
+  ApplicationHistoryManager getApplicationHistoryManager() {
     return this.historyManager;
   }
 
@@ -154,28 +161,35 @@ public class ApplicationHistoryServer ex
     launchAppHistoryServer(args);
   }
 
-  protected ApplicationHistoryManager createApplicationHistoryManager(
+  private ApplicationHistoryClientService
+      createApplicationHistoryClientService(
+          ApplicationHistoryManager historyManager) {
+    return new ApplicationHistoryClientService(historyManager);
+  }
+
+  private ApplicationHistoryManager createApplicationHistoryManager(
       Configuration conf) {
     return new ApplicationHistoryManagerImpl();
   }
 
-  protected TimelineStore createTimelineStore(
+  private TimelineStore createTimelineStore(
       Configuration conf) {
     return ReflectionUtils.newInstance(conf.getClass(
         YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
         TimelineStore.class), conf);
   }
 
-  protected TimelineDelegationTokenSecretManagerService
+  private TimelineDelegationTokenSecretManagerService
       createTimelineDelegationTokenSecretManagerService(Configuration conf) {
     return new TimelineDelegationTokenSecretManagerService();
   }
 
-  protected TimelineACLsManager createTimelineACLsManager(Configuration conf) {
-    return new TimelineACLsManager(conf);
+  private TimelineDataManager createTimelineDataManager(Configuration conf) {
+    return new TimelineDataManager(
+        timelineStore, new TimelineACLsManager(conf));
   }
 
-  protected void startWebApp() {
+  private void startWebApp() {
     Configuration conf = getConfig();
     // Always load pseudo authentication filter to parse "user.name" in an URL
     // to identify a HTTP request's user in insecure mode.
@@ -183,23 +197,41 @@ public class ApplicationHistoryServer ex
     // the customized filter will be loaded by the timeline server to do Kerberos
     // + DT authentication.
     String initializers = conf.get("hadoop.http.filter.initializers");
+
     initializers =
-        initializers == null || initializers.length() == 0 ? "" : ","
-            + initializers;
-    if (!initializers.contains(
-        TimelineAuthenticationFilterInitializer.class.getName())) {
-      conf.set("hadoop.http.filter.initializers",
-          TimelineAuthenticationFilterInitializer.class.getName()
-              + initializers);
+        initializers == null || initializers.length() == 0 ? "" : initializers;
+
+    if (!initializers.contains(TimelineAuthenticationFilterInitializer.class
+      .getName())) {
+      initializers =
+          TimelineAuthenticationFilterInitializer.class.getName() + ","
+              + initializers;
+    }
+
+    String[] parts = initializers.split(",");
+    ArrayList<String> target = new ArrayList<String>();
+    for (String filterInitializer : parts) {
+      filterInitializer = filterInitializer.trim();
+      if (filterInitializer.equals(AuthenticationFilterInitializer.class
+        .getName())) {
+        continue;
+      }
+      target.add(filterInitializer);
+    }
+    String actualInitializers =
+        org.apache.commons.lang.StringUtils.join(target, ",");
+    if (!actualInitializers.equals(initializers)) {
+      conf.set("hadoop.http.filter.initializers", actualInitializers);
     }
-    String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
+    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+                          YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+                          WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
     LOG.info("Instantiating AHSWebApp at " + bindAddress);
     try {
       AHSWebApp ahsWebApp = AHSWebApp.getInstance();
       ahsWebApp.setApplicationHistoryManager(historyManager);
-      ahsWebApp.setTimelineStore(timelineStore);
       ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
-      ahsWebApp.setTimelineACLsManager(timelineACLsManager);
+      ahsWebApp.setTimelineDataManager(timelineDataManager);
       webApp =
           WebApps
             .$for("applicationhistory", ApplicationHistoryClientService.class,
@@ -211,14 +243,6 @@ public class ApplicationHistoryServer ex
       throw new YarnRuntimeException(msg, e);
     }
   }
-  /**
-   * @return ApplicationTimelineStore
-   */
-  @Private
-  @VisibleForTesting
-  public TimelineStore getTimelineStore() {
-    return timelineStore;
-  }
 
   private void doSecureLogin(Configuration conf) throws IOException {
     InetSocketAddress socAddr = getBindAddress(conf);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Wed Aug 20 01:34:29 2014
@@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.uti
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.server.api.ApplicationContext;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
 import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -36,9 +35,8 @@ import com.google.common.annotations.Vis
 public class AHSWebApp extends WebApp implements YarnWebParams {
 
   private ApplicationHistoryManager applicationHistoryManager;
-  private TimelineStore timelineStore;
   private TimelineDelegationTokenSecretManagerService secretManagerService;
-  private TimelineACLsManager timelineACLsManager;
+  private TimelineDataManager timelineDataManager;
 
   private static AHSWebApp instance = null;
 
@@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp im
     this.applicationHistoryManager = applicationHistoryManager;
   }
 
-  public TimelineStore getTimelineStore() {
-    return timelineStore;
-  }
-
-  public void setTimelineStore(TimelineStore timelineStore) {
-    this.timelineStore = timelineStore;
-  }
-
   public TimelineDelegationTokenSecretManagerService
       getTimelineDelegationTokenSecretManagerService() {
     return secretManagerService;
@@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp im
     this.secretManagerService = secretManagerService;
   }
 
-  public TimelineACLsManager getTimelineACLsManager() {
-    return timelineACLsManager;
+  public TimelineDataManager getTimelineDataManager() {
+    return timelineDataManager;
   }
 
-  public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) {
-    this.timelineACLsManager = timelineACLsManager;
+  public void setTimelineDataManager(TimelineDataManager timelineDataManager) {
+    this.timelineDataManager = timelineDataManager;
   }
 
   @Override
@@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp im
     bind(TimelineWebServices.class);
     bind(GenericExceptionHandler.class);
     bind(ApplicationContext.class).toInstance(applicationHistoryManager);
-    bind(TimelineStore.class).toInstance(timelineStore);
     bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
         secretManagerService);
-    bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+    bind(TimelineDataManager.class).toInstance(timelineDataManager);
     route("/", AHSController.class);
     route(pajoin("/apps", APP_STATE), AHSController.class);
     route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java Wed Aug 20 01:34:29 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ti
 
 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -60,8 +61,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
 import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.ReadOptions;
@@ -141,6 +146,11 @@ public class LeveldbTimelineStore extend
       "z".getBytes();
 
   private static final byte[] EMPTY_BYTES = new byte[0];
+  
+  private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
+  
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 0);
 
   @Private
   @VisibleForTesting
@@ -193,6 +203,7 @@ public class LeveldbTimelineStore extend
     }
     LOG.info("Using leveldb path " + dbPath);
     db = factory.open(new File(dbPath.toString()), options);
+    checkVersion();
     startTimeWriteCache =
         Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
             conf)));
@@ -1270,8 +1281,6 @@ public class LeveldbTimelineStore extend
             DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
   }
 
-  // warning is suppressed to prevent eclipse from noting unclosed resource
-  @SuppressWarnings("resource")
   @VisibleForTesting
   List<String> getEntityTypes() throws IOException {
     DBIterator iterator = null;
@@ -1489,4 +1498,65 @@ public class LeveldbTimelineStore extend
     readOptions.fillCache(fillCache);
     return db.iterator(readOptions);
   }
+  
+  Version loadVersion() throws IOException {
+    byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return Version.newInstance(1, 0);
+    }
+    Version version =
+        new VersionPBImpl(VersionProto.parseFrom(data));
+    return version;
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(Version state) throws IOException {
+    dbStoreVersion(state);
+  }
+  
+  private void dbStoreVersion(Version state) throws IOException {
+    String key = TIMELINE_STORE_VERSION_KEY;
+    byte[] data = 
+        ((VersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of TS-store is a major upgrade, and any
+   *    compatible change of TS-store is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade timeline store or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded timeline store version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing timeline store version info " + getCurrentVersion());
+      dbStoreVersion(CURRENT_VERSION_INFO);
+    } else {
+      String incompatibleMessage = 
+          "Incompatible version for timeline store: expecting version " 
+              + getCurrentVersion() + ", but loading version " + loadedVersion;
+      LOG.fatal(incompatibleMessage);
+      throw new IOException(incompatibleMessage);
+    }
+  }
+  
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Wed Aug 20 01:34:29 2014
@@ -18,14 +18,10 @@
 
 package org.apache.hadoop.yarn.server.timeline.webapp;
 
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -80,14 +73,11 @@ public class TimelineWebServices {
 
   private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
 
-  private TimelineStore store;
-  private TimelineACLsManager timelineACLsManager;
+  private TimelineDataManager timelineDataManager;
 
   @Inject
-  public TimelineWebServices(TimelineStore store,
-      TimelineACLsManager timelineACLsManager) {
-    this.store = store;
-    this.timelineACLsManager = timelineACLsManager;
+  public TimelineWebServices(TimelineDataManager timelineDataManager) {
+    this.timelineDataManager = timelineDataManager;
   }
 
   @XmlRootElement(name = "about")
@@ -148,61 +138,28 @@ public class TimelineWebServices {
       @QueryParam("limit") String limit,
       @QueryParam("fields") String fields) {
     init(res);
-    TimelineEntities entities = null;
     try {
-      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
-      boolean modified = extendFields(fieldEnums);
-      UserGroupInformation callerUGI = getUser(req);
-      entities = store.getEntities(
+      return timelineDataManager.getEntities(
           parseStr(entityType),
-          parseLongStr(limit),
+          parsePairStr(primaryFilter, ":"),
+          parsePairsStr(secondaryFilter, ",", ":"),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
           parseStr(fromId),
           parseLongStr(fromTs),
-          parsePairStr(primaryFilter, ":"),
-          parsePairsStr(secondaryFilter, ",", ":"),
-          fieldEnums);
-      if (entities != null) {
-        Iterator<TimelineEntity> entitiesItr =
-            entities.getEntities().iterator();
-        while (entitiesItr.hasNext()) {
-          TimelineEntity entity = entitiesItr.next();
-          try {
-            // check ACLs
-            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-              entitiesItr.remove();
-            } else {
-              // clean up system data
-              if (modified) {
-                entity.setPrimaryFilters(null);
-              } else {
-                cleanupOwnerInfo(entity);
-              }
-            }
-          } catch (YarnException e) {
-            LOG.error("Error when verifying access for user " + callerUGI
-                + " on the events of the timeline entity "
-                + new EntityIdentifier(entity.getEntityId(),
-                    entity.getEntityType()), e);
-            entitiesItr.remove();
-          }
-        }
-      }
+          parseLongStr(limit),
+          parseFieldsStr(fields, ","),
+          getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
           "windowStart, windowEnd or limit is not a numeric value.");
     } catch (IllegalArgumentException e) {
       throw new BadRequestException("requested invalid field.");
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entities", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
     }
-    if (entities == null) {
-      return new TimelineEntities();
-    }
-    return entities;
   }
 
   /**
@@ -220,33 +177,15 @@ public class TimelineWebServices {
     init(res);
     TimelineEntity entity = null;
     try {
-      EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
-      boolean modified = extendFields(fieldEnums);
-      entity =
-          store.getEntity(parseStr(entityId), parseStr(entityType),
-              fieldEnums);
-      if (entity != null) {
-        // check ACLs
-        UserGroupInformation callerUGI = getUser(req);
-        if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-          entity = null;
-        } else {
-          // clean up the system data
-          if (modified) {
-            entity.setPrimaryFilters(null);
-          } else {
-            cleanupOwnerInfo(entity);
-          }
-        }
-      }
+      entity = timelineDataManager.getEntity(
+          parseStr(entityType),
+          parseStr(entityId),
+          parseFieldsStr(fields, ","),
+          getUser(req));
     } catch (IllegalArgumentException e) {
       throw new BadRequestException(
           "requested invalid field.");
-    } catch (IOException e) {
-      LOG.error("Error getting entity", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    } catch (YarnException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entity", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -275,51 +214,23 @@ public class TimelineWebServices {
       @QueryParam("windowEnd") String windowEnd,
       @QueryParam("limit") String limit) {
     init(res);
-    TimelineEvents events = null;
     try {
-      UserGroupInformation callerUGI = getUser(req);
-      events = store.getEntityTimelines(
+      return timelineDataManager.getEvents(
           parseStr(entityType),
           parseArrayStr(entityId, ","),
-          parseLongStr(limit),
+          parseArrayStr(eventType, ","),
           parseLongStr(windowStart),
           parseLongStr(windowEnd),
-          parseArrayStr(eventType, ","));
-      if (events != null) {
-        Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
-            events.getAllEvents().iterator();
-        while (eventsItr.hasNext()) {
-          TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
-          try {
-            TimelineEntity entity = store.getEntity(
-                eventsOfOneEntity.getEntityId(),
-                eventsOfOneEntity.getEntityType(),
-                EnumSet.of(Field.PRIMARY_FILTERS));
-            // check ACLs
-            if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
-              eventsItr.remove();
-            }
-          } catch (Exception e) {
-            LOG.error("Error when verifying access for user " + callerUGI
-                + " on the events of the timeline entity "
-                + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
-                    eventsOfOneEntity.getEntityType()), e);
-            eventsItr.remove();
-          }
-        }
-      }
+          parseLongStr(limit),
+          getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
           "windowStart, windowEnd or limit is not a numeric value.");
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.error("Error getting entity timelines", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
     }
-    if (events == null) {
-      return new TimelineEvents();
-    }
-    return events;
   }
 
   /**
@@ -333,9 +244,6 @@ public class TimelineWebServices {
       @Context HttpServletResponse res,
       TimelineEntities entities) {
     init(res);
-    if (entities == null) {
-      return new TimelinePutResponse();
-    }
     UserGroupInformation callerUGI = getUser(req);
     if (callerUGI == null) {
       String msg = "The owner of the posted timeline entities is not set";
@@ -343,76 +251,8 @@ public class TimelineWebServices {
       throw new ForbiddenException(msg);
     }
     try {
-      List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
-      TimelineEntities entitiesToPut = new TimelineEntities();
-      List<TimelinePutResponse.TimelinePutError> errors =
-          new ArrayList<TimelinePutResponse.TimelinePutError>();
-      for (TimelineEntity entity : entities.getEntities()) {
-        EntityIdentifier entityID =
-            new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
-
-        // check if there is existing entity
-        TimelineEntity existingEntity = null;
-        try {
-          existingEntity =
-              store.getEntity(entityID.getId(), entityID.getType(),
-                  EnumSet.of(Field.PRIMARY_FILTERS));
-          if (existingEntity != null
-              && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
-            throw new YarnException("The timeline entity " + entityID
-                + " was not put by " + callerUGI + " before");
-          }
-        } catch (Exception e) {
-          // Skip the entity which already exists and was put by others
-          LOG.warn("Skip the timeline entity: " + entityID + ", because "
-              + e.getMessage());
-          TimelinePutResponse.TimelinePutError error =
-              new TimelinePutResponse.TimelinePutError();
-          error.setEntityId(entityID.getId());
-          error.setEntityType(entityID.getType());
-          error.setErrorCode(
-              TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
-          errors.add(error);
-          continue;
-        }
-
-        // inject owner information for the access check if this is the first
-        // time to post the entity, in case it's the admin who is updating
-        // the timeline data.
-        try {
-          if (existingEntity == null) {
-            injectOwnerInfo(entity, callerUGI.getShortUserName());
-          }
-        } catch (YarnException e) {
-          // Skip the entity which messes up the primary filter and record the
-          // error
-          LOG.warn("Skip the timeline entity: " + entityID + ", because "
-              + e.getMessage());
-          TimelinePutResponse.TimelinePutError error =
-              new TimelinePutResponse.TimelinePutError();
-          error.setEntityId(entityID.getId());
-          error.setEntityType(entityID.getType());
-          error.setErrorCode(
-              TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
-          errors.add(error);
-          continue;
-        }
-
-        entityIDs.add(entityID);
-        entitiesToPut.addEntity(entity);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
-              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
-      }
-      TimelinePutResponse response =  store.put(entitiesToPut);
-      // add the errors of timeline system filter key conflict
-      response.addErrors(errors);
-      return response;
-    } catch (IOException e) {
+      return timelineDataManager.postEntities(entities, callerUGI);
+    } catch (Exception e) {
       LOG.error("Error putting entities", e);
       throw new WebApplicationException(e,
           Response.Status.INTERNAL_SERVER_ERROR);
@@ -423,6 +263,15 @@ public class TimelineWebServices {
     response.setContentType(null);
   }
 
+  private static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
   private static SortedSet<String> parseArrayStr(String str, String delimiter) {
     if (str == null) {
       return null;
@@ -495,14 +344,6 @@ public class TimelineWebServices {
     }
   }
 
-  private static boolean extendFields(EnumSet<Field> fieldEnums) {
-    boolean modified = false;
-    if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
-      fieldEnums.add(Field.PRIMARY_FILTERS);
-      modified = true;
-    }
-    return modified;
-  }
   private static Long parseLongStr(String str) {
     return str == null ? null : Long.parseLong(str.trim());
   }
@@ -511,34 +352,4 @@ public class TimelineWebServices {
     return str == null ? null : str.trim();
   }
 
-  private static UserGroupInformation getUser(HttpServletRequest req) {
-    String remoteUser = req.getRemoteUser();
-    UserGroupInformation callerUGI = null;
-    if (remoteUser != null) {
-      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-    }
-    return callerUGI;
-  }
-
-  private static void injectOwnerInfo(TimelineEntity timelineEntity,
-      String owner) throws YarnException {
-    if (timelineEntity.getPrimaryFilters() != null &&
-        timelineEntity.getPrimaryFilters().containsKey(
-            TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
-      throw new YarnException(
-          "User should not use the timeline system filter key: "
-              + TimelineStore.SystemFilter.ENTITY_OWNER);
-    }
-    timelineEntity.addPrimaryFilter(
-        TimelineStore.SystemFilter.ENTITY_OWNER
-            .toString(), owner);
-  }
-
-  private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
-    if (timelineEntity.getPrimaryFilters() != null) {
-      timelineEntity.getPrimaryFilters().remove(
-          TimelineStore.SystemFilter.ENTITY_OWNER.toString());
-    }
-  }
-
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Wed Aug 20 01:34:29 2014
@@ -69,7 +69,7 @@ public class TestApplicationHistoryClien
     historyServer.init(config);
     historyServer.start();
     store =
-        ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory())
+        ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager())
           .getHistoryStore();
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java Wed Aug 20 01:34:29 2014
@@ -23,11 +23,14 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.fail;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestApplicationHistoryServer {
@@ -69,6 +72,31 @@ public class TestApplicationHistoryServe
     }
   }
 
+  @Test(timeout = 50000)
+  public void testFilteOverrides() throws Exception {
+
+    String[] filterInitializers =
+        {
+            AuthenticationFilterInitializer.class.getName(),
+            TimelineAuthenticationFilterInitializer.class.getName(),
+            AuthenticationFilterInitializer.class.getName() + ","
+                + TimelineAuthenticationFilterInitializer.class.getName(),
+            AuthenticationFilterInitializer.class.getName() + ", "
+                + TimelineAuthenticationFilterInitializer.class.getName() };
+    for (String filterInitializer : filterInitializers) {
+      historyServer = new ApplicationHistoryServer();
+      Configuration config = new YarnConfiguration();
+      config.set("hadoop.http.filter.initializers", filterInitializer);
+      historyServer.init(config);
+      historyServer.start();
+      Configuration tmp = historyServer.getConfig();
+      assertEquals(TimelineAuthenticationFilterInitializer.class.getName(),
+        tmp.get("hadoop.http.filter.initializers"));
+      historyServer.stop();
+      AHSWebApp.resetInstance();
+    }
+  }
+
   @After
   public void stop() {
     if (historyServer != null) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java Wed Aug 20 01:34:29 2014
@@ -36,14 +36,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.iq80.leveldb.DBIterator;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,19 +55,19 @@ import org.junit.Test;
 public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
   private FileContext fsContext;
   private File fsPath;
+  private Configuration config = new YarnConfiguration();
 
   @Before
   public void setup() throws Exception {
     fsContext = FileContext.getLocalFSFileContext();
-    Configuration conf = new YarnConfiguration();
     fsPath = new File("target", this.getClass().getSimpleName() +
         "-tmpDir").getAbsoluteFile();
     fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
-    conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+    config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
         fsPath.getAbsolutePath());
-    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
     store = new LeveldbTimelineStore();
-    store.init(conf);
+    store.init(config);
     store.start();
     loadTestData();
     loadVerificationData();
@@ -263,5 +266,47 @@ public class TestLeveldbTimelineStore ex
     assertEquals(1, getEntities("type_2").size());
     assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
   }
+  
+  @Test
+  public void testCheckVersion() throws IOException {
+    LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
+    // default version
+    Version defaultVersion = dbStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // compatible version
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    dbStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
+    restartTimelineStore();
+    dbStore = (LeveldbTimelineStore) store;
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // incompatible version
+    Version incompatibleVersion =
+      Version.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    dbStore.storeVersion(incompatibleVersion);
+    try {
+      restartTimelineStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for timeline store"));
+    }
+  }
+  
+  private void restartTimelineStore() throws IOException {
+    // need to close so leveldb releases database lock
+    if (store != null) {
+      store.close();
+    }
+    store = new LeveldbTimelineStore();
+    store.init(config);
+    store.start();
+  }
 
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Wed Aug 20 01:34:29 2014
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 
 import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.ws.rs.core.MediaType;
 
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AdminACLsManager;
 import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
@@ -88,14 +90,15 @@ public class TestTimelineWebServices ext
       } catch (Exception e) {
         Assert.fail();
       }
-      bind(TimelineStore.class).toInstance(store);
       Configuration conf = new YarnConfiguration();
       conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
       timelineACLsManager = new TimelineACLsManager(conf);
       conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
       conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
       adminACLsManager = new AdminACLsManager(conf);
-      bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+      TimelineDataManager timelineDataManager =
+          new TimelineDataManager(store, timelineACLsManager);
+      bind(TimelineDataManager.class).toInstance(timelineDataManager);
       serve("/*").with(GuiceContainer.class);
       TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
       FilterConfig filterConfig = mock(FilterConfig.class);
@@ -105,6 +108,8 @@ public class TestTimelineWebServices ext
           .thenReturn("simple");
       when(filterConfig.getInitParameter(
           PseudoAuthenticationHandler.ANONYMOUS_ALLOWED)).thenReturn("true");
+      ServletContext context = mock(ServletContext.class);
+      when(filterConfig.getServletContext()).thenReturn(context);
       Enumeration<Object> names = mock(Enumeration.class);
       when(names.hasMoreElements()).thenReturn(true, true, false);
       when(names.nextElement()).thenReturn(

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Wed Aug 20 01:34:29 2014
@@ -47,4 +47,10 @@ message NodeHealthStatusProto {
   optional bool is_node_healthy = 1;
   optional string health_report = 2;
   optional int64 last_health_report_time = 3;
-}
\ No newline at end of file
+}
+
+message VersionProto {
+  optional int32 major_version = 1;
+  optional int32 minor_version = 2;
+}
+

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Wed Aug 20 01:34:29 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -29,17 +30,18 @@ import java.util.concurrent.locks.Reentr
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -126,9 +128,76 @@ public abstract class ContainerExecutor 
   public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
       throws IOException, InterruptedException;
 
+  public abstract boolean isContainerProcessAlive(String user, String pid)
+      throws IOException;
+
+  /**
+   * Recover an already existing container. This is a blocking call and returns
+   * only when the container exits.  Note that the container must have been
+   * activated prior to this call.
+   * @param user the user of the container
+   * @param containerId The ID of the container to reacquire
+   * @return The exit code of the pre-existing container
+   * @throws IOException
+   */
+  public int reacquireContainer(String user, ContainerId containerId)
+      throws IOException {
+    Path pidPath = getPidFilePath(containerId);
+    if (pidPath == null) {
+      LOG.warn(containerId + " is not active, returning terminated error");
+      return ExitCode.TERMINATED.getExitCode();
+    }
+
+    String pid = null;
+    pid = ProcessIdFileReader.getProcessId(pidPath);
+    if (pid == null) {
+      throw new IOException("Unable to determine pid for " + containerId);
+    }
+
+    LOG.info("Reacquiring " + containerId + " with pid " + pid);
+    try {
+      while(isContainerProcessAlive(user, pid)) {
+        Thread.sleep(1000);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while waiting for process " + pid
+          + " to exit", e);
+    }
+
+    // wait for exit code file to appear
+    String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
+    File file = new File(exitCodeFile);
+    final int sleepMsec = 100;
+    int msecLeft = 2000;
+    while (!file.exists() && msecLeft >= 0) {
+      if (!isContainerActive(containerId)) {
+        LOG.info(containerId + " was deactivated");
+        return ExitCode.TERMINATED.getExitCode();
+      }
+      try {
+        Thread.sleep(sleepMsec);
+      } catch (InterruptedException e) {
+        throw new IOException(
+            "Interrupted while waiting for exit code from " + containerId, e);
+      }
+      msecLeft -= sleepMsec;
+    }
+    if (msecLeft < 0) {
+      throw new IOException("Timeout while waiting for exit code from "
+          + containerId);
+    }
+
+    try {
+      return Integer.parseInt(FileUtils.readFileToString(file).trim());
+    } catch (NumberFormatException e) {
+      throw new IOException("Error parsing exit code from pid " + pid, e);
+    }
+  }
+
   public enum ExitCode {
     FORCE_KILLED(137),
-    TERMINATED(143);
+    TERMINATED(143),
+    LOST(154);
     private final int code;
 
     private ExitCode(int exitCode) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Wed Aug 20 01:34:29 2014
@@ -273,25 +273,57 @@ public class DefaultContainerExecutor ex
 
   private final class UnixLocalWrapperScriptBuilder
       extends LocalWrapperScriptBuilder {
+    private final Path sessionScriptPath;
 
     public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
       super(containerWorkDir);
+      this.sessionScriptPath = new Path(containerWorkDir,
+          Shell.appendScriptExtension("default_container_executor_session"));
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
+        throws IOException {
+      writeSessionScript(launchDst, pidFile);
+      super.writeLocalWrapperScript(launchDst, pidFile);
     }
 
     @Override
     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
         PrintStream pout) {
-
-      // We need to do a move as writing to a file is not atomic
-      // Process reading a file being written to may get garbled data
-      // hence write pid to tmp file first followed by a mv
+      String exitCodeFile = ContainerLaunch.getExitCodeFile(
+          pidFile.toString());
+      String tmpFile = exitCodeFile + ".tmp";
       pout.println("#!/bin/bash");
-      pout.println();
-      pout.println("echo $$ > " + pidFile.toString() + ".tmp");
-      pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
-      String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
-      pout.println(exec + " /bin/bash \"" +
-        launchDst.toUri().getPath().toString() + "\"");
+      pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
+      pout.println("rc=$?");
+      pout.println("echo $rc > \"" + tmpFile + "\"");
+      pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
+      pout.println("exit $rc");
+    }
+
+    private void writeSessionScript(Path launchDst, Path pidFile)
+        throws IOException {
+      DataOutputStream out = null;
+      PrintStream pout = null;
+      try {
+        out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
+        pout = new PrintStream(out);
+        // We need to do a move as writing to a file is not atomic
+        // Process reading a file being written to may get garbled data
+        // hence write pid to tmp file first followed by a mv
+        pout.println("#!/bin/bash");
+        pout.println();
+        pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+        pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+        String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
+        pout.println(exec + " /bin/bash \"" +
+            launchDst.toUri().getPath().toString() + "\"");
+      } finally {
+        IOUtils.cleanup(LOG, pout, out);
+      }
+      lfs.setPermission(sessionScriptPath,
+          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
     }
   }
 
@@ -310,6 +342,7 @@ public class DefaultContainerExecutor ex
     @Override
     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
         PrintStream pout) {
+      // TODO: exit code script for Windows
 
       // On Windows, the pid is the container ID, so that it can also serve as
       // the name of the job object created by winutils for task management.
@@ -342,6 +375,12 @@ public class DefaultContainerExecutor ex
     return true;
   }
 
+  @Override
+  public boolean isContainerProcessAlive(String user, String pid)
+      throws IOException {
+    return containerIsAlive(pid);
+  }
+
   /**
    * Returns true if the process with the specified pid is alive.
    * 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Wed Aug 20 01:34:29 2014
@@ -403,6 +403,13 @@ public class LinuxContainerExecutor exte
     }
   }
   
+  @Override
+  public boolean isContainerProcessAlive(String user, String pid)
+      throws IOException {
+    // Send a test signal to the process as the user to see if it's alive
+    return signalContainer(user, pid, Signal.NULL);
+  }
+
   public void mountCgroups(List<String> cgroupKVs, String hierarchy)
          throws IOException {
     List<String> command = new ArrayList<String>(

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Wed Aug 20 01:34:29 2014
@@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record
 
 public interface NodeStatusUpdater extends Service {
 
+  /**
+   * Schedule a heartbeat to the ResourceManager outside of the normal,
+   * periodic heartbeating process. This is typically called when the state
+   * of containers on the node has changed to notify the RM sooner.
+   */
   void sendOutofBandHeartBeat();
 
+  /**
+   * Get the ResourceManager identifier received during registration
+   * @return the ResourceManager ID
+   */
   long getRMIdentifier();
   
+  /**
+   * Query if a container has recently completed
+   * @param containerId the container ID
+   * @return true if the container has recently completed
+   */
   public boolean isContainerRecentlyStopped(ContainerId containerId);
   
+  /**
+   * Add a container to the list of containers that have recently completed
+   * @param containerId the ID of the completed container
+   */
+  public void addCompletedContainer(ContainerId containerId);
+
+  /**
+   * Clear the list of recently completed containers
+   */
   public void clearFinishedContainersFromCache();
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Aug 20 01:34:29 2014
@@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
-        updateStoppedContainersInCache(container.getContainerId());
-        addCompletedContainer(container);
+        addCompletedContainer(container.getContainerId());
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -393,8 +392,7 @@ public class NodeStatusUpdaterImpl exten
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
-        updateStoppedContainersInCache(container.getContainerId());
-        addCompletedContainer(container);
+        addCompletedContainer(container.getContainerId());
       }
     }
     LOG.info("Sending out " + containerStatuses.size()
@@ -402,9 +400,15 @@ public class NodeStatusUpdaterImpl exten
     return containerStatuses;
   }
 
-  private void addCompletedContainer(Container container) {
+  @Override
+  public void addCompletedContainer(ContainerId containerId) {
     synchronized (previousCompletedContainers) {
-      previousCompletedContainers.add(container.getContainerId());
+      previousCompletedContainers.add(containerId);
+    }
+    synchronized (recentlyStoppedContainers) {
+      removeVeryOldStoppedContainersFromCache();
+      recentlyStoppedContainers.put(containerId,
+        System.currentTimeMillis() + durationToTrackStoppedContainers);
     }
   }
 
@@ -451,16 +455,6 @@ public class NodeStatusUpdaterImpl exten
     }
   }
   
-  @Private
-  @VisibleForTesting
-  public void updateStoppedContainersInCache(ContainerId containerId) {
-    synchronized (recentlyStoppedContainers) {
-      removeVeryOldStoppedContainersFromCache();
-      recentlyStoppedContainers.put(containerId,
-        System.currentTimeMillis() + durationToTrackStoppedContainers);
-    }
-  }
-  
   @Override
   public void clearFinishedContainersFromCache() {
     synchronized (recentlyStoppedContainers) {
@@ -476,8 +470,14 @@ public class NodeStatusUpdaterImpl exten
       Iterator<ContainerId> i =
           recentlyStoppedContainers.keySet().iterator();
       while (i.hasNext()) {
-        if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+        ContainerId cid = i.next();
+        if (recentlyStoppedContainers.get(cid) < currentTime) {
           i.remove();
+          try {
+            context.getNMStateStore().removeContainer(cid);
+          } catch (IOException e) {
+            LOG.error("Unable to remove container " + cid + " in store", e);
+          }
         } else {
           break;
         }



Mime
View raw message