flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [24/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package
Date Wed, 28 Sep 2016 08:21:07 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
new file mode 100644
index 0000000..52d9d06
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+	private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+	private static final long DEFAULT_TESTING_MEMORY = 512;
+
+	private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+		new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+
+	private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+		new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+
+	private ResourceManagerGateway resourceManagerGateway;
+
+	@Before
+	public void setUp() {
+		resourceManagerGateway = mock(ResourceManagerGateway.class);
+	}
+
+	/**
+	 * Tests that there are no free slots when we request, need to allocate from cluster manager master
+	 */
+	@Test
+	public void testRequestSlotWithoutFreeSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(1, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+	}
+
+	/**
+	 * Tests that there are some free slots when we request, and the request is fulfilled immediately
+	 */
+	@Test
+	public void testRequestSlotWithFreeSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+		assertEquals(1, slotManager.getFreeSlotCount());
+
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertEquals(0, slotManager.getAllocatedContainers().size());
+	}
+
+	/**
+	 * Tests that there are some free slots when we request, but none of them are suitable
+	 */
+	@Test
+	public void testRequestSlotWithoutSuitableSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
+		assertEquals(2, slotManager.getFreeSlotCount());
+
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(2, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(1, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+	}
+
+	/**
+	 * Tests that we send duplicated slot request
+	 */
+	@Test
+	public void testDuplicatedSlotRequest() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+
+		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+		slotManager.requestSlot(request1);
+		slotManager.requestSlot(request2);
+		slotManager.requestSlot(request2);
+		slotManager.requestSlot(request1);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertEquals(1, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+	}
+
+	/**
+	 * Tests that we send multiple slot requests
+	 */
+	@Test
+	public void testRequestMultipleSlots() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+
+		// request 3 normal slots
+		for (int i = 0; i < 3; ++i) {
+			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		}
+
+		// request 2 big slots
+		for (int i = 0; i < 2; ++i) {
+			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+		}
+
+		// request 1 normal slot again
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+		assertEquals(4, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(2, slotManager.getPendingRequestCount());
+		assertEquals(2, slotManager.getAllocatedContainers().size());
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+	 */
+	@Test
+	public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		assertEquals(1, slotManager.getPendingRequestCount());
+
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, but we have no pending request
+	 */
+	@Test
+	public void testNewlyAppearedFreeSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+	 */
+	@Test
+	public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+		assertEquals(1, slotManager.getPendingRequestCount());
+
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertFalse(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+	 */
+	@Test
+	public void testNewlyAppearedInUseSlot() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertTrue(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that we had a slot in-use, and it's confirmed by SlotReport
+	 */
+	@Test
+	public void testExistingInUseSlotUpdateStatus() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		// make this slot in use
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertTrue(slotManager.isAllocated(slotId));
+
+		// slot status is confirmed
+		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
+			request.getAllocationId(), request.getJobId());
+		slotManager.updateSlotStatus(slotStatus2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertTrue(slotManager.isAllocated(slotId));
+	}
+
+	/**
+	 * Tests that we had a slot in-use, but it's empty according to the SlotReport
+	 */
+	@Test
+	public void testExistingInUseSlotAdjustedToEmpty() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request1);
+
+		// make this slot in use
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		// another request pending
+		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+		// but slot is reported empty again, request2 will be fulfilled, request1 will be missing
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+	}
+
+	/**
+	 * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
+	 * information didn't match.
+	 */
+	@Test
+	public void testExistingInUseSlotWithDifferentAllocationInfo() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		// make this slot in use
+		SlotID slotId = SlotID.generate();
+		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+		SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+		// update slot status with different allocation info
+		slotManager.updateSlotStatus(slotStatus2);
+
+		// original request is missing and won't be allocated
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slotId));
+		assertFalse(slotManager.isAllocated(request.getAllocationId()));
+		assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+	}
+
+	/**
+	 * Tests that we had a free slot, and it's confirmed by SlotReport
+	 */
+	@Test
+	public void testExistingEmptySlotUpdateStatus() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+		slotManager.addFreeSlot(slot);
+
+		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(0, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+	}
+
+	/**
+	 * Tests that we had a free slot, and it's reported in-use by TaskManager
+	 */
+	@Test
+	public void testExistingEmptySlotAdjustedToInUse() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+		slotManager.addFreeSlot(slot);
+
+		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
+			new AllocationID(), new JobID());
+		slotManager.updateSlotStatus(slotStatus);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slot.getSlotId()));
+	}
+
+	/**
+	 * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+	 */
+	@Test
+	public void testSlotAllocationFailedAtTaskManager() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+		slotManager.addFreeSlot(slot);
+
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+	}
+
+
+	/**
+	 * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+	 */
+	@Test
+	public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+		slotManager.addFreeSlot(slot);
+
+		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request);
+
+		// slot is set empty by heartbeat
+		SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
+		slotManager.updateSlotStatus(slotStatus);
+
+		// another request took this slot
+		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+		slotManager.requestSlot(request2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+		assertFalse(slotManager.isAllocated(request.getAllocationId()));
+		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+		// original request should be pended
+		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(1, slotManager.getPendingRequestCount());
+		assertFalse(slotManager.isAllocated(request.getAllocationId()));
+		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+	}
+
+	@Test
+	public void testNotifyTaskManagerFailure() {
+		TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+		ResourceID resource1 = ResourceID.generate();
+		ResourceID resource2 = ResourceID.generate();
+
+		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
+		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
+		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
+		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
+
+		slotManager.addFreeSlot(slot11);
+		slotManager.addFreeSlot(slot21);
+
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+		assertEquals(2, slotManager.getAllocatedSlotCount());
+		assertEquals(0, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		slotManager.addFreeSlot(slot12);
+		slotManager.addFreeSlot(slot22);
+
+		assertEquals(2, slotManager.getAllocatedSlotCount());
+		assertEquals(2, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		slotManager.notifyTaskManagerFailure(resource2);
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+
+		// notify an not exist resource failure
+		slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+		assertEquals(1, slotManager.getAllocatedSlotCount());
+		assertEquals(1, slotManager.getFreeSlotCount());
+		assertEquals(0, slotManager.getPendingRequestCount());
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing utilities
+	// ------------------------------------------------------------------------
+
+	private void directlyProvideFreeSlots(
+		final SlotManager slotManager,
+		final ResourceProfile resourceProfile,
+		final int freeSlotNum)
+	{
+		for (int i = 0; i < freeSlotNum; ++i) {
+			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing classes
+	// ------------------------------------------------------------------------
+
+	private static class TestingSlotManager extends SlotManager {
+
+		private final List<ResourceProfile> allocatedContainers;
+
+		TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
+			super(resourceManagerGateway);
+			this.allocatedContainers = new LinkedList<>();
+		}
+
+		/**
+		 * Choose slot randomly if it matches requirement
+		 *
+		 * @param request   The slot request
+		 * @param freeSlots All slots which can be used
+		 * @return The chosen slot or null if cannot find a match
+		 */
+		@Override
+		protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+			for (ResourceSlot slot : freeSlots.values()) {
+				if (slot.isMatchingRequirement(request.getResourceProfile())) {
+					return slot;
+				}
+			}
+			return null;
+		}
+
+		/**
+		 * Choose request randomly if offered slot can match its requirement
+		 *
+		 * @param offeredSlot     The free slot
+		 * @param pendingRequests All the pending slot requests
+		 * @return The chosen request's AllocationID or null if cannot find a match
+		 */
+		@Override
+		protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
+			Map<AllocationID, SlotRequest> pendingRequests)
+		{
+			for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
+				if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
+					return pendingRequest.getValue();
+				}
+			}
+			return null;
+		}
+
+		@Override
+		protected void allocateContainer(ResourceProfile resourceProfile) {
+			allocatedContainers.add(resourceProfile);
+		}
+
+		List<ResourceProfile> getAllocatedContainers() {
+			return allocatedContainers;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 2790cf8..f55069e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -21,28 +21,14 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import org.mockito.Mockito;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
deleted file mode 100644
index 9508825..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.Futures;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
- */
-public class RetryingRegistrationTest extends TestLogger {
-
-	@Test
-	public void testSimpleSuccessfulRegistration() throws Exception {
-		final String testId = "laissez les bon temps roulez";
-		final String testEndpointAddress = "<test-address>";
-		final UUID leaderId = UUID.randomUUID();
-
-		// an endpoint that immediately returns success
-		TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
-		TestingRpcService rpc = new TestingRpcService();
-
-		try {
-			rpc.registerGateway(testEndpointAddress, testGateway);
-
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-			registration.startRegistration();
-
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
-			assertNotNull(future);
-
-			// multiple accesses return the same future
-			assertEquals(future, registration.getFuture());
-
-			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = 
-					Await.result(future, new FiniteDuration(10, SECONDS));
-
-			// validate correct invocation and result
-			assertEquals(testId, success.f1.getCorrelationId());
-			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-		}
-		finally {
-			testGateway.stop();
-			rpc.stopService();
-		}
-	}
-	
-	@Test
-	public void testPropagateFailures() throws Exception {
-		final String testExceptionMessage = "testExceptionMessage";
-
-		// RPC service that fails with exception upon the connection
-		RpcService rpc = mock(RpcService.class);
-		when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage));
-
-		TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
-		registration.startRegistration();
-
-		Future<?> future = registration.getFuture();
-		assertTrue(future.failed().isCompleted());
-
-		assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
-	}
-
-	@Test
-	public void testRetryConnectOnFailure() throws Exception {
-		final String testId = "laissez les bon temps roulez";
-		final UUID leaderId = UUID.randomUUID();
-
-		ExecutorService executor = Executors.newCachedThreadPool();
-		TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
-
-		try {
-			// RPC service that fails upon the first connection, but succeeds on the second
-			RpcService rpc = mock(RpcService.class);
-			when(rpc.connect(anyString(), any(Class.class))).thenReturn(
-					Futures.failed(new Exception("test connect failure")),  // first connection attempt fails
-					Futures.successful(testGateway)                         // second connection attempt succeeds
-			);
-			when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
-
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
-			registration.startRegistration();
-
-			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
-
-			// validate correct invocation and result
-			assertEquals(testId, success.f1.getCorrelationId());
-			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-		}
-		finally {
-			testGateway.stop();
-			executor.shutdown();
-		}
-	}
-
-	@Test
-	public void testRetriesOnTimeouts() throws Exception {
-		final String testId = "rien ne va plus";
-		final String testEndpointAddress = "<test-address>";
-		final UUID leaderId = UUID.randomUUID();
-
-		// an endpoint that immediately returns futures with timeouts before returning a successful future
-		TestRegistrationGateway testGateway = new TestRegistrationGateway(
-				null, // timeout
-				null, // timeout
-				new TestRegistrationSuccess(testId) // success
-		);
-
-		TestingRpcService rpc = new TestingRpcService();
-
-		try {
-			rpc.registerGateway(testEndpointAddress, testGateway);
-	
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-	
-			long started = System.nanoTime();
-			registration.startRegistration();
-	
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
-			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(future, new FiniteDuration(10, SECONDS));
-	
-			long finished = System.nanoTime();
-			long elapsedMillis = (finished - started) / 1000000;
-	
-			// validate correct invocation and result
-			assertEquals(testId, success.f1.getCorrelationId());
-			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-	
-			// validate that some retry-delay / back-off behavior happened
-			assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
-		}
-		finally {
-			rpc.stopService();
-			testGateway.stop();
-		}
-	}
-
-	@Test
-	public void testDecline() throws Exception {
-		final String testId = "qui a coupe le fromage";
-		final String testEndpointAddress = "<test-address>";
-		final UUID leaderId = UUID.randomUUID();
-
-		TestingRpcService rpc = new TestingRpcService();
-
-		TestRegistrationGateway testGateway = new TestRegistrationGateway(
-				null, // timeout
-				new RegistrationResponse.Decline("no reason "),
-				null, // timeout
-				new TestRegistrationSuccess(testId) // success
-		);
-
-		try {
-			rpc.registerGateway(testEndpointAddress, testGateway);
-
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
-			long started = System.nanoTime();
-			registration.startRegistration();
-	
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
-			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(future, new FiniteDuration(10, SECONDS));
-
-			long finished = System.nanoTime();
-			long elapsedMillis = (finished - started) / 1000000;
-
-			// validate correct invocation and result
-			assertEquals(testId, success.f1.getCorrelationId());
-			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-
-			// validate that some retry-delay / back-off behavior happened
-			assertTrue("retries did not properly back off", elapsedMillis >= 
-					2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
-		}
-		finally {
-			testGateway.stop();
-			rpc.stopService();
-		}
-	}
-	
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRetryOnError() throws Exception {
-		final String testId = "Petit a petit, l'oiseau fait son nid";
-		final String testEndpointAddress = "<test-address>";
-		final UUID leaderId = UUID.randomUUID();
-
-		TestingRpcService rpc = new TestingRpcService();
-
-		try {
-			// gateway that upon calls first responds with a failure, then with a success
-			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
-
-			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
-					Futures.<RegistrationResponse>failed(new Exception("test exception")),
-					Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
-			
-			rpc.registerGateway(testEndpointAddress, testGateway);
-
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
-			long started = System.nanoTime();
-			registration.startRegistration();
-
-			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
-			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(future, new FiniteDuration(10, SECONDS));
-
-			long finished = System.nanoTime();
-			long elapsedMillis = (finished - started) / 1000000;
-			
-			assertEquals(testId, success.f1.getCorrelationId());
-
-			// validate that some retry-delay / back-off behavior happened
-			assertTrue("retries did not properly back off",
-					elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR);
-		}
-		finally {
-			rpc.stopService();
-		}
-	}
-
-	@Test
-	public void testCancellation() throws Exception {
-		final String testEndpointAddress = "my-test-address";
-		final UUID leaderId = UUID.randomUUID();
-
-		TestingRpcService rpc = new TestingRpcService();
-
-		try {
-			Promise<RegistrationResponse> result = Futures.promise();
-
-			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
-			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
-
-			rpc.registerGateway(testEndpointAddress, testGateway);
-
-			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-			registration.startRegistration();
-
-			// cancel and fail the current registration attempt
-			registration.cancel();
-			result.failure(new TimeoutException());
-
-			// there should not be a second registration attempt
-			verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());
-		}
-		finally {
-			rpc.stopService();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test registration
-	// ------------------------------------------------------------------------
-
-	private static class TestRegistrationSuccess extends RegistrationResponse.Success {
-		private static final long serialVersionUID = 5542698790917150604L;
-
-		private final String correlationId;
-
-		private TestRegistrationSuccess(String correlationId) {
-			this.correlationId = correlationId;
-		}
-
-		public String getCorrelationId() {
-			return correlationId;
-		}
-	}
-
-	private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
-
-		// we use shorter timeouts here to speed up the tests
-		static final long INITIAL_TIMEOUT = 20;
-		static final long MAX_TIMEOUT = 200;
-		static final long DELAY_ON_ERROR = 200;
-		static final long DELAY_ON_DECLINE = 200;
-
-		public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
-			super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
-					rpc, "TestEndpoint",
-					TestRegistrationGateway.class,
-					targetAddress, leaderId,
-					INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
-		}
-
-		@Override
-		protected Future<RegistrationResponse> invokeRegistration(
-				TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
-			return gateway.registrationCall(leaderId, timeoutMillis);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
deleted file mode 100644
index a049e48..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.Futures;
-
-import org.apache.flink.runtime.rpc.TestingGatewayBase;
-import org.apache.flink.util.Preconditions;
-
-import scala.concurrent.Future;
-
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TestRegistrationGateway extends TestingGatewayBase {
-
-	private final BlockingQueue<RegistrationCall> invocations;
-
-	private final RegistrationResponse[] responses;
-
-	private int pos;
-
-	public TestRegistrationGateway(RegistrationResponse... responses) {
-		Preconditions.checkArgument(responses != null && responses.length > 0);
-
-		this.invocations = new LinkedBlockingQueue<>();
-		this.responses = responses;
-		
-	}
-
-	// ------------------------------------------------------------------------
-
-	public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
-		invocations.add(new RegistrationCall(leaderId, timeout));
-
-		RegistrationResponse response = responses[pos];
-		if (pos < responses.length - 1) {
-			pos++;
-		}
-
-		// return a completed future (for a proper value), or one that never completes and will time out (for null)
-		return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
-	}
-
-	public BlockingQueue<RegistrationCall> getInvocations() {
-		return invocations;
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class RegistrationCall {
-		private final UUID leaderId;
-		private final long timeout;
-
-		public RegistrationCall(UUID leaderId, long timeout) {
-			this.leaderId = leaderId;
-			this.timeout = timeout;
-		}
-
-		public UUID leaderId() {
-			return leaderId;
-		}
-
-		public long timeout() {
-			return timeout;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
deleted file mode 100644
index dfffeda..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * resourceManager HA test, including grant leadership and revoke leadership
- */
-public class ResourceManagerHATest {
-
-	@Test
-	public void testGrantAndRevokeLeadership() throws Exception {
-		// mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
-		TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
-		doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
-
-		RpcService rpcService = mock(RpcService.class);
-		when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
-
-		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
-
-		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
-		resourceManager.start();
-		// before grant leadership, resourceManager's leaderId is null
-		Assert.assertNull(resourceManager.getLeaderSessionID());
-		final UUID leaderId = UUID.randomUUID();
-		leaderElectionService.isLeader(leaderId);
-		// after grant leadership, resourceManager's leaderId has value
-		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
-		// then revoke leadership, resourceManager's leaderId is null again
-		leaderElectionService.notLeader();
-		Assert.assertNull(resourceManager.getLeaderSessionID());
-	}
-
-	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
-		@Override
-		public void runAsync(Runnable runnable) {
-			runnable.run();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
deleted file mode 100644
index 25a670c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.NonHaServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class TaskExecutorTest extends TestLogger {
-
-	@Test
-	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
-		final ResourceID resourceID = ResourceID.generate();
-		final String resourceManagerAddress = "/resource/manager/address/one";
-
-		final TestingRpcService rpc = new TestingRpcService();
-		try {
-			// register a mock resource manager gateway
-			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
-			rpc.registerGateway(resourceManagerAddress, rmGateway);
-
-			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
-			TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
-				new Configuration(), resourceID, rpc, "localhost", haServices, true);
-			String taskManagerAddress = taskManager.getAddress();
-			taskManager.start();
-
-			verify(rmGateway, timeout(5000)).registerTaskExecutor(
-					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
-		}
-		finally {
-			rpc.stopService();
-		}
-	}
-
-	@Test
-	public void testTriggerRegistrationOnLeaderChange() throws Exception {
-		final ResourceID resourceID = ResourceID.generate();
-
-		final String address1 = "/resource/manager/address/one";
-		final String address2 = "/resource/manager/address/two";
-		final UUID leaderId1 = UUID.randomUUID();
-		final UUID leaderId2 = UUID.randomUUID();
-
-		final TestingRpcService rpc = new TestingRpcService();
-		try {
-			// register the mock resource manager gateways
-			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
-			ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
-			rpc.registerGateway(address1, rmGateway1);
-			rpc.registerGateway(address2, rmGateway2);
-
-			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
-
-			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-			haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-			TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
-				new Configuration(), resourceID, rpc, "localhost", haServices, true);
-			String taskManagerAddress = taskManager.getAddress();
-			taskManager.start();
-
-			// no connection initially, since there is no leader
-			assertNull(taskManager.getResourceManagerConnection());
-
-			// define a leader and see that a registration happens
-			testLeaderService.notifyListener(address1, leaderId1);
-
-			verify(rmGateway1, timeout(5000)).registerTaskExecutor(
-					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
-			assertNotNull(taskManager.getResourceManagerConnection());
-
-			// cancel the leader 
-			testLeaderService.notifyListener(null, null);
-
-			// set a new leader, see that a registration happens 
-			testLeaderService.notifyListener(address2, leaderId2);
-
-			verify(rmGateway2, timeout(5000)).registerTaskExecutor(
-					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
-			assertNotNull(taskManager.getResourceManagerConnection());
-		}
-		finally {
-			rpc.stopService();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..a8d5bd7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TaskExecutorTest extends TestLogger {
+
+	@Test
+	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
+		final ResourceID resourceID = ResourceID.generate();
+		final String resourceManagerAddress = "/resource/manager/address/one";
+
+		final TestingRpcService rpc = new TestingRpcService();
+		try {
+			// register a mock resource manager gateway
+			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+			rpc.registerGateway(resourceManagerAddress, rmGateway);
+
+			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
+			TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
+				new Configuration(), resourceID, rpc, "localhost", haServices, true);
+			String taskManagerAddress = taskManager.getAddress();
+			taskManager.start();
+
+			verify(rmGateway, timeout(5000)).registerTaskExecutor(
+					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+		}
+		finally {
+			rpc.stopService();
+		}
+	}
+
+	@Test
+	public void testTriggerRegistrationOnLeaderChange() throws Exception {
+		final ResourceID resourceID = ResourceID.generate();
+
+		final String address1 = "/resource/manager/address/one";
+		final String address2 = "/resource/manager/address/two";
+		final UUID leaderId1 = UUID.randomUUID();
+		final UUID leaderId2 = UUID.randomUUID();
+
+		final TestingRpcService rpc = new TestingRpcService();
+		try {
+			// register the mock resource manager gateways
+			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
+			ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
+			rpc.registerGateway(address1, rmGateway1);
+			rpc.registerGateway(address2, rmGateway2);
+
+			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+
+			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+			haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+			TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
+				new Configuration(), resourceID, rpc, "localhost", haServices, true);
+			String taskManagerAddress = taskManager.getAddress();
+			taskManager.start();
+
+			// no connection initially, since there is no leader
+			assertNull(taskManager.getResourceManagerConnection());
+
+			// define a leader and see that a registration happens
+			testLeaderService.notifyListener(address1, leaderId1);
+
+			verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+			assertNotNull(taskManager.getResourceManagerConnection());
+
+			// cancel the leader 
+			testLeaderService.notifyListener(null, null);
+
+			// set a new leader, see that a registration happens 
+			testLeaderService.notifyListener(address2, leaderId2);
+
+			verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+			assertNotNull(taskManager.getResourceManagerConnection());
+		}
+		finally {
+			rpc.stopService();
+		}
+	}
+}


Mime
View raw message