flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5403: [WIP] Reschedule failed tasks to previous allocati...
Date Fri, 23 Feb 2018 14:29:23 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5403#discussion_r170253550
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.clusterframework.types;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.function.BiFunction;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Stream;
    +
    +/**
    + * A slot profile describes the profile of a slot into which a task wants to be scheduled.
The profile contains
    + * attributes such as resource or locality constraints, some of which may be hard or
soft. A matcher can be generated
    + * to filter out candidate slots by matching their {@link SlotContext} against the slot
profile and, potentially,
    + * further requirements.
    + */
    +public class SlotProfile {
    +
    +	/** Singleton object for a slot profile without any requirements. */
    +	private static final SlotProfile NO_REQUIREMENTS = noLocality(ResourceProfile.UNKNOWN);
    +
    +	/** This specifies the desired resource profile for the slot. */
    +	@Nonnull
    +	private final ResourceProfile resourceProfile;
    +
    +	/** This specifies the preferred locations for the slot. */
    +	@Nonnull
    +	private final Collection<TaskManagerLocation> preferredLocations;
    +
    +	/** This contains desired allocation ids of the slot. */
    +	@Nonnull
    +	private final Collection<AllocationID> priorAllocations;
    +
    +	public SlotProfile(
    +		@Nonnull ResourceProfile resourceProfile,
    +		@Nonnull Collection<TaskManagerLocation> preferredLocations,
    +		@Nonnull Collection<AllocationID> priorAllocations) {
    +		this.resourceProfile = resourceProfile;
    +		this.preferredLocations = preferredLocations;
    +		this.priorAllocations = priorAllocations;
    +	}
    +
    +	/**
    +	 * Returns the desired resource profile for the slot.
    +	 */
    +	@Nonnull
    +	public ResourceProfile getResourceProfile() {
    +		return resourceProfile;
    +	}
    +
    +	/**
    +	 * Returns the preferred locations for the slot.
    +	 */
    +	@Nonnull
    +	public Collection<TaskManagerLocation> getPreferredLocations() {
    +		return preferredLocations;
    +	}
    +
    +	/**
    +	 * Returns the desired allocation ids for the slot.
    +	 */
    +	@Nonnull
    +	public Collection<AllocationID> getPriorAllocations() {
    +		return priorAllocations;
    +	}
    +
    +	public ProfileToSlotContextMatcher matcher() {
    +		if (priorAllocations.isEmpty()) {
    +			return new LocalityAwareRequirementsToSlotMatcher(preferredLocations);
    +		} else {
    +			return new PreviousAllocationProfileToSlotContextMatcher(priorAllocations);
    +		}
    +	}
    +
    +	/**
    +	 * Classes that implement this interface provide a method to match objects to somehow
represent slot candidates
    +	 * against the {@link SlotProfile} that produced the matcher object. A matching candidate
is transformed into a
    +	 * desired result. If the matcher does not find a matching candidate, it returns null.
    +	 */
    +	public interface ProfileToSlotContextMatcher {
    +
    +		/**
    +		 * @param candidates                   stream of candidates to match against.
    +		 * @param contextExtractor             function to extract the {@link SlotContext}
from the candidates.
    +		 * @param additionalRequirementsFilter predicate to specify additional requirements
for each candidate.
    +		 * @param resultProducer               function to produce a result from a matching
candidate input.
    +		 * @param <IN>                         type of the objects against we match the
profile.
    +		 * @param <OUT>                        type of the produced output from a matching
object.
    +		 * @return the result produced by resultProducer if a matching candidate was found
or null otherwise.
    +		 */
    +		@Nullable
    +		<IN, OUT> OUT findMatchWithLocality(
    +			@Nonnull Stream<IN> candidates,
    +			@Nonnull Function<IN, SlotContext> contextExtractor,
    +			@Nonnull Predicate<IN> additionalRequirementsFilter,
    +			@Nonnull BiFunction<IN, Locality, OUT> resultProducer);
    +	}
    +
    +	/**
    +	 * This matcher implementation is the presence of prior allocations. Prior allocations
are supposed to overrule
    +	 * other locality requirements, such as preferred locations. Prior allocations also
require strict matching and
    +	 * this matcher returns null if it cannot find a candidate for the same prior allocation.
The background is that
    +	 * this will force the scheduler tor request a new slot that is guaranteed to be not
the prior location of any
    +	 * other subtask, so that subtasks do not steal another subtasks prior allocation in
case that the own prior
    +	 * allocation is no longer available (e.g. machine failure). This is important to enable
local recovery for all
    +	 * tasks that can still return to their prior allocation.
    +	 */
    +	@VisibleForTesting
    +	public static class PreviousAllocationProfileToSlotContextMatcher implements ProfileToSlotContextMatcher
{
    +
    +		/** Set of prior allocations. */
    +		private final HashSet<AllocationID> priorAllocations;
    +
    +		@VisibleForTesting
    +		PreviousAllocationProfileToSlotContextMatcher(Collection<AllocationID> priorAllocations)
{
    +			this.priorAllocations = new HashSet<>(priorAllocations);
    +			Preconditions.checkState(
    +				this.priorAllocations.size() > 0,
    +				"This matcher should only be used if there are prior allocations!");
    +		}
    +
    +		public <I, O> O findMatchWithLocality(
    +			@Nonnull Stream<I> candidates,
    +			@Nonnull Function<I, SlotContext> contextExtractor,
    +			@Nonnull Predicate<I> additionalRequirementsFilter,
    +			@Nonnull BiFunction<I, Locality, O> resultProducer) {
    +
    +			Predicate<I> filterByAllocation =
    +				(candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
    +
    +			return candidates
    +				.filter(filterByAllocation.and(additionalRequirementsFilter))
    +				.findFirst()
    +				.map((result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce
special locality?
    +				.orElse(null);
    +		}
    +	}
    +
    +	/**
    +	 * This matcher is used whenever no prior allocation was specified in the {@link SlotProfile}.
This implementation
    +	 * tries to achieve best possible locality if a preferred location is specified in the
profile.
    +	 */
    +	@VisibleForTesting
    +	public static class LocalityAwareRequirementsToSlotMatcher implements ProfileToSlotContextMatcher
{
    +
    +		private final Collection<TaskManagerLocation> locationPreferences;
    +
    +		@VisibleForTesting
    +		public LocalityAwareRequirementsToSlotMatcher(Collection<TaskManagerLocation>
locationPreferences) {
    --- End diff --
    
    `@NonNull` missing or null check in constructor body.


---

Mime
View raw message