Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 79B0C1878A for ; Thu, 28 May 2015 16:45:12 +0000 (UTC) Received: (qmail 51538 invoked by uid 500); 28 May 2015 16:45:12 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 51504 invoked by uid 500); 28 May 2015 16:45:12 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 51495 invoked by uid 99); 28 May 2015 16:45:12 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 May 2015 16:45:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8853A182202 for ; Thu, 28 May 2015 16:45:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id WKXZ78eJN71S for ; Thu, 28 May 2015 16:45:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id E9B6225FA7 for ; Thu, 28 May 2015 16:44:59 +0000 (UTC) Received: (qmail 50659 invoked by uid 99); 28 May 2015 16:44:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 May 2015 16:44:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AD10EE10F7; Thu, 28 May 2015 16:44:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 28 May 2015 16:45:05 -0000 Message-Id: In-Reply-To: <8f06749aba50497bacb1fd8a9da63f26@git.apache.org> References: <8f06749aba50497bacb1fd8a9da63f26@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/35] incubator-ignite git commit: #IGNITE-857 Added resource limit. #IGNITE-857 Added resource limit. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3208738 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3208738 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3208738 Branch: refs/heads/ignite-218-hdfs-only Commit: e320873828284fb86fb4d6e52cee98a6bb87b4af Parents: 55c166a Author: nikolay tikhonov Authored: Tue May 19 19:42:22 2015 +0300 Committer: nikolay tikhonov Committed: Tue May 19 19:42:22 2015 +0300 ---------------------------------------------------------------------- modules/mesos/pom.xml | 6 +- .../apache/ignite/mesos/ClusterResources.java | 130 +++++++++++++++++++ .../apache/ignite/mesos/IgniteFramework.java | 4 +- .../apache/ignite/mesos/IgniteScheduler.java | 120 ++++++++--------- .../org/apache/ignite/mesos/IgniteTask.java | 78 +++++++++++ .../ignite/mesos/IgniteSchedulerSelfTest.java | 1 - 6 files changed, 272 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml index ef73c0b..5ce3e5c 100644 --- a/modules/mesos/pom.xml +++ b/modules/mesos/pom.xml @@ -68,6 +68,11 @@ jar-with-dependencies + + + org.apache.ignite.mesos.IgniteFramework + + @@ -79,7 +84,6 @@ - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java new file mode 100644 index 0000000..0a2193f --- /dev/null +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import java.io.*; +import java.util.*; + +/** + * Cluster settings. + */ +public class ClusterResources { + /** Unlimited. */ + public static final int DEFAULT_VALUE = -1; + + /** */ + public static final String IGNITE_RESOURCE_CPU_CORES = "IGNITE_RESOURCE_CPU_CORES"; + + /** CPU limit. */ + private double cpu = DEFAULT_VALUE; + + /** */ + public static final String IGNITE_RESOURCE_MEM_MB = "IGNITE_RESOURCE_MEM_MB"; + + /** Memory limit. */ + private double mem = DEFAULT_VALUE; + + /** */ + public static final String IGNITE_RESOURCE_DISK_MB = "IGNITE_RESOURCE_DISK_MB"; + + /** Disk space limit. */ + private double disk = DEFAULT_VALUE; + + /** */ + public static final String IGNITE_RESOURCE_NODE_CNT = "IGNITE_RESOURCE_NODE_CNT"; + + /** Node count limit. */ + private double nodeCnt = DEFAULT_VALUE; + + /** */ + public ClusterResources() { + // No-op. + } + + /** + * @return CPU count limit. + */ + public double cpus(){ + return cpu; + } + + /** + * @return mem limit. + */ + public double memory() { + return mem; + } + + /** + * @return disk limit. + */ + public double disk() { + return disk; + } + + /** + * @return instance count limit. + */ + public double instances() { + return nodeCnt; + } + + /** + * @param config path to config file. + * @return Cluster configuration. + */ + public static ClusterResources from(String config) { + try { + Properties props = new Properties(); + + props.load(new FileInputStream(config)); + + ClusterResources resources = new ClusterResources(); + + resources.cpu = getProperty(IGNITE_RESOURCE_CPU_CORES, props); + resources.mem = getProperty(IGNITE_RESOURCE_MEM_MB, props); + resources.disk = getProperty(IGNITE_RESOURCE_DISK_MB, props); + resources.nodeCnt = getProperty(IGNITE_RESOURCE_NODE_CNT, props); + + return resources; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * @param name Property name. + * @param fileProps Property file. + * @return Property value. + */ + private static double getProperty(String name, Properties fileProps) { + if (fileProps.containsKey(name)) + return Double.valueOf(fileProps.getProperty(name)); + + String property = System.getProperty(name); + + if (property == null) + System.getenv(name); + + if (property == null) + return DEFAULT_VALUE; + + return Double.valueOf(property); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java index 5c556a1..3d309f3 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java @@ -25,7 +25,7 @@ import org.apache.mesos.*; */ public class IgniteFramework { /** - * @param args Args + * @param args Args [host:port] [resource limit] */ public static void main(String[] args) { checkArgs(args); @@ -43,7 +43,7 @@ public class IgniteFramework { } // create the scheduler - final Scheduler scheduler = new IgniteScheduler(); + final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(args[1])); // create the driver MesosSchedulerDriver driver; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java index 7b5623b..fcbab87 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java @@ -40,26 +40,39 @@ public class IgniteScheduler implements Scheduler { /** Mem. */ public static final String MEM = "mem"; + /** Disk. */ + public static final String DISK = "disk"; + /** Default port range. */ public static final String DEFAULT_PORT = ":47500..47510"; + /** Min of memory required. */ + public static final int MIN_MEMORY = 256; + /** Delimiter to use in IP names. */ public static final String DELIM = ","; - /** ID generator. */ - private AtomicInteger taskIdGenerator = new AtomicInteger(); - /** Logger. */ private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class); - /** Min of memory required. */ - public static final int MIN_MEMORY = 256; - /** Mutex. */ private static final Object mux = new Object(); + /** ID generator. */ + private AtomicInteger taskIdGenerator = new AtomicInteger(); + /** Task on host. */ - private ConcurrentMap tasks = new ConcurrentHashMap<>(); + private ConcurrentMap tasks = new ConcurrentHashMap<>(); + + /** Cluster resources. */ + private ClusterResources clusterLimit; + + /** + * @param clusterLimit Resources limit. + */ + public IgniteScheduler(ClusterResources clusterLimit) { + this.clusterLimit = clusterLimit; + } /** {@inheritDoc} */ @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, @@ -78,10 +91,10 @@ public class IgniteScheduler implements Scheduler { log.info("resourceOffers() with {} offers", offers.size()); for (Protos.Offer offer : offers) { - Tuple cpuMem = checkOffer(offer); + IgniteTask igniteTask = checkOffer(offer); // Decline offer which doesn't match by mem or cpu. - if (cpuMem == null) { + if (igniteTask == null) { schedulerDriver.declineOffer(offer.getId()); continue; @@ -94,13 +107,13 @@ public class IgniteScheduler implements Scheduler { log.info("Launching task {}", taskId.getValue()); // Create task to run. - Protos.TaskInfo task = createTask(offer, cpuMem, taskId); + Protos.TaskInfo task = createTask(offer, igniteTask, taskId); schedulerDriver.launchTasks(Collections.singletonList(offer.getId()), Collections.singletonList(task), Protos.Filters.newBuilder().setRefuseSeconds(1).build()); - tasks.put(taskId.getValue(), offer.getHostname()); + tasks.put(taskId.getValue(), igniteTask); } } } @@ -109,11 +122,11 @@ public class IgniteScheduler implements Scheduler { * Create Task. * * @param offer Offer. - * @param cpuMem Cpu and mem on slave. + * @param igniteTask Task description. * @param taskId Task id. * @return Task. */ - protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple cpuMem, Protos.TaskID taskId) { + protected Protos.TaskInfo createTask(Protos.Offer offer, IgniteTask igniteTask, Protos.TaskID taskId) { // Docker image info. Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder() .setImage(IMAGE) @@ -131,16 +144,16 @@ public class IgniteScheduler implements Scheduler { .addResources(Protos.Resource.newBuilder() .setName(CPUS) .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1()))) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores()))) .addResources(Protos.Resource.newBuilder() .setName(MEM) .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2()))) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem()))) .setContainer(cont) .setCommand(Protos.CommandInfo.newBuilder() .setShell(false) .addArguments(STARTUP_SCRIPT) - .addArguments(String.valueOf(cpuMem.get2().intValue())) + .addArguments(String.valueOf(igniteTask.mem())) .addArguments(getAddress())) .build(); } @@ -154,8 +167,8 @@ public class IgniteScheduler implements Scheduler { StringBuilder sb = new StringBuilder(); - for (String host : tasks.values()) - sb.append(host).append(DEFAULT_PORT).append(DELIM); + for (IgniteTask task : tasks.values()) + sb.append(task.host()).append(DEFAULT_PORT).append(DELIM); return sb.substring(0, sb.length() - 1); } @@ -164,11 +177,15 @@ public class IgniteScheduler implements Scheduler { * Check slave resources and return resources infos. * * @param offer Offer request. - * @return Pair where first is cpus, second is memory. + * @return Ignite task description. */ - private Tuple checkOffer(Protos.Offer offer) { - double cpus = -1; - double mem = -1; + private IgniteTask checkOffer(Protos.Offer offer) { + if (checkLimit(clusterLimit.instances(), tasks.size())) + return null; + + double cpus = -2; + double mem = -2; + double disk = -2; for (Protos.Resource resource : offer.getResourcesList()) { if (resource.getName().equals(CPUS)) { @@ -183,17 +200,18 @@ public class IgniteScheduler implements Scheduler { else log.debug("Mem resource was not a scalar: " + resource.getType().toString()); } - else if (resource.getName().equals("disk")) - log.debug("Ignoring disk resources from offer"); + else if (resource.getType().equals(Protos.Value.Type.SCALAR)) + disk = resource.getScalar().getValue(); + else + log.debug("Disk resource was not a scalar: " + resource.getType().toString()); } - if (cpus < 0) - log.debug("No cpus resource present"); - if (mem < 0) - log.debug("No mem resource present"); + if (checkLimit(clusterLimit.memory(), mem) && + checkLimit(clusterLimit.cpus(), cpus) && + checkLimit(clusterLimit.disk(), disk) && + MIN_MEMORY <= mem) - if (cpus >= 1 && MIN_MEMORY <= mem) - return new Tuple<>(cpus, mem); + return new IgniteTask(offer.getHostname(), cpus, mem, disk); else { log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() + "\n" + offer.getAttributesList().toString() + @@ -205,6 +223,15 @@ public class IgniteScheduler implements Scheduler { } } + /** + * @param limit Limit. + * @param value Value. + * @return {@code True} if limit isn't violated else {@code false}. + */ + private boolean checkLimit(double limit, double value) { + return limit == ClusterResources.DEFAULT_VALUE || limit <= value; + } + /** {@inheritDoc} */ @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) { log.info("offerRescinded()"); @@ -250,37 +277,4 @@ public class IgniteScheduler implements Scheduler { @Override public void error(SchedulerDriver schedulerDriver, String s) { log.error("error() {}", s); } - - /** - * Tuple. - */ - public static class Tuple { - /** */ - private final A val1; - - /** */ - private final B val2; - - /** - * - */ - public Tuple(A val1, B val2) { - this.val1 = val1; - this.val2 = val2; - } - - /** - * @return val1 - */ - public A get1() { - return val1; - } - - /** - * @return val2 - */ - public B get2() { - return val2; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java new file mode 100644 index 0000000..bad9996 --- /dev/null +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +/** + * TODO + */ +public class IgniteTask { + /** */ + public final String host; + + /** */ + public final double cpuCores; + + /** */ + public final double mem; + + /** */ + public final double disk; + + /** + * Ignite launched task. + * + * @param host Host. + * @param cpuCores Cpu cores count. + * @param mem Memory. + * @param disk Disk. + */ + public IgniteTask(String host, double cpuCores, double mem, double disk) { + this.host = host; + this.cpuCores = cpuCores; + this.mem = mem; + this.disk = disk; + } + + /** + * @return Host. + */ + public String host() { + return host; + } + + /** + * @return Cores count. + */ + public double cpuCores() { + return cpuCores; + } + + /** + * @return Memory. + */ + public double mem() { + return mem; + } + + /** + * @return Disk. + */ + public double disk() { + return disk; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java index 5534b2c..2c4b6ee 100644 --- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java +++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java @@ -160,6 +160,5 @@ public class IgniteSchedulerSelfTest extends TestCase { return null; } - } } \ No newline at end of file