Return-Path: X-Original-To: apmail-myriad-commits-archive@minotaur.apache.org Delivered-To: apmail-myriad-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 138DF182BC for ; Wed, 28 Oct 2015 18:20:29 +0000 (UTC) Received: (qmail 3967 invoked by uid 500); 28 Oct 2015 18:20:28 -0000 Delivered-To: apmail-myriad-commits-archive@myriad.apache.org Received: (qmail 3804 invoked by uid 500); 28 Oct 2015 18:20:28 -0000 Mailing-List: contact commits-help@myriad.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@myriad.incubator.apache.org Delivered-To: mailing list commits@myriad.incubator.apache.org Received: (qmail 3602 invoked by uid 99); 28 Oct 2015 18:20:28 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 18:20:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EC47EC0FD1 for ; Wed, 28 Oct 2015 18:20:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id xNzsSSdpzz6U for ; Wed, 28 Oct 2015 18:20:21 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id D0056429A6 for ; Wed, 28 Oct 2015 18:20:20 +0000 (UTC) Received: (qmail 2668 invoked by uid 99); 28 Oct 2015 18:20:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 18:20:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AB12DFCCE; Wed, 28 Oct 2015 18:20:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smarella@apache.org To: commits@myriad.incubator.apache.org Message-Id: <9b7f58ff7ce84642991130112e3542d0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-myriad git commit: MYRIAD-160 Randomizing Mesos ports assigned to NMs to make sure that in case of flexdown and subsequent flexup NM does not use previously used port ports clashes can still happen, but this minimizes those events. Date: Wed, 28 Oct 2015 18:20:20 +0000 (UTC) Repository: incubator-myriad Updated Branches: refs/heads/master 101bcad35 -> a3ef8c791 MYRIAD-160 Randomizing Mesos ports assigned to NMs to make sure that in case of flexdown and subsequent flexup NM does not use previously used port ports clashes can still happen, but this minimizes those events. This closes: #20 Review: #20 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/a3ef8c79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/a3ef8c79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/a3ef8c79 Branch: refs/heads/master Commit: a3ef8c791c77afad609deacda7bcade7e25fae80 Parents: 101bcad Author: Yuliya Feldman Authored: Mon Oct 26 11:22:13 2015 -0700 Committer: Santosh Marella Committed: Wed Oct 28 11:19:53 2015 -0700 ---------------------------------------------------------------------- .../apache/myriad/scheduler/TaskFactory.java | 67 +++--- .../myriad/scheduler/TestRandomPorts.java | 204 +++++++++++++++++++ 2 files changed, 248 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/a3ef8c79/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java index 33bc832..ad0ec0d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java @@ -18,14 +18,17 @@ */ package org.apache.myriad.scheduler; +import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; import java.util.Objects; +import java.util.Random; import javax.inject.Inject; import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.configuration.MyriadExecutorConfiguration; +import org.apache.myriad.state.NodeTask; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -39,6 +42,7 @@ import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.Value; +import org.apache.mesos.Protos.Value.Range; import org.apache.mesos.Protos.Value.Scalar; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +58,7 @@ public interface TaskFactory { static final String YARN_HTTP_POLICY = "yarn.http.policy"; static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY"; - TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, org.apache.myriad.state.NodeTask nodeTask); + TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask); // TODO(Santosh): This is needed because the ExecutorInfo constructed // to launch NM needs to be specified to launch placeholder tasks for @@ -72,6 +76,7 @@ public interface TaskFactory { public static final String YARN_NODEMANAGER_OPTS_KEY = "YARN_NODEMANAGER_OPTS"; private static final Logger LOGGER = LoggerFactory.getLogger(NMTaskFactoryImpl.class); + private static final Random rand = new Random(); private MyriadConfiguration cfg; private TaskUtils taskUtils; private ExecutorCommandLineGenerator clGenerator; @@ -85,32 +90,47 @@ public interface TaskFactory { this.constraints = new NMTaskConstraints(); } + @VisibleForTesting + protected static HashSet getNMPorts(Resource resource) { + HashSet ports = new HashSet<>(); + if (resource.getName().equals("ports")){ + /* + ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only. + so must loop though each range until we get all ports needed. We exit each loop as soon as all + ports are found so bounded by NMPorts.expectedNumPorts. + */ + final List ranges = resource.getRanges().getRangeList(); + final List allAvailablePorts = new ArrayList<>(); + for (Range range : ranges) { + if (range.hasBegin() && range.hasEnd()) { + for (long i = range.getBegin(); i <= range.getEnd(); i++) { + allAvailablePorts.add(i); + } + } + } + final int allAvailablePortsSize = allAvailablePorts.size(); + Preconditions.checkState(allAvailablePorts.size() >= NMPorts.expectedNumPorts(), "Not enough ports in offer"); + + while (ports.size() < NMPorts.expectedNumPorts()) { + int portIndex = rand.nextInt(allAvailablePortsSize); + ports.add(allAvailablePorts.get(portIndex)); + } + } + return ports; + } + //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer - private static NMPorts getPorts(Offer offer) { + @VisibleForTesting + protected static NMPorts getPorts(Offer offer) { HashSet ports = new HashSet<>(); for (Resource resource : offer.getResourcesList()) { if (resource.getName().equals("ports")) { - /* - ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only. - so must loop though each range until we get all ports needed. We exit each loop as soon as all - ports are found so bounded by NMPorts.expectedNumPorts. - */ - Iterator itr = resource.getRanges().getRangeList().iterator(); - while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) { - Value.Range range = itr.next(); - if (range.getBegin() <= range.getEnd()) { - long i = range.getBegin(); - while (i <= range.getEnd() && ports.size() < NMPorts.expectedNumPorts()) { - ports.add(i); - i++; - } - } - } + ports = getNMPorts(resource); + break; } } - Preconditions.checkState(ports.size() == NMPorts.expectedNumPorts(), "Not enough ports in offer"); - Long[] portArray = ports.toArray(new Long[ports.size()]); + Long [] portArray = ports.toArray(new Long [ports.size()]); return new NMPorts(portArray); } @@ -123,7 +143,8 @@ public interface TaskFactory { if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { - throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!"); + throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + + "and/or frameworkSuperUser not set!"); } String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get(); cmd = clGenerator.generateCommandLine(profile, ports); @@ -153,7 +174,7 @@ public interface TaskFactory { } @Override - public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, org.apache.myriad.state.NodeTask nodeTask) { + public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) { Objects.requireNonNull(offer, "Offer should be non-null"); Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/a3ef8c79/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java new file mode 100644 index 0000000..e8c0e58 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + + +import static org.junit.Assert.*; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Resource; +import org.apache.mesos.Protos.Value.Range; +import org.apache.mesos.Protos.Value.Ranges; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl; +import com.google.common.collect.Lists; + +/** + * Test Class to test NM ports randomization + * + */ +public class TestRandomPorts { + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void testRandomPorts() { + Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build(); + Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build(); + Range range3 = Range.newBuilder().setBegin(310).setEnd(500).build(); + Range range4 = Range.newBuilder().setBegin(520).setEnd(720).build(); + Range range5 = Range.newBuilder().setBegin(750).setEnd(1000).build(); + + Ranges ranges = Ranges.newBuilder().addRange(range1) + .addRange(range2) + .addRange(range3) + .addRange(range4) + .addRange(range5).build(); + + + Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); + + Set ports = NMTaskFactoryImpl.getNMPorts(resource); + + assertEquals(NMPorts.expectedNumPorts(), ports.size()); + List sortedList = Lists.newArrayList(ports); + + Collections.sort(sortedList); + + for (Long port : sortedList) { + assertTrue((port >= 100 && port <= 200) || + (port >= 250 && port <= 300) || + (port >= 310 && port <= 500) || + (port >= 520 && port <= 720) || + (port >= 750 && port <= 1000)); + } + } + + @Test + public void testRandomPortsNotEnough() { + Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build(); + Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build(); + + Ranges ranges = Ranges.newBuilder().addRange(range1) + .addRange(range2) + .build(); + + + Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); + + Set ports = NMTaskFactoryImpl.getNMPorts(resource); + + assertEquals(NMPorts.expectedNumPorts(), ports.size()); + List sortedList = Lists.newArrayList(ports); + + Collections.sort(sortedList); + + for (Long port : sortedList) { + assertTrue((port >= 100 && port <= 200) || + (port >= 250 && port <= 300)); + } + } + + @Test + public void testRandomPortsNotEnoughPercentKickIn() { + Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build(); + Range range2 = Range.newBuilder().setBegin(250).setEnd(335).build(); + + Ranges ranges = Ranges.newBuilder().addRange(range1) + .addRange(range2) + .build(); + + + Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); + + Set ports = NMTaskFactoryImpl.getNMPorts(resource); + + assertEquals(NMPorts.expectedNumPorts(), ports.size()); + List sortedList = Lists.newArrayList(ports); + + Collections.sort(sortedList); + + for (int i = 0; i < sortedList.size(); i++) { + assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 200) || + (sortedList.get(i) >= 250 && sortedList.get(i) <= 335)); + } + } + + @Test + public void testRandomPortsLargeRange() { + Range range1 = Range.newBuilder().setBegin(100).setEnd(500).build(); + Range range2 = Range.newBuilder().setBegin(550).setEnd(835).build(); + + Ranges ranges = Ranges.newBuilder().addRange(range1) + .addRange(range2) + .build(); + + + Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); + + Set ports = NMTaskFactoryImpl.getNMPorts(resource); + + assertEquals(NMPorts.expectedNumPorts(), ports.size()); + List sortedList = Lists.newArrayList(ports); + + Collections.sort(sortedList); + + for (int i = 0; i < sortedList.size(); i++) { + assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 500) || + (sortedList.get(i) >= 550 && sortedList.get(i) <= 835)); + } + } + + @Test + public void testRandomPortsSmallRange() { + Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build(); + Range range2 = Range.newBuilder().setBegin(110).setEnd(115).build(); + + Ranges ranges = Ranges.newBuilder().addRange(range1) + .addRange(range2) + .build(); + + Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); + + Set ports = NMTaskFactoryImpl.getNMPorts(resource); + + assertEquals(NMPorts.expectedNumPorts(), ports.size()); + List sortedList = Lists.newArrayList(ports); + + Collections.sort(sortedList); + + for (int i = 0; i < sortedList.size(); i++) { + assertTrue(sortedList.get(i) == 100 || (sortedList.get(i) <= 115 && sortedList.get(i) >= 110)); + } + } + + @Test + public void notEnoughPorts() throws Exception { + Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build(); + Range range2 = Range.newBuilder().setBegin(110).setEnd(111).build(); + + Ranges ranges = Ranges.newBuilder().addRange(range1) + .addRange(range2) + .build(); + + Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); + + try { + NMTaskFactoryImpl.getNMPorts(resource); + fail("Should fail, as number of ports is not enough"); + } catch (IllegalStateException ise) { + // should get here + } + + } +}