flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1487] [runtime][tests] Fixes SchedulerIsolatedTasksTest.testScheduleQueueing by waiting for the released resources to registered at the Scheduler again.
Date Sun, 15 Feb 2015 18:33:25 GMT
[FLINK-1487] [runtime][tests] Fixes SchedulerIsolatedTasksTest.testScheduleQueueing by waiting
for the released resources to registered at the Scheduler again.

This closes #398


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b40386d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b40386d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b40386d

Branch: refs/heads/master
Commit: 1b40386d39d7848eff0d1e6cec7fa17a96ef4f6e
Parents: 81a7837
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Sat Feb 14 17:38:57 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Feb 15 18:31:47 2015 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 100 +++++++++++--------
 .../scheduler/SchedulerIsolatedTasksTest.java   |  52 +++++-----
 2 files changed, 85 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b40386d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 3d7d29a..de8adb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -102,38 +102,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 		}
 	}
 
-	/**
-	 * 
-	 * NOTE: In the presence of multi-threaded operations, this number may be inexact.
-	 * 
-	 * @return The number of empty slots, for tasks.
-	 */
-	public int getNumberOfAvailableSlots() {
-		int count = 0;
-		
-		synchronized (globalLock) {
-			for (Instance instance : instancesWithAvailableResources) {
-				count += instance.getNumberOfAvailableSlots();
-			}
-		}
-		
-		return count;
-	}
-	
-	public int getTotalNumberOfSlots() {
-		int count = 0;
-		
-		synchronized (globalLock) {
-			for (Instance instance : allInstances) {
-				if (instance.isAlive()) {
-					count += instance.getTotalNumberOfSlots();
-				}
-			}
-		}
-		
-		return count;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
@@ -209,7 +177,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 				}
 
 				SimpleSlot newSlot = null;
-				
+				SimpleSlot toUse = null;
+
 				// the following needs to make sure any allocated slot is released in case of an error
 				try {
 					
@@ -228,8 +197,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 					// get a new slot, since we could not place it into the group, or we could not place
it locally
 					newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation);
 
-					SimpleSlot toUse;
-					
 					if (newSlot == null) {
 						if (slotFromGroup == null) {
 							// both null
@@ -283,7 +250,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 					}
 					
 					updateLocalityCounters(toUse.getLocality());
-					return toUse;
 				}
 				catch (NoResourceAvailableException e) {
 					throw e;
@@ -295,13 +261,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 					if (newSlot != null) {
 						newSlot.releaseSlot();
 					}
-					
+
 					ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
 				}
