ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anovi...@apache.org
Subject [02/50] [abbrv] incubator-ignite git commit: #IGNITE-857 Removed Grizzly dependecy. Added tests.
Date Mon, 01 Jun 2015 02:56:53 GMT
#IGNITE-857 Removed Grizzly dependecy. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/639c9128
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/639c9128
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/639c9128

Branch: refs/heads/ignite-843
Commit: 639c912870dccde404d99407c445dfe723b2bdba
Parents: 5e3bcb2
Author: nikolay tikhonov <ntikhonov@gridgain.com>
Authored: Tue May 26 16:05:25 2015 +0300
Committer: nikolay tikhonov <ntikhonov@gridgain.com>
Committed: Tue May 26 16:05:25 2015 +0300

----------------------------------------------------------------------
 modules/mesos/licenses/jetty-epl-license.txt    |  69 ++++++
 modules/mesos/pom.xml                           |  19 +-
 .../apache/ignite/mesos/ClusterProperties.java  |  37 ++-
 .../apache/ignite/mesos/IgniteFramework.java    |  16 +-
 .../apache/ignite/mesos/IgniteScheduler.java    | 107 ++++----
 .../org/apache/ignite/mesos/IgniteTask.java     |   8 +
 .../ignite/mesos/resource/IgniteProvider.java   |   6 +-
 .../ignite/mesos/resource/JettyServer.java      |  61 +++++
 .../mesos/resource/ResourceController.java      | 127 ----------
 .../ignite/mesos/resource/ResourceHandler.java  | 142 +++++++++++
 .../ignite/mesos/resource/ResourceProvider.java |   2 +-
 .../main/resources/ignite-default-config.xml    |   6 +-
 .../ignite/mesos/IgniteSchedulerSelfTest.java   | 242 ++++++++++++++++++-
 13 files changed, 636 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/licenses/jetty-epl-license.txt
----------------------------------------------------------------------
diff --git a/modules/mesos/licenses/jetty-epl-license.txt b/modules/mesos/licenses/jetty-epl-license.txt
new file mode 100644
index 0000000..f5f0c89
--- /dev/null
+++ b/modules/mesos/licenses/jetty-epl-license.txt
@@ -0,0 +1,69 @@
+Eclipse Public License, Version 1.0 (EPL-1.0)
+(plain text)
+THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+1. DEFINITIONS
+
+"Contribution" means:
+
+a) in the case of the initial Contributor, the initial code and documentation distributed under this Agreement, and
+b) in the case of each subsequent Contributor:
+i) changes to the Program, and
+ii) additions to the Program;
+where such changes and/or additions to the Program originate from and are distributed by that particular Contributor. A Contribution 'originates' from a Contributor if it was added to the Program by such Contributor itself or anyone acting on such Contributor's behalf. Contributions do not include additions to the Program which: (i) are separate modules of software distributed in conjunction with the Program under their own license agreement, and (ii) are not derivative works of the Program.
+"Contributor" means any person or entity that distributes the Program.
+
+"Licensed Patents " mean patent claims licensable by a Contributor which are necessarily infringed by the use or sale of its Contribution alone or when combined with the Program.
+
+"Program" means the Contributions distributed in accordance with this Agreement.
+
+"Recipient" means anyone who receives the Program under this Agreement, including all Contributors.
+
+2. GRANT OF RIGHTS
+
+a) Subject to the terms of this Agreement, each Contributor hereby grants Recipient a non-exclusive, worldwide, royalty-free copyright license to reproduce, prepare derivative works of, publicly display, publicly perform, distribute and sublicense the Contribution of such Contributor, if any, and such derivative works, in source code and object code form.
+b) Subject to the terms of this Agreement, each Contributor hereby grants Recipient a non-exclusive, worldwide, royalty-free patent license under Licensed Patents to make, use, sell, offer to sell, import and otherwise transfer the Contribution of such Contributor, if any, in source code and object code form. This patent license shall apply to the combination of the Contribution and the Program if, at the time the Contribution is added by the Contributor, such addition of the Contribution causes such combination to be covered by the Licensed Patents. The patent license shall not apply to any other combinations which include the Contribution. No hardware per se is licensed hereunder.
+c) Recipient understands that although each Contributor grants the licenses to its Contributions set forth herein, no assurances are provided by any Contributor that the Program does not infringe the patent or other intellectual property rights of any other entity. Each Contributor disclaims any liability to Recipient for claims brought by any other entity based on infringement of intellectual property rights or otherwise. As a condition to exercising the rights and licenses granted hereunder, each Recipient hereby assumes sole responsibility to secure any other intellectual property rights needed, if any. For example, if a third party patent license is required to allow Recipient to distribute the Program, it is Recipient's responsibility to acquire that license before distributing the Program.
+d) Each Contributor represents that to its knowledge it has sufficient copyright rights in its Contribution, if any, to grant the copyright license set forth in this Agreement.
+3. REQUIREMENTS
+
+A Contributor may choose to distribute the Program in object code form under its own license agreement, provided that:
+
+a) it complies with the terms and conditions of this Agreement; and
+b) its license agreement:
+i) effectively disclaims on behalf of all Contributors all warranties and conditions, express and implied, including warranties or conditions of title and non-infringement, and implied warranties or conditions of merchantability and fitness for a particular purpose;
+ii) effectively excludes on behalf of all Contributors all liability for damages, including direct, indirect, special, incidental and consequential damages, such as lost profits;
+iii) states that any provisions which differ from this Agreement are offered by that Contributor alone and not by any other party; and
+iv) states that source code for the Program is available from such Contributor, and informs licensees how to obtain it in a reasonable manner on or through a medium customarily used for software exchange.
+When the Program is made available in source code form:
+
+a) it must be made available under this Agreement; and
+b) a copy of this Agreement must be included with each copy of the Program.
+Contributors may not remove or alter any copyright notices contained within the Program.
+Each Contributor must identify itself as the originator of its Contribution, if any, in a manner that reasonably allows subsequent Recipients to identify the originator of the Contribution.
+
+4. COMMERCIAL DISTRIBUTION
+
+Commercial distributors of software may accept certain responsibilities with respect to end users, business partners and the like. While this license is intended to facilitate the commercial use of the Program, the Contributor who includes the Program in a commercial product offering should do so in a manner which does not create potential liability for other Contributors. Therefore, if a Contributor includes the Program in a commercial product offering, such Contributor ("Commercial Contributor") hereby agrees to defend and indemnify every other Contributor ("Indemnified Contributor") against any losses, damages and costs (collectively "Losses") arising from claims, lawsuits and other legal actions brought by a third party against the Indemnified Contributor to the extent caused by the acts or omissions of such Commercial Contributor in connection with its distribution of the Program in a commercial product offering. The obligations in this section do not apply to any claims or Los
 ses relating to any actual or alleged intellectual property infringement. In order to qualify, an Indemnified Contributor must: a) promptly notify the Commercial Contributor in writing of such claim, and b) allow the Commercial Contributor to control, and cooperate with the Commercial Contributor in, the defense and any related settlement negotiations. The Indemnified Contributor may participate in any such claim at its own expense.
