flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-8174] Mesos RM unable to accept offers for unreserved resources
Date Wed, 06 Dec 2017 10:50:10 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.4 9ba9e809b -> 4bc4f3265


[FLINK-8174] Mesos RM unable to accept offers for unreserved resources

[FLINK-8174] Mesos RM unable to accept offers for unreserved resources
- added test `OfferTest`

[FLINK-8174] Mesos RM unable to accept offers for unreserved resources
- fix `LaunchCoordinatorTest`

[FLINK-8174] Mesos RM unable to accept offers for unreserved resources
- improved javadocs

[FLINK-8174] Mesos RM unable to accept offers for unreserved resources
- rename `print` to `toString`
- precalculate offer resource values
- extend TestLogger

This closes #5114.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4bc4f326
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4bc4f326
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4bc4f326

Branch: refs/heads/release-1.4
Commit: 4bc4f326580bbea290630999273d9d90711b44c3
Parents: 9ba9e80
Author: Eron Wright <eronwright@gmail.com>
Authored: Sun Dec 3 03:14:41 2017 -0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Dec 6 11:49:45 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/mesos/Utils.java | 161 ++++++++++-
 .../clusterframework/LaunchableMesosWorker.java |  39 +--
 .../flink/mesos/scheduler/LaunchableTask.java   |   7 +-
 .../org/apache/flink/mesos/scheduler/Offer.java | 187 +++++++++++++
 .../flink/mesos/util/MesosConfiguration.java    |  10 +
 .../mesos/util/MesosResourceAllocation.java     | 206 ++++++++++++++
 .../mesos/scheduler/LaunchCoordinator.scala     |  25 +-
 .../apache/flink/mesos/scheduler/OfferTest.java | 174 ++++++++++++
 .../mesos/util/MesosResourceAllocationTest.java | 275 +++++++++++++++++++
 .../mesos/scheduler/LaunchCoordinatorTest.scala |   6 +-
 10 files changed, 1058 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index eac73e0..0e63b9b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -25,17 +25,30 @@ import org.apache.mesos.Protos;
 
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import scala.Option;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Collection of utility methods.
  */
 public class Utils {
+
+	/**
+	 * The special 'unreserved' role.
+	 */
+	public static final String UNRESERVED_ROLE = "*";
+
 	/**
 	 * Construct a Mesos environment variable.
 	 */
 	public static Protos.Environment.Variable variable(String name, String value) {
+		checkNotNull(name);
 		return Protos.Environment.Variable.newBuilder()
 			.setName(name)
 			.setValue(value)
@@ -46,6 +59,7 @@ public class Utils {
 	 * Construct a Mesos URI.
 	 */
 	public static Protos.CommandInfo.URI uri(URL url, boolean cacheable) {
+		checkNotNull(url);
 		return Protos.CommandInfo.URI.newBuilder()
 			.setValue(url.toExternalForm())
 			.setExtract(false)
@@ -57,6 +71,8 @@ public class Utils {
 	 * Construct a Mesos URI.
 	 */
 	public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
+		checkNotNull(resolver);
+		checkNotNull(artifact);
 		Option<URL> url = resolver.resolve(artifact.dest);
 		if (url.isEmpty()) {
 			throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
@@ -72,9 +88,90 @@ public class Utils {
 	}
 
 	/**
-	 * Construct a scalar resource value.
+	 * Construct a list of resources.
+	 */
+	public static List<Protos.Resource> resources(Protos.Resource... resources) {
+		checkNotNull(resources);
+		return Arrays.asList(resources);
+	}
+
+	/**
+	 * Construct a cpu resource.
+	 */
+	public static Protos.Resource cpus(double amount) {
+		return cpus(UNRESERVED_ROLE, amount);
+	}
+
+	/**
+	 * Construct a cpu resource.
+	 */
+	public static Protos.Resource cpus(String role, double amount) {
+		return scalar("cpus", role, amount);
+	}
+
+	/**
+	 * Construct a mem resource.
+	 */
+	public static Protos.Resource mem(double amount) {
+		return mem(UNRESERVED_ROLE, amount);
+	}
+
+	/**
+	 * Construct a mem resource.
+	 */
+	public static Protos.Resource mem(String role, double amount) {
+		return scalar("mem", role, amount);
+	}
+
+	/**
+	 * Construct a network resource.
+	 */
+	public static Protos.Resource network(double amount) {
+		return network(UNRESERVED_ROLE, amount);
+	}
+
+	/**
+	 * Construct a network resource.
+	 */
+	public static Protos.Resource network(String role, double amount) {
+		return scalar("network", role, amount);
+	}
+
+	/**
+	 * Construct a disk resource.
+	 */
+	public static Protos.Resource disk(double amount) {
+		return disk(UNRESERVED_ROLE, amount);
+	}
+
+	/**
+	 * Construct a disk resource.
+	 */
+	public static Protos.Resource disk(String role, double amount) {
+		return scalar("disk", role, amount);
+	}
+
+	/**
+	 * Construct a port resource.
+	 */
+	public static Protos.Resource ports(Protos.Value.Range... ranges) {
+		return ports(UNRESERVED_ROLE, ranges);
+	}
+
+	/**
+	 * Construct a port resource.
+	 */
+	public static Protos.Resource ports(String role, Protos.Value.Range... ranges) {
+		return ranges("ports", role, ranges);
+	}
+
+	/**
+	 * Construct a scalar resource.
 	 */
 	public static Protos.Resource scalar(String name, String role, double value) {
+		checkNotNull(name);
+		checkNotNull(role);
+		checkNotNull(value);
 		return Protos.Resource.newBuilder()
 			.setName(name)
 			.setType(Protos.Value.Type.SCALAR)
@@ -91,9 +188,12 @@ public class Utils {
 	}
 
 	/**
-	 * Construct a ranges resource value.
+	 * Construct a range resource.
 	 */
 	public static Protos.Resource ranges(String name, String role, Protos.Value.Range... ranges) {
+		checkNotNull(name);
+		checkNotNull(role);
+		checkNotNull(ranges);
 		return Protos.Resource.newBuilder()
 			.setName(name)
 			.setType(Protos.Value.Type.RANGES)
@@ -101,4 +201,61 @@ public class Utils {
 			.setRole(role)
 			.build();
 	}
+
+	/**
+	 * Gets a stream of values from a collection of range resources.
+	 */
+	public static LongStream rangeValues(Collection<Protos.Resource> resources) {
+		checkNotNull(resources);
+		return resources.stream()
+			.filter(Protos.Resource::hasRanges)
+			.flatMap(r -> r.getRanges().getRangeList().stream())
+			.flatMapToLong(Utils::rangeValues);
+	}
+
+	/**
+	 * Gets a stream of values from a range.
+	 */
+	public static LongStream rangeValues(Protos.Value.Range range) {
+		checkNotNull(range);
+		return LongStream.rangeClosed(range.getBegin(), range.getEnd());
+	}
+
+	/**
+	 * Gets a string representation of a collection of resources.
+	 */
+	public static String toString(Collection<Protos.Resource> resources) {
+		checkNotNull(resources);
+		return resources.stream().map(Utils::toString).collect(Collectors.joining("; ", "[", "]"));
+	}
+
+	/**
+	 * Gets a string representation of a resource.
+	 */
+	public static String toString(Protos.Resource resource) {
+		checkNotNull(resource);
+		if (resource.hasScalar()) {
+			return String.format("%s(%s):%.1f", resource.getName(), resource.getRole(), resource.getScalar().getValue());
+		}
+		if (resource.hasRanges()) {
+			return String.format("%s(%s):%s", resource.getName(), resource.getRole(), toString(resource.getRanges()));
+		}
+		return resource.toString();
+	}
+
+	/**
+	 * Gets a string representation of a collection of ranges.
+	 */
+	public static String toString(Protos.Value.Ranges ranges) {
+		checkNotNull(ranges);
+		return ranges.getRangeList().stream().map(Utils::toString).collect(Collectors.joining(",", "[", "]"));
+	}
+
+	/**
+	 * Gets a string representation of a range.
+	 */
+	public static String toString(Protos.Value.Range range) {
+		checkNotNull(range);
+		return String.format("%d-%d", range.getBegin(), range.getEnd());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 125258b..e71c703 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -25,12 +25,14 @@ import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.MesosResourceAllocation;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
 import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.TaskAssignmentResult;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.mesos.Protos;
@@ -39,16 +41,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 
 import scala.Option;
 
-import static org.apache.flink.mesos.Utils.range;
-import static org.apache.flink.mesos.Utils.ranges;
-import static org.apache.flink.mesos.Utils.scalar;
+import static org.apache.flink.mesos.Utils.rangeValues;
 import static org.apache.flink.mesos.Utils.variable;
 
 /**
@@ -180,11 +182,11 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	/**
 	 * Construct the TaskInfo needed to launch the worker.
 	 * @param slaveId the assigned slave.
-	 * @param assignment the assignment details.
+	 * @param allocation the resource allocation (available resources).
 	 * @return a fully-baked TaskInfo.
 	 */
 	@Override
-	public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) {
+	public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation) {
 
 		ContaineredTaskManagerParameters tmParams = params.containeredParameters();
 
@@ -197,9 +199,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		final Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder()
 			.setSlaveId(slaveId)
 			.setTaskId(taskID)
-			.setName(taskID.getValue())
-			.addResources(scalar("cpus", mesosConfiguration.frameworkInfo().getRole(), assignment.getRequest().getCPUs()))
-			.addResources(scalar("mem", mesosConfiguration.frameworkInfo().getRole(), assignment.getRequest().getMemory()));
+			.setName(taskID.getValue());
+
+		// take needed resources from the overall allocation, under the assumption of adequate resources
+		Set<String> roles = mesosConfiguration.roles();
+		taskInfo.addAllResources(allocation.takeScalar("cpus", taskRequest.getCPUs(), roles));
+		taskInfo.addAllResources(allocation.takeScalar("mem", taskRequest.getMemory(), roles));
 
 		final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder();
 		final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
@@ -217,15 +222,13 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostname);
 		}
 
-		// use the assigned ports for the TM
-		if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
-			throw new IllegalArgumentException("unsufficient # of ports assigned");
-		}
-		for (int i = 0; i < TM_PORT_KEYS.length; i++) {
-			int port = assignment.getAssignedPorts().get(i);
-			String key = TM_PORT_KEYS[i];
-			taskInfo.addResources(ranges("ports", mesosConfiguration.frameworkInfo().getRole(), range(port, port)));
-			dynamicProperties.setInteger(key, port);
+		// take needed ports for the TM
+		List<Protos.Resource> portResources = allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
+		taskInfo.addAllResources(portResources);
+		Iterator<String> portsToAssign = Iterators.forArray(TM_PORT_KEYS);
+		rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port));
+		if (portsToAssign.hasNext()) {
+			throw new IllegalArgumentException("insufficient # of ports assigned");
 		}
 
 		// ship additional files

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/LaunchableTask.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/LaunchableTask.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/LaunchableTask.java
index 203708b..fe8f2e3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/LaunchableTask.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/LaunchableTask.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.mesos.scheduler;
 
-import com.netflix.fenzo.TaskAssignmentResult;
+import org.apache.flink.mesos.util.MesosResourceAllocation;
+
 import com.netflix.fenzo.TaskRequest;
 import org.apache.mesos.Protos;
 
@@ -35,8 +36,8 @@ public interface LaunchableTask {
 	/**
 	 * Prepare to launch the task by producing a Mesos TaskInfo record.
 	 * @param slaveId the slave assigned to the task.
-	 * @param taskAssignmentResult the task assignment details.
+	 * @param allocation the resource allocation to take from.
      * @return a TaskInfo.
      */
-	Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult taskAssignmentResult);
+	Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/Offer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/Offer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/Offer.java
new file mode 100644
index 0000000..64436fb
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/Offer.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import org.apache.flink.mesos.Utils;
+
+import com.netflix.fenzo.VirtualMachineLease;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An adapter class to transform a Mesos resource offer to a Fenzo {@link VirtualMachineLease}.
+ *
+ * <p>The default implementation provided by Fenzo isn't compatible with reserved resources.
+ * This implementation properly combines resources, e.g. a combination of reserved and unreserved cpus.
+ *
+ */
+public class Offer implements VirtualMachineLease {
+
+	private static final Logger logger = LoggerFactory.getLogger(Offer.class);
+
+	private final Protos.Offer offer;
+	private final String hostname;
+	private final String vmID;
+	private final long offeredTime;
+
+	private final List<Protos.Resource> resources;
+	private final Map<String, Protos.Attribute> attributeMap;
+
+	private final double cpuCores;
+	private final double memoryMB;
+	private final double networkMbps;
+	private final double diskMB;
+	private final List<Range> portRanges;
+
+	public Offer(Protos.Offer offer) {
+		this.offer = checkNotNull(offer);
+		this.hostname = offer.getHostname();
+		this.vmID = offer.getSlaveId().getValue();
+		this.offeredTime = System.currentTimeMillis();
+
+		List<Protos.Resource> resources = new ArrayList<>(offer.getResourcesList().size());
+		Map<String, List<Protos.Resource>> resourceMap = new HashMap<>();
+		for (Protos.Resource resource : offer.getResourcesList()) {
+			switch (resource.getType()) {
+				case SCALAR:
+				case RANGES:
+					resources.add(resource);
+					resourceMap.computeIfAbsent(resource.getName(), k -> new ArrayList<>(2)).add(resource);
+					break;
+				default:
+					logger.debug("Unknown resource type " + resource.getType() + " for resource " + resource.getName() +
+						" in offer, hostname=" + hostname + ", offerId=" + offer.getId());
+			}
+		}
+		this.resources = Collections.unmodifiableList(resources);
+
+		this.cpuCores = aggregateScalarResource(resourceMap, "cpus");
+		this.memoryMB = aggregateScalarResource(resourceMap, "mem");
+		this.networkMbps = aggregateScalarResource(resourceMap, "network");
+		this.diskMB = aggregateScalarResource(resourceMap, "disk");
+		this.portRanges = Collections.unmodifiableList(aggregateRangesResource(resourceMap, "ports"));
+
+		if (offer.getAttributesCount() > 0) {
+			Map<String, Protos.Attribute> attributeMap = new HashMap<>();
+			for (Protos.Attribute attribute: offer.getAttributesList()) {
+				attributeMap.put(attribute.getName(), attribute);
+			}
+			this.attributeMap = Collections.unmodifiableMap(attributeMap);
+		} else {
+			this.attributeMap = Collections.emptyMap();
+		}
+	}
+
+	public List<Protos.Resource> getResources() {
+		return resources;
+	}
+
+	@Override
+	public String hostname() {
+		return hostname;
+	}
+
+	@Override
+	public String getVMID() {
+		return vmID;
+	}
+
+	@Override
+	public double cpuCores() {
+		return cpuCores;
+	}
+
+	@Override
+	public double memoryMB() {
+		return memoryMB;
+	}
+
+	@Override
+	public double networkMbps() {
+		return networkMbps;
+	}
+
+	@Override
+	public double diskMB() {
+		return diskMB;
+	}
+
+	public Protos.Offer getOffer(){
+		return offer;
+	}
+
+	@Override
+	public String getId() {
+		return offer.getId().getValue();
+	}
+
+	@Override
+	public long getOfferedTime() {
+		return offeredTime;
+	}
+
+	@Override
+	public List<Range> portRanges() {
+		return portRanges;
+	}
+
+	@Override
+	public Map<String, Protos.Attribute> getAttributeMap() {
+		return attributeMap;
+	}
+
+	@Override
+	public String toString() {
+		return "Offer{" +
+			"offer=" + offer +
+			", resources='" + Utils.toString(resources) + '\'' +
+			", hostname='" + hostname + '\'' +
+			", vmID='" + vmID + '\'' +
+			", attributeMap=" + attributeMap +
+			", offeredTime=" + offeredTime +
+			'}';
+	}
+
+	private static double aggregateScalarResource(Map<String, List<Protos.Resource>> resourceMap, String resourceName) {
+		if (resourceMap.get(resourceName) == null) {
+			return 0.0;
+		}
+		return resourceMap.get(resourceName).stream().mapToDouble(r -> r.getScalar().getValue()).sum();
+	}
+
+	private static List<Range> aggregateRangesResource(Map<String, List<Protos.Resource>> resourceMap, String resourceName) {
+		if (resourceMap.get(resourceName) == null) {
+			return Collections.emptyList();
+		}
+		return resourceMap.get(resourceName).stream()
+			.flatMap(r -> r.getRanges().getRangeList().stream())
+			.map(r -> new Range((int) r.getBegin(), (int) r.getEnd()))
+			.collect(Collectors.toList());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
index 7660e9c..4db5e25 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
@@ -24,7 +24,9 @@ import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 import scala.Option;
 
@@ -90,6 +92,14 @@ public class MesosConfiguration {
 	}
 
 	/**
+	 * Gets the roles associated with the framework.
+	 */
+	public Set<String> roles() {
+		return frameworkInfo.hasRole() && !"*".equals(frameworkInfo.getRole()) ?
+			Collections.singleton(frameworkInfo.getRole()) : Collections.emptySet();
+	}
+
+	/**
 	 * Create the Mesos scheduler driver based on this configuration.
 	 * @param scheduler the scheduler to use.
 	 * @param implicitAcknowledgements whether to configure the driver for implicit acknowledgements.

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosResourceAllocation.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosResourceAllocation.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosResourceAllocation.java
new file mode 100644
index 0000000..9ad5234
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosResourceAllocation.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.flink.mesos.Utils;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+
+import static org.apache.flink.mesos.Utils.UNRESERVED_ROLE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An allocation of resources on a particular host from one or more Mesos offers, to be portioned out to tasks.
+ *
+ * <p>A typical offer contains a mix of reserved and unreserved resources.
+ * The below example depicts 2 cpus reserved for 'myrole' plus 3 unreserved cpus for a total of 5 cpus:
+ * <pre>{@code
+ *   cpus(myrole):2.0; mem(myrole):4096.0; ports(myrole):[1025-2180];
+ *   disk(*):28829.0; cpus(*):3.0; mem(*):10766.0; ports(*):[2182-3887,8082-8180,8182-32000]
+ * }</pre>
+ *
+ * <p>This class assumes that the resources were offered <b>without</b> the {@code RESERVATION_REFINEMENT} capability,
+ * as detailed in the "Resource Format" section of the Mesos protocol definition.
+ *
+ * <p>This class is not thread-safe.
+ */
+public class MesosResourceAllocation {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceAllocation.class);
+
+	static final double EPSILON = 1e-5;
+
+	private final List<Protos.Resource> resources;
+
+	/**
+	 * Creates an allocation of resources for tasks to take.
+	 *
+	 * @param resources the resources to add to the allocation.
+	 */
+	public MesosResourceAllocation(Collection<Protos.Resource> resources) {
+		this.resources = new ArrayList<>(checkNotNull(resources));
+
+		// sort the resources to prefer reserved resources
+		this.resources.sort(Comparator.comparing(r -> UNRESERVED_ROLE.equals(r.getRole())));
+	}
+
+	/**
+	 * Gets the remaining resources.
+	 */
+	public List<Protos.Resource> getRemaining() {
+		return Collections.unmodifiableList(resources);
+	}
+
+	/**
+	 * Takes some amount of scalar resources (e.g. cpus, mem).
+	 *
+	 * @param amount the (approximate) amount to take from the available quantity.
+	 * @param roles the roles to accept
+	 */
+	public List<Protos.Resource> takeScalar(String resourceName, double amount, Set<String> roles) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Allocating {} {}", amount, resourceName);
+		}
+
+		List<Protos.Resource> result = new ArrayList<>(1);
+		for (ListIterator<Protos.Resource> i = resources.listIterator(); i.hasNext();) {
+			if (amount <= EPSILON) {
+				break;
+			}
+
+			// take from next available scalar resource that is unreserved or reserved for an applicable role
+			Protos.Resource available = i.next();
+			if (!resourceName.equals(available.getName()) || !available.hasScalar()) {
+				continue;
+			}
+			if (!UNRESERVED_ROLE.equals(available.getRole()) && !roles.contains(available.getRole())) {
+				continue;
+			}
+
+			double amountToTake = Math.min(available.getScalar().getValue(), amount);
+			Protos.Resource taken = available.toBuilder().setScalar(Protos.Value.Scalar.newBuilder().setValue(amountToTake)).build();
+			amount -= amountToTake;
+			result.add(taken);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Taking {} from {}", amountToTake, Utils.toString(available));
+			}
+
+			// keep remaining amount (if any)
+			double remaining = available.getScalar().getValue() - taken.getScalar().getValue();
+			if (remaining > EPSILON) {
+				i.set(available.toBuilder().setScalar(Protos.Value.Scalar.newBuilder().setValue(remaining)).build());
+			}
+			else {
+				i.remove();
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Allocated: {}, unsatisfied: {}", Utils.toString(result), amount);
+		}
+		return result;
+	}
+
+	/**
+	 * Takes some amount of range resources (e.g. ports).
+	 *
+	 * @param amount the number of values to take from the available range(s).
+	 * @param roles the roles to accept
+	 */
+	public List<Protos.Resource> takeRanges(String resourceName, int amount, Set<String> roles) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Allocating {} {}", amount, resourceName);
+		}
+
+		List<Protos.Resource> result = new ArrayList<>(1);
+		for (ListIterator<Protos.Resource> i = resources.listIterator(); i.hasNext();) {
+			if (amount <= 0) {
+				break;
+			}
+
+			// take from next available range resource that is unreserved or reserved for an applicable role
+			Protos.Resource available = i.next();
+			if (!resourceName.equals(available.getName()) || !available.hasRanges()) {
+				continue;
+			}
+			if (!UNRESERVED_ROLE.equals(available.getRole()) && !roles.contains(available.getRole())) {
+				continue;
+			}
+
+			List<Protos.Value.Range> takenRanges = new ArrayList<>();
+			List<Protos.Value.Range> remainingRanges = new ArrayList<>(available.getRanges().getRangeList());
+			for (ListIterator<Protos.Value.Range> j = remainingRanges.listIterator(); j.hasNext();) {
+				if (amount <= 0) {
+					break;
+				}
+
+				// take from next available range (note: ranges are inclusive)
+				Protos.Value.Range availableRange = j.next();
+				long amountToTake = Math.min(availableRange.getEnd() - availableRange.getBegin() + 1, amount);
+				Protos.Value.Range takenRange = availableRange.toBuilder().setEnd(availableRange.getBegin() + amountToTake - 1).build();
+				amount -= amountToTake;
+				takenRanges.add(takenRange);
+
+				// keep remaining range (if any)
+				long remaining = availableRange.getEnd() - takenRange.getEnd();
+				if (remaining > 0) {
+					j.set(availableRange.toBuilder().setBegin(takenRange.getEnd() + 1).build());
+				}
+				else {
+					j.remove();
+				}
+			}
+			Protos.Resource taken = available.toBuilder().setRanges(Protos.Value.Ranges.newBuilder().addAllRange(takenRanges)).build();
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Taking {} from {}", Utils.toString(taken.getRanges()), Utils.toString(available));
+			}
+			result.add(taken);
+
+			// keep remaining ranges (if any)
+			if (remainingRanges.size() > 0) {
+				i.set(available.toBuilder().setRanges(Protos.Value.Ranges.newBuilder().addAllRange(remainingRanges)).build());
+			}
+			else {
+				i.remove();
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Allocated: {}, unsatisfied: {}", Utils.toString(result), amount);
+		}
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "MesosResourceAllocation{" +
+			"resources=" + Utils.toString(resources) +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
index 1024b5c..124022a 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
@@ -23,12 +23,13 @@ import java.util.Collections
 import akka.actor.{Actor, ActorRef, FSM, Props}
 import com.netflix.fenzo._
 import com.netflix.fenzo.functions.Action1
-import com.netflix.fenzo.plugins.VMLeaseObject
 import grizzled.slf4j.Logger
 import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.Utils
 import org.apache.flink.mesos.scheduler.LaunchCoordinator._
 import org.apache.flink.mesos.scheduler.messages._
+import org.apache.flink.mesos.util.MesosResourceAllocation
 import org.apache.mesos.{SchedulerDriver, Protos}
 
 import scala.collection.JavaConverters._
@@ -147,15 +148,17 @@ class LaunchCoordinator(
       goto(Suspended) using data.copy(newLeases = Nil)
 
     case Event(offers: ResourceOffers, data: GatherData) =>
-      val leases = offers.offers().asScala.map(
-        new VMLeaseObject(_).asInstanceOf[VirtualMachineLease])
+      val leases = offers.offers().asScala.map(new Offer(_))
       if(LOG.isInfoEnabled) {
         val (cpus, mem) = leases.foldLeft((0.0,0.0)) {
           (z,o) => (z._1 + o.cpuCores(), z._2 + o.memoryMB())
         }
         LOG.info(s"Received offer(s) of $mem MB, $cpus cpus:")
         for(l <- leases) {
-          LOG.info(s"  ${l.getId} from ${l.hostname()} of ${l.memoryMB()} MB, ${l.cpuCores()} cpus")
+          val reservations = l.getResources.asScala.map(_.getRole).toSet
+          LOG.info(
+            s"  ${l.getId} from ${l.hostname()} of ${l.memoryMB()} MB, ${l.cpuCores()} cpus" +
+            s" for ${reservations.mkString("[", ",", "]")}")
         }
       }
       stay using data.copy(newLeases = data.newLeases ++ leases) forMax (1 seconds)
@@ -185,7 +188,7 @@ class LaunchCoordinator(
         // process the assignments into a set of operations (reserve and/or launch)
         val slaveId = assignments.getLeasesUsed.get(0).getOffer.getSlaveId
         val offerIds = assignments.getLeasesUsed.asScala.map(_.getOffer.getId)
-        val operations = processAssignments(slaveId, assignments, remaining.toMap)
+        val operations = processAssignments(LOG, slaveId, assignments, remaining.toMap)
 
         // update the state to reflect the launched tasks
         val launchedTasks = operations
@@ -316,18 +319,26 @@ object LaunchCoordinator {
     *
     * The operations may include reservations and task launches.
     *
+    * @param log the logger to use.
     * @param slaveId the slave associated with the given assignments.
     * @param assignments the task assignments as provided by the optimizer.
     * @param allTasks all known tasks, keyed by taskId.
     * @return the operations to perform.
     */
   private def processAssignments(
+      log: Logger,
       slaveId: Protos.SlaveID,
       assignments: VMAssignmentResult,
       allTasks: Map[String, LaunchableTask]): Seq[Protos.Offer.Operation] = {
 
+    val resources =
+      assignments.getLeasesUsed.asScala.flatMap(_.asInstanceOf[Offer].getResources.asScala)
+    val allocation = new MesosResourceAllocation(resources.asJava)
+    log.debug(s"Assigning resources: ${Utils.toString(allocation.getRemaining)}")
+
     def taskInfo(assignment: TaskAssignmentResult): Protos.TaskInfo = {
-      allTasks(assignment.getTaskId).launch(slaveId, assignment)
+      log.debug(s"Processing task ${assignment.getTaskId}")
+      allTasks(assignment.getTaskId).launch(slaveId, allocation)
     }
 
     val launches = Protos.Offer.Operation.newBuilder()
@@ -338,6 +349,8 @@ object LaunchCoordinator {
         ))
       .build()
 
+    log.debug(s"Remaining resources: ${Utils.toString(allocation.getRemaining)}")
+
     Seq(launches)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/test/java/org/apache/flink/mesos/scheduler/OfferTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/scheduler/OfferTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/scheduler/OfferTest.java
new file mode 100644
index 0000000..cf84e8e
--- /dev/null
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/scheduler/OfferTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import org.apache.flink.util.TestLogger;
+
+import com.netflix.fenzo.VirtualMachineLease;
+import org.apache.mesos.Protos;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.mesos.Utils.UNRESERVED_ROLE;
+import static org.apache.flink.mesos.Utils.cpus;
+import static org.apache.flink.mesos.Utils.disk;
+import static org.apache.flink.mesos.Utils.mem;
+import static org.apache.flink.mesos.Utils.network;
+import static org.apache.flink.mesos.Utils.ports;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.resources;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Tests {@link Offer} which adapts a Mesos offer as a lease for use with Fenzo.
+ */
+public class OfferTest extends TestLogger {
+
+	private static final double EPSILON = 1e-5;
+
+	private static final Protos.FrameworkID FRAMEWORK_ID = Protos.FrameworkID.newBuilder().setValue("framework-1").build();
+	private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder().setValue("offer-1").build();
+	private static final String HOSTNAME = "host-1";
+	private static final Protos.SlaveID AGENT_ID = Protos.SlaveID.newBuilder().setValue("agent-1").build();
+
+	private static final String ROLE_A = "A";
+
+	private static final String ATTR_1 = "A1";
+
+	// region Resources
+
+	/**
+	 * Tests basic properties (other than those of specific resources, covered elsewhere).
+	 */
+	@Test
+	public void testResourceProperties() {
+		Offer offer = new Offer(offer(resources(), attrs()));
+		Assert.assertNotNull(offer.getResources());
+		Assert.assertEquals(HOSTNAME, offer.hostname());
+		Assert.assertEquals(AGENT_ID.getValue(), offer.getVMID());
+		Assert.assertNotNull(offer.getOffer());
+		Assert.assertEquals(OFFER_ID.getValue(), offer.getId());
+		Assert.assertNotEquals(0L, offer.getOfferedTime());
+		Assert.assertNotNull(offer.getAttributeMap());
+		Assert.assertNotNull(offer.toString());
+	}
+
+	/**
+	 * Tests aggregation of resources in the presence of unreserved plus reserved resources.
+	 */
+	@Test
+	public void testResourceAggregation() {
+		Offer offer;
+
+		offer = new Offer(offer(resources(), attrs()));
+		Assert.assertEquals(0.0, offer.cpuCores(), EPSILON);
+		Assert.assertEquals(Arrays.asList(), ranges(offer.portRanges()));
+
+		offer = new Offer(offer(resources(
+			cpus(ROLE_A, 1.0), cpus(UNRESERVED_ROLE, 1.0),
+			ports(ROLE_A, range(80, 80), range(443, 444)), ports(UNRESERVED_ROLE, range(8080, 8081)),
+			otherScalar(42.0)), attrs()));
+		Assert.assertEquals(2.0, offer.cpuCores(), EPSILON);
+		Assert.assertEquals(Arrays.asList(range(80, 80), range(443, 444), range(8080, 8081)), ranges(offer.portRanges()));
+	}
+
+	@Test
+	public void testCpuCores() {
+		Offer offer = new Offer(offer(resources(cpus(1.0)), attrs()));
+		Assert.assertEquals(1.0, offer.cpuCores(), EPSILON);
+	}
+
+	@Test
+	public void testMemoryMB() {
+		Offer offer = new Offer(offer(resources(mem(1024.0)), attrs()));
+		Assert.assertEquals(1024.0, offer.memoryMB(), EPSILON);
+	}
+
+	@Test
+	public void testNetworkMbps() {
+		Offer offer = new Offer(offer(resources(network(10.0)), attrs()));
+		Assert.assertEquals(10.0, offer.networkMbps(), EPSILON);
+	}
+
+	@Test
+	public void testDiskMB() {
+		Offer offer = new Offer(offer(resources(disk(1024.0)), attrs()));
+		Assert.assertEquals(1024.0, offer.diskMB(), EPSILON);
+	}
+
+	@Test
+	public void testPortRanges() {
+		Offer offer = new Offer(offer(resources(ports(range(8080, 8081))), attrs()));
+		Assert.assertEquals(Collections.singletonList(range(8080, 8081)), ranges(offer.portRanges()));
+	}
+
+	// endregion
+
+	// region Attributes
+
+	@Test
+	public void testAttributeIndexing() {
+		Offer offer = new Offer(offer(resources(), attrs(attr(ATTR_1, 42.0))));
+		Assert.assertEquals(attr(ATTR_1, 42.0), offer.getAttributeMap().get(ATTR_1));
+	}
+
+	// endregion
+
+	// region Utilities
+
+	private static Protos.Offer offer(List<Protos.Resource> resources, List<Protos.Attribute> attributes) {
+		return Protos.Offer.newBuilder()
+			.setId(OFFER_ID)
+			.setFrameworkId(FRAMEWORK_ID)
+			.setHostname(HOSTNAME)
+			.setSlaveId(AGENT_ID)
+			.addAllAttributes(attributes)
+			.addAllResources(resources)
+			.build();
+	}
+
+	private static Protos.Attribute attr(String name, double scalar) {
+		return Protos.Attribute.newBuilder()
+			.setName(name)
+			.setType(Protos.Value.Type.SCALAR)
+			.setScalar(Protos.Value.Scalar.newBuilder().setValue(scalar))
+			.build();
+	}
+
+	private static List<Protos.Attribute> attrs(Protos.Attribute... attributes) {
+		return Arrays.asList(attributes);
+	}
+
+	private static List<Protos.Value.Range> ranges(List<VirtualMachineLease.Range> ranges) {
+		return ranges.stream()
+			.map(r -> Protos.Value.Range.newBuilder().setBegin(r.getBeg()).setEnd(r.getEnd()).build())
+			.collect(Collectors.toList());
+	}
+
+	private static Protos.Resource otherScalar(double value) {
+		return scalar("mem", UNRESERVED_ROLE, value);
+	}
+
+	// endregion
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/test/java/org/apache/flink/mesos/util/MesosResourceAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/util/MesosResourceAllocationTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/util/MesosResourceAllocationTest.java
new file mode 100644
index 0000000..fe2e444
--- /dev/null
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/util/MesosResourceAllocationTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.mesos.Protos;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.mesos.Utils.UNRESERVED_ROLE;
+import static org.apache.flink.mesos.Utils.cpus;
+import static org.apache.flink.mesos.Utils.ports;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.resources;
+
+/**
+ * Tests {@link MesosResourceAllocation}.
+ */
+public class MesosResourceAllocationTest extends TestLogger {
+
+	// possible roles
+	private static final String ROLE_A = "A";
+	private static final String ROLE_B = "B";
+
+	// possible framework configurations
+	private static final Set<String> AS_ROLE_A = Collections.singleton(ROLE_A);
+	private static final Set<String> AS_NO_ROLE = Collections.emptySet();
+
+	// region Reservations
+
+	/**
+	 * Tests that reserved resources are prioritized.
+	 */
+	@Test
+	public void testReservationPrioritization() {
+		MesosResourceAllocation allocation = new MesosResourceAllocation(
+			resources(cpus(ROLE_A, 1.0), cpus(UNRESERVED_ROLE, 1.0), cpus(ROLE_B, 1.0)));
+		Assert.assertEquals(
+			resources(cpus(ROLE_A, 1.0), cpus(ROLE_B, 1.0), cpus(UNRESERVED_ROLE, 1.0)),
+			allocation.getRemaining());
+	}
+
+	/**
+	 * Tests that resources are filtered according to the framework role (if any).
+	 */
+	@Test
+	public void testReservationFiltering() {
+		MesosResourceAllocation allocation;
+
+		// unreserved resources
+		allocation = new MesosResourceAllocation(
+			resources(cpus(UNRESERVED_ROLE, 1.0), ports(UNRESERVED_ROLE, range(80, 80))));
+		Assert.assertEquals(resources(cpus(UNRESERVED_ROLE, 1.0)),
+			allocation.takeScalar("cpus", 1.0, AS_NO_ROLE));
+		Assert.assertEquals(resources(ports(UNRESERVED_ROLE, range(80, 80))),
+			allocation.takeRanges("ports", 1, AS_NO_ROLE));
+		allocation = new MesosResourceAllocation(
+			resources(cpus(UNRESERVED_ROLE, 1.0), ports(UNRESERVED_ROLE, range(80, 80))));
+		Assert.assertEquals(resources(cpus(UNRESERVED_ROLE, 1.0)),
+			allocation.takeScalar("cpus", 1.0, AS_ROLE_A));
+		Assert.assertEquals(resources(ports(UNRESERVED_ROLE, range(80, 80))),
+			allocation.takeRanges("ports", 1, AS_ROLE_A));
+
+		// reserved for the framework role
+		allocation = new MesosResourceAllocation(
+			resources(cpus(ROLE_A, 1.0), ports(ROLE_A, range(80, 80))));
+		Assert.assertEquals(resources(),
+			allocation.takeScalar("cpus", 1.0, AS_NO_ROLE));
+		Assert.assertEquals(resources(),
+			allocation.takeRanges("ports", 1, AS_NO_ROLE));
+		Assert.assertEquals(resources(cpus(ROLE_A, 1.0)),
+			allocation.takeScalar("cpus", 1.0, AS_ROLE_A));
+		Assert.assertEquals(resources(ports(ROLE_A, range(80, 80))),
+			allocation.takeRanges("ports", 1, AS_ROLE_A));
+
+		// reserved for a different role
+		allocation = new MesosResourceAllocation(
+			resources(cpus(ROLE_B, 1.0), ports(ROLE_B, range(80, 80))));
+		Assert.assertEquals(resources(),
+			allocation.takeScalar("cpus", 1.0, AS_NO_ROLE));
+		Assert.assertEquals(resources(),
+			allocation.takeRanges("ports", 1, AS_NO_ROLE));
+		Assert.assertEquals(resources(),
+			allocation.takeScalar("cpus", 1.0, AS_ROLE_A));
+		Assert.assertEquals(resources(),
+			allocation.takeRanges("ports", 1, AS_ROLE_A));
+	}
+
+	// endregion
+
+	// region General
+
+	/**
+	 * Tests resource naming and typing.
+	 */
+	@Test
+	public void testResourceSpecificity() {
+		MesosResourceAllocation allocation = new MesosResourceAllocation(
+			resources(cpus(1.0), ports(range(80, 80))));
+
+		// mismatched name
+		Assert.assertEquals(
+			resources(),
+			allocation.takeScalar("other", 1.0, AS_NO_ROLE));
+		Assert.assertEquals(
+			resources(),
+			allocation.takeRanges("other", 1, AS_NO_ROLE));
+
+		// mismatched type
+		Assert.assertEquals(
+			resources(),
+			allocation.takeScalar("ports", 1.0, AS_NO_ROLE));
+		Assert.assertEquals(
+			resources(),
+			allocation.takeRanges("cpus", 1, AS_NO_ROLE));
+
+		// nothing lost
+		Assert.assertEquals(
+			resources(cpus(1.0), ports(range(80, 80))),
+			allocation.getRemaining());
+	}
+
+	// endregion
+
+	// region Scalar Resources
+
+	/**
+	 * Tests scalar resource accounting.
+	 */
+	@Test
+	public void testScalarResourceAccounting() {
+		MesosResourceAllocation allocation;
+
+		// take part of a resource
+		allocation = new MesosResourceAllocation(resources(cpus(1.0)));
+		Assert.assertEquals(
+			resources(cpus(0.25)),
+			allocation.takeScalar("cpus", 0.25, AS_NO_ROLE));
+		Assert.assertEquals(
+			resources(cpus(0.75)),
+			allocation.getRemaining());
+
+		// take a whole resource
+		allocation = new MesosResourceAllocation(resources(cpus(1.0)));
+		Assert.assertEquals(
+			resources(cpus(1.0)),
+			allocation.takeScalar("cpus", 1.0, AS_NO_ROLE));
+		Assert.assertEquals(resources(), allocation.getRemaining());
+
+		// take multiple resources
+		allocation = new MesosResourceAllocation(
+			resources(cpus(ROLE_A, 1.0), cpus(UNRESERVED_ROLE, 1.0)));
+		Assert.assertEquals(
+			resources(cpus(ROLE_A, 1.0), cpus(UNRESERVED_ROLE, 0.25)),
+			allocation.takeScalar("cpus", 1.25, AS_ROLE_A));
+		Assert.assertEquals(
+			resources(cpus(UNRESERVED_ROLE, 0.75)),
+			allocation.getRemaining());
+	}
+
+	/**
+	 * Tests scalar resource exhaustion (i.e. insufficient resources).
+	 */
+	@Test
+	public void testScalarResourceExhaustion() {
+		MesosResourceAllocation allocation = new MesosResourceAllocation(resources(cpus(1.0)));
+		Assert.assertEquals(
+			resources(cpus(1.0)),
+			allocation.takeScalar("cpus", 2.0, AS_NO_ROLE));
+		Assert.assertEquals(resources(), allocation.getRemaining());
+	}
+
+	// endregion
+
+	// region Range Resources
+
+	/**
+	 * Tests range resource accounting.
+	 */
+	@Test
+	public void testRangeResourceAccounting() {
+		MesosResourceAllocation allocation;
+		List<Protos.Resource> ports = resources(
+			ports(ROLE_A, range(80, 81), range(443, 444)),
+			ports(UNRESERVED_ROLE, range(1024, 1025), range(8080, 8081)));
+
+		// take a partial range of one resource
+		allocation = new MesosResourceAllocation(ports);
+		Assert.assertEquals(
+			resources(ports(ROLE_A, range(80, 80))),
+			allocation.takeRanges("ports", 1, AS_ROLE_A));
+		Assert.assertEquals(
+			resources(
+				ports(ROLE_A, range(81, 81), range(443, 444)),
+				ports(UNRESERVED_ROLE, range(1024, 1025), range(8080, 8081))),
+			allocation.getRemaining());
+
+		// take a whole range of one resource
+		allocation = new MesosResourceAllocation(ports);
+		Assert.assertEquals(
+			resources(ports(ROLE_A, range(80, 81))),
+			allocation.takeRanges("ports", 2, AS_ROLE_A));
+		Assert.assertEquals(
+			resources(
+				ports(ROLE_A, range(443, 444)),
+				ports(UNRESERVED_ROLE, range(1024, 1025), range(8080, 8081))),
+			allocation.getRemaining());
+
+		// take numerous ranges of one resource
+		allocation = new MesosResourceAllocation(ports);
+		Assert.assertEquals(
+			resources(ports(ROLE_A, range(80, 81), range(443, 443))),
+			allocation.takeRanges("ports", 3, AS_ROLE_A));
+		Assert.assertEquals(
+			resources(
+				ports(ROLE_A, range(444, 444)),
+				ports(UNRESERVED_ROLE, range(1024, 1025), range(8080, 8081))),
+			allocation.getRemaining());
+
+		// take a whole resource
+		allocation = new MesosResourceAllocation(ports);
+		Assert.assertEquals(
+			resources(ports(ROLE_A, range(80, 81), range(443, 444))),
+			allocation.takeRanges("ports", 4, AS_ROLE_A));
+		Assert.assertEquals(
+			resources(ports(UNRESERVED_ROLE, range(1024, 1025), range(8080, 8081))),
+			allocation.getRemaining());
+
+		// take numerous resources
+		allocation = new MesosResourceAllocation(ports);
+		Assert.assertEquals(
+			resources(
+				ports(ROLE_A, range(80, 81), range(443, 444)),
+				ports(UNRESERVED_ROLE, range(1024, 1024))),
+			allocation.takeRanges("ports", 5, AS_ROLE_A));
+		Assert.assertEquals(
+			resources(ports(UNRESERVED_ROLE, range(1025, 1025), range(8080, 8081))),
+			allocation.getRemaining());
+	}
+
+	/**
+	 * Tests range resource exhaustion (i.e. insufficient resources).
+	 */
+	@Test
+	public void testRangeResourceExhaustion() {
+		MesosResourceAllocation allocation = new MesosResourceAllocation(resources(ports(range(80, 80))));
+		Assert.assertEquals(
+			resources(ports(range(80, 80))),
+			allocation.takeRanges("ports", 2, AS_NO_ROLE));
+		Assert.assertEquals(resources(), allocation.getRemaining());
+	}
+
+	// endregion
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4bc4f326/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
index f18c07d..eb8259a 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
@@ -26,7 +26,6 @@ import akka.testkit._
 import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest}
 import com.netflix.fenzo._
 import com.netflix.fenzo.functions.{Action1, Action2}
-import com.netflix.fenzo.plugins.VMLeaseObject
 import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.mesos.scheduler.LaunchCoordinator._
@@ -47,6 +46,7 @@ import scala.collection.JavaConverters._
 import org.apache.flink.mesos.Utils.range
 import org.apache.flink.mesos.Utils.ranges
 import org.apache.flink.mesos.Utils.scalar
+import org.apache.flink.mesos.util.MesosResourceAllocation
 
 @RunWith(classOf[JUnitRunner])
 class LaunchCoordinatorTest
@@ -95,7 +95,7 @@ class LaunchCoordinatorTest
       override def taskRequest: TaskRequest = generateTaskRequest
       override def launch(
           slaveId: SlaveID,
-          taskAssignment: TaskAssignmentResult): Protos.TaskInfo = {
+          allocation: MesosResourceAllocation): Protos.TaskInfo = {
         Protos.TaskInfo.newBuilder
           .setTaskId(taskID).setName(taskID.getValue)
           .setCommand(Protos.CommandInfo.newBuilder.setValue("whoami"))
@@ -129,7 +129,7 @@ class LaunchCoordinatorTest
   }
 
   def lease(offer: Protos.Offer) = {
-    new VMLeaseObject(offer)
+    new Offer(offer)
   }
 
   /**


Mime
View raw message