flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5403: [WIP] Reschedule failed tasks to previous allocati...
Date Fri, 23 Feb 2018 14:29:06 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5403#discussion_r170262113
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
---
    @@ -1343,63 +1330,27 @@ boolean contains(AllocationID slotId) {
     		 * Poll a slot which matches the required resource profile. The polling tries to satisfy
the
     		 * location preferences, by TaskManager and by host.
     		 *
    -		 * @param resourceProfile      The required resource profile.
    -		 * @param locationPreferences  The location preferences, in order to be checked.
    +		 * @param slotProfile slot profile that specifies the requirements for the slot
     		 *
     		 * @return Slot which matches the resource profile, null if we can't find a match
     		 */
    -		SlotAndLocality poll(ResourceProfile resourceProfile, Collection<TaskManagerLocation>
locationPreferences) {
    +		SlotAndLocality poll(SlotProfile slotProfile) {
     			// fast path if no slots are available
     			if (availableSlots.isEmpty()) {
     				return null;
     			}
     
    -			boolean hadLocationPreference = false;
    +			SlotProfile.ProfileToSlotContextMatcher matcher = slotProfile.matcher();
     
    -			if (locationPreferences != null && !locationPreferences.isEmpty()) {
    -
    -				// first search by TaskManager
    -				for (TaskManagerLocation location : locationPreferences) {
    -					hadLocationPreference = true;
    -
    -					final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID());
    -					if (onTaskManager != null) {
    -						for (AllocatedSlot candidate : onTaskManager) {
    -							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
    -								remove(candidate.getAllocationId());
    -								return new SlotAndLocality(candidate, Locality.LOCAL);
    -							}
    -						}
    -					}
    -				}
    -
    -				// now, search by host
    -				for (TaskManagerLocation location : locationPreferences) {
    -					final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname());
    -					if (onHost != null) {
    -						for (AllocatedSlot candidate : onHost) {
    -							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
    -								remove(candidate.getAllocationId());
    -								return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
    -							}
    -						}
    -					}
    -				}
    -			}
    -
    -			// take any slot
    -			for (SlotAndTimestamp candidate : availableSlots.values()) {
    -				final AllocatedSlot slot = candidate.slot();
    -
    -				if (slot.getResourceProfile().isMatching(resourceProfile)) {
    +			return matcher.findMatchWithLocality(
    +				availableSlots.values().stream(),
    +				SlotAndTimestamp::slot,
    +				(slot) -> slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
    +				((slotAndTimestamp, locality) -> {
    --- End diff --
    
    It's always a bit easier to understand the code if the variables have type annotations.


---

Mime
View raw message