+
+For example, a Contributor might include the Program in a commercial product offering, Product X. That Contributor is then a Commercial Contributor. If that Commercial Contributor then makes performance claims, or offers warranties related to Product X, those performance claims and warranties are such Commercial Contributor's responsibility alone. Under this section, the Commercial Contributor would have to defend claims against the other Contributors related to those performance claims and warranties, and if a court requires any other Contributor to pay any damages as a result, the Commercial Contributor must pay those damages.
+
+5. NO WARRANTY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely responsible for determining the appropriateness of using and distributing the Program and assumes all risks associated with its exercise of rights under this Agreement , including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and unavailability or interruption of operations.
+
+6. DISCLAIMER OF LIABILITY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
+
+7. GENERAL
+
+If any provision of this Agreement is invalid or unenforceable under applicable law, it shall not affect the validity or enforceability of the remainder of the terms of this Agreement, and without further action by the parties hereto, such provision shall be reformed to the minimum extent necessary to make such provision valid and enforceable.
+
+If Recipient institutes patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Program itself (excluding combinations of the Program with other software or hardware) infringes such Recipient's patent(s), then such Recipient's rights granted under Section 2(b) shall terminate as of the date such litigation is filed.
+
+All Recipient's rights under this Agreement shall terminate if it fails to comply with any of the material terms or conditions of this Agreement and does not cure such failure in a reasonable period of time after becoming aware of such noncompliance. If all Recipient's rights under this Agreement terminate, Recipient agrees to cease use and distribution of the Program as soon as reasonably practicable. However, Recipient's obligations under this Agreement and any licenses granted by Recipient relating to the Program shall continue and survive.
+
+Everyone is permitted to copy and distribute copies of this Agreement, but in order to avoid inconsistency the Agreement is copyrighted and may only be modified in the following manner. The Agreement Steward reserves the right to publish new versions (including revisions) of this Agreement from time to time. No one other than the Agreement Steward has the right to modify this Agreement. The Eclipse Foundation is the initial Agreement Steward. The Eclipse Foundation may assign the responsibility to serve as the Agreement Steward to a suitable separate entity. Each new version of the Agreement will be given a distinguishing version number. The Program (including Contributions) may always be distributed subject to the version of the Agreement under which it was received. In addition, after a new version of the Agreement is published, Contributor may elect to distribute the Program (including its Contributions) under the new version. Except as expressly stated in Sections 2(a) and 2(b) 
 above, Recipient receives no rights or licenses to the intellectual property of any Contributor under this Agreement, whether expressly, by implication, estoppel or otherwise. All rights in the Program not expressly granted under this Agreement are reserved.
