ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [06/50] [abbrv] incubator-ignite git commit: #YARN Code cleanup. Added tests.
Date Thu, 16 Jul 2015 09:28:29 GMT
#YARN Code cleanup. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960e19dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960e19dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960e19dd

Branch: refs/heads/ignite-gg-9615
Commit: 960e19dda15d58ddc403a8e6856d0eb19d7794c1
Parents: 858d2a3
Author: nikolay tikhonov <ntikhonov@gridgain.com>
Authored: Tue Jun 9 16:38:07 2015 +0300
Committer: nikolay tikhonov <ntikhonov@gridgain.com>
Committed: Tue Jun 9 16:38:07 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/yarn/ApplicationMaster.java   |  89 +++--
 .../apache/ignite/yarn/ClusterProperties.java   |  53 +--
 .../apache/ignite/yarn/IgniteYarnClient.java    |  30 +-
 .../org/apache/ignite/IgniteMesosTestSuite.java |  38 ---
 .../org/apache/ignite/IgniteYarnTestSuite.java  |  38 +++
 .../yarn/IgniteApplicationMasterSelfTest.java   | 324 +++++++++++++++++++
 .../ignite/yarn/IgniteSchedulerSelfTest.java    |  29 --
 7 files changed, 460 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 3bf0521..c552ea0 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -47,27 +47,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
     public static final String DELIM = ",";
 
     /** */
+    private long schedulerTimeout = TimeUnit.SECONDS.toMillis(1);
+
+    /** Yarn configuration. */
     private YarnConfiguration conf;
 
-    /** */
+    /** Cluster properties. */
     private ClusterProperties props;
 
-    /** */
+    /** Network manager. */
     private NMClient nmClient;
 
-    /** */
-    AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
+    /** Resource manager. */
+    private AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
 
-    /** */
+    /** Ignite path. */
     private Path ignitePath;
 
-    /** */
+    /** Config path. */
     private Path cfgPath;
 
-    /** */
+    /** Hadoop file system. */
     private FileSystem fs;
 
-    /** */
+    /** Running containers. */
     private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>();
 
     /**
@@ -76,13 +79,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
     public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception
{
         this.conf = new YarnConfiguration();
         this.props = props;
-        this.fs = FileSystem.get(conf);
         this.ignitePath = new Path(ignitePath);
-
-        nmClient = NMClient.createNMClient();
-
-        nmClient.init(conf);
-        nmClient.start();
     }
 
     /** {@inheritDoc} */
@@ -103,11 +100,16 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
                     resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
                     resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath,
fs, LocalResourceType.FILE));
 
+                    if (props.userLibs() != null)
+                        resources.put("libs", IgniteYarnUtils.setupFile(new Path(props.userLibs()),
fs,
+                            LocalResourceType.FILE));
+
                     ctx.setLocalResources(resources);
 
                     ctx.setCommands(
                         Collections.singletonList(
-                            "./ignite/*/bin/ignite.sh "
+                            "cp -r ./libs/* ./ignite/*/libs/ || true && "
+                            + "./ignite/*/bin/ignite.sh "
                             + "./ignite-config.xml"
                             + " -J-Xmx" + c.getResource().getMemory() + "m"
                             + " -J-Xms" + c.getResource().getMemory() + "m"
@@ -153,7 +155,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
         // Check that slave satisfies min requirements.
         if (cont.getResource().getVirtualCores() < props.cpusPerNode()
             || cont.getResource().getMemory() < props.memoryPerNode()) {
-            //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+            log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0},
cpu: {1}, mem: {2}",
+                new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
+                   cont.getResource().getMemory()});
 
             return false;
         }
@@ -185,7 +189,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
         for (ContainerStatus status : statuses) {
             containers.remove(status.getContainerId());
 
-            //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+            log.log(Level.INFO, "Container stopped. Container id: {0}. State: {1}.",
+                new Object[]{status.getContainerId(), status.getState()});
         }
     }
 
@@ -243,9 +248,6 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
      * @throws Exception If failed.
      */
     public void run() throws Exception {
-        rmClient.init(conf);
-        rmClient.start();
-
         // Register with ResourceManager
         rmClient.registerApplicationMaster("", 0, "");
 
@@ -260,7 +262,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
             while (!nmClient.isInState(Service.STATE.STOPPED)) {
                 int runningCnt = containers.size();
 
-                if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources()))
{
+                if (runningCnt < props.instances() && checkAvailableResource())
{
                     // Resource requirements for worker containers.
                     Resource capability = Records.newRecord(Resource.class);
 
@@ -279,7 +281,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
                     }
                 }
 