-			}
-		
-			// 2) === schedule without hints and sharing ===
-			{
+
+				return toUse;
+			} else {
+				// 2) === schedule without hints and sharing ===
 				SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
 				if (slot != null) {
 					updateLocalityCounters(slot.getLocality());
@@ -698,6 +664,40 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	//  Status reporting
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 *
+	 * NOTE: In the presence of multi-threaded operations, this number may be inexact.
+	 *
+	 * @return The number of empty slots, for tasks.
+	 */
+	public int getNumberOfAvailableSlots() {
+		int count = 0;
+
+		synchronized (globalLock) {
+			processNewlyAvailableInstances();
+
+			for (Instance instance : instancesWithAvailableResources) {
+				count += instance.getNumberOfAvailableSlots();
+			}
+		}
+
+		return count;
+	}
+
+	public int getTotalNumberOfSlots() {
+		int count = 0;
+
+		synchronized (globalLock) {
+			for (Instance instance : allInstances) {
+				if (instance.isAlive()) {
+					count += instance.getTotalNumberOfSlots();
+				}
+			}
+		}
+
+		return count;
+	}
+
 	public int getNumberOfAvailableInstances() {
 		int numberAvailableInstances = 0;
 		synchronized (this.globalLock) {
@@ -712,7 +712,11 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	}
 	
 	public int getNumberOfInstancesWithAvailableSlots() {
-		return instancesWithAvailableResources.size();
+		synchronized (globalLock) {
+			processNewlyAvailableInstances();
+
+			return instancesWithAvailableResources.size();
+		}
 	}
 	
 	public Map<String, List<Instance>> getInstancesByHost() {
@@ -739,6 +743,18 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	}
 	
 	// --------------------------------------------------------------------------------------------
+
+	private void processNewlyAvailableInstances() {
+		synchronized (globalLock) {
+			Instance instance;
+
+			while((instance = newlyAvailableInstances.poll()) != null){
+				if(instance.hasResourcesAvailable()){
+					instancesWithAvailableResources.add(instance);
+				}
+			}
+		}
+	}
 	
 	private static final class QueuedTask {
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/1b40386d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index a7f0f04..d19299b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -198,29 +198,29 @@ public class SchedulerIsolatedTasksTest {
 		final int NUM_TASKS_TO_SCHEDULE = 2000;
 
 		TestingUtils.setGlobalExecutionContext();
-		
+
 		try {
 			// note: since this test asynchronously releases slots, the executor needs release workers.
 			// doing the release call synchronous can lead to a deadlock
 			Scheduler scheduler = new Scheduler();
-			
-			for (int i = 0;i < NUM_INSTANCES; i++) {
+
+			for (int i = 0; i < NUM_INSTANCES; i++) {
 				scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE)
+ 1));
 			}
-			
+
 			assertEquals(NUM_INSTANCES, scheduler.getNumberOfAvailableInstances());
 			final int totalSlots = scheduler.getNumberOfAvailableSlots();
-			
+
 			// all slots we ever got.
 			List<SlotAllocationFuture> allAllocatedSlots = new ArrayList<SlotAllocationFuture>();
-			
+
 			// slots that need to be released
 			final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
-			
+
 			// flag to track errors in the concurrent thread
 			final AtomicBoolean errored = new AtomicBoolean(false);
-			
-			
+
+
 			SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
 				@Override
 				public void slotAllocated(SimpleSlot slot) {
@@ -230,10 +230,10 @@ public class SchedulerIsolatedTasksTest {
 					}
 				}
 			};
-			
+
 			// thread to asynchronously release slots
 			Runnable disposer = new Runnable() {
-				
+
 				@Override
 				public void run() {
 					try {
@@ -243,25 +243,24 @@ public class SchedulerIsolatedTasksTest {
 								while (toRelease.isEmpty()) {
 									toRelease.wait();
 								}
-								
+
 								Iterator<SimpleSlot> iter = toRelease.iterator();
 								SimpleSlot next = iter.next();
 								iter.remove();
-								
+
 								next.releaseSlot();
 								recycled++;
 							}
 						}
-					}
-					catch (Throwable t) {
+					} catch (Throwable t) {
 						errored.set(true);
 					}
 				}
 			};
-			
+
 			Thread disposeThread = new Thread(disposer);
 			disposeThread.start();
-			
+
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
 				SlotAllocationFuture future = scheduler.scheduleQueued(new ScheduledUnit(getDummyTask()));
 				future.setFutureAction(action);
@@ -269,23 +268,26 @@ public class SchedulerIsolatedTasksTest {
 			}
 
 			disposeThread.join();
-			
+
 			assertFalse("The slot releasing thread caused an error.", errored.get());
-			
+
 			List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
 			for (SlotAllocationFuture future : allAllocatedSlots) {
 				slotsAfter.add(future.waitTillAllocated());
 			}
-			
+
+			assertEquals("All instances should have available slots.", NUM_INSTANCES,
+					scheduler.getNumberOfInstancesWithAvailableSlots());
+
 			// the slots should all be different
 			assertTrue(areAllDistinct(slotsAfter.toArray()));
-			
-			assertEquals(totalSlots, scheduler.getNumberOfAvailableSlots());
-		}
-		catch (Exception e) {
+
+			assertEquals("All slots should be available.", totalSlots,
+					scheduler.getNumberOfAvailableSlots());
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
+		} finally {
 			TestingUtils.setCallingThreadDispatcher(system);
 		}
 	}


Mime
View raw message