+
+This Agreement is governed by the laws of the State of New York and the intellectual property laws of the United States of America. No party to this Agreement will bring a legal action under this Agreement more than one year after the cause of action arose. Each party waives its rights to a jury trial in any resulting litigation.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml
index 4aa0dae..9079c66 100644
--- a/modules/mesos/pom.xml
+++ b/modules/mesos/pom.xml
@@ -28,38 +28,41 @@
     <version>1.1.0-SNAPSHOT</version>
 
     <properties>
-        <version.grizzly>2.16</version.grizzly>
+        <jetty.version>9.2.10.v20150310</jetty.version>
+        <mesos.version>0.22.0</mesos.version>
+        <slf4j.version>1.7.12</slf4j.version>
+        <log4j.version>2.0.2</log4j.version>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.mesos</groupId>
             <artifactId>mesos</artifactId>
-            <version>0.22.0</version>
+            <version>${mesos.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <version>1.7.12</version>
+            <version>${slf4j.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
-            <version>2.0.2</version>
+            <version>${log4j.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-slf4j-impl</artifactId>
-            <version>2.0.2</version>
+            <version>${log4j.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId>
-            <artifactId>jersey-container-grizzly2-http</artifactId>
-            <version>${version.grizzly}</version>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${jetty.version}</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
index 63ef27c..de0afcf 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
@@ -98,7 +98,7 @@ public class ClusterProperties {
     public static final String IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE = "IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE";
 
     /** */
-    public static final double DEFAULT_RESOURCE_MIN_CPU = 2;
+    public static final double DEFAULT_RESOURCE_MIN_CPU = 1;
 
     /** Min memory per node. */
     private double minCpu = DEFAULT_RESOURCE_MIN_CPU;
@@ -155,6 +155,13 @@ public class ClusterProperties {
     }
 
     /**
+     * Set CPU count limit.
+     */
+    public void cpus(double cpu){
+        this.cpu = cpu;
+    }
+
+    /**
      * @return CPU count limit.
      */
     public double cpusPerNode(){
@@ -169,6 +176,14 @@ public class ClusterProperties {
     }
 
     /**
+     * Set mem limit.
+     */
+    public void memory(double mem) {
+        this.mem = mem;
+    }
+
+
+    /**
      * @return mem limit.
      */
     public double memoryPerNode() {
@@ -204,6 +219,15 @@ public class ClusterProperties {
     }
 
     /**
+     * Sets min memory.
+     *
+     * @param minMemory Min memory.
+     */
+    public void minMemoryPerNode(double minMemory) {
+        this.minMemory = minMemory;
+    }
+
+    /**
      * @return min cpu count per node.
      */
     public double minCpuPerNode() {
@@ -211,6 +235,15 @@ public class ClusterProperties {
     }
 
     /**
+     * Sets min cpu count per node.
+     *
+     * @param minCpu min cpu count per node.
+     */
+    public void minCpuPerNode(double minCpu) {
+        this.minCpu = minCpu;
+    }
+
+    /**
      * @return Ignite version.
      */
     public String igniteVer() {
@@ -286,7 +319,7 @@ public class ClusterProperties {
             prop.mem = getDoubleProperty(IGNITE_RESOURCE_MEM_MB, props, UNLIMITED);
             prop.memPerNode = getDoubleProperty(IGNITE_RESOURCE_MEM_MB_PER_NODE, props, UNLIMITED);
             prop.disk = getDoubleProperty(IGNITE_RESOURCE_DISK_MB, props, UNLIMITED);
-            prop.diskPerNode = getDoubleProperty(IGNITE_RESOURCE_DISK_MB_PER_NODE, props, UNLIMITED);
+            prop.diskPerNode = getDoubleProperty(IGNITE_RESOURCE_DISK_MB_PER_NODE, props, 1024.0);
             prop.nodeCnt = getDoubleProperty(IGNITE_RESOURCE_NODE_CNT, props, UNLIMITED);
             prop.minCpu = getDoubleProperty(IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
             prop.minMemory = getDoubleProperty(IGNITE_RESOURCE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
index b385bc9..154385b 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -20,9 +20,6 @@ package org.apache.ignite.mesos;
 import com.google.protobuf.*;
 import org.apache.ignite.mesos.resource.*;
 import org.apache.mesos.*;
-import org.glassfish.grizzly.http.server.*;
-import org.glassfish.jersey.grizzly2.httpserver.*;
-import org.glassfish.jersey.server.*;
 import org.slf4j.*;
 
 import java.net.*;
@@ -61,13 +58,12 @@ public class IgniteFramework {
 
         String baseUrl = String.format("http://%s:%d", clusterProps.httpServerHost(), clusterProps.httpServerPort());
 
-        URI httpServerBaseUri = URI.create(baseUrl);
+        JettyServer httpServer = new JettyServer();
 
-        ResourceConfig rc = new ResourceConfig()
-            .registerInstances(new ResourceController(clusterProps.userLibs(), clusterProps.igniteCfg(),
-                clusterProps.igniteWorkDir()));
-
-        HttpServer httpServer = GrizzlyHttpServerFactory.createHttpServer(httpServerBaseUri, rc);
+        httpServer.start(
+            new InetSocketAddress(clusterProps.httpServerHost(), clusterProps.httpServerPort()),
+            new ResourceHandler(clusterProps.userLibs(), clusterProps.igniteCfg(), clusterProps.igniteWorkDir())
+        );
 
         ResourceProvider provider = new ResourceProvider();
 
@@ -113,7 +109,7 @@ public class IgniteFramework {
 
         int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;
 
-        httpServer.shutdown();
+        httpServer.stop();
 
         // Ensure that the driver process terminates.
         driver.stop();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index 7d713cd..6b165e4 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -56,24 +56,24 @@ public class IgniteScheduler implements Scheduler {
     private Map<String, IgniteTask> tasks = new HashMap<>();
 
     /** Cluster resources. */
-    private ClusterProperties clusterLimit;
+    private ClusterProperties clusterProps;
 
     /** Resource provider. */
     private ResourceProvider resourceProvider;
 
     /**
-     * @param clusterLimit Cluster limit.
+     * @param clusterProps Cluster limit.
      * @param resourceProvider Resource provider.
      */
-    public IgniteScheduler(ClusterProperties clusterLimit, ResourceProvider resourceProvider) {
-        this.clusterLimit = clusterLimit;
+    public IgniteScheduler(ClusterProperties clusterProps, ResourceProvider resourceProvider) {
+        this.clusterProps = clusterProps;
         this.resourceProvider = resourceProvider;
     }
 
     /** {@inheritDoc} */
     @Override public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) {
         synchronized (mux) {
-            log.info("resourceOffers() with {} offers", offers.size());
+            log.debug("Offers resources: {} ", offers.size());
 
             for (Protos.Offer offer : offers) {
                 IgniteTask igniteTask = checkOffer(offer);
@@ -89,7 +89,7 @@ public class IgniteScheduler implements Scheduler {
                 Protos.TaskID taskId = Protos.TaskID.newBuilder()
                     .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build();
 
-                log.info("Launching task {}", taskId.getValue());
+                log.info("Launching task: [{}]", igniteTask);
 
                 // Create task to run.
                 Protos.TaskInfo task = createTask(offer, igniteTask, taskId);
@@ -103,7 +103,6 @@ public class IgniteScheduler implements Scheduler {
         }
     }
 
-
     /**
      * Create Task.
      *
@@ -116,7 +115,7 @@ public class IgniteScheduler implements Scheduler {
         Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder()
             .setEnvironment(Protos.Environment.newBuilder().addVariables(Protos.Environment.Variable.newBuilder()
                 .setName("IGNITE_TCP_DISCOVERY_ADDRESSES")
-                .setValue(getAddress())))
+                .setValue(getAddress(offer.getHostname()))))
             .addUris(Protos.CommandInfo.URI.newBuilder()
                 .setValue(resourceProvider.igniteUrl())
                 .setExtract(true))
@@ -129,14 +128,14 @@ public class IgniteScheduler implements Scheduler {
             builder.setValue("cp *.jar ./gridgain-community-*/libs/ "
                 + "&& ./gridgain-community-*/bin/ignite.sh "
                 + resourceProvider.configName()
-                + " -J-Xmx" + String.valueOf((int) igniteTask.mem() + "m")
-                + " -J-Xms" + String.valueOf((int) igniteTask.mem()) + "m");
+                + " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m")
+                + " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m");
         }
         else
             builder.setValue("./gridgain-community-*/bin/ignite.sh "
                 + resourceProvider.configName()
-                + " -J-Xmx" + String.valueOf((int) igniteTask.mem() + "m")
-                + " -J-Xms" + String.valueOf((int) igniteTask.mem()) + "m");
+                + " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m")
+                + " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m");
 
         return Protos.TaskInfo.newBuilder()
             .setName("Ignite node " + taskId.getValue())
@@ -151,15 +150,23 @@ public class IgniteScheduler implements Scheduler {
                 .setName(MEM)
                 .setType(Protos.Value.Type.SCALAR)
                 .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem())))
+            .addResources(Protos.Resource.newBuilder()
+                .setName(DISK)
+                .setType(Protos.Value.Type.SCALAR)
+                .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.disk())))
                 .build();
     }
 
     /**
      * @return Address running nodes.
      */
-    protected String getAddress() {
-        if (tasks.isEmpty())
+    protected String getAddress(String address) {
+        if (tasks.isEmpty()) {
+            if (address != null && !address.isEmpty())
+                return address + DEFAULT_PORT;
+
             return "";
+        }
 
         StringBuilder sb = new StringBuilder();
 
@@ -177,7 +184,7 @@ public class IgniteScheduler implements Scheduler {
      */
     private IgniteTask checkOffer(Protos.Offer offer) {
         // Check limit on running nodes.
-        if (clusterLimit.instances() <= tasks.size())
+        if (clusterProps.instances() <= tasks.size())
             return null;
 
         double cpus = -1;
@@ -206,7 +213,7 @@ public class IgniteScheduler implements Scheduler {
         }
 
         // Check that slave satisfies min requirements.
-        if (cpus < clusterLimit.minCpuPerNode()  && mem < clusterLimit.minMemoryPerNode() ) {
+        if (cpus < clusterProps.minCpuPerNode() || mem < clusterProps.minMemoryPerNode() ) {
             log.debug("Offer not sufficient for slave request: {}", offer.getResourcesList());
 
             return null;
@@ -223,9 +230,9 @@ public class IgniteScheduler implements Scheduler {
             totalDisk += task.disk();
         }
 
-        cpus = Math.min(clusterLimit.cpus() - totalCpus, Math.min(cpus, clusterLimit.cpusPerNode()));
-        mem = Math.min(clusterLimit.memory() - totalMem, Math.min(mem, clusterLimit.memoryPerNode()));
-        disk = Math.min(clusterLimit.disk() - totalDisk, Math.min(disk, clusterLimit.diskPerNode()));
+        cpus = Math.min(clusterProps.cpus() - totalCpus, Math.min(cpus, clusterProps.cpusPerNode()));
+        mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode()));
+        disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode()));
 
         if (cpus > 0 && mem > 0)
             return new IgniteTask(offer.getHostname(), cpus, mem, disk);
@@ -240,37 +247,45 @@ public class IgniteScheduler implements Scheduler {
     @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
         final String taskId = taskStatus.getTaskId().getValue();
 
-        log.info("statusUpdate() task {} is in state {}", taskId, taskStatus.getState());
-
-        switch (taskStatus.getState()) {
-            case TASK_FAILED:
-            case TASK_FINISHED:
-                synchronized (mux) {
-                    IgniteTask failedTask = tasks.remove(taskId);
-
-                    if (failedTask != null) {
-                        List<Protos.Request> requests = new ArrayList<>();
-
-                        Protos.Request request = Protos.Request.newBuilder()
-                            .addResources(Protos.Resource.newBuilder()
-                                .setType(Protos.Value.Type.SCALAR)
-                                .setName(MEM)
-                                .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
-                            .addResources(Protos.Resource.newBuilder()
-                                .setType(Protos.Value.Type.SCALAR)
-                                .setName(CPUS)
-                                .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
-                            .build();
-
-                        requests.add(request);
-
-                        schedulerDriver.requestResources(requests);
-                    }
+        log.info("Received update event task: [{}] is in state: [{}]", taskId, taskStatus.getState());
+
+        if (taskStatus.getState().equals(Protos.TaskState.TASK_FAILED)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_ERROR)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_FINISHED)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_KILLED)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_LOST)) {
+            synchronized (mux) {
+                IgniteTask failedTask = tasks.remove(taskId);
+
+                if (failedTask != null) {
+                    List<Protos.Request> requests = new ArrayList<>();
+
+                    Protos.Request request = Protos.Request.newBuilder()
+                        .addResources(Protos.Resource.newBuilder()
+                            .setType(Protos.Value.Type.SCALAR)
+                            .setName(MEM)
+                            .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
+                        .addResources(Protos.Resource.newBuilder()
+                            .setType(Protos.Value.Type.SCALAR)
+                            .setName(CPUS)
+                            .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
+                        .build();
+
+                    requests.add(request);
+
+                    schedulerDriver.requestResources(requests);
                 }
-                break;
+            }
         }
     }
 
+    /**
+     * @param clusterProps Cluster properties.
+     */
+    public void setClusterProps(ClusterProperties clusterProps) {
+        this.clusterProps = clusterProps;
+    }
+
     /** {@inheritDoc} */
     @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
         Protos.MasterInfo masterInfo) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
index c41ff49..ecd2272 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
@@ -75,4 +75,12 @@ public class IgniteTask {
     public double disk() {
         return disk;
     }
+
+    @Override
+    public String toString() {
+        return "IgniteTask " +
+            "host: [" + host + ']' +
+            ", cpuCores: [" + cpuCores + "]" +
+            ", mem: [" + mem + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
index 9a27539..2887112 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
@@ -66,9 +66,9 @@ public class IgniteProvider {
                             String[] ver1 = parseVersion(f1).split("\\.");
                             String[] ver2 = parseVersion(f2).split("\\.");
 
-                            if (Integer.valueOf(ver1[0]) > Integer.valueOf(ver2[0])
-                                && Integer.valueOf(ver1[1]) > Integer.valueOf(ver2[1])
-                                && Integer.valueOf(ver1[2]) > Integer.valueOf(ver2[2]))
+                            if (Integer.valueOf(ver1[0]) >= Integer.valueOf(ver2[0])
+                                && Integer.valueOf(ver1[1]) >= Integer.valueOf(ver2[1])
+                                && Integer.valueOf(ver1[2]) >= Integer.valueOf(ver2[2]))
 
                                 return 1;
                             else

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
new file mode 100644
index 0000000..fb27963
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import org.eclipse.jetty.server.*;
+
+import java.net.*;
+
+/**
+ * Embedded jetty server.
+ */
+public class JettyServer {
+    /** */
+    private Server server;
+
+    /**
+     * Starts jetty server.
+     *
+     * @param address Inter socket address.
+     * @param handler Handler.
+     * @throws Exception If failed.
+     */
+    public void start(InetSocketAddress address, Handler handler) throws Exception {
+        if (server == null) {
+            server = new Server(address);
+
+            server.setHandler(handler);
+
+            server.start();
+        }
+        else
+            throw new IllegalStateException("Failed. Jetty server has been started already.");
+    }
+
+    /**
+     * Stops server.
+     *
+     * @throws Exception If failed.
+     */
+    public void stop() throws Exception {
+        if (server != null)
+            server.stop();
+        else
+            throw new IllegalStateException("Failed. Jetty server has not been started yet.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
deleted file mode 100644
index 8f0a2af..0000000
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.mesos.resource;
-
-import javax.ws.rs.*;
-import javax.ws.rs.core.*;
-import java.io.*;
-
-/**
- * HTTP controller which provides on slave resources.
- */
-@Path("/")
-public class ResourceController {
-    /** */
-    public static final String IGNITE_PREFIX = "/ignite/";
-
-    /** */
-    public static final String LIBS_PREFIX = "/libs/";
-
-    /** */
-    public static final String CONFIG_PREFIX = "/config/";
-
-    /** */
-    public static final String DEFAULT_CONFIG = CONFIG_PREFIX + "default/";
-
-    /** */
-    private String libsDir;
-
-    /** */
-    private String cfgPath;
-
-    /** */
-    private String igniteDir;
-
-    /**
-     * @param libsDir Path to directory with user libs.
-     * @param cfgPath Path to config file.
-     */
-    public ResourceController(String libsDir, String cfgPath, String igniteDir) {
-        this.libsDir = libsDir;
-        this.cfgPath = cfgPath;
-        this.igniteDir = igniteDir;
-    }
-
-    /**
-     * @param ignite Ignite jar name.
-     * @return Http response.
-     */
-    @GET
-    @Path(IGNITE_PREFIX + "{ignite-dist}")
-    public Response ignite(@PathParam("ignite-dist") String ignite) {
-        return handleRequest(new File(igniteDir + "/" + ignite), "application/zip-archive", ignite);
-    }
-
-    /**
-     * @param lib user's jar.
-     * @return Http response.
-     */
-    @GET
-    @Path(LIBS_PREFIX + "{lib}")
-    public Response lib(@PathParam("lib") String lib) {
-        return handleRequest(new File(libsDir + "/" + lib), "application/java-archive", lib);
-    }
-
-    /**
-     *
-     * @param cfg Config file.
-     * @return Http response.
-     */
-    @GET
-    @Path(CONFIG_PREFIX + "{cfg}")
-    public Response config(@PathParam("cfg") String cfg) {
-        return handleRequest(new File(cfgPath), "application/xml", cfg);
-    }
-
-    /**
-     * @param cfg Config file.
-     * @return Http response.
-     */
-    @GET
-    @Path(DEFAULT_CONFIG + "{cfg}")
-    public Response defaultConfig(@PathParam("cfg") String cfg) {
-        return handleRequest(Thread.currentThread().getContextClassLoader().getResourceAsStream(cfg),
-            "application/xml", cfg);
-    }
-
-    /**
-     *
-     * @param resource File resource.
-     * @param type Type.
-     * @param attachmentName Attachment name.
-     * @return Http response.
-     */
-    private static Response handleRequest(File resource, String type, String attachmentName) {
-        final Response.ResponseBuilder builder = Response.ok(resource, type);
-        builder.header("Content-Disposition", "attachment; filename=\"" + attachmentName + "\"");
-        return builder.build();
-    }
-
-    /**
-     *
-     * @param resource File resource.
-     * @param type Type.
-     * @param attachmentName Attachment name.
-     * @return Http response.
-     */
-    private static Response handleRequest(InputStream resource, String type, String attachmentName) {
-        final Response.ResponseBuilder builder = Response.ok(resource, type);
-        builder.header("Content-Disposition", "attachment; filename=\"" + attachmentName + "\"");
-        return builder.build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
new file mode 100644
index 0000000..ea883e3
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import org.eclipse.jetty.server.*;
+import org.eclipse.jetty.server.handler.*;
+
+import javax.servlet.*;
+import javax.servlet.http.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.nio.file.*;
+
+/**
+ * HTTP controller which provides on slave resources.
+ */
+public class ResourceHandler extends AbstractHandler {
+    /** */
+    public static final String IGNITE_PREFIX = "/ignite/";
+
+    /** */
+    public static final String LIBS_PREFIX = "/libs/";
+
+    /** */
+    public static final String CONFIG_PREFIX = "/config/";
+
+    /** */
+    public static final String DEFAULT_CONFIG = CONFIG_PREFIX + "default/";
+
+    /** */
+    private String libsDir;
+
+    /** */
+    private String cfgPath;
+
+    /** */
+    private String igniteDir;
+
+    /**
+     * @param libsDir Directory with user's libs.
+     * @param cfgPath Path to config file.
+     * @param igniteDir Directory with ignites.
+     */
+    public ResourceHandler(String libsDir, String cfgPath, String igniteDir) {
+        this.libsDir = libsDir;
+        this.cfgPath = cfgPath;
+        this.igniteDir = igniteDir;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void handle(
+        String url,
+        Request request,
+        HttpServletRequest httpServletRequest,
+        HttpServletResponse response) throws IOException, ServletException {
+
+        String[] path = url.split("/");
+
+        String fileName = path[path.length -1];
+
+        String servicePath = url.substring(0, url.length() - fileName.length());
+
+        switch (servicePath) {
+            case IGNITE_PREFIX:
+                handleRequest(response, "application/zip-archive", igniteDir + "/" + fileName);
+
+                request.setHandled(true);
+                break;
+
+            case LIBS_PREFIX:
+                handleRequest(response, "application/java-archive", libsDir + "/" + fileName);
+
+                request.setHandled(true);
+                break;
+
+            case CONFIG_PREFIX:
+                handleRequest(response, "application/xml", cfgPath);
+
+                request.setHandled(true);
+                break;
+
+            case DEFAULT_CONFIG:
+                handleRequest(response, "application/xml",
+                    Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName),
+                    fileName);
+
+                request.setHandled(true);
+                break;
+        }
+    }
+
+    /**
+     * @param response Http response.
+     * @param type Type.
+     * @param path Path to file.
+     * @throws IOException If failed.
+     */
+    private static void handleRequest(HttpServletResponse response, String type, String path) throws IOException {
+        Path path0 = Paths.get(path);
+
+        response.setContentType(type);
+        response.setHeader("Content-Disposition", "attachment; filename=\"" + path0.getFileName() + "\"");
+
+        try (HttpOutput out = (HttpOutput)response.getOutputStream()) {
+            out.sendContent(FileChannel.open(path0, StandardOpenOption.READ));
+        }
+    }
+
+    /**
+     * @param response Http response.
+     * @param type Type.
+     * @param stream Stream.
+     * @param attachmentName Attachment name.
+     * @throws IOException If failed.
+     */
+    private static void handleRequest(HttpServletResponse response, String type, InputStream stream,
+                                      String attachmentName) throws IOException {
+        response.setContentType(type);
+        response.setHeader("Content-Disposition", "attachment; filename=\"" + attachmentName + "\"");
+
+        try (HttpOutput out = (HttpOutput)response.getOutputStream()) {
+            out.sendContent(stream);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
index 1b1f615..f02d1bf 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
@@ -22,7 +22,7 @@ import org.apache.ignite.mesos.*;
 import java.io.*;
 import java.util.*;
 
-import static org.apache.ignite.mesos.resource.ResourceController.*;
+import static org.apache.ignite.mesos.resource.ResourceHandler.*;
 
 /**
  * Provides path to user's libs and config file.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/resources/ignite-default-config.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/resources/ignite-default-config.xml b/modules/mesos/src/main/resources/ignite-default-config.xml
index 9fcce97..2f26398 100644
--- a/modules/mesos/src/main/resources/ignite-default-config.xml
+++ b/modules/mesos/src/main/resources/ignite-default-config.xml
@@ -25,10 +25,10 @@
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                 <property name="ipFinder">
-                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
-                        <property name="shared" value="true"/>
-                    </bean>
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"/>
                 </property>
+
+                <property name="joinTimeout" value="60000"/>
             </bean>
         </property>
     </bean>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 4124331..277e0db 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.mesos;
 
 import junit.framework.*;
+import org.apache.ignite.mesos.resource.*;
 import org.apache.mesos.*;
 
 import java.util.*;
@@ -33,27 +34,230 @@ public class IgniteSchedulerSelfTest extends TestCase {
     @Override public void setUp() throws Exception {
         super.setUp();
 
-        //scheduler = new IgniteScheduler();
+        ClusterProperties clustProp = new ClusterProperties();
+
+        scheduler = new IgniteScheduler(clustProp, new ResourceProvider() {
+            @Override public String configName() {
+                return "config.xml";
+            }
+
+            @Override public String igniteUrl() {
+                return "ignite.jar";
+            }
+
+            @Override public String igniteConfigUrl() {
+                return "config.xml";
+            }
+
+            @Override public Collection<String> resourceUrl() {
+                return null;
+            }
+        });
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testHostRegister() throws Exception {
-        //Protos.Offer offer = createOffer("hostname", 4, 1024);
+        Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.launchedTask);
+        assertEquals(1, mock.launchedTask.size());
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+        assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByCpu() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.cpus(2);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.launchedTask);
+        assertEquals(1, mock.launchedTask.size());
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+        assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
+
+        mock.clear();
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNull(mock.launchedTask);
+
+        Protos.OfferID declinedOffer = mock.declinedOffer;
+
+        assertEquals(offer.getId(), declinedOffer);
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByMem() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.memory(512);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.launchedTask);
+        assertEquals(1, mock.launchedTask.size());
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+        assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
 
-        //scheduler.resourceOffers(DriverStub.INSTANCE, Lists.);
+        mock.clear();
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNull(mock.launchedTask);
+
+        Protos.OfferID declinedOffer = mock.declinedOffer;
+
+        assertEquals(offer.getId(), declinedOffer);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByMemCpu() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 1, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.cpus(4);
+        clustProp.memory(2000);
+
+        scheduler.setClusterProps(clustProp);
+
+        double totalMem = 0, totalCpu = 0;
+
+        for (int i = 0; i < 2; i++) {
+            scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+            assertNotNull(mock.launchedTask);
+            assertEquals(1, mock.launchedTask.size());
+
+            Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+            totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS);
+            totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM);
+
+            mock.clear();
+        }
+
+        assertEquals(2.0, totalCpu);
+        assertEquals(2000.0, totalMem);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNull(mock.launchedTask);
+
+        Protos.OfferID declinedOffer = mock.declinedOffer;
+
+        assertEquals(offer.getId(), declinedOffer);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByCpuMinRequirements() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.minCpuPerNode(12);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.declinedOffer);
+
+        assertEquals(offer.getId(), mock.declinedOffer);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByMemMinRequirements() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.minMemoryPerNode(15000);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.declinedOffer);
+
+        assertEquals(offer.getId(), mock.declinedOffer);
+    }
+
+
+    /**
+     * @param resourceType Resource type.
+     * @return Value.
+     */
+    private Double resources(List<Protos.Resource> resources, String resourceType) {
+        for (Protos.Resource resource : resources) {
+            if (resource.getName().equals(resourceType))
+                return resource.getScalar().getValue();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param hostname Hostname
+     * @param cpu Cpu count.
+     * @param mem Mem size.
+     * @return Offer.
+     */
     private Protos.Offer createOffer(String hostname, double cpu, double mem) {
         return Protos.Offer.newBuilder()
-            .setSlaveId(Protos.SlaveID.newBuilder().setValue("1").build())
+            .setId(Protos.OfferID.newBuilder().setValue("1"))
+            .setSlaveId(Protos.SlaveID.newBuilder().setValue("1"))
+            .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("1"))
             .setHostname(hostname)
             .addResources(Protos.Resource.newBuilder()
+                .setType(Protos.Value.Type.SCALAR)
                 .setName(IgniteScheduler.CPUS)
                 .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
                 .build())
             .addResources(Protos.Resource.newBuilder()
+                .setType(Protos.Value.Type.SCALAR)
                 .setName(IgniteScheduler.MEM)
                 .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build())
                 .build())
@@ -63,8 +267,22 @@ public class IgniteSchedulerSelfTest extends TestCase {
     /**
      * No-op implementation.
      */
-    public static class DriverStub implements SchedulerDriver {
-        private static final DriverStub INSTANCE = new DriverStub();
+    public static class DriverMock implements SchedulerDriver {
+        /**
+         *
+         */
+        Collection<Protos.TaskInfo> launchedTask;
+
+        /** */
+        Protos.OfferID declinedOffer;
+
+        /**
+         * Clear launched task.
+         */
+        public void clear() {
+            launchedTask = null;
+            declinedOffer = null;
+        }
 
         /** {@inheritDoc} */
         @Override public Protos.Status start() {
@@ -104,23 +322,31 @@ public class IgniteSchedulerSelfTest extends TestCase {
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
             Collection<Protos.TaskInfo> tasks, Protos.Filters filters) {
+            launchedTask = tasks;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
             Collection<Protos.TaskInfo> tasks) {
+            launchedTask = tasks;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks,
             Protos.Filters filters) {
+            launchedTask = tasks;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks) {
+            launchedTask = tasks;
+
             return null;
         }
 
@@ -131,11 +357,15 @@ public class IgniteSchedulerSelfTest extends TestCase {
 
         /** {@inheritDoc} */
         @Override public Protos.Status declineOffer(Protos.OfferID offerId, Protos.Filters filters) {
+            declinedOffer = offerId;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status declineOffer(Protos.OfferID offerId) {
+            declinedOffer = offerId;
+
             return null;
         }
 


Mime
View raw message