-                TimeUnit.SECONDS.sleep(5);
+                TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
             }
         }
         catch (Exception e) {
@@ -294,10 +296,11 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
     }
 
     /**
-     * @param availableRes Available resources.
      * @return {@code True} if cluster contains available resources.
      */
-    private boolean checkAvailableResource(Resource availableRes) {
+    private boolean checkAvailableResource() {
+        Resource availableRes = rmClient.getAvailableResources();
+
         return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
             && availableRes.getVirtualCores() >= props.cpusPerNode();
     }
@@ -306,10 +309,17 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
      * @throws IOException
      */
     public void init() throws IOException {
+        fs = FileSystem.get(conf);
+
+        nmClient = NMClient.createNMClient();
+
+        nmClient.init(conf);
+        nmClient.start();
+
         // Create async application master.
         rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
 
-        if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
+        if (props.igniteCfg() == null || props.igniteCfg().isEmpty()) {
             InputStream input = Thread.currentThread().getContextClassLoader()
                 .getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
 
@@ -325,6 +335,33 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler
{
             IOUtils.closeQuietly(outputStream);
         }
         else
-            cfgPath = new Path(props.igniteConfigUrl());
+            cfgPath = new Path(props.igniteCfg());
+    }
+
+    /**
+     * Sets NMClient.
+     *
+     * @param nmClient NMClient.
+     */
+    public void setNmClient(NMClient nmClient) {
+        this.nmClient = nmClient;
+    }
+
+    /**
+     * Sets RMClient
+     *
+     * @param rmClient AMRMClientAsync.
+     */
+    public void setRmClient(AMRMClientAsync<AMRMClient.ContainerRequest> rmClient)
{
+        this.rmClient = rmClient;
+    }
+
+    /**
+     * Sets scheduler timeout.
+     *
+     * @param schedulerTimeout Scheduler timeout.
+     */
+    public void setSchedulerTimeout(long schedulerTimeout) {
+        this.schedulerTimeout = schedulerTimeout;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index f9fdb59..d021d45 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -59,8 +59,11 @@ public class ClusterProperties {
     /** */
     public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
 
+    /** */
+    public static final int DEFAULT_IGNITE_NODE_COUNT = 3;
+
     /** Node count limit. */
-    private double nodeCnt = 3;
+    private double nodeCnt = DEFAULT_IGNITE_NODE_COUNT;
 
     /** */
     public static final String IGNITE_VERSION = "IGNITE_VERSION";
@@ -105,24 +108,12 @@ public class ClusterProperties {
     private String userLibs = null;
 
     /** */
-    public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL";
-
-    /** URL to users libs. */
-    private String userLibsUrl = null;
-
-    /** */
     public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG";
 
     /** Ignite config. */
     private String igniteCfg = null;
 
     /** */
-    public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL";
-
-    /** Url to ignite config. */
-    private String igniteCfgUrl = null;
-
-    /** */
     public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT";
 
     /** Url to ignite config. */
@@ -179,6 +170,13 @@ public class ClusterProperties {
     }
 
     /**
+     * Sets instance count limit.
+     */
+    public void instances(int nodeCnt) {
+        this.nodeCnt = nodeCnt;
+    }
+
+    /**
      * Sets hostname constraint.
      *
      * @param pattern Hostname pattern.
@@ -230,20 +228,6 @@ public class ClusterProperties {
     }
 
     /**
-     * @return Url to ignite configuration.
-     */
-    public String igniteConfigUrl() {
-        return igniteCfgUrl;
-    }
-
-    /**
-     * @return Url to users libs configuration.
-     */
-    public String usersLibsUrl() {
-        return userLibsUrl;
-    }
-
-    /**
      * @return Host name constraint.
      */
     public Pattern hostnameConstraint() {
@@ -268,15 +252,14 @@ public class ClusterProperties {
 
             prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
 
-            prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
-            prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
-
             prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0);
             prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0);
             prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0);
 
             prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
             prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+            prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+            prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
             prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
             prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
 
@@ -306,15 +289,14 @@ public class ClusterProperties {
 
         prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
 
-        prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null);
-        prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null);
-
         prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0);
         prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0);
         prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0);
 
         prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION);
         prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
+        prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, null, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+        prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, null, DEFAULT_IGNITE_RELEASES_DIR);
         prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
         prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
 
@@ -342,15 +324,14 @@ public class ClusterProperties {
 
         envs.put(IGNITE_CLUSTER_NAME, toEnvVal(clusterName));
 
-        envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
-        envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
-
         envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
         envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
         envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
 
         envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
         envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
+        envs.put(IGNITE_LOCAL_WORK_DIR, toEnvVal(igniteLocalWorkDir));
+        envs.put(IGNITE_RELEASES_DIR, toEnvVal(igniteReleasesDir));
         envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg));
         envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index f74890d..764e717 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -39,9 +39,9 @@ public class IgniteYarnClient {
     public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName());
 
     /**
-     * Main methods has only one optional parameter - path to properties file.
+     * Main methods has one mandatory parameter and one optional parameter.
      *
-     * @param args Args.
+     * @param args Path to jar mandatory parameter and property file is optional.
      */
     public static void main(String[] args) throws Exception {
         checkArguments(args);
@@ -107,24 +107,27 @@ public class IgniteYarnClient {
 
         yarnClient.submitApplication(appContext);
 
-        log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId);
+        log.log(Level.INFO, "Submitted application. Application id: {0}", appId);
 
         ApplicationReport appReport = yarnClient.getApplicationReport(appId);
         YarnApplicationState appState = appReport.getYarnApplicationState();
 
-        while (appState != YarnApplicationState.FINISHED &&
-                appState != YarnApplicationState.KILLED &&
-                appState != YarnApplicationState.FAILED) {
+        while (appState == YarnApplicationState.NEW ||
+            appState == YarnApplicationState.NEW_SAVING ||
+            appState == YarnApplicationState.SUBMITTED ||
+            appState == YarnApplicationState.ACCEPTED) {
             TimeUnit.SECONDS.sleep(1L);
 
             appReport = yarnClient.getApplicationReport(appId);
 
+            if (appState != YarnApplicationState.ACCEPTED
+                && appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
+                log.log(Level.INFO, "Application {0} is ACCEPTED.", appId);
+
             appState = appReport.getYarnApplicationState();
         }
 
-        yarnClient.killApplication(appId);
-
-        log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId,
appState});
+        log.log(Level.INFO, "Application {0} is {1}.", new Object[]{appId, appState});
     }
 
     /**
@@ -134,7 +137,7 @@ public class IgniteYarnClient {
      */
     private static void checkArguments(String[] args) {
         if (args.length < 1)
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException("Invalid arguments.");
     }
 
     /**
@@ -146,11 +149,14 @@ public class IgniteYarnClient {
     private static Path getIgnite(ClusterProperties props, FileSystem fileSystem) throws
Exception {
         IgniteProvider provider = new IgniteProvider(props, fileSystem);
 
-        return provider.getIgnite();
+        if (props.igniteVer() == null
+            || props.igniteVer().equalsIgnoreCase(ClusterProperties.DEFAULT_IGNITE_VERSION))
+            return provider.getIgnite();
+        else
+            return provider.getIgnite(props.igniteVer());
     }
 
     /**
-     *
      * @param envs Environment variables.
      * @param conf Yarn configuration.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
deleted file mode 100644
index e6920b3..0000000
--- a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import junit.framework.*;
-import org.apache.ignite.yarn.*;
-
-/**
- * Apache Mesos integration tests.
- */
-public class IgniteMesosTestSuite extends TestSuite {
-    /**
-     * @return Test suite.
-     * @throws Exception Thrown in case of the failure.
-     */
-    public static TestSuite suite() throws Exception {
-        TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite");
-
-        suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class));
-
-        return suite;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
new file mode 100644
index 0000000..aa31774
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import junit.framework.*;
+import org.apache.ignite.yarn.*;
+
+/**
+ * Apache Hadoop Yarn integration tests.
+ */
+public class IgniteYarnTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Apache Yarn Integration Test Suite");
+
+        suite.addTest(new TestSuite(IgniteApplicationMasterSelfTest.class));
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
new file mode 100644
index 0000000..d865659
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import junit.framework.*;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.*;
+import org.apache.hadoop.yarn.client.api.async.*;
+import org.apache.hadoop.yarn.exceptions.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Application master tests.
+ */
+public class IgniteApplicationMasterSelfTest extends TestCase {
+    /** */
+    private ApplicationMaster appMaster;
+
+    /** */
+    private ClusterProperties props;
+
+    /** */
+    private RMMock rmMock = new RMMock();
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Override protected void setUp() throws Exception {
+        super.setUp();
+
+        props = new ClusterProperties();
+        appMaster = new ApplicationMaster("test", props);
+
+        appMaster.setSchedulerTimeout(100000);
+
+        rmMock.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContainerAllocate() throws Exception {
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(new NMMock());
+
+        props.cpusPerNode(2);
+        props.memoryPerNode(1024);
+        props.instances(3);
+
+        Thread thread = runAppMaster(appMaster);
+
+        List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 2,
1000);
+
+        interruptedThread(thread);
+
+        assertEquals(3, contRequests.size());
+
+        for (AMRMClient.ContainerRequest req : contRequests) {
+            assertEquals(2, req.getCapability().getVirtualCores());
+            assertEquals(1024, req.getCapability().getMemory());
+        }
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClusterResource() throws Exception {
+        rmMock.availableRes(new MockResource(1024, 2));
+
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(new NMMock());
+
+        props.cpusPerNode(8);
+        props.memoryPerNode(10240);
+        props.instances(3);
+
+        Thread thread = runAppMaster(appMaster);
+
+        interruptedThread(thread);
+
+        List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1,
1000);
+
+        assertEquals(0, contRequests.size());
+    }
+
+    /**
+     * @param rmMock RM mock.
+     * @param expectedCnt Expected cnt.
+     * @param timeOut Timeout.
+     * @return Requests.
+     */
+    private List<AMRMClient.ContainerRequest> collectRequests(RMMock rmMock, int expectedCnt,
int timeOut) {
+        long startTime = System.currentTimeMillis();
+
+        List<AMRMClient.ContainerRequest> requests = rmMock.requests();
+
+        while (requests.size() < expectedCnt
+           && (System.currentTimeMillis() - startTime) < timeOut)
+            requests = rmMock.requests();
+
+        return requests;
+    }
+
+    /**
+     * Runs appMaster other thread.
+     *
+     * @param appMaster Application master.
+     * @return Thread.
+     */
+    private static Thread runAppMaster(final ApplicationMaster appMaster) {
+        Thread thread = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    appMaster.run();
+                }
+                catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        thread.start();
+
+        return thread;
+    }
+
+    /**
+     * Interrupt thread and wait.
+     *
+     * @param thread Thread.
+     */
+    private static void interruptedThread(Thread thread) throws InterruptedException {
+        thread.interrupt();
+
+        thread.join();
+    }
+
+    /**
+     * Resource manager mock.
+     */
+    private static class RMMock extends AMRMClientAsync {
+        /** */
+        private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>();
+
+        /** */
+        private Resource availableRes;
+
+        /** */
+        public RMMock() {
+            super(0, null);
+        }
+
+        /**
+         * @return Requests.
+         */
+        public List<AMRMClient.ContainerRequest> requests() {
+            return contRequests;
+        }
+
+        /**
+         * Sets resource.
+         *
+         * @param availableRes Available resource.
+         */
+        public void availableRes(Resource availableRes) {
+            this.availableRes = availableRes;
+        }
+
+        /**
+         * Clear internal state.
+         */
+        public void clear() {
+            contRequests.clear();
+            availableRes = null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<? extends Collection> getMatchingRequests(Priority priority,
String resourceName,
+            Resource capability) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RegisterApplicationMasterResponse registerApplicationMaster(String
appHostName,
+            int appHostPort, String appTrackingUrl) throws YarnException, IOException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage,
+            String appTrackingUrl) throws YarnException, IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void addContainerRequest(AMRMClient.ContainerRequest req) {
+            contRequests.add(req);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void releaseAssignedContainer(ContainerId containerId) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Resource getAvailableResources() {
+            return availableRes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getClusterNodeCount() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void updateBlacklist(List blacklistAdditions, List blacklistRemovals)
{
+            // No-op.
+        }
+    }
+
+    /**
+     * Network manager mock.
+     */
+    public static class NMMock extends NMClient {
+        /** */
+        protected NMMock() {
+            super("name");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, ByteBuffer> startContainer(Container container,
+            ContainerLaunchContext containerLaunchContext) throws YarnException, IOException
{
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws
YarnException, IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public ContainerStatus getContainerStatus(ContainerId containerId, NodeId
nodeId)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cleanupRunningContainersOnStop(boolean enabled) {
+            // No-op.
+        }
+    }
+
+    /**
+     * Resource.
+     */
+    public static class MockResource extends Resource {
+        /** Memory. */
+        private int mem;
+
+        /** CPU. */
+        private int cpu;
+
+        /**
+         * @param mem Memory.
+         * @param cpu CPU.
+         */
+        public MockResource(int mem, int cpu) {
+            this.mem = mem;
+            this.cpu = cpu;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getMemory() {
+            return mem;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setMemory(int memory) {
+            this.mem = memory;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getVirtualCores() {
+            return cpu;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setVirtualCores(int vCores) {
+            this.cpu = vCores;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Resource resource) {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
deleted file mode 100644
index 04d3492..0000000
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yarn;
-
-import junit.framework.*;
-
-/**
- * Scheduler tests.
- */
-public class IgniteSchedulerSelfTest extends TestCase {
-    public void testName() throws Exception {
-
-    }
-}


Mime
View raw message